各位大佬,大家好!
//hive 中 CREATE TABLE IF NOT EXISTS temp_h1( id VARCHAR(50), lac VARCHAR(50), ci VARCHAR(50), flux_m VARCHAR(50), nums VARCHAR(50), sno VARCHAR(50), cdate VARCHAR(50) ) row format delimited FIELDS TERMINATED BY ',' stored as textfile LOCATION '/tmp/hive/temp_h1'; CREATE TABLE IF NOT EXISTS temp_h2( id VARCHAR(50), lac VARCHAR(50), ci VARCHAR(50), flux_m VARCHAR(50), nums VARCHAR(50), sno VARCHAR(50), cdate VARCHAR(50) ) row format delimited FIELDS TERMINATED BY ',' stored as textfile LOCATION '/tmp/hive/temp_h2'; //测试数据(t.txt) 101,中国,100.02,123.001,1000020000,30,20200316 102,美国,100.02,123.001,1000020000,30,20200316 103,武汉,100.02,123.001,1000020000,30,20200316 104,北京,100.02,123.001,1000020000,30,20200316 105,俄罗斯,100.02,123.001,1000020000,30,20200316 106,海南,100.02,123.001,1000020000,30,20200316 107,香格里拉酒店,100.02,123.001,1000020000,30,20200316 //加载数据 load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1; //在FLink sql 中 insert into temp_h2 select * from temp_h1; select * from temp_h2; //出现乱码,而且数据不全 temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0 Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log 注,flink监控中显示信息:CsvTableSource(read fields: a, b) -> SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed 。我有不清楚为啥使用的是SinkConversionToRow。 大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了!
Executor.log (49K) Download Attachment |
Hi,
- SinkConversionToRow是Flink内部的数据结构转化结果,和结果正确性应该无关,具体看sink的。 - 似乎只有log一个附件,没看到乱码文件。 - 在Flink中试下“select * from temp_h1”看下结果? - 在Hive中试下“select * from temp_h1”看下结果? - 在Hive中试下“select * from temp_h2”看下结果? Best, Jingsong Lee On Tue, Mar 17, 2020 at 4:25 PM 吕先生 <[hidden email]> wrote: > 各位大佬,大家好! > > 帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive > 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。 > > 软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink > SQL gateway 0.1 > > 切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL > gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本: > > //hive 中 > CREATE TABLE IF NOT EXISTS temp_h1( > id VARCHAR(50), > lac VARCHAR(50), > ci VARCHAR(50), > flux_m VARCHAR(50), > nums VARCHAR(50), > sno VARCHAR(50), > cdate VARCHAR(50) > ) > row format delimited FIELDS TERMINATED BY ',' > stored as textfile > LOCATION '/tmp/hive/temp_h1'; > > CREATE TABLE IF NOT EXISTS temp_h2( > id VARCHAR(50), > lac VARCHAR(50), > ci VARCHAR(50), > flux_m VARCHAR(50), > nums VARCHAR(50), > sno VARCHAR(50), > cdate VARCHAR(50) > ) > row format delimited FIELDS TERMINATED BY ',' > stored as textfile > LOCATION '/tmp/hive/temp_h2'; > > //测试数据(t.txt) > 101,中国,100.02,123.001,1000020000,30,20200316 > 102,美国,100.02,123.001,1000020000,30,20200316 > 103,武汉,100.02,123.001,1000020000,30,20200316 > 104,北京,100.02,123.001,1000020000,30,20200316 > 105,俄罗斯,100.02,123.001,1000020000,30,20200316 > 106,海南,100.02,123.001,1000020000,30,20200316 > 107,香格里拉酒店,100.02,123.001,1000020000,30,20200316 > > //加载数据 > load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1; > > //在FLink sql 中 > insert into temp_h2 select * from temp_h1; > select * from temp_h2; //出现乱码,而且数据不全 > > temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0 > Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log > > 注,flink监控中显示信息:CsvTableSource(read fields: a, b) -> > SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read > fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed > 。我有不清楚为啥使用的是SinkConversionToRow。 > > 大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了! > > > > > > > -- Best, Jingsong Lee |
Hi, 在Hive 和 Flink 中执行 select * from temp_h1 均正常。在Flink sql中,查询异常:(将temp_h2 在hdfs 上的文件,又以附件的形式上传了一次)
|
In reply to this post by 吕先生
Hi
经过阿里工程师的协助,现定位到这是一个压缩问题。我的Hive 开启了压缩,以减小磁盘空间的使用。Flink sql在写入压缩文件后(textfile 格式),没有添加后缀名(正常情况下应该有“.snappy”后缀)。所以读的时候,把压缩文件当普通的txt 文件读了,造成乱码。 后面阿里的工程师会跟进这个问题。 解决办法:将hive-site.xml 里面的hive.exec.compress.output 设置成 false,或使用其他存储格式。 另外确认Flink sql 对Parquet、ORC 格式Hive 表的压缩是支持的。 在此,对提供帮助的阿里工程师,表示感谢! 在 2020-03-17 16:24:03,"吕先生" <[hidden email]> 写道: 各位大佬,大家好! 帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。 软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink SQL gateway 0.1 切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本: //hive 中 CREATE TABLE IF NOT EXISTS temp_h1( id VARCHAR(50), lac VARCHAR(50), ci VARCHAR(50), flux_m VARCHAR(50), nums VARCHAR(50), sno VARCHAR(50), cdate VARCHAR(50) ) row format delimited FIELDS TERMINATED BY ',' stored as textfile LOCATION '/tmp/hive/temp_h1'; CREATE TABLE IF NOT EXISTS temp_h2( id VARCHAR(50), lac VARCHAR(50), ci VARCHAR(50), flux_m VARCHAR(50), nums VARCHAR(50), sno VARCHAR(50), cdate VARCHAR(50) ) row format delimited FIELDS TERMINATED BY ',' stored as textfile LOCATION '/tmp/hive/temp_h2'; //测试数据(t.txt) 101,中国,100.02,123.001,1000020000,30,20200316 102,美国,100.02,123.001,1000020000,30,20200316 103,武汉,100.02,123.001,1000020000,30,20200316 104,北京,100.02,123.001,1000020000,30,20200316 105,俄罗斯,100.02,123.001,1000020000,30,20200316 106,海南,100.02,123.001,1000020000,30,20200316 107,香格里拉酒店,100.02,123.001,1000020000,30,20200316 //加载数据 load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1; //在FLink sql 中 insert into temp_h2 select * from temp_h1; select * from temp_h2; //出现乱码,而且数据不全 temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0 Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log 注,flink监控中显示信息:CsvTableSource(read fields: a, b) -> SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed 。我有不清楚为啥使用的是SinkConversionToRow。 大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了! |
Hi,
会在1.10.1里面修复。 JIRA: https://issues.apache.org/jira/browse/FLINK-16647 谢谢 @[hidden email] <[hidden email]> Best, Jingsong Lee On Wed, Mar 18, 2020 at 4:50 PM 吕先生 <[hidden email]> wrote: > Hi > > > 经过阿里工程师的协助,现定位到这是一个压缩问题。我的Hive 开启了压缩,以减小磁盘空间的使用。Flink > sql在写入压缩文件后(textfile 格式),没有添加后缀名(正常情况下应该有“.snappy”后缀)。所以读的时候,把压缩文件当普通的txt > 文件读了,造成乱码。 后面阿里的工程师会跟进这个问题。 > > > 解决办法:将hive-site.xml 里面的hive.exec.compress.output 设置成 false,或使用其他存储格式。 > > > 另外确认Flink sql 对Parquet、ORC 格式Hive 表的压缩是支持的。 > > > 在此,对提供帮助的阿里工程师,表示感谢! > > > > > > > > 在 2020-03-17 16:24:03,"吕先生" <[hidden email]> 写道: > > 各位大佬,大家好! > 帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive > 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。 > 软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink > SQL gateway 0.1 > > > 切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL > gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本: > > > //hive 中 > CREATE TABLE IF NOT EXISTS temp_h1( > id VARCHAR(50), > lac VARCHAR(50), > ci VARCHAR(50), > flux_m VARCHAR(50), > nums VARCHAR(50), > sno VARCHAR(50), > cdate VARCHAR(50) > ) > row format delimited FIELDS TERMINATED BY ',' > stored as textfile > LOCATION '/tmp/hive/temp_h1'; > > > CREATE TABLE IF NOT EXISTS temp_h2( > id VARCHAR(50), > lac VARCHAR(50), > ci VARCHAR(50), > flux_m VARCHAR(50), > nums VARCHAR(50), > sno VARCHAR(50), > cdate VARCHAR(50) > ) > row format delimited FIELDS TERMINATED BY ',' > stored as textfile > LOCATION '/tmp/hive/temp_h2'; > > > //测试数据(t.txt) > 101,中国,100.02,123.001,1000020000,30,20200316 > 102,美国,100.02,123.001,1000020000,30,20200316 > 103,武汉,100.02,123.001,1000020000,30,20200316 > 104,北京,100.02,123.001,1000020000,30,20200316 > 105,俄罗斯,100.02,123.001,1000020000,30,20200316 > 106,海南,100.02,123.001,1000020000,30,20200316 > 107,香格里拉酒店,100.02,123.001,1000020000,30,20200316 > > > //加载数据 > load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1; > > > //在FLink sql 中 > insert into temp_h2 select * from temp_h1; > select * from temp_h2; //出现乱码,而且数据不全 > > > temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0 > Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log > > > 注,flink监控中显示信息:CsvTableSource(read fields: a, b) -> > SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read > fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed > 。我有不清楚为啥使用的是SinkConversionToRow。 > > > 大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了! > > > > > > > > > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |