不能实时读取实时写入到 Hive 的数据

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

不能实时读取实时写入到 Hive 的数据

wanglei2@geekplus.com.cn

试验了一下 Flink-1.11  hive streaming 的功能
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html

创建 kafka 表,通过 SQL 实时写入 Hive.

但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。

谢谢,
王磊



[hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: 不能实时读取实时写入到 Hive 的数据

Leonard Xu
HI, wanglei

你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

就在你看得这个页面应该有对应的文档说明如何读取hive数据。

祝好,
Leonard Xu

> 在 2020年7月14日,15:47,[hidden email] 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>

Reply | Threaded
Open this post in threaded view
|

回复: Re: 不能实时读取实时写入到 Hive 的数据

wanglei2@geekplus.com.cn

我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。

谢谢,
王磊



[hidden email]

发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
 
你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
 
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
 
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
 
祝好,
Leonard Xu
 

> 在 2020年7月14日,15:47,[hidden email] 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
 
Reply | Threaded
Open this post in threaded view
|

回复:Re: 不能实时读取实时写入到 Hive 的数据

Shuai Xia
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么


------------------------------------------------------------------
发件人:[hidden email] <[hidden email]>
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]>
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据


我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。

谢谢,
王磊



[hidden email]

发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei

你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

就在你看得这个页面应该有对应的文档说明如何读取hive数据。

祝好,
Leonard Xu

> 在 2020年7月14日,15:47,[hidden email] 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>

Reply | Threaded
Open this post in threaded view
|

Re: 回复: 不能实时读取实时写入到 Hive 的数据

wanglei2@geekplus.com.cn
应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'ods_wms_pick_order',
 'properties.bootstrap.servers' = 'xxxx:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */;




[hidden email]

 
Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
 
 
------------------------------------------------------------------
发件人:[hidden email] <[hidden email]>
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]>
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
 
 
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
 
谢谢,
王磊
 
 
 
[hidden email]
 
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
 
你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
 
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
 
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
 
祝好,
Leonard Xu
 

> 在 2020年7月14日,15:47,[hidden email] 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
 
Reply | Threaded
Open this post in threaded view
|

回复:回复: 不能实时读取实时写入到 Hive 的数据

Shuai Xia
你好,
可以参考下这个问题的解决
http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html


------------------------------------------------------------------
发件人:[hidden email] <[hidden email]>
发送时间:2020年7月14日(星期二) 16:50
收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>; Leonard Xu <[hidden email]>
主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据


应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'ods_wms_pick_order',
 'properties.bootstrap.servers' = 'xxxx:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */;




[hidden email]


Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
------------------------------------------------------------------
发件人:[hidden email] <[hidden email]>
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]>
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
谢谢,
王磊
[hidden email]
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
祝好,
Leonard Xu

> 在 2020年7月14日,15:47,[hidden email] 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: 不能实时读取实时写入到 Hive 的数据

Leonard Xu

Hi, wanglei

这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 的分区已完成信息(通过metastore或success文件).

你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置

祝好,
Leonard Xu


> 在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道:
>
> 你好,
> 可以参考下这个问题的解决
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
>
>
> ------------------------------------------------------------------
> 发件人:[hidden email] <[hidden email]>
> 发送时间:2020年7月14日(星期二) 16:50
> 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>; Leonard Xu <[hidden email]>
> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
>
>
> 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
>
>
> CREATE TABLE kafka_ods_wms_pick_order (
> order_no STRING,
> status INT,
> dispatch_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ods_wms_pick_order',
> 'properties.bootstrap.servers' = 'xxxx:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
>
>
> CREATE TABLE hive_ods_wms_pick_order (
>  order_no STRING,
>  status INT,
>  dispatch_time TIMESTAMP
> ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>  'sink.partition-commit.trigger'='partition-time',
>  'sink.partition-commit.delay'='1 h',
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */;
>
>
>
>
> [hidden email]
>
>
> Sender: 夏帅
> Send Time: 2020-07-14 16:43
> Receiver: user-zh; xbjtdcq
> Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> 你好,
> 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> ------------------------------------------------------------------
> 发件人:[hidden email] <[hidden email]>
> 发送时间:2020年7月14日(星期二) 16:40
> 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]>
> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> 我加上了这个 tablehint 。
> 任务提交上去了,但客户端还是没有任何返回显示。
> 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
> 谢谢,
> 王磊
> [hidden email]
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 16:17
> 收件人: user-zh
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
> HI, wanglei
> 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
> SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
> 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> 祝好,
> Leonard Xu
>> 在 2020年7月14日,15:47,[hidden email] 写道:
>>
>>
>> 试验了一下 Flink-1.11  hive streaming 的功能
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>>
>> 创建 kafka 表,通过 SQL 实时写入 Hive.
>>
>> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>>
>> 谢谢,
>> 王磊
>>
>>
>>
>> [hidden email]
>>

Reply | Threaded
Open this post in threaded view
|

Re: 不能实时读取实时写入到 Hive 的数据

Rui Li
还可以在hive那边验证一下数据是否commit了,比如从hive CLI端执行一下show partitions,或者读一点数据

On Tue, Jul 14, 2020 at 5:20 PM Leonard Xu <[hidden email]> wrote:

>
> Hi, wanglei
>
> 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive
> 的分区已完成信息(通过metastore或success文件).
>
> 你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置
>
> 祝好,
> Leonard Xu
>
>
> > 在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道:
> >
> > 你好,
> > 可以参考下这个问题的解决
> >
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
> >
> >
> > ------------------------------------------------------------------
> > 发件人:[hidden email] <[hidden email]>
> > 发送时间:2020年7月14日(星期二) 16:50
> > 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>;
> Leonard Xu <[hidden email]>
> > 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
> >
> >
> > 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
> >
> >
> > CREATE TABLE kafka_ods_wms_pick_order (
> > order_no STRING,
> > status INT,
> > dispatch_time TIMESTAMP(3)
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' = 'ods_wms_pick_order',
> > 'properties.bootstrap.servers' = 'xxxx:9092',
> > 'properties.group.id' = 'testGroup',
> > 'format' = 'json',
> > 'scan.startup.mode' = 'latest-offset'
> > )
> >
> >
> > CREATE TABLE hive_ods_wms_pick_order (
> >  order_no STRING,
> >  status INT,
> >  dispatch_time TIMESTAMP
> > ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
> >  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >  'sink.partition-commit.trigger'='partition-time',
> >  'sink.partition-commit.delay'='1 h',
> >  'sink.partition-commit.policy.kind'='metastore,success-file'
> > );
> >
> > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'),
> DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> > SELECT * FROM hive_ods_wms_pick_order /*+
> OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-07-24') */;
> >
> >
> >
> >
> > [hidden email]
> >
> >
> > Sender: 夏帅
> > Send Time: 2020-07-14 16:43
> > Receiver: user-zh; xbjtdcq
> > Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> > 你好,
> > 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> > ------------------------------------------------------------------
> > 发件人:[hidden email] <[hidden email]>
> > 发送时间:2020年7月14日(星期二) 16:40
> > 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]>
> > 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> > 我加上了这个 tablehint 。
> > 任务提交上去了,但客户端还是没有任何返回显示。
> > 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
> > 谢谢,
> > 王磊
> > [hidden email]
> > 发件人: Leonard Xu
> > 发送时间: 2020-07-14 16:17
> > 收件人: user-zh
> > 主题: Re: 不能实时读取实时写入到 Hive 的数据
> > HI, wanglei
> > 你开启了 streaming-source.enable
> 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints
> 方便地指定参数。
> > SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-05-20') */;
> > 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> > 祝好,
> > Leonard Xu
> >> 在 2020年7月14日,15:47,[hidden email] 写道:
> >>
> >>
> >> 试验了一下 Flink-1.11  hive streaming 的功能
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> >>
> >> 创建 kafka 表,通过 SQL 实时写入 Hive.
> >>
> >> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink
> webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
> >>
> >> 谢谢,
> >> 王磊
> >>
> >>
> >>
> >> [hidden email]
> >>
>
>

