hi,everyone:
由于业务需求,需要对一条流中的数据分4个步骤处理,处理完第一步后,再过15分钟进行第二步,第二步完成后再过30分钟处理第三步,然后60分钟后进行第四步,我这边通过timerservice设置定时器的方式进行的实现,但是运行过程中发现第二步15分钟的定时器只有非常少部分消息到时间触发了定时任务onTimer中的逻辑,部分代码实现如下: DataStream<String> dataStream = env.addSource(…); final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> missPingback = …; final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level1Tag = …; final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level2Tag = …; final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level3Tag = …; SingleOutputStreamOperator<FinalItemFeature> missDs = dataStream .map(…) .filter(…) .assignTimestampsAndWatermarks(new AssignWaterMark()) .keyBy(0) .timeWindow(Time.seconds(winSize)) .process(new BatchMergeProcessFunction(missPingback)); SingleOutputStreamOperator<FinalItemFeature> level1MissedDs = missDs .getSideOutput(missPingback) .keyBy(0) // ********** 这一步中,处理类中15分钟的定时器只有非常少部分数据的定时器timer触发了,触发比例10%不到。 ******** .process(new HbaseTimerProcessFunc(level1Tag, 15)); SingleOutputStreamOperator<FinalItemFeature> level2MissedDs = level1MissedDs .keyBy(0) .process(new HbaseTimerProcessFunc(level2Tag, 30)); SingleOutputStreamOperator<FinalItemFeature> level3MissedDs = level2MissedDs .keyBy(0) .process(new HbaseTimerProcessFunc(level3Tag, 60)); DataStream<FinalItemFeature> l1Ds = level1MissedDs.getSideOutput(level1Tag); DataStream<FinalItemFeature> l2Ds = level2MissedDs.getSideOutput(level2Tag); DataStream<FinalItemFeature> l3Ds = level3MissedDs.getSideOutput(level3Tag); missDs.union(l1Ds).union(l2Ds).union(l3Ds).addSink(…); ——————————— —> HbaseTimerProcessFunc.class 中定时器设置方法 @Override public void processElement(SimplifiedPingbackMsg value, Context context, Collector<SimplifiedPingbackMsg> out) throws Exception { long ts = System.currentTimeMillis() + 60 * 1000 * timeout; pbState.put(value.getEventId() + ts, value); context.timerService().registerEventTimeTimer(ts); MetricModifyUtils.modifyPerDay(timerTotalCounter, 1); } ——————————— … // 水印提取类,类似于直接用了ProcessTime来提取的 class AssignWaterMark implements AssignerWithPeriodicWatermarks<Tuple2<String, SimplifiedPingbackMsg>> { private long maxOutOfOrderness = 60 * 1000 * 5; private long currentMaxTimestamp = 0L; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } @Override public long extractTimestamp(Tuple2<String, SimplifiedPingbackMsg> element, long previousElementTimestamp) { long timestamp = System.currentTimeMillis(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } } 所以为啥这里的实现会出现此类情况呢,这种情况该如何去排查为啥会有绝大多数timer不触发呢,我看了webui中水印生成是正常的。 |
具体原因不太清楚,但建议使用context.timeService().currentEventTime()和currentProcessingTime()来获取当前的时间。
排查方法的话,不知道你有没有做算子的单元测试,如果还没有的话可以通过flink test util[1][2]做单元测试来debug排查, 可以比较明确的观察到timeService上的Timer状态。 [1] https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html 钟昊(zhonghao01)-商业智能部 <[hidden email]> 于2020年8月6日周四 下午5:31写道: > hi,everyone: > > > 由于业务需求,需要对一条流中的数据分4个步骤处理,处理完第一步后,再过15分钟进行第二步,第二步完成后再过30分钟处理第三步,然后60分钟后进行第四步,我这边通过timerservice设置定时器的方式进行的实现,但是运行过程中发现第二步15分钟的定时器只有非常少部分消息到时间触发了定时任务onTimer中的逻辑,部分代码实现如下: > > > DataStream<String> dataStream = env.addSource(…); > final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> missPingback = …; > final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level1Tag = …; > final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level2Tag = …; > final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level3Tag = …; > > SingleOutputStreamOperator<FinalItemFeature> missDs = dataStream > .map(…) > .filter(…) > .assignTimestampsAndWatermarks(new AssignWaterMark()) > .keyBy(0) > .timeWindow(Time.seconds(winSize)) > .process(new BatchMergeProcessFunction(missPingback)); > > SingleOutputStreamOperator<FinalItemFeature> level1MissedDs = > missDs > .getSideOutput(missPingback) > .keyBy(0) > // ********** > 这一步中,处理类中15分钟的定时器只有非常少部分数据的定时器timer触发了,触发比例10%不到。 ******** > .process(new HbaseTimerProcessFunc(level1Tag, 15)); > > SingleOutputStreamOperator<FinalItemFeature> level2MissedDs = > level1MissedDs > .keyBy(0) > .process(new HbaseTimerProcessFunc(level2Tag, 30)); > > SingleOutputStreamOperator<FinalItemFeature> level3MissedDs = > level2MissedDs > .keyBy(0) > .process(new HbaseTimerProcessFunc(level3Tag, 60)); > DataStream<FinalItemFeature> l1Ds = > level1MissedDs.getSideOutput(level1Tag); > DataStream<FinalItemFeature> l2Ds = > level2MissedDs.getSideOutput(level2Tag); > DataStream<FinalItemFeature> l3Ds = > level3MissedDs.getSideOutput(level3Tag); > > missDs.union(l1Ds).union(l2Ds).union(l3Ds).addSink(…); > > > ——————————— > —> HbaseTimerProcessFunc.class 中定时器设置方法 > @Override > public void processElement(SimplifiedPingbackMsg value, Context context, > Collector<SimplifiedPingbackMsg> out) throws Exception { > long ts = System.currentTimeMillis() + 60 * 1000 * timeout; > pbState.put(value.getEventId() + ts, value); > context.timerService().registerEventTimeTimer(ts); > MetricModifyUtils.modifyPerDay(timerTotalCounter, 1); > } > ——————————— > > … > > // 水印提取类,类似于直接用了ProcessTime来提取的 > class AssignWaterMark implements > AssignerWithPeriodicWatermarks<Tuple2<String, SimplifiedPingbackMsg>> { > > private long maxOutOfOrderness = 60 * 1000 * 5; > private long currentMaxTimestamp = 0L; > > @Nullable > @Override > public Watermark getCurrentWatermark() { > return new Watermark(currentMaxTimestamp - maxOutOfOrderness); > } > > @Override > public long extractTimestamp(Tuple2<String, SimplifiedPingbackMsg> > element, long previousElementTimestamp) { > long timestamp = System.currentTimeMillis(); > currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); > return timestamp; > } > } > > > > 所以为啥这里的实现会出现此类情况呢,这种情况该如何去排查为啥会有绝大多数timer不触发呢,我看了webui中水印生成是正常的。 > > |
In reply to this post by 钟昊(zhonghao01)-商业智能部
|
Hi
对于 event time 的处理来说,不建议注册 timer 的时候使用 System.currentTimeMillis() 这种系统时间,这两个时间可能会不一样,可以使用 TimerService 中的 currentWatermark 表示当前的 event time Best, Congxian jsqf <[hidden email]> 于2020年8月6日周四 下午9:53写道: > 试试 重写 onTimer 方法 > 可以参考 > > https://github.com/JSQF/flink10_learn/blob/master/src/main/scala/com/yyb/flink10/DataStream/ProcessFunction/OperatorProcessFunctionDemo.java > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
In reply to this post by 钟昊(zhonghao01)-商业智能部
不好意思,邮件被公司邮箱屏蔽了刚接收到。十分感谢解答,按照建议改了context.timeService().currentEventTime(),目前情况还是这样没有啥改变,后面再补充一些单元测试吧。
钟昊
IIG-BI
爱奇艺公司 iQIYI, Inc
地址:上海市长宁区临虹路365号爱奇艺创新大厦3层
邮编:201103
手机:+86 150 0080 1440
|
Free forum by Nabble | Edit this page |