试验了一下 Flink-1.11 hive streaming 的功能 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html 创建 kafka 表,通过 SQL 实时写入 Hive. 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 谢谢, 王磊 [hidden email] |
HI, wanglei
你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,[hidden email] 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > [hidden email] > |
我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 [hidden email] 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,[hidden email] 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > [hidden email] > |
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么 ------------------------------------------------------------------ 发件人:[hidden email] <[hidden email]> 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 [hidden email] 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,[hidden email] 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > [hidden email] > |
应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
CREATE TABLE kafka_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'ods_wms_pick_order', 'properties.bootstrap.servers' = 'xxxx:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */; [hidden email] Sender: 夏帅 Send Time: 2020-07-14 16:43 Receiver: user-zh; xbjtdcq Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 你好, 这说明写入的hive文件没有进行rollup,可以贴下SQL么 ------------------------------------------------------------------ 发件人:[hidden email] <[hidden email]> 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 [hidden email] 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,[hidden email] 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > [hidden email] > |
你好,
可以参考下这个问题的解决 http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html ------------------------------------------------------------------ 发件人:[hidden email] <[hidden email]> 发送时间:2020年7月14日(星期二) 16:50 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>; Leonard Xu <[hidden email]> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit CREATE TABLE kafka_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'ods_wms_pick_order', 'properties.bootstrap.servers' = 'xxxx:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */; [hidden email] Sender: 夏帅 Send Time: 2020-07-14 16:43 Receiver: user-zh; xbjtdcq Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 你好, 这说明写入的hive文件没有进行rollup,可以贴下SQL么 ------------------------------------------------------------------ 发件人:[hidden email] <[hidden email]> 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 [hidden email] 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,[hidden email] 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > [hidden email] > |
Hi, wanglei 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 的分区已完成信息(通过metastore或success文件). 你看下夏帅贴的邮件,检查下 checkpoint 和 partition-commit的设置 祝好, Leonard Xu > 在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道: > > 你好, > 可以参考下这个问题的解决 > http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html > > > ------------------------------------------------------------------ > 发件人:[hidden email] <[hidden email]> > 发送时间:2020年7月14日(星期二) 16:50 > 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>; Leonard Xu <[hidden email]> > 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 > > > 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit > > > CREATE TABLE kafka_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ods_wms_pick_order', > 'properties.bootstrap.servers' = 'xxxx:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ) > > > CREATE TABLE hive_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP > ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.delay'='1 h', > 'sink.partition-commit.policy.kind'='metastore,success-file' > ); > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; > SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */; > > > > > [hidden email] > > > Sender: 夏帅 > Send Time: 2020-07-14 16:43 > Receiver: user-zh; xbjtdcq > Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 > 你好, > 这说明写入的hive文件没有进行rollup,可以贴下SQL么 > ------------------------------------------------------------------ > 发件人:[hidden email] <[hidden email]> > 发送时间:2020年7月14日(星期二) 16:40 > 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]> > 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 > 我加上了这个 tablehint 。 > 任务提交上去了,但客户端还是没有任何返回显示。 > 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 > 谢谢, > 王磊 > [hidden email] > 发件人: Leonard Xu > 发送时间: 2020-07-14 16:17 > 收件人: user-zh > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > HI, wanglei > 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 > SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; > 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 > 祝好, > Leonard Xu >> 在 2020年7月14日,15:47,[hidden email] 写道: >> >> >> 试验了一下 Flink-1.11 hive streaming 的功能 >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html >> >> 创建 kafka 表,通过 SQL 实时写入 Hive. >> >> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 >> >> 谢谢, >> 王磊 >> >> >> >> [hidden email] >> |
还可以在hive那边验证一下数据是否commit了,比如从hive CLI端执行一下show partitions,或者读一点数据
On Tue, Jul 14, 2020 at 5:20 PM Leonard Xu <[hidden email]> wrote: > > Hi, wanglei > > 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive > 的分区已完成信息(通过metastore或success文件). > > 你看下夏帅贴的邮件,检查下 checkpoint 和 partition-commit的设置 > > 祝好, > Leonard Xu > > > > 在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道: > > > > 你好, > > 可以参考下这个问题的解决 > > > http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html > > > > > > ------------------------------------------------------------------ > > 发件人:[hidden email] <[hidden email]> > > 发送时间:2020年7月14日(星期二) 16:50 > > 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>; > Leonard Xu <[hidden email]> > > 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 > > > > > > 应该是我没有理解 partitiion-commit 的意思,我看这里有文档: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit > > > > > > CREATE TABLE kafka_ods_wms_pick_order ( > > order_no STRING, > > status INT, > > dispatch_time TIMESTAMP(3) > > ) WITH ( > > 'connector' = 'kafka', > > 'topic' = 'ods_wms_pick_order', > > 'properties.bootstrap.servers' = 'xxxx:9092', > > 'properties.group.id' = 'testGroup', > > 'format' = 'json', > > 'scan.startup.mode' = 'latest-offset' > > ) > > > > > > CREATE TABLE hive_ods_wms_pick_order ( > > order_no STRING, > > status INT, > > dispatch_time TIMESTAMP > > ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > > 'sink.partition-commit.trigger'='partition-time', > > 'sink.partition-commit.delay'='1 h', > > 'sink.partition-commit.policy.kind'='metastore,success-file' > > ); > > > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, > dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), > DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; > > SELECT * FROM hive_ods_wms_pick_order /*+ > OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-07-24') */; > > > > > > > > > > [hidden email] > > > > > > Sender: 夏帅 > > Send Time: 2020-07-14 16:43 > > Receiver: user-zh; xbjtdcq > > Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 > > 你好, > > 这说明写入的hive文件没有进行rollup,可以贴下SQL么 > > ------------------------------------------------------------------ > > 发件人:[hidden email] <[hidden email]> > > 发送时间:2020年7月14日(星期二) 16:40 > > 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]> > > 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 > > 我加上了这个 tablehint 。 > > 任务提交上去了,但客户端还是没有任何返回显示。 > > 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 > > 谢谢, > > 王磊 > > [hidden email] > > 发件人: Leonard Xu > > 发送时间: 2020-07-14 16:17 > > 收件人: user-zh > > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > > HI, wanglei > > 你开启了 streaming-source.enable > 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints > 方便地指定参数。 > > SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-05-20') */; > > 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 > > 祝好, > > Leonard Xu > >> 在 2020年7月14日,15:47,[hidden email] 写道: > >> > >> > >> 试验了一下 Flink-1.11 hive streaming 的功能 > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > >> > >> 创建 kafka 表,通过 SQL 实时写入 Hive. > >> > >> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink > webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > >> > >> 谢谢, > >> 王磊 > >> > >> > >> > >> [hidden email] > >> > > -- Best regards! Rui Li |
In reply to this post by Leonard Xu
我把问题简化一下,创建 Hive 表时不带任何参数
CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) STORED AS parquet INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time FROM kafka_ods_wms_pick_order; 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢? 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM kafka_ods_wms_pick_order 确实是有数据返回的。 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0. 是需要让 job 做 checkpoint 才能写到 hdfs 上吗? 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢? 谢谢, 王磊 [hidden email] 发件人: Leonard Xu 发送时间: 2020-07-14 17:20 收件人: user-zh; 夏帅 抄送: [hidden email] 主题: Re: 不能实时读取实时写入到 Hive 的数据 Hi, wanglei 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 的分区已完成信息(通过metastore或success文件). 你看下夏帅贴的邮件,检查下 checkpoint 和 partition-commit的设置 祝好, Leonard Xu 在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道: 你好, 可以参考下这个问题的解决 http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html ------------------------------------------------------------------ 发件人:[hidden email] <[hidden email]> 发送时间:2020年7月14日(星期二) 16:50 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>; Leonard Xu <[hidden email]> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit CREATE TABLE kafka_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'ods_wms_pick_order', 'properties.bootstrap.servers' = 'xxxx:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */; [hidden email] Sender: 夏帅 Send Time: 2020-07-14 16:43 Receiver: user-zh; xbjtdcq Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 你好, 这说明写入的hive文件没有进行rollup,可以贴下SQL么 ------------------------------------------------------------------ 发件人:[hidden email] <[hidden email]> 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 [hidden email] 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu 在 2020年7月14日,15:47,[hidden email] 写道: 试验了一下 Flink-1.11 hive streaming 的功能 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html 创建 kafka 表,通过 SQL 实时写入 Hive. 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 谢谢, 王磊 [hidden email] |
流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL
client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint On Tue, Jul 14, 2020 at 5:49 PM [hidden email] < [hidden email]> wrote: > 我把问题简化一下,创建 Hive 表时不带任何参数 > > CREATE TABLE hive_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP > ) STORED AS parquet > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, > dispatch_time FROM kafka_ods_wms_pick_order; > > 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢? > 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM > kafka_ods_wms_pick_order 确实是有数据返回的。 > > 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0. > 是需要让 job 做 checkpoint 才能写到 hdfs 上吗? > 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢? > > 谢谢, > 王磊 > > > > > [hidden email] > > > 发件人: Leonard Xu > 发送时间: 2020-07-14 17:20 > 收件人: user-zh; 夏帅 > 抄送: [hidden email] > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > > Hi, wanglei > > 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive > 的分区已完成信息(通过metastore或success文件). > > 你看下夏帅贴的邮件,检查下 checkpoint 和 partition-commit的设置 > > 祝好, > Leonard Xu > > > 在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道: > > 你好, > 可以参考下这个问题的解决 > > http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html > > > ------------------------------------------------------------------ > 发件人:[hidden email] <[hidden email]> > 发送时间:2020年7月14日(星期二) 16:50 > 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>; > Leonard Xu <[hidden email]> > 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 > > > 应该是我没有理解 partitiion-commit 的意思,我看这里有文档: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit > > > CREATE TABLE kafka_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ods_wms_pick_order', > 'properties.bootstrap.servers' = 'xxxx:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ) > > > CREATE TABLE hive_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP > ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.delay'='1 h', > 'sink.partition-commit.policy.kind'='metastore,success-file' > ); > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, > dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), > DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; > SELECT * FROM hive_ods_wms_pick_order /*+ > OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-07-24') */; > > > > > [hidden email] > > > Sender: 夏帅 > Send Time: 2020-07-14 16:43 > Receiver: user-zh; xbjtdcq > Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 > 你好, > 这说明写入的hive文件没有进行rollup,可以贴下SQL么 > ------------------------------------------------------------------ > 发件人:[hidden email] <[hidden email]> > 发送时间:2020年7月14日(星期二) 16:40 > 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]> > 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 > 我加上了这个 tablehint 。 > 任务提交上去了,但客户端还是没有任何返回显示。 > 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 > 谢谢, > 王磊 > [hidden email] > 发件人: Leonard Xu > 发送时间: 2020-07-14 16:17 > 收件人: user-zh > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > HI, wanglei > 你开启了 streaming-source.enable > 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints > 方便地指定参数。 > SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-05-20') */; > 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 > 祝好, > Leonard Xu > 在 2020年7月14日,15:47,[hidden email] 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink > webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > [hidden email] > > > -- Best regards! Rui Li |
谢谢,根本原因就是 flink sql-client 客户端默认没有设置 checkpoint 导致的。 [hidden email] Sender: Rui Li Send Time: 2020-07-14 18:29 Receiver: user-zh cc: Leonard Xu; 夏帅 Subject: Re: Re: 不能实时读取实时写入到 Hive 的数据 流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint On Tue, Jul 14, 2020 at 5:49 PM [hidden email] < [hidden email]> wrote: > 我把问题简化一下,创建 Hive 表时不带任何参数 > > CREATE TABLE hive_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP > ) STORED AS parquet > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, > dispatch_time FROM kafka_ods_wms_pick_order; > > 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢? > 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM > kafka_ods_wms_pick_order 确实是有数据返回的。 > > 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0. > 是需要让 job 做 checkpoint 才能写到 hdfs 上吗? > 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢? > > 谢谢, > 王磊 > > > > > [hidden email] > > > 发件人: Leonard Xu > 发送时间: 2020-07-14 17:20 > 收件人: user-zh; 夏帅 > 抄送: [hidden email] > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > > Hi, wanglei > > 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive > 的分区已完成信息(通过metastore或success文件). > > 你看下夏帅贴的邮件,检查下 checkpoint 和 partition-commit的设置 > > 祝好, > Leonard Xu > > > 在 2020年7月14日,16:59,夏帅 <[hidden email]> 写道: > > 你好, > 可以参考下这个问题的解决 > > http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html > > > ------------------------------------------------------------------ > 发件人:[hidden email] <[hidden email]> > 发送时间:2020年7月14日(星期二) 16:50 > 收件人:user-zh <[hidden email]>; 夏帅 <[hidden email]>; > Leonard Xu <[hidden email]> > 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 > > > 应该是我没有理解 partitiion-commit 的意思,我看这里有文档: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit > > > CREATE TABLE kafka_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ods_wms_pick_order', > 'properties.bootstrap.servers' = 'xxxx:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ) > > > CREATE TABLE hive_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP > ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.delay'='1 h', > 'sink.partition-commit.policy.kind'='metastore,success-file' > ); > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, > dispatch_time, DATE_FORMAT(dispatch_time, 'yyyy-MM-dd'), > DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; > SELECT * FROM hive_ods_wms_pick_order /*+ > OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-07-24') */; > > > > > [hidden email] > > > Sender: 夏帅 > Send Time: 2020-07-14 16:43 > Receiver: user-zh; xbjtdcq > Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 > 你好, > 这说明写入的hive文件没有进行rollup,可以贴下SQL么 > ------------------------------------------------------------------ > 发件人:[hidden email] <[hidden email]> > 发送时间:2020年7月14日(星期二) 16:40 > 收件人:user-zh <[hidden email]>; xbjtdcq <[hidden email]> > 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 > 我加上了这个 tablehint 。 > 任务提交上去了,但客户端还是没有任何返回显示。 > 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 > 谢谢, > 王磊 > [hidden email] > 发件人: Leonard Xu > 发送时间: 2020-07-14 16:17 > 收件人: user-zh > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > HI, wanglei > 你开启了 streaming-source.enable > 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints > 方便地指定参数。 > SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-05-20') */; > 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 > 祝好, > Leonard Xu > 在 2020年7月14日,15:47,[hidden email] 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink > webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > [hidden email] > > > -- Best regards! Rui Li |
In reply to this post by Rui Li
您好,请问一下,flink在写hive的时候,会rollup很多这样的文件:part-73a2bc68-d4f9-4987-984c-f5546f351637-0-101
控制这些文件的数量生成是通过什么参数达成的? 谢谢 |
Free forum by Nabble | Edit this page |