--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

回复: Re: 不能实时读取实时写入到 Hive 的数据

wanglei2@geekplus.com.cn
In reply to this post by Leonard Xu
我把问题简化一下,创建 Hive 表时不带任何参数

CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
)  STORED AS parquet

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time FROM kafka_ods_wms_pick_order;

我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
我在 flink 客户端 SELECT order_no, status, dispatch_time FROM kafka_ods_wms_pick_order 确实是有数据返回的。

我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?

谢谢,
王磊




[hidden email]

 
发件人: Leonard Xu
发送时间: 2020-07-14 17:20
收件人: user-zh; 夏帅
抄送: [hidden email]
主题: Re: 不能实时读取实时写入到 Hive 的数据

Hi, wanglei

这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 的分区已完成信息(通过metastore或success文件).

你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置

祝好,
Leonard Xu


在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道:

你好,
可以参考下这个问题的解决
http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html


------------------------------------------------------------------
发件人:[hidden email] <[hidden email]>
发送时间:2020年7月14日(星期二) 16:50
收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>; Leonard Xu <[hidden email]>
主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据


应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
order_no STRING,
status INT,
dispatch_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'ods_wms_pick_order',
'properties.bootstrap.servers' = 'xxxx:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */;




[hidden email]


Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
------------------------------------------------------------------
发件人:[hidden email] <[hidden email]>
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]>
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
谢谢,
王磊
[hidden email]
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
祝好,
Leonard Xu
在 2020年7月14日,15:47,[hidden email] 写道:


试验了一下 Flink-1.11  hive streaming 的功能
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html

创建 kafka 表,通过 SQL 实时写入 Hive.

但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。

谢谢,
王磊



