Hi,all
Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 现在有这样的场景: 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? 有大佬知道吗,有实际验证过吗 感谢 附上简单sql: CREATE TABLE kafka ( a STRING, b STRING, c BIGINT, process_time BIGINT, e STRING, f STRING, g STRING, h INT, i STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = 'x', 'properties.group.id' = 'test-1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.flink.partition-discovery.interval-millis' = '300000' ); CREATE TABLE filesystem ( `day` STRING, `hour` STRING, a STRING, b STRING, c BIGINT, d BIGINT, e STRING, f STRING, g STRING, h INT, i STRING ) PARTITIONED BY (`day`, `hour`) WITH ( 'connector' = 'filesystem', 'format' = 'parquet', 'path' = 'hdfs://xx', 'parquet.compression'='SNAPPY', 'sink.partition-commit.policy.kind' = 'success-file' ); insert into filesystem select from_unixtime(process_time,'yyyy-MM-dd') as `day`, from_unixtime(process_time,'HH') as `hour`, a, b, c, d, e, f, g, h, i from kafka; [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger |
hi:
按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay 设置的多久,如果超过之后,应当默认是会丢弃的吧。 https://cloud.tencent.com/developer/article/1707182 这个连接可以看一下 在 2020-11-12 11:58:22,"admin" <[hidden email]> 写道: >Hi,all >Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 >现在有这样的场景: >消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, >这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? >有大佬知道吗,有实际验证过吗 >感谢 > >附上简单sql: >CREATE TABLE kafka ( > a STRING, > b STRING, > c BIGINT, > process_time BIGINT, > e STRING, > f STRING, > g STRING, > h INT, > i STRING >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic', > 'properties.bootstrap.servers' = 'x', > 'properties.group.id' = 'test-1', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json', > 'properties.flink.partition-discovery.interval-millis' = '300000' >); > >CREATE TABLE filesystem ( > `day` STRING, > `hour` STRING, > a STRING, > b STRING, > c BIGINT, > d BIGINT, > e STRING, > f STRING, > g STRING, > h INT, > i STRING >) PARTITIONED BY (`day`, `hour`) WITH ( > 'connector' = 'filesystem', > 'format' = 'parquet', > 'path' = 'hdfs://xx', > 'parquet.compression'='SNAPPY', > 'sink.partition-commit.policy.kind' = 'success-file' >); > >insert into filesystem >select > from_unixtime(process_time,'yyyy-MM-dd') as `day`, > from_unixtime(process_time,'HH') as `hour`, > a, > b, > c, > d, > e, > f, > g, > h, > i >from kafka; > > > >[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger |
Hi ,kandy
我没有基于partition time 提交分区,我是基于默认的process time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区 > 2020年11月12日 下午12:46,kandy.wang <[hidden email]> 写道: > > hi: > 按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay > 设置的多久,如果超过之后,应当默认是会丢弃的吧。 > > > https://cloud.tencent.com/developer/article/1707182 > > 这个连接可以看一下 > > > > > > > > 在 2020-11-12 11:58:22,"admin" <[hidden email]> 写道: >> Hi,all >> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 >> 现在有这样的场景: >> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, >> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? >> 有大佬知道吗,有实际验证过吗 >> 感谢 >> >> 附上简单sql: >> CREATE TABLE kafka ( >> a STRING, >> b STRING, >> c BIGINT, >> process_time BIGINT, >> e STRING, >> f STRING, >> g STRING, >> h INT, >> i STRING >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'topic', >> 'properties.bootstrap.servers' = 'x', >> 'properties.group.id' = 'test-1', >> 'scan.startup.mode' = 'latest-offset', >> 'format' = 'json', >> 'properties.flink.partition-discovery.interval-millis' = '300000' >> ); >> >> CREATE TABLE filesystem ( >> `day` STRING, >> `hour` STRING, >> a STRING, >> b STRING, >> c BIGINT, >> d BIGINT, >> e STRING, >> f STRING, >> g STRING, >> h INT, >> i STRING >> ) PARTITIONED BY (`day`, `hour`) WITH ( >> 'connector' = 'filesystem', >> 'format' = 'parquet', >> 'path' = 'hdfs://xx', >> 'parquet.compression'='SNAPPY', >> 'sink.partition-commit.policy.kind' = 'success-file' >> ); >> >> insert into filesystem >> select >> from_unixtime(process_time,'yyyy-MM-dd') as `day`, >> from_unixtime(process_time,'HH') as `hour`, >> a, >> b, >> c, >> d, >> e, >> f, >> g, >> h, >> i >> from kafka; >> >> >> >> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger |
sink.partition-commit.trigger <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#sink-partition-commit-trigger> process-time String Trigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.
sink.partition-commit.delay <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#sink-partition-commit-delay> 0 s Duration The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'. 这两个参数都没有设置,都是默认值 > 2020年11月12日 下午2:15,admin <[hidden email]> 写道: > > Hi ,kandy > 我没有基于partition time 提交分区,我是基于默认的process time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区 > >> 2020年11月12日 下午12:46,kandy.wang <[hidden email]> 写道: >> >> hi: >> 按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay >> 设置的多久,如果超过之后,应当默认是会丢弃的吧。 >> >> >> https://cloud.tencent.com/developer/article/1707182 >> >> 这个连接可以看一下 >> >> >> >> >> >> >> >> 在 2020-11-12 11:58:22,"admin" <[hidden email]> 写道: >>> Hi,all >>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 >>> 现在有这样的场景: >>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, >>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? >>> 有大佬知道吗,有实际验证过吗 >>> 感谢 >>> >>> 附上简单sql: >>> CREATE TABLE kafka ( >>> a STRING, >>> b STRING, >>> c BIGINT, >>> process_time BIGINT, >>> e STRING, >>> f STRING, >>> g STRING, >>> h INT, >>> i STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'topic', >>> 'properties.bootstrap.servers' = 'x', >>> 'properties.group.id' = 'test-1', >>> 'scan.startup.mode' = 'latest-offset', >>> 'format' = 'json', >>> 'properties.flink.partition-discovery.interval-millis' = '300000' >>> ); >>> >>> CREATE TABLE filesystem ( >>> `day` STRING, >>> `hour` STRING, >>> a STRING, >>> b STRING, >>> c BIGINT, >>> d BIGINT, >>> e STRING, >>> f STRING, >>> g STRING, >>> h INT, >>> i STRING >>> ) PARTITIONED BY (`day`, `hour`) WITH ( >>> 'connector' = 'filesystem', >>> 'format' = 'parquet', >>> 'path' = 'hdfs://xx', >>> 'parquet.compression'='SNAPPY', >>> 'sink.partition-commit.policy.kind' = 'success-file' >>> ); >>> >>> insert into filesystem >>> select >>> from_unixtime(process_time,'yyyy-MM-dd') as `day`, >>> from_unixtime(process_time,'HH') as `hour`, >>> a, >>> b, >>> c, >>> d, >>> e, >>> f, >>> g, >>> h, >>> i >>> from kafka; >>> >>> >>> >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger > |
In reply to this post by admin
补充一下不用partition time trigger的原因,partition time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
> 2020年11月12日 下午2:15,admin <[hidden email]> 写道: > > Hi ,kandy > 我没有基于partition time 提交分区,我是基于默认的process time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区 > >> 2020年11月12日 下午12:46,kandy.wang <[hidden email]> 写道: >> >> hi: >> 按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay >> 设置的多久,如果超过之后,应当默认是会丢弃的吧。 >> >> >> https://cloud.tencent.com/developer/article/1707182 >> >> 这个连接可以看一下 >> >> >> >> >> >> >> >> 在 2020-11-12 11:58:22,"admin" <[hidden email]> 写道: >>> Hi,all >>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 >>> 现在有这样的场景: >>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, >>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? >>> 有大佬知道吗,有实际验证过吗 >>> 感谢 >>> >>> 附上简单sql: >>> CREATE TABLE kafka ( >>> a STRING, >>> b STRING, >>> c BIGINT, >>> process_time BIGINT, >>> e STRING, >>> f STRING, >>> g STRING, >>> h INT, >>> i STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'topic', >>> 'properties.bootstrap.servers' = 'x', >>> 'properties.group.id' = 'test-1', >>> 'scan.startup.mode' = 'latest-offset', >>> 'format' = 'json', >>> 'properties.flink.partition-discovery.interval-millis' = '300000' >>> ); >>> >>> CREATE TABLE filesystem ( >>> `day` STRING, >>> `hour` STRING, >>> a STRING, >>> b STRING, >>> c BIGINT, >>> d BIGINT, >>> e STRING, >>> f STRING, >>> g STRING, >>> h INT, >>> i STRING >>> ) PARTITIONED BY (`day`, `hour`) WITH ( >>> 'connector' = 'filesystem', >>> 'format' = 'parquet', >>> 'path' = 'hdfs://xx', >>> 'parquet.compression'='SNAPPY', >>> 'sink.partition-commit.policy.kind' = 'success-file' >>> ); >>> >>> insert into filesystem >>> select >>> from_unixtime(process_time,'yyyy-MM-dd') as `day`, >>> from_unixtime(process_time,'HH') as `hour`, >>> a, >>> b, >>> c, >>> d, >>> e, >>> f, >>> g, >>> h, >>> i >>> from kafka; >>> >>> >>> >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger > |
Hi admin,
不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作) On Thu, Nov 12, 2020 at 3:11 PM admin <[hidden email]> wrote: > 补充一下不用partition time trigger的原因,partition > time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的 > > > 2020年11月12日 下午2:15,admin <[hidden email]> 写道: > > > > Hi ,kandy > > 我没有基于partition time 提交分区,我是基于默认的process > time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区 > > > >> 2020年11月12日 下午12:46,kandy.wang <[hidden email]> 写道: > >> > >> hi: > >> 按照我的理解,partition time提交分区,是会在current watermark > partition time + > commit delay 时机触发分区提交,得看你的sink.partition-commit.delay > >> 设置的多久,如果超过之后,应当默认是会丢弃的吧。 > >> > >> > >> https://cloud.tencent.com/developer/article/1707182 > >> > >> 这个连接可以看一下 > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-11-12 11:58:22,"admin" <[hidden email]> 写道: > >>> Hi,all > >>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 > >>> 现在有这样的场景: > >>> 消费kafka数据写入hdfs中,分区字段是 day + hour > ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, > >>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? > >>> 有大佬知道吗,有实际验证过吗 > >>> 感谢 > >>> > >>> 附上简单sql: > >>> CREATE TABLE kafka ( > >>> a STRING, > >>> b STRING, > >>> c BIGINT, > >>> process_time BIGINT, > >>> e STRING, > >>> f STRING, > >>> g STRING, > >>> h INT, > >>> i STRING > >>> ) WITH ( > >>> 'connector' = 'kafka', > >>> 'topic' = 'topic', > >>> 'properties.bootstrap.servers' = 'x', > >>> 'properties.group.id' = 'test-1', > >>> 'scan.startup.mode' = 'latest-offset', > >>> 'format' = 'json', > >>> 'properties.flink.partition-discovery.interval-millis' = '300000' > >>> ); > >>> > >>> CREATE TABLE filesystem ( > >>> `day` STRING, > >>> `hour` STRING, > >>> a STRING, > >>> b STRING, > >>> c BIGINT, > >>> d BIGINT, > >>> e STRING, > >>> f STRING, > >>> g STRING, > >>> h INT, > >>> i STRING > >>> ) PARTITIONED BY (`day`, `hour`) WITH ( > >>> 'connector' = 'filesystem', > >>> 'format' = 'parquet', > >>> 'path' = 'hdfs://xx', > >>> 'parquet.compression'='SNAPPY', > >>> 'sink.partition-commit.policy.kind' = 'success-file' > >>> ); > >>> > >>> insert into filesystem > >>> select > >>> from_unixtime(process_time,'yyyy-MM-dd') as `day`, > >>> from_unixtime(process_time,'HH') as `hour`, > >>> a, > >>> b, > >>> c, > >>> d, > >>> e, > >>> f, > >>> g, > >>> h, > >>> i > >>> from kafka; > >>> > >>> > >>> > >>> [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger > > > > -- Best, Jingsong Lee |
Hi,jingsong
所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的, 比如sink.partition-commit.trigger = partition-time sink.partition-commit.delay = 10 min > 2020年11月12日 下午3:22,Jingsong Li <[hidden email]> 写道: > > Hi admin, > > 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作) > > On Thu, Nov 12, 2020 at 3:11 PM admin <[hidden email]> wrote: > >> 补充一下不用partition time trigger的原因,partition >> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的 >> >>> 2020年11月12日 下午2:15,admin <[hidden email]> 写道: >>> >>> Hi ,kandy >>> 我没有基于partition time 提交分区,我是基于默认的process >> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区 >>> >>>> 2020年11月12日 下午12:46,kandy.wang <[hidden email]> 写道: >>>> >>>> hi: >>>> 按照我的理解,partition time提交分区,是会在current watermark > partition time + >> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay >>>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。 >>>> >>>> >>>> https://cloud.tencent.com/developer/article/1707182 >>>> >>>> 这个连接可以看一下 >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> 在 2020-11-12 11:58:22,"admin" <[hidden email]> 写道: >>>>> Hi,all >>>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 >>>>> 现在有这样的场景: >>>>> 消费kafka数据写入hdfs中,分区字段是 day + hour >> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, >>>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? >>>>> 有大佬知道吗,有实际验证过吗 >>>>> 感谢 >>>>> >>>>> 附上简单sql: >>>>> CREATE TABLE kafka ( >>>>> a STRING, >>>>> b STRING, >>>>> c BIGINT, >>>>> process_time BIGINT, >>>>> e STRING, >>>>> f STRING, >>>>> g STRING, >>>>> h INT, >>>>> i STRING >>>>> ) WITH ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'topic', >>>>> 'properties.bootstrap.servers' = 'x', >>>>> 'properties.group.id' = 'test-1', >>>>> 'scan.startup.mode' = 'latest-offset', >>>>> 'format' = 'json', >>>>> 'properties.flink.partition-discovery.interval-millis' = '300000' >>>>> ); >>>>> >>>>> CREATE TABLE filesystem ( >>>>> `day` STRING, >>>>> `hour` STRING, >>>>> a STRING, >>>>> b STRING, >>>>> c BIGINT, >>>>> d BIGINT, >>>>> e STRING, >>>>> f STRING, >>>>> g STRING, >>>>> h INT, >>>>> i STRING >>>>> ) PARTITIONED BY (`day`, `hour`) WITH ( >>>>> 'connector' = 'filesystem', >>>>> 'format' = 'parquet', >>>>> 'path' = 'hdfs://xx', >>>>> 'parquet.compression'='SNAPPY', >>>>> 'sink.partition-commit.policy.kind' = 'success-file' >>>>> ); >>>>> >>>>> insert into filesystem >>>>> select >>>>> from_unixtime(process_time,'yyyy-MM-dd') as `day`, >>>>> from_unixtime(process_time,'HH') as `hour`, >>>>> a, >>>>> b, >>>>> c, >>>>> d, >>>>> e, >>>>> f, >>>>> g, >>>>> h, >>>>> i >>>>> from kafka; >>>>> >>>>> >>>>> >>>>> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger >>> >> >> > > -- > Best, Jingsong Lee |
尽早的可查,直接把delay设为0即可 (其它默认值)
On Thu, Nov 12, 2020 at 5:17 PM admin <[hidden email]> wrote: > Hi,jingsong > 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。 > 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的, > 比如sink.partition-commit.trigger = partition-time > sink.partition-commit.delay = 10 min > > > 2020年11月12日 下午3:22,Jingsong Li <[hidden email]> 写道: > > > > Hi admin, > > > > 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作) > > > > On Thu, Nov 12, 2020 at 3:11 PM admin <[hidden email]> wrote: > > > >> 补充一下不用partition time trigger的原因,partition > >> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的 > >> > >>> 2020年11月12日 下午2:15,admin <[hidden email]> 写道: > >>> > >>> Hi ,kandy > >>> 我没有基于partition time 提交分区,我是基于默认的process > >> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区 > >>> > >>>> 2020年11月12日 下午12:46,kandy.wang <[hidden email]> 写道: > >>>> > >>>> hi: > >>>> 按照我的理解,partition time提交分区,是会在current watermark > partition time + > >> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay > >>>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。 > >>>> > >>>> > >>>> https://cloud.tencent.com/developer/article/1707182 > >>>> > >>>> 这个连接可以看一下 > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> 在 2020-11-12 11:58:22,"admin" <[hidden email]> 写道: > >>>>> Hi,all > >>>>> Flink 1.11的filesystem connector,partition > trigger[1]都是使用的默认值,所以分区可以多次提交 > >>>>> 现在有这样的场景: > >>>>> 消费kafka数据写入hdfs中,分区字段是 day + hour > >> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, > >>>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? > >>>>> 有大佬知道吗,有实际验证过吗 > >>>>> 感谢 > >>>>> > >>>>> 附上简单sql: > >>>>> CREATE TABLE kafka ( > >>>>> a STRING, > >>>>> b STRING, > >>>>> c BIGINT, > >>>>> process_time BIGINT, > >>>>> e STRING, > >>>>> f STRING, > >>>>> g STRING, > >>>>> h INT, > >>>>> i STRING > >>>>> ) WITH ( > >>>>> 'connector' = 'kafka', > >>>>> 'topic' = 'topic', > >>>>> 'properties.bootstrap.servers' = 'x', > >>>>> 'properties.group.id' = 'test-1', > >>>>> 'scan.startup.mode' = 'latest-offset', > >>>>> 'format' = 'json', > >>>>> 'properties.flink.partition-discovery.interval-millis' = '300000' > >>>>> ); > >>>>> > >>>>> CREATE TABLE filesystem ( > >>>>> `day` STRING, > >>>>> `hour` STRING, > >>>>> a STRING, > >>>>> b STRING, > >>>>> c BIGINT, > >>>>> d BIGINT, > >>>>> e STRING, > >>>>> f STRING, > >>>>> g STRING, > >>>>> h INT, > >>>>> i STRING > >>>>> ) PARTITIONED BY (`day`, `hour`) WITH ( > >>>>> 'connector' = 'filesystem', > >>>>> 'format' = 'parquet', > >>>>> 'path' = 'hdfs://xx', > >>>>> 'parquet.compression'='SNAPPY', > >>>>> 'sink.partition-commit.policy.kind' = 'success-file' > >>>>> ); > >>>>> > >>>>> insert into filesystem > >>>>> select > >>>>> from_unixtime(process_time,'yyyy-MM-dd') as `day`, > >>>>> from_unixtime(process_time,'HH') as `hour`, > >>>>> a, > >>>>> b, > >>>>> c, > >>>>> d, > >>>>> e, > >>>>> f, > >>>>> g, > >>>>> h, > >>>>> i > >>>>> from kafka; > >>>>> > >>>>> > >>>>> > >>>>> [1] > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger > >>> > >> > >> > > > > -- > > Best, Jingsong Lee > > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |