Flink-1.11.1流写filesystem分区提交问题

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink-1.11.1流写filesystem分区提交问题

amenhub@163.com
hi everyone,

最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。

问题描述:
在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧)

那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交..

另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。

描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢!

source ddl:
CREATE TABLE kafka_source (
    order_id STRING,
    order_sales DOUBLE,
    update_time TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink-kafka',
    'properties.bootstrap.servers' = '10.3.15.128:9092',
    'properties.group.id' = 'kafka_hdfs',
    'format' = 'json',
    'scan.startup.mode' = 'group-offsets'
)
sink ddl:
CREATE TABLE hdfs_sink (
    order_id STRING,
    order_sales DOUBLE,
    dt STRING,
    `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs:///user/flink/order',
    'format' = 'json',
    'sink.partition-commit.delay' = '1h',
    'sink.partition-commit.policy.kind' = 'success-file'
)
transform dml:
INSERT INTO hdfs_sink
SELECT
    order_id,
    order_sales,
    DATE_FORMAT(update_time, 'yyyy-MM-dd'),
    DATE_FORMAT(update_time, 'HH')
FROM kafka_source

best,
amenhub


Reply | Threaded
Open this post in threaded view
|

Re: Flink-1.11.1流写filesystem分区提交问题

amenhub@163.com
完了,现在的问题是发现好像所有的分区都没有提交,一直不提交,这是为什么呢?



 
发件人: [hidden email]
发送时间: 2020-12-24 17:04
收件人: user-zh
主题: Flink-1.11.1流写filesystem分区提交问题
hi everyone,
 
最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。
 
问题描述:
在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧)
 
那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交..
 
另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。
 
描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢!
 
source ddl:
CREATE TABLE kafka_source (
    order_id STRING,
    order_sales DOUBLE,
    update_time TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink-kafka',
    'properties.bootstrap.servers' = '10.3.15.128:9092',
    'properties.group.id' = 'kafka_hdfs',
    'format' = 'json',
    'scan.startup.mode' = 'group-offsets'
)
sink ddl:
CREATE TABLE hdfs_sink (
    order_id STRING,
    order_sales DOUBLE,
    dt STRING,
    `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs:///user/flink/order',
    'format' = 'json',
    'sink.partition-commit.delay' = '1h',
    'sink.partition-commit.policy.kind' = 'success-file'
)
transform dml:
INSERT INTO hdfs_sink
SELECT
    order_id,
    order_sales,
    DATE_FORMAT(update_time, 'yyyy-MM-dd'),
    DATE_FORMAT(update_time, 'HH')
FROM kafka_source
 
best,
amenhub
 
 
Reply | Threaded
Open this post in threaded view
|

Re: Flink-1.11.1流写filesystem分区提交问题

冯嘉伟
有开启checkpoint吗?

Part files can be in one of three states:

In-progress : The part file that is currently being written to is
in-progress
Pending : Closed (due to the specified rolling policy) in-progress files
that are waiting to be committed
Finished : On successful checkpoints (STREAMING) or at the end of input
(BATCH) pending files transition to “Finished”

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html>  



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink-1.11.1流写filesystem分区提交问题

amenhub@163.com
一语点醒梦中人,谢谢回复@冯嘉伟

因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢

best,
amenhub



 
发件人: 冯嘉伟
发送时间: 2020-12-24 18:39
收件人: user-zh
主题: Re: Flink-1.11.1流写filesystem分区提交问题
有开启checkpoint吗?
 
Part files can be in one of three states:
 
In-progress : The part file that is currently being written to is
in-progress
Pending : Closed (due to the specified rolling policy) in-progress files
that are waiting to be committed
Finished : On successful checkpoints (STREAMING) or at the end of input
(BATCH) pending files transition to “Finished”
 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html>  
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink-1.11.1流写filesystem分区提交问题

amenhub@163.com
想请问下,写filesystem的时候依赖checkpoint进行commit,那么做完一次checkpoint的时候可提交的文件数是由并行度parallelism数决定的吗?我发现我的文件提交数都是3个3个的当每次chk结束后。



 
发件人: [hidden email]
发送时间: 2020-12-24 18:47
收件人: user-zh
主题: Re: Re: Flink-1.11.1流写filesystem分区提交问题
一语点醒梦中人,谢谢回复@冯嘉伟
 
因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢
 
best,
amenhub
 
 
 
发件人: 冯嘉伟
发送时间: 2020-12-24 18:39
收件人: user-zh
主题: Re: Flink-1.11.1流写filesystem分区提交问题
有开启checkpoint吗?
Part files can be in one of three states:
In-progress : The part file that is currently being written to is
in-progress
Pending : Closed (due to the specified rolling policy) in-progress files
that are waiting to be committed
Finished : On successful checkpoints (STREAMING) or at the end of input
(BATCH) pending files transition to “Finished”
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html>  
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink-1.11.1流写filesystem分区提交问题

zhuxiaoshang

Hi,
文件数是和并发有关的,一个并发一次至少写一个文件,还和文件滚动大小有关。


> 2020年12月25日 下午2:10,[hidden email] 写道:
>
> 想请问下,写filesystem的时候依赖checkpoint进行commit,那么做完一次checkpoint的时候可提交的文件数是由并行度parallelism数决定的吗?我发现我的文件提交数都是3个3个的当每次chk结束后。
>
>
>
>
> 发件人: [hidden email]
> 发送时间: 2020-12-24 18:47
> 收件人: user-zh
> 主题: Re: Re: Flink-1.11.1流写filesystem分区提交问题
> 一语点醒梦中人,谢谢回复@冯嘉伟
>
> 因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢
>
> best,
> amenhub
>
>
>
> 发件人: 冯嘉伟
> 发送时间: 2020-12-24 18:39
> 收件人: user-zh
> 主题: Re: Flink-1.11.1流写filesystem分区提交问题
> 有开启checkpoint吗?
> Part files can be in one of three states:
> In-progress : The part file that is currently being written to is
> in-progress
> Pending : Closed (due to the specified rolling policy) in-progress files
> that are waiting to be committed
> Finished : On successful checkpoints (STREAMING) or at the end of input
> (BATCH) pending files transition to “Finished”
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html>  
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/