hi everyone,
最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。 问题描述: 在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧) 那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交.. 另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。 描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢! source ddl: CREATE TABLE kafka_source ( order_id STRING, order_sales DOUBLE, update_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'flink-kafka', 'properties.bootstrap.servers' = '10.3.15.128:9092', 'properties.group.id' = 'kafka_hdfs', 'format' = 'json', 'scan.startup.mode' = 'group-offsets' ) sink ddl: CREATE TABLE hdfs_sink ( order_id STRING, order_sales DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs:///user/flink/order', 'format' = 'json', 'sink.partition-commit.delay' = '1h', 'sink.partition-commit.policy.kind' = 'success-file' ) transform dml: INSERT INTO hdfs_sink SELECT order_id, order_sales, DATE_FORMAT(update_time, 'yyyy-MM-dd'), DATE_FORMAT(update_time, 'HH') FROM kafka_source best, amenhub |
完了,现在的问题是发现好像所有的分区都没有提交,一直不提交,这是为什么呢?
发件人: [hidden email] 发送时间: 2020-12-24 17:04 收件人: user-zh 主题: Flink-1.11.1流写filesystem分区提交问题 hi everyone, 最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。 问题描述: 在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧) 那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交.. 另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。 描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢! source ddl: CREATE TABLE kafka_source ( order_id STRING, order_sales DOUBLE, update_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'flink-kafka', 'properties.bootstrap.servers' = '10.3.15.128:9092', 'properties.group.id' = 'kafka_hdfs', 'format' = 'json', 'scan.startup.mode' = 'group-offsets' ) sink ddl: CREATE TABLE hdfs_sink ( order_id STRING, order_sales DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs:///user/flink/order', 'format' = 'json', 'sink.partition-commit.delay' = '1h', 'sink.partition-commit.policy.kind' = 'success-file' ) transform dml: INSERT INTO hdfs_sink SELECT order_id, order_sales, DATE_FORMAT(update_time, 'yyyy-MM-dd'), DATE_FORMAT(update_time, 'HH') FROM kafka_source best, amenhub |
有开启checkpoint吗?
Part files can be in one of three states: In-progress : The part file that is currently being written to is in-progress Pending : Closed (due to the specified rolling policy) in-progress files that are waiting to be committed Finished : On successful checkpoints (STREAMING) or at the end of input (BATCH) pending files transition to “Finished” https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
一语点醒梦中人,谢谢回复@冯嘉伟
因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢 best, amenhub 发件人: 冯嘉伟 发送时间: 2020-12-24 18:39 收件人: user-zh 主题: Re: Flink-1.11.1流写filesystem分区提交问题 有开启checkpoint吗? Part files can be in one of three states: In-progress : The part file that is currently being written to is in-progress Pending : Closed (due to the specified rolling policy) in-progress files that are waiting to be committed Finished : On successful checkpoints (STREAMING) or at the end of input (BATCH) pending files transition to “Finished” https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
想请问下,写filesystem的时候依赖checkpoint进行commit,那么做完一次checkpoint的时候可提交的文件数是由并行度parallelism数决定的吗?我发现我的文件提交数都是3个3个的当每次chk结束后。
发件人: [hidden email] 发送时间: 2020-12-24 18:47 收件人: user-zh 主题: Re: Re: Flink-1.11.1流写filesystem分区提交问题 一语点醒梦中人,谢谢回复@冯嘉伟 因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢 best, amenhub 发件人: 冯嘉伟 发送时间: 2020-12-24 18:39 收件人: user-zh 主题: Re: Flink-1.11.1流写filesystem分区提交问题 有开启checkpoint吗? Part files can be in one of three states: In-progress : The part file that is currently being written to is in-progress Pending : Closed (due to the specified rolling policy) in-progress files that are waiting to be committed Finished : On successful checkpoints (STREAMING) or at the end of input (BATCH) pending files transition to “Finished” https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi, 文件数是和并发有关的,一个并发一次至少写一个文件,还和文件滚动大小有关。 > 2020年12月25日 下午2:10,[hidden email] 写道: > > 想请问下,写filesystem的时候依赖checkpoint进行commit,那么做完一次checkpoint的时候可提交的文件数是由并行度parallelism数决定的吗?我发现我的文件提交数都是3个3个的当每次chk结束后。 > > > > > 发件人: [hidden email] > 发送时间: 2020-12-24 18:47 > 收件人: user-zh > 主题: Re: Re: Flink-1.11.1流写filesystem分区提交问题 > 一语点醒梦中人,谢谢回复@冯嘉伟 > > 因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢 > > best, > amenhub > > > > 发件人: 冯嘉伟 > 发送时间: 2020-12-24 18:39 > 收件人: user-zh > 主题: Re: Flink-1.11.1流写filesystem分区提交问题 > 有开启checkpoint吗? > Part files can be in one of three states: > In-progress : The part file that is currently being written to is > in-progress > Pending : Closed (due to the specified rolling policy) in-progress files > that are waiting to be committed > Finished : On successful checkpoints (STREAMING) or at the end of input > (BATCH) pending files transition to “Finished” > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html> > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |