flink 1.11 streaming file sink to hdfs orc file format could be recognized by hive

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 streaming file sink to hdfs orc file format could be recognized by hive

大罗
Hi,大家好,我的试验场景是想把kafka的数据直接写到hive(orc format),构建一个实时数仓的理念,但是,通过flink
sql,又因为依赖问题而不能成功,那么我就想可否通过streaming file sink写入hdfs文件,给hive识别呢。我的试验过程如下:

我所使用的版本如下:

Hadoop 3.0.0+cdh6.3.2

HDFS 3.0.0+cdh6.3.2

HBase 2.1.0+cdh6.3.2

Hive 2.1.1+cdh6.3.2

Flink 1.11.1

我在代码中引入依赖'org.apache.flink:flink-orc_2.11:1.11.0', 核心代码如下:

TestStreamFileSinkViaCustomVectorizer.java
<http://apache-flink.147419.n8.nabble.com/file/t909/TestStreamFileSinkViaCustomVectorizer.java>  

然后,的确可以在hdfs相关目录,找到写入后的文件,比如:/tmp/person_orc/dt_day=2020-09-06/dt_hour=16/part-4-14

那么我就想这个文件,是否可以给hive的sql读取呢,我的hive sql如下:

CREATE TABLE ods_person_test_os(
name string, age int)
partitioned by (dt_day string, dt_hour string)
STORED AS ORC
LOCATION 'hdfs://nameservice1/tmp/person_orc/'
TBLPROPERTIES(
 'orc.compress'='SNAPPY'
);

当我手动添加分区"alter table ods_person_test_os add partition(dt_day='2020-09-06',
dt_hour='16')
",
运行查询语句 "select * from ods_person_test_os"后,报错,
hive-error.txt
<http://apache-flink.147419.n8.nabble.com/file/t909/hive-error.txt>  

其中核心的关键字"java.lang.ArrayIndexOutOfBoundsException: 7",很明显,数组越界。

经过仔细的调试后,我认为,问题出现在依赖'orc-core'里。

'org.apache.flink:flink-orc_2.11:1.11.0',会导入依赖'org.apache.orc:orc-core:1.5.6',其中的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
= OrcFile.WriterVersion.ORC_517,也就是第7个。

        ORIGINAL(OrcFile.WriterImplementation.ORC_JAVA, 0),
        HIVE_8732(OrcFile.WriterImplementation.ORC_JAVA, 1),
        HIVE_4243(OrcFile.WriterImplementation.ORC_JAVA, 2),
        HIVE_12055(OrcFile.WriterImplementation.ORC_JAVA, 3),
        HIVE_13083(OrcFile.WriterImplementation.ORC_JAVA, 4),
        ORC_101(OrcFile.WriterImplementation.ORC_JAVA, 5),
        ORC_135(OrcFile.WriterImplementation.ORC_JAVA, 6),
        ORC_517(OrcFile.WriterImplementation.ORC_JAVA, 7),
        ORC_CPP_ORIGINAL(OrcFile.WriterImplementation.ORC_CPP, 6),
        PRESTO_ORIGINAL(OrcFile.WriterImplementation.PRESTO, 6),
        SCRITCHLEY_GO_ORIGINAL(OrcFile.WriterImplementation.SCRITCHLEY_GO,
6),
        FUTURE(OrcFile.WriterImplementation.UNKNOWN, 2147483647);

而我的hive版本使用的orc版本为'org.apache.hive:hive-orc:2.1.1-cdh6.3.2',里面的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
= OrcFile.WriterVersion.HIVE_13083,  并且不存在第7个version。
        ORIGINAL(0),
        HIVE_8732(1),
        HIVE_4243(2),
        HIVE_12055(3),
        HIVE_13083(4),
        FUTURE(2147483647);

所以,当hive解析orc文件时,使用第7个版本就会报错!这是我的分析!

为此,我在想能否通过在引入'org.apache.flink:flink-orc_2.11:1.11.0'时,exclude
'org.apache.orc:orc-core:1.5.6',然后,再引入低版本的,比如'org.apache.orc:orc-core:1.1.2',也就是CURRENT_WRITER
=
OrcFile.WriterVersion.HIVE_13083,但是,又遇到以下两个问题导致初始化OrcBulkWriterFactory的时候出错:
1. 低版本的orc-core中的类'org.apache.orc.TypeDescription' 没有 fromString这个方法;
2. 低版本的orc-core中没有'org.apache.orcPhysicalWriter';

那么,为了匹配不同版本hive使用的orc writer
version,'flink-orc'是否可以在构建OrcBulkWriterFactory时,多一些动态参数呢?






--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 streaming file sink to hdfs orc file format could be recognized by hive

Jingsong Li
Hi,

flink-orc实现的OrcBulkWriterFactory,是有点“ 深入“的,重写了部分ORC的代码,所以没那么好做版本兼容。

你可以考虑使用Hive的streaming写,它使用native的hive orc writer[1],可以对应你需要的那个版本。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_streaming.html#streaming-writing

