Hi,大家好,我的试验场景是想把kafka的数据直接写到hive(orc format),构建一个实时数仓的理念,但是,通过flink
sql,又因为依赖问题而不能成功,那么我就想可否通过streaming file sink写入hdfs文件,给hive识别呢。我的试验过程如下: 我所使用的版本如下: Hadoop 3.0.0+cdh6.3.2 HDFS 3.0.0+cdh6.3.2 HBase 2.1.0+cdh6.3.2 Hive 2.1.1+cdh6.3.2 Flink 1.11.1 我在代码中引入依赖'org.apache.flink:flink-orc_2.11:1.11.0', 核心代码如下: TestStreamFileSinkViaCustomVectorizer.java <http://apache-flink.147419.n8.nabble.com/file/t909/TestStreamFileSinkViaCustomVectorizer.java> 然后,的确可以在hdfs相关目录,找到写入后的文件,比如:/tmp/person_orc/dt_day=2020-09-06/dt_hour=16/part-4-14 那么我就想这个文件,是否可以给hive的sql读取呢,我的hive sql如下: CREATE TABLE ods_person_test_os( name string, age int) partitioned by (dt_day string, dt_hour string) STORED AS ORC LOCATION 'hdfs://nameservice1/tmp/person_orc/' TBLPROPERTIES( 'orc.compress'='SNAPPY' ); 当我手动添加分区"alter table ods_person_test_os add partition(dt_day='2020-09-06', dt_hour='16') ", 运行查询语句 "select * from ods_person_test_os"后,报错, hive-error.txt <http://apache-flink.147419.n8.nabble.com/file/t909/hive-error.txt> 其中核心的关键字"java.lang.ArrayIndexOutOfBoundsException: 7",很明显,数组越界。 经过仔细的调试后,我认为,问题出现在依赖'orc-core'里。 'org.apache.flink:flink-orc_2.11:1.11.0',会导入依赖'org.apache.orc:orc-core:1.5.6',其中的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER = OrcFile.WriterVersion.ORC_517,也就是第7个。 ORIGINAL(OrcFile.WriterImplementation.ORC_JAVA, 0), HIVE_8732(OrcFile.WriterImplementation.ORC_JAVA, 1), HIVE_4243(OrcFile.WriterImplementation.ORC_JAVA, 2), HIVE_12055(OrcFile.WriterImplementation.ORC_JAVA, 3), HIVE_13083(OrcFile.WriterImplementation.ORC_JAVA, 4), ORC_101(OrcFile.WriterImplementation.ORC_JAVA, 5), ORC_135(OrcFile.WriterImplementation.ORC_JAVA, 6), ORC_517(OrcFile.WriterImplementation.ORC_JAVA, 7), ORC_CPP_ORIGINAL(OrcFile.WriterImplementation.ORC_CPP, 6), PRESTO_ORIGINAL(OrcFile.WriterImplementation.PRESTO, 6), SCRITCHLEY_GO_ORIGINAL(OrcFile.WriterImplementation.SCRITCHLEY_GO, 6), FUTURE(OrcFile.WriterImplementation.UNKNOWN, 2147483647); 而我的hive版本使用的orc版本为'org.apache.hive:hive-orc:2.1.1-cdh6.3.2',里面的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER = OrcFile.WriterVersion.HIVE_13083, 并且不存在第7个version。 ORIGINAL(0), HIVE_8732(1), HIVE_4243(2), HIVE_12055(3), HIVE_13083(4), FUTURE(2147483647); 所以,当hive解析orc文件时,使用第7个版本就会报错!这是我的分析! 为此,我在想能否通过在引入'org.apache.flink:flink-orc_2.11:1.11.0'时,exclude 'org.apache.orc:orc-core:1.5.6',然后,再引入低版本的,比如'org.apache.orc:orc-core:1.1.2',也就是CURRENT_WRITER = OrcFile.WriterVersion.HIVE_13083,但是,又遇到以下两个问题导致初始化OrcBulkWriterFactory的时候出错: 1. 低版本的orc-core中的类'org.apache.orc.TypeDescription' 没有 fromString这个方法; 2. 低版本的orc-core中没有'org.apache.orcPhysicalWriter'; 那么,为了匹配不同版本hive使用的orc writer version,'flink-orc'是否可以在构建OrcBulkWriterFactory时,多一些动态参数呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
flink-orc实现的OrcBulkWriterFactory,是有点“ 深入“的,重写了部分ORC的代码,所以没那么好做版本兼容。 你可以考虑使用Hive的streaming写,它使用native的hive orc writer[1],可以对应你需要的那个版本。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_streaming.html#streaming-writing Best, Jingsong On Mon, Sep 7, 2020 at 2:11 PM 大罗 <[hidden email]> wrote: > Hi,大家好,我的试验场景是想把kafka的数据直接写到hive(orc format),构建一个实时数仓的理念,但是,通过flink > sql,又因为依赖问题而不能成功,那么我就想可否通过streaming file sink写入hdfs文件,给hive识别呢。我的试验过程如下: > > 我所使用的版本如下: > > Hadoop 3.0.0+cdh6.3.2 > > HDFS 3.0.0+cdh6.3.2 > > HBase 2.1.0+cdh6.3.2 > > Hive 2.1.1+cdh6.3.2 > > Flink 1.11.1 > > 我在代码中引入依赖'org.apache.flink:flink-orc_2.11:1.11.0', 核心代码如下: > > TestStreamFileSinkViaCustomVectorizer.java > < > http://apache-flink.147419.n8.nabble.com/file/t909/TestStreamFileSinkViaCustomVectorizer.java> > > > > 然后,的确可以在hdfs相关目录,找到写入后的文件,比如:/tmp/person_orc/dt_day=2020-09-06/dt_hour=16/part-4-14 > > 那么我就想这个文件,是否可以给hive的sql读取呢,我的hive sql如下: > > CREATE TABLE ods_person_test_os( > name string, age int) > partitioned by (dt_day string, dt_hour string) > STORED AS ORC > LOCATION 'hdfs://nameservice1/tmp/person_orc/' > TBLPROPERTIES( > 'orc.compress'='SNAPPY' > ); > > 当我手动添加分区"alter table ods_person_test_os add partition(dt_day='2020-09-06', > dt_hour='16') > ", > 运行查询语句 "select * from ods_person_test_os"后,报错, > hive-error.txt > <http://apache-flink.147419.n8.nabble.com/file/t909/hive-error.txt> > > 其中核心的关键字"java.lang.ArrayIndexOutOfBoundsException: 7",很明显,数组越界。 > > 经过仔细的调试后,我认为,问题出现在依赖'orc-core'里。 > > > 'org.apache.flink:flink-orc_2.11:1.11.0',会导入依赖'org.apache.orc:orc-core:1.5.6',其中的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER > = OrcFile.WriterVersion.ORC_517,也就是第7个。 > > ORIGINAL(OrcFile.WriterImplementation.ORC_JAVA, 0), > HIVE_8732(OrcFile.WriterImplementation.ORC_JAVA, 1), > HIVE_4243(OrcFile.WriterImplementation.ORC_JAVA, 2), > HIVE_12055(OrcFile.WriterImplementation.ORC_JAVA, 3), > HIVE_13083(OrcFile.WriterImplementation.ORC_JAVA, 4), > ORC_101(OrcFile.WriterImplementation.ORC_JAVA, 5), > ORC_135(OrcFile.WriterImplementation.ORC_JAVA, 6), > ORC_517(OrcFile.WriterImplementation.ORC_JAVA, 7), > ORC_CPP_ORIGINAL(OrcFile.WriterImplementation.ORC_CPP, 6), > PRESTO_ORIGINAL(OrcFile.WriterImplementation.PRESTO, 6), > SCRITCHLEY_GO_ORIGINAL(OrcFile.WriterImplementation.SCRITCHLEY_GO, > 6), > FUTURE(OrcFile.WriterImplementation.UNKNOWN, 2147483647); > > > 而我的hive版本使用的orc版本为'org.apache.hive:hive-orc:2.1.1-cdh6.3.2',里面的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER > = OrcFile.WriterVersion.HIVE_13083, 并且不存在第7个version。 > ORIGINAL(0), > HIVE_8732(1), > HIVE_4243(2), > HIVE_12055(3), > HIVE_13083(4), > FUTURE(2147483647); > > 所以,当hive解析orc文件时,使用第7个版本就会报错!这是我的分析! > > 为此,我在想能否通过在引入'org.apache.flink:flink-orc_2.11:1.11.0'时,exclude > > 'org.apache.orc:orc-core:1.5.6',然后,再引入低版本的,比如'org.apache.orc:orc-core:1.1.2',也就是CURRENT_WRITER > = > > OrcFile.WriterVersion.HIVE_13083,但是,又遇到以下两个问题导致初始化OrcBulkWriterFactory的时候出错: > 1. 低版本的orc-core中的类'org.apache.orc.TypeDescription' 没有 fromString这个方法; > 2. 低版本的orc-core中没有'org.apache.orcPhysicalWriter'; > > 那么,为了匹配不同版本hive使用的orc writer > version,'flink-orc'是否可以在构建OrcBulkWriterFactory时,多一些动态参数呢? > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |