水位线问题

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

水位线问题

沉醉寒風
flink1.12版本, kafka有3个分区, flink对应设置了3个并发. 发现个问题, 如果有个分区一直没有数据, 其他分区一直有新的数据进去,会导致水位线停止不动, 这个有什么办法吗?
Reply | Threaded
Open this post in threaded view
|

回复: 水位线问题

amenhub@163.com
hi, 可以尝试设置idle source空闲数据源检测[1]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

best,
amenhub



 
发件人: 沉醉寒風
发送时间: 2021-02-01 17:20
收件人: user-zh
主题: 水位线问题
flink1.12版本, kafka有3个分区, flink对应设置了3个并发. 发现个问题, 如果有个分区一直没有数据, 其他分区一直有新的数据进去,会导致水位线停止不动, 这个有什么办法吗?
Reply | Threaded
Open this post in threaded view
|

回复:回复: 水位线问题

沉醉寒風
你好, 我是这么设置的 Configuration configuration = streamTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.idle-timeout","1000 ms"); 但是没有生效




------------------ 原始邮件 ------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年2月1日(星期一) 下午5:52
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;回复: 水位线问题



hi, 可以尝试设置idle source空闲数据源检测[1]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

best,
amenhub



&nbsp;
发件人: 沉醉寒風
发送时间: 2021-02-01 17:20
收件人: user-zh
主题: 水位线问题
flink1.12版本, kafka有3个分区, flink对应设置了3个并发. 发现个问题, 如果有个分区一直没有数据, 其他分区一直有新的数据进去,会导致水位线停止不动, 这个有什么办法吗?
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 水位线问题

amenhub@163.com
请问Flink使用版本是多少呢



 
发件人: 沉醉寒風
发送时间: 2021-02-01 17:56
收件人: user-zh
主题: 回复:回复: 水位线问题
你好, 我是这么设置的 Configuration configuration = streamTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.idle-timeout","1000 ms"); 但是没有生效
 
 
 
 
------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年2月1日(星期一) 下午5:52
收件人:&nbsp;"user-zh"<[hidden email]&gt;;
 
主题:&nbsp;回复: 水位线问题
 
 
 
hi, 可以尝试设置idle source空闲数据源检测[1]
 
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
 
best,
amenhub
 
 
 
&nbsp;
发件人: 沉醉寒風
发送时间: 2021-02-01 17:20
收件人: user-zh
主题: 水位线问题
flink1.12版本, kafka有3个分区, flink对应设置了3个并发. 发现个问题, 如果有个分区一直没有数据, 其他分区一直有新的数据进去,会导致水位线停止不动, 这个有什么办法吗?
Reply | Threaded
Open this post in threaded view
|

回复:回复: 回复: 水位线问题

沉醉寒風
你好,是1.12版本的 EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(executionEnvironment, settings);
Configuration configuration = streamTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.idle-timeout","1000 ms");




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年2月1日(星期一) 晚上6:03
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;回复: 回复: 水位线问题



请问Flink使用版本是多少呢



&nbsp;
发件人: 沉醉寒風
发送时间: 2021-02-01 17:56
收件人: user-zh
主题: 回复:回复: 水位线问题
你好, 我是这么设置的 Configuration configuration = streamTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.idle-timeout","1000 ms"); 但是没有生效
&nbsp;
&nbsp;
&nbsp;
&nbsp;
------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <[hidden email]&amp;gt;;
发送时间:&amp;nbsp;2021年2月1日(星期一) 下午5:52
收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&nbsp;
主题:&amp;nbsp;回复: 水位线问题
&nbsp;
&nbsp;
&nbsp;
hi, 可以尝试设置idle source空闲数据源检测[1]
&nbsp;
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
&nbsp;
best,
amenhub
&nbsp;
&nbsp;
&nbsp;
&amp;nbsp;
发件人: 沉醉寒風
发送时间: 2021-02-01 17:20
收件人: user-zh
主题: 水位线问题
flink1.12版本, kafka有3个分区, flink对应设置了3个并发. 发现个问题, 如果有个分区一直没有数据, 其他分区一直有新的数据进去,会导致水位线停止不动, 这个有什么办法吗?
Reply | Threaded
Open this post in threaded view
|

回复: 水位线问题

MOBIN
请问这个不生效是Flink版本不支持吗?Flink SQL中set是生效的
谢谢


| |
MOBIN
|
签名由网易邮箱大师定制


在2021年02月1日 18:06,沉醉寒風<[hidden email]> 写道:
你好,是1.12版本的 EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(executionEnvironment, settings);
Configuration configuration = streamTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.idle-timeout","1000 ms");




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年2月1日(星期一) 晚上6:03
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;回复: 回复: 水位线问题



请问Flink使用版本是多少呢



&nbsp;
发件人: 沉醉寒風
发送时间: 2021-02-01 17:56
收件人: user-zh
主题: 回复:回复: 水位线问题
你好, 我是这么设置的 Configuration configuration = streamTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.idle-timeout","1000 ms"); 但是没有生效
&nbsp;
&nbsp;
&nbsp;
&nbsp;
------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <[hidden email]&amp;gt;;
发送时间:&amp;nbsp;2021年2月1日(星期一) 下午5:52
收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&nbsp;
主题:&amp;nbsp;回复: 水位线问题
&nbsp;
&nbsp;
&nbsp;
hi, 可以尝试设置idle source空闲数据源检测[1]
&nbsp;
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
&nbsp;
best,
amenhub
&nbsp;
&nbsp;
&nbsp;
&amp;nbsp;
发件人: 沉醉寒風
发送时间: 2021-02-01 17:20
收件人: user-zh
主题: 水位线问题
flink1.12版本, kafka有3个分区, flink对应设置了3个并发. 发现个问题, 如果有个分区一直没有数据, 其他分区一直有新的数据进去,会导致水位线停止不动, 这个有什么办法吗?