[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Re: 不能实时读取实时写入到 Hive 的数据

Rui Li
流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL
client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint

On Tue, Jul 14, 2020 at 5:49 PM [hidden email] <
[hidden email]> wrote:

> 我把问题简化一下,创建 Hive 表时不带任何参数
>
> CREATE TABLE hive_ods_wms_pick_order (
>   order_no STRING,
>   status INT,
>   dispatch_time TIMESTAMP
> )  STORED AS parquet
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time FROM kafka_ods_wms_pick_order;
>
> 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
> 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM
> kafka_ods_wms_pick_order 确实是有数据返回的。
>
> 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
> 是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
> 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?
>
> 谢谢,
> 王磊
>
>
>
>
> [hidden email]
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 17:20
> 收件人: user-zh; 夏帅
> 抄送: [hidden email]
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
>
> Hi, wanglei
>
> 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive
> 的分区已完成信息(通过metastore或success文件).
>
> 你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置
>
> 祝好,
> Leonard Xu
>
>
> 在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道:
>
> 你好,
> 可以参考下这个问题的解决
>
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
>
>
> ------------------------------------------------------------------
> 发件人:[hidden email] <[hidden email]>
> 发送时间:2020年7月14日(星期二) 16:50
> 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>;
> Leonard Xu <[hidden email]>
> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
>
>
> 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
>
>
> CREATE TABLE kafka_ods_wms_pick_order (
> order_no STRING,
> status INT,
> dispatch_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ods_wms_pick_order',
> 'properties.bootstrap.servers' = 'xxxx:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
>
>
> CREATE TABLE hive_ods_wms_pick_order (
>  order_no STRING,
>  status INT,
>  dispatch_time TIMESTAMP
> ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>  'sink.partition-commit.trigger'='partition-time',
>  'sink.partition-commit.delay'='1 h',
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'),
> DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> SELECT * FROM hive_ods_wms_pick_order /*+
> OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-07-24') */;
>
>
>
>
> [hidden email]
>
>
> Sender: 夏帅
> Send Time: 2020-07-14 16:43
> Receiver: user-zh; xbjtdcq
> Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> 你好,
> 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> ------------------------------------------------------------------
> 发件人:[hidden email] <[hidden email]>
> 发送时间:2020年7月14日(星期二) 16:40
> 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]>
> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> 我加上了这个 tablehint 。
> 任务提交上去了,但客户端还是没有任何返回显示。
> 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
> 谢谢,
> 王磊
> [hidden email]
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 16:17
> 收件人: user-zh
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
> HI, wanglei
> 你开启了 streaming-source.enable
> 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints
> 方便地指定参数。
> SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-05-20') */;
> 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> 祝好,
> Leonard Xu
> 在 2020年7月14日,15:47,[hidden email] 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink
> webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
>
>

--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: Re: 不能实时读取实时写入到 Hive 的数据

wanglei2@geekplus.com.cn

谢谢,根本原因就是  flink sql-client 客户端默认没有设置 checkpoint 导致的。



[hidden email]

 
Sender: Rui Li
Send Time: 2020-07-14 18:29
Receiver: user-zh
cc: Leonard Xu; 夏帅
Subject: Re: Re: 不能实时读取实时写入到 Hive 的数据
流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL
client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint
 
On Tue, Jul 14, 2020 at 5:49 PM [hidden email] <
[hidden email]> wrote:
 

> 我把问题简化一下,创建 Hive 表时不带任何参数
>
> CREATE TABLE hive_ods_wms_pick_order (
>   order_no STRING,
>   status INT,
>   dispatch_time TIMESTAMP
> )  STORED AS parquet
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time FROM kafka_ods_wms_pick_order;
>
> 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
> 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM
> kafka_ods_wms_pick_order 确实是有数据返回的。
>
> 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
> 是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
> 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?
>
> 谢谢,
> 王磊
>
>
>
>
> [hidden email]
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 17:20
> 收件人: user-zh; 夏帅
> 抄送: [hidden email]
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
>
> Hi, wanglei
>
> 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive
> 的分区已完成信息(通过metastore或success文件).
>
> 你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置
>
> 祝好,
> Leonard Xu
>
>
> 在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道:
>
> 你好,
> 可以参考下这个问题的解决
>
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
>
>
> ------------------------------------------------------------------
> 发件人:[hidden email] <[hidden email]>
> 发送时间:2020年7月14日(星期二) 16:50
> 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>;
> Leonard Xu <[hidden email]>
> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
>
>
> 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
>
>
> CREATE TABLE kafka_ods_wms_pick_order (
> order_no STRING,
> status INT,
> dispatch_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ods_wms_pick_order',
> 'properties.bootstrap.servers' = 'xxxx:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
>
>
> CREATE TABLE hive_ods_wms_pick_order (
>  order_no STRING,
>  status INT,
>  dispatch_time TIMESTAMP
> ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>  'sink.partition-commit.trigger'='partition-time',
>  'sink.partition-commit.delay'='1 h',
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'),
> DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> SELECT * FROM hive_ods_wms_pick_order /*+
> OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-07-24') */;
>
>
>
>
> [hidden email]
>
>
> Sender: 夏帅
> Send Time: 2020-07-14 16:43
> Receiver: user-zh; xbjtdcq
> Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> 你好,
> 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> ------------------------------------------------------------------
> 发件人:[hidden email] <[hidden email]>
> 发送时间:2020年7月14日(星期二) 16:40
> 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]>
> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> 我加上了这个 tablehint 。
> 任务提交上去了,但客户端还是没有任何返回显示。
> 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
> 谢谢,
> 王磊
> [hidden email]
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 16:17
> 收件人: user-zh
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
> HI, wanglei
> 你开启了 streaming-source.enable
> 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints
> 方便地指定参数。
> SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-05-20') */;
> 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> 祝好,
> Leonard Xu
> 在 2020年7月14日,15:47,[hidden email] 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink
> webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
>
>
 
--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: Re: 不能实时读取实时写入到 Hive 的数据

rachmaninoff
In reply to this post by Rui Li
您好,请问一下,flink在写hive的时候,会rollup很多这样的文件:part-73a2bc68-d4f9-4987-984c-f5546f351637-0-101

控制这些文件的数量生成是通过什么参数达成的?

谢谢