Best,
Jingsong

On Mon, Sep 7, 2020 at 2:11 PM 大罗 <[hidden email]> wrote:

> Hi,大家好,我的试验场景是想把kafka的数据直接写到hive(orc format),构建一个实时数仓的理念,但是,通过flink
> sql,又因为依赖问题而不能成功,那么我就想可否通过streaming file sink写入hdfs文件,给hive识别呢。我的试验过程如下:
>
> 我所使用的版本如下:
>
> Hadoop 3.0.0+cdh6.3.2
>
> HDFS 3.0.0+cdh6.3.2
>
> HBase 2.1.0+cdh6.3.2
>
> Hive 2.1.1+cdh6.3.2
>
> Flink 1.11.1
>
> 我在代码中引入依赖'org.apache.flink:flink-orc_2.11:1.11.0', 核心代码如下:
>
> TestStreamFileSinkViaCustomVectorizer.java
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/TestStreamFileSinkViaCustomVectorizer.java>
>
>
>
> 然后,的确可以在hdfs相关目录,找到写入后的文件,比如:/tmp/person_orc/dt_day=2020-09-06/dt_hour=16/part-4-14
>
> 那么我就想这个文件,是否可以给hive的sql读取呢,我的hive sql如下:
>
> CREATE TABLE ods_person_test_os(
> name string, age int)
> partitioned by (dt_day string, dt_hour string)
> STORED AS ORC
> LOCATION 'hdfs://nameservice1/tmp/person_orc/'
> TBLPROPERTIES(
>  'orc.compress'='SNAPPY'
> );
>
> 当我手动添加分区"alter table ods_person_test_os add partition(dt_day='2020-09-06',
> dt_hour='16')
> ",
> 运行查询语句 "select * from ods_person_test_os"后,报错,
> hive-error.txt
> <http://apache-flink.147419.n8.nabble.com/file/t909/hive-error.txt>
>
> 其中核心的关键字"java.lang.ArrayIndexOutOfBoundsException: 7",很明显,数组越界。
>
> 经过仔细的调试后,我认为,问题出现在依赖'orc-core'里。
>
>
> 'org.apache.flink:flink-orc_2.11:1.11.0',会导入依赖'org.apache.orc:orc-core:1.5.6',其中的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
> = OrcFile.WriterVersion.ORC_517,也就是第7个。
>
>         ORIGINAL(OrcFile.WriterImplementation.ORC_JAVA, 0),
>         HIVE_8732(OrcFile.WriterImplementation.ORC_JAVA, 1),
>         HIVE_4243(OrcFile.WriterImplementation.ORC_JAVA, 2),
>         HIVE_12055(OrcFile.WriterImplementation.ORC_JAVA, 3),
>         HIVE_13083(OrcFile.WriterImplementation.ORC_JAVA, 4),
>         ORC_101(OrcFile.WriterImplementation.ORC_JAVA, 5),
>         ORC_135(OrcFile.WriterImplementation.ORC_JAVA, 6),
>         ORC_517(OrcFile.WriterImplementation.ORC_JAVA, 7),
>         ORC_CPP_ORIGINAL(OrcFile.WriterImplementation.ORC_CPP, 6),
>         PRESTO_ORIGINAL(OrcFile.WriterImplementation.PRESTO, 6),
>         SCRITCHLEY_GO_ORIGINAL(OrcFile.WriterImplementation.SCRITCHLEY_GO,
> 6),
>         FUTURE(OrcFile.WriterImplementation.UNKNOWN, 2147483647);
>
>
> 而我的hive版本使用的orc版本为'org.apache.hive:hive-orc:2.1.1-cdh6.3.2',里面的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
> = OrcFile.WriterVersion.HIVE_13083,  并且不存在第7个version。
>         ORIGINAL(0),
>         HIVE_8732(1),
>         HIVE_4243(2),
>         HIVE_12055(3),
>         HIVE_13083(4),
>         FUTURE(2147483647);
>
> 所以,当hive解析orc文件时,使用第7个版本就会报错!这是我的分析!
>
> 为此,我在想能否通过在引入'org.apache.flink:flink-orc_2.11:1.11.0'时,exclude
>
> 'org.apache.orc:orc-core:1.5.6',然后,再引入低版本的,比如'org.apache.orc:orc-core:1.1.2',也就是CURRENT_WRITER
> =
>
> OrcFile.WriterVersion.HIVE_13083,但是,又遇到以下两个问题导致初始化OrcBulkWriterFactory的时候出错:
> 1. 低版本的orc-core中的类'org.apache.orc.TypeDescription' 没有 fromString这个方法;
> 2. 低版本的orc-core中没有'org.apache.orcPhysicalWriter';
>
> 那么,为了匹配不同版本hive使用的orc writer
> version,'flink-orc'是否可以在构建OrcBulkWriterFactory时,多一些动态参数呢?
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--
Best, Jingsong Lee