你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator = flatMap.keyBy(0,1) .timeWindow(Time.minutes(1)) .process(new ProcessWindowFunction) 当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。 |
Hi,
根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。 并且可以看下使用到的是处理时间还是事件时间? 如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "爱吃鱼"<[hidden email]>; 发送时间: 2020年7月9日(星期四) 中午11:37 收件人: "user-zh"<[hidden email]>; 主题: flink时间窗口 你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下 SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator = flatMap.keyBy(0,1) .timeWindow(Time.minutes(1)) .process(new ProcessWindowFunction) 当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。 |
Hi, 因为业务原因具体的keyby字段没有写清楚,我是根据warningPojo类里面的字段进行排序,源数据 是从kafka实时流传输过来的,每一分钟滑动窗口计算一次 SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator1 = env.addSource(stringFlinkKafkaConsumerBase) .filter((String s) -> (s.split(",", -1).length == 34)) .flatMap(new RichFlatMapFunction<String, warningPojo>() { .keyBy("src", "msg") .timeWindow(Time.minutes(1)) .process(new ProcessWindowFunction<warningPojo, Tuple2<warningPojo, String>, Tuple, TimeWindow>() .setParallelism(1); 每次执行这段流代码就只有第一次的一分钟时间窗口有数据传输到es,之后就没有数据了。 在 2020-07-09 13:09:32,"Yichao Yang" <[hidden email]> 写道: >Hi, > > >根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。 >并且可以看下使用到的是处理时间还是事件时间? >如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。 > > >Best, >Yichao Yang > > > > >------------------ 原始邮件 ------------------ >发件人: "爱吃鱼"<[hidden email]>; >发送时间: 2020年7月9日(星期四) 中午11:37 >收件人: "user-zh"<[hidden email]>; > >主题: flink时间窗口 > > > >你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下 >SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator = flatMap.keyBy(0,1) > .timeWindow(Time.minutes(1)) > .process(new ProcessWindowFunction) >当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。 |
对于 window 来说,你需要判断下是没有数据进来,还是有数据进来但是 window 没有触发。
如果是数据没有进来,那么需要看 window 节点之前的逻辑,如果是数据进来了,但是没有触发,需要看下 wateramrk 是不是符合预期 Best, Congxian 爱吃鱼 <[hidden email]> 于2020年7月9日周四 下午1:42写道: > > > > Hi, > > > > > > > 因为业务原因具体的keyby字段没有写清楚,我是根据warningPojo类里面的字段进行排序,源数据 > 是从kafka实时流传输过来的,每一分钟滑动窗口计算一次 > > > SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator1 = > env.addSource(stringFlinkKafkaConsumerBase) > .filter((String s) -> (s.split(",", -1).length == 34)) > .flatMap(new RichFlatMapFunction<String, warningPojo>() { > .keyBy("src", "msg") > .timeWindow(Time.minutes(1)) > .process(new ProcessWindowFunction<warningPojo, > Tuple2<warningPojo, String>, Tuple, TimeWindow>() > .setParallelism(1); > > > > > 每次执行这段流代码就只有第一次的一分钟时间窗口有数据传输到es,之后就没有数据了。 > > > > > > 在 2020-07-09 13:09:32,"Yichao Yang" <[hidden email]> 写道: > >Hi, > > > > > >根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。 > >并且可以看下使用到的是处理时间还是事件时间? > >如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。 > > > > > >Best, > >Yichao Yang > > > > > > > > > >------------------ 原始邮件 ------------------ > >发件人: "爱吃鱼"<[hidden email]>; > >发送时间: 2020年7月9日(星期四) 中午11:37 > >收件人: "user-zh"<[hidden email]>; > > > >主题: flink时间窗口 > > > > > > > >你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下 > >SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator = > flatMap.keyBy(0,1) > > > .timeWindow(Time.minutes(1)) > > > .process(new ProcessWindowFunction) > >当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。 > |
In reply to this post by 爱吃鱼
new ProcessWindowFunction是怎么处理的?
是否设置了水印? ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月9日(星期四) 中午11:37 收件人: "user-zh"<[hidden email]>; 主题: flink时间窗口 你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下 SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator = flatMap.keyBy(0,1) .timeWindow(Time.minutes(1)) .process(new ProcessWindowFunction) 当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。 |
Hi
对于这个问题,可以尝试看添加相关日志能否在线上(或者测试环境)排查,另外可以使用 watermark 相关的 metric[1] 查看下是否符合预期 如果上面的不行,可以尝试看能否在 IDE 中进行复现,这样可以 debug 进行追查 [1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io Best, Congxian 忝忝向仧 <[hidden email]> 于2020年7月9日周四 下午11:36写道: > new ProcessWindowFunction是怎么处理的? > 是否设置了水印? > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [hidden email]>; > 发送时间: 2020年7月9日(星期四) 中午11:37 > 收件人: "user-zh"<[hidden email]>; > > 主题: flink时间窗口 > > > > 你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下 > SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator = > flatMap.keyBy(0,1) > > .timeWindow(Time.minutes(1)) > > .process(new ProcessWindowFunction) > 当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。 |
Free forum by Nabble | Edit this page |