flink时间窗口

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink时间窗口

爱吃鱼
你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator = flatMap.keyBy(0,1)
                .timeWindow(Time.minutes(1))
                .process(new ProcessWindowFunction)
当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。
Reply | Threaded
Open this post in threaded view
|

回复:flink时间窗口

Yichao Yang
Hi,


根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。
并且可以看下使用到的是处理时间还是事件时间?
如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"爱吃鱼"<[hidden email]&gt;;
发送时间:&nbsp;2020年7月9日(星期四) 中午11:37
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;flink时间窗口



你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
SingleOutputStreamOperator<Tuple2<warningPojo, String&gt;&gt; operator = flatMap.keyBy(0,1)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .timeWindow(Time.minutes(1))
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .process(new ProcessWindowFunction)
当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。
Reply | Threaded
Open this post in threaded view
|

Re:回复:flink时间窗口

爱吃鱼



Hi,






因为业务原因具体的keyby字段没有写清楚,我是根据warningPojo类里面的字段进行排序,源数据
是从kafka实时流传输过来的,每一分钟滑动窗口计算一次


SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator1 = env.addSource(stringFlinkKafkaConsumerBase)
                .filter((String s) -> (s.split(",", -1).length == 34))
                .flatMap(new RichFlatMapFunction<String, warningPojo>() {
                .keyBy("src", "msg")
                .timeWindow(Time.minutes(1))
                .process(new ProcessWindowFunction<warningPojo, Tuple2<warningPojo, String>, Tuple, TimeWindow>()
                .setParallelism(1);




每次执行这段流代码就只有第一次的一分钟时间窗口有数据传输到es,之后就没有数据了。





在 2020-07-09 13:09:32,"Yichao Yang" <[hidden email]> 写道:

>Hi,
>
>
>根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。
>并且可以看下使用到的是处理时间还是事件时间?
>如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。
>
>
>Best,
>Yichao Yang
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:&nbsp;"爱吃鱼"<[hidden email]&gt;;
>发送时间:&nbsp;2020年7月9日(星期四) 中午11:37
>收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
>主题:&nbsp;flink时间窗口
>
>
>
>你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
>SingleOutputStreamOperator<Tuple2<warningPojo, String&gt;&gt; operator = flatMap.keyBy(0,1)
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .timeWindow(Time.minutes(1))
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .process(new ProcessWindowFunction)
>当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。
Reply | Threaded
Open this post in threaded view
|

Re: 回复:flink时间窗口

Congxian Qiu
对于 window 来说,你需要判断下是没有数据进来,还是有数据进来但是 window 没有触发。
如果是数据没有进来,那么需要看 window 节点之前的逻辑,如果是数据进来了,但是没有触发,需要看下 wateramrk 是不是符合预期

Best,
Congxian


爱吃鱼 <[hidden email]> 于2020年7月9日周四 下午1:42写道:

>
>
>
> Hi,
>
>
>
>
>
>
> 因为业务原因具体的keyby字段没有写清楚,我是根据warningPojo类里面的字段进行排序,源数据
> 是从kafka实时流传输过来的,每一分钟滑动窗口计算一次
>
>
> SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator1 =
> env.addSource(stringFlinkKafkaConsumerBase)
>                 .filter((String s) -> (s.split(",", -1).length == 34))
>                 .flatMap(new RichFlatMapFunction<String, warningPojo>() {
>                 .keyBy("src", "msg")
>                 .timeWindow(Time.minutes(1))
>                 .process(new ProcessWindowFunction<warningPojo,
> Tuple2<warningPojo, String>, Tuple, TimeWindow>()
>                 .setParallelism(1);
>
>
>
>
> 每次执行这段流代码就只有第一次的一分钟时间窗口有数据传输到es,之后就没有数据了。
>
>
>
>
>
> 在 2020-07-09 13:09:32,"Yichao Yang" <[hidden email]> 写道:
> >Hi,
> >
> >
> >根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。
> >并且可以看下使用到的是处理时间还是事件时间?
> >如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。
> >
> >
> >Best,
> >Yichao Yang
> >
> >
> >
> >
> >------------------&nbsp;原始邮件&nbsp;------------------
> >发件人:&nbsp;"爱吃鱼"<[hidden email]&gt;;
> >发送时间:&nbsp;2020年7月9日(星期四) 中午11:37
> >收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> >
> >主题:&nbsp;flink时间窗口
> >
> >
> >
> >你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
> >SingleOutputStreamOperator<Tuple2<warningPojo, String&gt;&gt; operator =
> flatMap.keyBy(0,1)
> >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .timeWindow(Time.minutes(1))
> >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .process(new ProcessWindowFunction)
> >当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。
>
Reply | Threaded
Open this post in threaded view
|

回复:flink时间窗口

忝忝向仧
In reply to this post by 爱吃鱼
new ProcessWindowFunction是怎么处理的?
是否设置了水印?




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年7月9日(星期四) 中午11:37
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;flink时间窗口



你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
SingleOutputStreamOperator<Tuple2<warningPojo, String&gt;&gt; operator = flatMap.keyBy(0,1)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .timeWindow(Time.minutes(1))
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .process(new ProcessWindowFunction)
当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。
Reply | Threaded
Open this post in threaded view
|

Re: flink时间窗口

Congxian Qiu
Hi

对于这个问题,可以尝试看添加相关日志能否在线上(或者测试环境)排查,另外可以使用 watermark 相关的 metric[1] 查看下是否符合预期
如果上面的不行,可以尝试看能否在 IDE 中进行复现,这样可以 debug 进行追查

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
Best,
Congxian


忝忝向仧 <[hidden email]> 于2020年7月9日周四 下午11:36写道:

> new ProcessWindowFunction是怎么处理的?
> 是否设置了水印?
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> [hidden email]&gt;;
> 发送时间:&nbsp;2020年7月9日(星期四) 中午11:37
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;flink时间窗口
>
>
>
> 你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
> SingleOutputStreamOperator<Tuple2<warningPojo, String&gt;&gt; operator =
> flatMap.keyBy(0,1)
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .timeWindow(Time.minutes(1))
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .process(new ProcessWindowFunction)
> 当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。