使用Flink sql insert 数据 to hive 之乱码问题

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

使用Flink sql insert 数据 to hive 之乱码问题

吕先生
各位大佬,大家好!
帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。
软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink SQL gateway 0.1

切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本:

//hive 中
CREATE TABLE IF NOT EXISTS temp_h1(
  id VARCHAR(50),
  lac VARCHAR(50),
  ci VARCHAR(50),
  flux_m VARCHAR(50),
  nums VARCHAR(50),
  sno  VARCHAR(50),
  cdate VARCHAR(50)
)
row format delimited FIELDS TERMINATED BY ',' 
stored as textfile 
LOCATION '/tmp/hive/temp_h1';

CREATE TABLE IF NOT EXISTS temp_h2(
  id VARCHAR(50),
  lac VARCHAR(50),
  ci VARCHAR(50),
  flux_m VARCHAR(50),
  nums VARCHAR(50),
  sno  VARCHAR(50),
  cdate VARCHAR(50)
)
row format delimited FIELDS TERMINATED BY ',' 
stored as textfile 
LOCATION '/tmp/hive/temp_h2';

//测试数据(t.txt)
101,中国,100.02,123.001,1000020000,30,20200316
102,美国,100.02,123.001,1000020000,30,20200316
103,武汉,100.02,123.001,1000020000,30,20200316
104,北京,100.02,123.001,1000020000,30,20200316
105,俄罗斯,100.02,123.001,1000020000,30,20200316
106,海南,100.02,123.001,1000020000,30,20200316
107,香格里拉酒店,100.02,123.001,1000020000,30,20200316

//加载数据
load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1;

//在FLink sql 中
insert into temp_h2 select * from temp_h1;
select * from temp_h2;            //出现乱码,而且数据不全

temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0
Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log

注,flink监控中显示信息:CsvTableSource(read fields: a, b) -> SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed    。我有不清楚为啥使用的是SinkConversionToRow。

大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了!




 


