flink 1.10 将流设置为 idle

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.10 将流设置为 idle

Akisaya
flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为 idle 流
但是在 1.10 中没有这样的方法可以设置。

看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在 StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了

StreamSource

请问下,为啥不考虑将这个参数开放出来供用户使用。

我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka 流是否有数据并设置其为 idle

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.10 将流设置为 idle

Akisaya
图片好像发送不出去,这里贴一下代码



StreamSourceContexts#getSourceContext
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
TimeCharacteristic timeCharacteristic,
ProcessingTimeService processingTimeService,
Object checkpointLock,
StreamStatusMaintainer streamStatusMaintainer,
Output<StreamRecord<OUT>> output,
long watermarkInterval,
long idleTimeout) {



StreamSource#run
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);

Akisaya <[hidden email]> 于2021年1月11日周一 下午7:17写道:
flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为 idle 流
但是在 1.10 中没有这样的方法可以设置。

看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在 StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了

StreamSource

请问下,为啥不考虑将这个参数开放出来供用户使用。

我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka 流是否有数据并设置其为 idle