Executor.log (49K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: 使用Flink sql insert 数据 to hive 之乱码问题

Jingsong Li
Hi,

- SinkConversionToRow是Flink内部的数据结构转化结果,和结果正确性应该无关,具体看sink的。
- 似乎只有log一个附件,没看到乱码文件。
- 在Flink中试下“select * from temp_h1”看下结果?
- 在Hive中试下“select * from temp_h1”看下结果?
- 在Hive中试下“select * from temp_h2”看下结果?

Best,
Jingsong Lee

On Tue, Mar 17, 2020 at 4:25 PM 吕先生 <[hidden email]> wrote:

> 各位大佬,大家好!
>
> 帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive
> 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。
>
> 软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink
> SQL gateway 0.1
>
> 切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL
> gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本:
>
> //hive 中
> CREATE TABLE IF NOT EXISTS temp_h1(
>   id VARCHAR(50),
>   lac VARCHAR(50),
>   ci VARCHAR(50),
>   flux_m VARCHAR(50),
>   nums VARCHAR(50),
>   sno  VARCHAR(50),
>   cdate VARCHAR(50)
> )
> row format delimited FIELDS TERMINATED BY ','
> stored as textfile
> LOCATION '/tmp/hive/temp_h1';
>
> CREATE TABLE IF NOT EXISTS temp_h2(
>   id VARCHAR(50),
>   lac VARCHAR(50),
>   ci VARCHAR(50),
>   flux_m VARCHAR(50),
>   nums VARCHAR(50),
>   sno  VARCHAR(50),
>   cdate VARCHAR(50)
> )
> row format delimited FIELDS TERMINATED BY ','
> stored as textfile
> LOCATION '/tmp/hive/temp_h2';
>
> //测试数据(t.txt)
> 101,中国,100.02,123.001,1000020000,30,20200316
> 102,美国,100.02,123.001,1000020000,30,20200316
> 103,武汉,100.02,123.001,1000020000,30,20200316
> 104,北京,100.02,123.001,1000020000,30,20200316
> 105,俄罗斯,100.02,123.001,1000020000,30,20200316
> 106,海南,100.02,123.001,1000020000,30,20200316
> 107,香格里拉酒店,100.02,123.001,1000020000,30,20200316
>
> //加载数据
> load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1;
>
> //在FLink sql 中
> insert into temp_h2 select * from temp_h1;
> select * from temp_h2;            //出现乱码,而且数据不全
>
> temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0
> Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log
>
> 注,flink监控中显示信息:CsvTableSource(read fields: a, b) ->
> SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read
> fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed
> 。我有不清楚为啥使用的是SinkConversionToRow。
>
> 大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了!
>
>
>
>
>
>
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: 使用Flink sql insert 数据 to hive 之乱码问题

吕先生
Hi,

在Hive 和 Flink 中执行 select * from temp_h1 均正常。在Flink sql中,查询异常:(将temp_h2 在hdfs 上的文件,又以附件的形式上传了一次)





在 2020-03-17 17:05:21,"Jingsong Li" <[hidden email]> 写道: >Hi, > >- SinkConversionToRow是Flink内部的数据结构转化结果,和结果正确性应该无关,具体看sink的。 >- 似乎只有log一个附件,没看到乱码文件。 >- 在Flink中试下“select * from temp_h1”看下结果? >- 在Hive中试下“select * from temp_h1”看下结果? >- 在Hive中试下“select * from temp_h2”看下结果? > >Best, >Jingsong Lee > >On Tue, Mar 17, 2020 at 4:25 PM 吕先生 <[hidden email]> wrote: > >> 各位大佬,大家好! >> >> 帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive >> 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。 >> >> 软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink >> SQL gateway 0.1 >> >> 切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL >> gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本: >> >> //hive 中 >> CREATE TABLE IF NOT EXISTS temp_h1( >> id VARCHAR(50), >> lac VARCHAR(50), >> ci VARCHAR(50), >> flux_m VARCHAR(50), >> nums VARCHAR(50), >> sno VARCHAR(50), >> cdate VARCHAR(50) >> ) >> row format delimited FIELDS TERMINATED BY ',' >> stored as textfile >> LOCATION '/tmp/hive/temp_h1'; >> >> CREATE TABLE IF NOT EXISTS temp_h2( >> id VARCHAR(50), >> lac VARCHAR(50), >> ci VARCHAR(50), >> flux_m VARCHAR(50), >> nums VARCHAR(50), >> sno VARCHAR(50), >> cdate VARCHAR(50) >> ) >> row format delimited FIELDS TERMINATED BY ',' >> stored as textfile >> LOCATION '/tmp/hive/temp_h2'; >> >> //测试数据(t.txt) >> 101,中国,100.02,123.001,1000020000,30,20200316 >> 102,美国,100.02,123.001,1000020000,30,20200316 >> 103,武汉,100.02,123.001,1000020000,30,20200316 >> 104,北京,100.02,123.001,1000020000,30,20200316 >> 105,俄罗斯,100.02,123.001,1000020000,30,20200316 >> 106,海南,100.02,123.001,1000020000,30,20200316 >> 107,香格里拉酒店,100.02,123.001,1000020000,30,20200316 >> >> //加载数据 >> load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1; >> >> //在FLink sql 中 >> insert into temp_h2 select * from temp_h1; >> select * from temp_h2; //出现乱码,而且数据不全 >> >> temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0 >> Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log >> >> 注,flink监控中显示信息:CsvTableSource(read fields: a, b) -> >> SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read >> fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed >> 。我有不清楚为啥使用的是SinkConversionToRow。 >> >> 大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了! >> >> >> >> >> >> >> > > >-- >Best, Jingsong Lee


 

Reply | Threaded
Open this post in threaded view
|

Re:使用Flink sql insert 数据 to hive 之乱码问题 (已解决)

吕先生
In reply to this post by 吕先生
Hi


    经过阿里工程师的协助,现定位到这是一个压缩问题。我的Hive 开启了压缩,以减小磁盘空间的使用。Flink sql在写入压缩文件后(textfile 格式),没有添加后缀名(正常情况下应该有“.snappy”后缀)。所以读的时候,把压缩文件当普通的txt 文件读了,造成乱码。   后面阿里的工程师会跟进这个问题。


解决办法:将hive-site.xml 里面的hive.exec.compress.output 设置成 false,或使用其他存储格式。


另外确认Flink sql 对Parquet、ORC 格式Hive 表的压缩是支持的。


在此,对提供帮助的阿里工程师,表示感谢!







在 2020-03-17 16:24:03,"吕先生" <[hidden email]> 写道:

各位大佬,大家好!
帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。
软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink SQL gateway 0.1


切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本:


//hive 中
CREATE TABLE IF NOT EXISTS temp_h1(
  id VARCHAR(50),
  lac VARCHAR(50),
  ci VARCHAR(50),
  flux_m VARCHAR(50),
  nums VARCHAR(50),
  sno  VARCHAR(50),
  cdate VARCHAR(50)
)
row format delimited FIELDS TERMINATED BY ','
stored as textfile
LOCATION '/tmp/hive/temp_h1';


CREATE TABLE IF NOT EXISTS temp_h2(
  id VARCHAR(50),
  lac VARCHAR(50),
  ci VARCHAR(50),
  flux_m VARCHAR(50),
  nums VARCHAR(50),
  sno  VARCHAR(50),
  cdate VARCHAR(50)
)
row format delimited FIELDS TERMINATED BY ','
stored as textfile
LOCATION '/tmp/hive/temp_h2';


//测试数据(t.txt)
101,中国,100.02,123.001,1000020000,30,20200316
102,美国,100.02,123.001,1000020000,30,20200316
103,武汉,100.02,123.001,1000020000,30,20200316
104,北京,100.02,123.001,1000020000,30,20200316
105,俄罗斯,100.02,123.001,1000020000,30,20200316
106,海南,100.02,123.001,1000020000,30,20200316
107,香格里拉酒店,100.02,123.001,1000020000,30,20200316


//加载数据
load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1;


//在FLink sql 中
insert into temp_h2 select * from temp_h1;
select * from temp_h2;            //出现乱码,而且数据不全


temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0
Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log


注,flink监控中显示信息:CsvTableSource(read fields: a, b) -> SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed    。我有不清楚为啥使用的是SinkConversionToRow。


大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了!








 
Reply | Threaded
Open this post in threaded view
|

Re: 使用Flink sql insert 数据 to hive 之乱码问题 (已解决)

Jingsong Li
Hi,

会在1.10.1里面修复。
JIRA: https://issues.apache.org/jira/browse/FLINK-16647
谢谢 @[hidden email] <[hidden email]>

Best,
Jingsong Lee

On Wed, Mar 18, 2020 at 4:50 PM 吕先生 <[hidden email]> wrote:

> Hi
>
>
>     经过阿里工程师的协助,现定位到这是一个压缩问题。我的Hive 开启了压缩,以减小磁盘空间的使用。Flink
> sql在写入压缩文件后(textfile 格式),没有添加后缀名(正常情况下应该有“.snappy”后缀)。所以读的时候,把压缩文件当普通的txt
> 文件读了,造成乱码。   后面阿里的工程师会跟进这个问题。
>
>
> 解决办法:将hive-site.xml 里面的hive.exec.compress.output 设置成 false,或使用其他存储格式。
>
>
> 另外确认Flink sql 对Parquet、ORC 格式Hive 表的压缩是支持的。
>
>
> 在此,对提供帮助的阿里工程师,表示感谢!
>
>
>
>
>
>
>
> 在 2020-03-17 16:24:03,"吕先生" <[hidden email]> 写道:
>
> 各位大佬,大家好!
> 帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive
> 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。
> 软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink
> SQL gateway 0.1
>
>
> 切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL
> gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本:
>
>
> //hive 中
> CREATE TABLE IF NOT EXISTS temp_h1(
>   id VARCHAR(50),
>   lac VARCHAR(50),
>   ci VARCHAR(50),
>   flux_m VARCHAR(50),
>   nums VARCHAR(50),
>   sno  VARCHAR(50),
>   cdate VARCHAR(50)
> )
> row format delimited FIELDS TERMINATED BY ','
> stored as textfile
> LOCATION '/tmp/hive/temp_h1';
>
>
> CREATE TABLE IF NOT EXISTS temp_h2(
>   id VARCHAR(50),
>   lac VARCHAR(50),
>   ci VARCHAR(50),
>   flux_m VARCHAR(50),
>   nums VARCHAR(50),
>   sno  VARCHAR(50),
>   cdate VARCHAR(50)
> )
> row format delimited FIELDS TERMINATED BY ','
> stored as textfile
> LOCATION '/tmp/hive/temp_h2';
>
>
> //测试数据(t.txt)
> 101,中国,100.02,123.001,1000020000,30,20200316
> 102,美国,100.02,123.001,1000020000,30,20200316
> 103,武汉,100.02,123.001,1000020000,30,20200316
> 104,北京,100.02,123.001,1000020000,30,20200316
> 105,俄罗斯,100.02,123.001,1000020000,30,20200316
> 106,海南,100.02,123.001,1000020000,30,20200316
> 107,香格里拉酒店,100.02,123.001,1000020000,30,20200316
>
>
> //加载数据
> load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1;
>
>
> //在FLink sql 中
> insert into temp_h2 select * from temp_h1;
> select * from temp_h2;            //出现乱码,而且数据不全
>
>
> temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0
> Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log
>
>
> 注,flink监控中显示信息:CsvTableSource(read fields: a, b) ->
> SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read
> fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed
> 。我有不清楚为啥使用的是SinkConversionToRow。
>
>
> 大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了!
>
>
>
>
>
>
>
>
>



--
Best, Jingsong Lee