flink timerservice注册的timer定时器只有少部分触发

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink timerservice注册的timer定时器只有少部分触发

钟昊(zhonghao01)-商业智能部
hi,everyone:

由于业务需求,需要对一条流中的数据分4个步骤处理,处理完第一步后,再过15分钟进行第二步,第二步完成后再过30分钟处理第三步,然后60分钟后进行第四步,我这边通过timerservice设置定时器的方式进行的实现,但是运行过程中发现第二步15分钟的定时器只有非常少部分消息到时间触发了定时任务onTimer中的逻辑,部分代码实现如下:


DataStream<String> dataStream = env.addSource(…);
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> missPingback = …;
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level1Tag = …;
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level2Tag = …;
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level3Tag = …;

SingleOutputStreamOperator<FinalItemFeature> missDs = dataStream
            .map(…)
            .filter(…)
            .assignTimestampsAndWatermarks(new AssignWaterMark())
            .keyBy(0)
            .timeWindow(Time.seconds(winSize))
            .process(new BatchMergeProcessFunction(missPingback));

SingleOutputStreamOperator<FinalItemFeature> level1MissedDs =
             missDs
            .getSideOutput(missPingback)
            .keyBy(0)
            // ********** 这一步中,处理类中15分钟的定时器只有非常少部分数据的定时器timer触发了,触发比例10%不到。 ********
            .process(new HbaseTimerProcessFunc(level1Tag, 15));

SingleOutputStreamOperator<FinalItemFeature> level2MissedDs =
             level1MissedDs
            .keyBy(0)
            .process(new HbaseTimerProcessFunc(level2Tag, 30));

SingleOutputStreamOperator<FinalItemFeature> level3MissedDs =
             level2MissedDs
            .keyBy(0)
            .process(new HbaseTimerProcessFunc(level3Tag, 60));
DataStream<FinalItemFeature> l1Ds = level1MissedDs.getSideOutput(level1Tag);
DataStream<FinalItemFeature> l2Ds = level2MissedDs.getSideOutput(level2Tag);
DataStream<FinalItemFeature> l3Ds = level3MissedDs.getSideOutput(level3Tag);

missDs.union(l1Ds).union(l2Ds).union(l3Ds).addSink(…);


———————————
 —> HbaseTimerProcessFunc.class 中定时器设置方法
@Override
public void processElement(SimplifiedPingbackMsg value, Context context, Collector<SimplifiedPingbackMsg> out) throws Exception {
    long ts = System.currentTimeMillis() + 60 * 1000 * timeout;
    pbState.put(value.getEventId() + ts, value);
    context.timerService().registerEventTimeTimer(ts);
    MetricModifyUtils.modifyPerDay(timerTotalCounter, 1);
}
———————————



// 水印提取类,类似于直接用了ProcessTime来提取的
class AssignWaterMark implements AssignerWithPeriodicWatermarks<Tuple2<String, SimplifiedPingbackMsg>> {

     private long maxOutOfOrderness = 60 * 1000 * 5;
     private long currentMaxTimestamp = 0L;

     @Nullable
     @Override
     public Watermark getCurrentWatermark() {
         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
     }

     @Override
     public long extractTimestamp(Tuple2<String, SimplifiedPingbackMsg> element, long previousElementTimestamp) {
         long timestamp = System.currentTimeMillis();
         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
         return timestamp;
     }
}



所以为啥这里的实现会出现此类情况呢,这种情况该如何去排查为啥会有绝大多数timer不触发呢,我看了webui中水印生成是正常的。

Reply | Threaded
Open this post in threaded view
|

Re: flink timerservice注册的timer定时器只有少部分触发

shizk233
具体原因不太清楚,但建议使用context.timeService().currentEventTime()和currentProcessingTime()来获取当前的时间。

排查方法的话,不知道你有没有做算子的单元测试,如果还没有的话可以通过flink test util[1][2]做单元测试来debug排查,
可以比较明确的观察到timeService上的Timer状态。

[1]
https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

钟昊(zhonghao01)-商业智能部 <[hidden email]> 于2020年8月6日周四 下午5:31写道:

> hi,everyone:
>
>
> 由于业务需求,需要对一条流中的数据分4个步骤处理,处理完第一步后,再过15分钟进行第二步,第二步完成后再过30分钟处理第三步,然后60分钟后进行第四步,我这边通过timerservice设置定时器的方式进行的实现,但是运行过程中发现第二步15分钟的定时器只有非常少部分消息到时间触发了定时任务onTimer中的逻辑,部分代码实现如下:
>
>
> DataStream<String> dataStream = env.addSource(…);
> final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> missPingback = …;
> final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level1Tag = …;
> final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level2Tag = …;
> final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level3Tag = …;
>
> SingleOutputStreamOperator<FinalItemFeature> missDs = dataStream
>             .map(…)
>             .filter(…)
>             .assignTimestampsAndWatermarks(new AssignWaterMark())
>             .keyBy(0)
>             .timeWindow(Time.seconds(winSize))
>             .process(new BatchMergeProcessFunction(missPingback));
>
> SingleOutputStreamOperator<FinalItemFeature> level1MissedDs =
>              missDs
>             .getSideOutput(missPingback)
>             .keyBy(0)
>             // **********
> 这一步中,处理类中15分钟的定时器只有非常少部分数据的定时器timer触发了,触发比例10%不到。 ********
>             .process(new HbaseTimerProcessFunc(level1Tag, 15));
>
> SingleOutputStreamOperator<FinalItemFeature> level2MissedDs =
>              level1MissedDs
>             .keyBy(0)
>             .process(new HbaseTimerProcessFunc(level2Tag, 30));
>
> SingleOutputStreamOperator<FinalItemFeature> level3MissedDs =
>              level2MissedDs
>             .keyBy(0)
>             .process(new HbaseTimerProcessFunc(level3Tag, 60));
> DataStream<FinalItemFeature> l1Ds =
> level1MissedDs.getSideOutput(level1Tag);
> DataStream<FinalItemFeature> l2Ds =
> level2MissedDs.getSideOutput(level2Tag);
> DataStream<FinalItemFeature> l3Ds =
> level3MissedDs.getSideOutput(level3Tag);
>
> missDs.union(l1Ds).union(l2Ds).union(l3Ds).addSink(…);
>
>
> ———————————
>  —> HbaseTimerProcessFunc.class 中定时器设置方法
> @Override
> public void processElement(SimplifiedPingbackMsg value, Context context,
> Collector<SimplifiedPingbackMsg> out) throws Exception {
>     long ts = System.currentTimeMillis() + 60 * 1000 * timeout;
>     pbState.put(value.getEventId() + ts, value);
>     context.timerService().registerEventTimeTimer(ts);
>     MetricModifyUtils.modifyPerDay(timerTotalCounter, 1);
> }
> ———————————
>
> …
>
> // 水印提取类,类似于直接用了ProcessTime来提取的
> class AssignWaterMark implements
> AssignerWithPeriodicWatermarks<Tuple2<String, SimplifiedPingbackMsg>> {
>
>      private long maxOutOfOrderness = 60 * 1000 * 5;
>      private long currentMaxTimestamp = 0L;
>
>      @Nullable
>      @Override
>      public Watermark getCurrentWatermark() {
>          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>      }
>
>      @Override
>      public long extractTimestamp(Tuple2<String, SimplifiedPingbackMsg>
> element, long previousElementTimestamp) {
>          long timestamp = System.currentTimeMillis();
>          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>          return timestamp;
>      }
> }
>
>
>
> 所以为啥这里的实现会出现此类情况呢,这种情况该如何去排查为啥会有绝大多数timer不触发呢,我看了webui中水印生成是正常的。
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink timerservice注册的timer定时器只有少部分触发

jsqf
In reply to this post by 钟昊(zhonghao01)-商业智能部
Reply | Threaded
Open this post in threaded view
|

Re: flink timerservice注册的timer定时器只有少部分触发

Congxian Qiu
Hi
   对于 event time 的处理来说,不建议注册 timer 的时候使用 System.currentTimeMillis()
这种系统时间,这两个时间可能会不一样,可以使用 TimerService 中的 currentWatermark 表示当前的 event time

Best,
Congxian


jsqf <[hidden email]> 于2020年8月6日周四 下午9:53写道:
Reply | Threaded
Open this post in threaded view
|

Re: flink timerservice注册的timer定时器只有少部分触发

钟昊(zhonghao01)-商业智能部
In reply to this post by 钟昊(zhonghao01)-商业智能部
不好意思,邮件被公司邮箱屏蔽了刚接收到。十分感谢解答,按照建议改了context.timeService().currentEventTime(),目前情况还是这样没有啥改变,后面再补充一些单元测试吧。

钟昊
IIG-BI
 
爱奇艺公司  iQIYI, Inc
地址:上海市长宁区临虹路365号爱奇艺创新大厦3
邮编:201103
手机:+86 150 0080 1440
邮箱:[hidden email]
网址:www.iQIYI.com


2020年8月6日 下午6:14,shizk233 <[hidden email]> 写道:

具体原因不太清楚,但建议使用context.timeService().currentEventTime()和currentProcessingTime()来获取当前的时间。

排查方法的话,不知道你有没有做算子的单元测试,如果还没有的话可以通过flink test util[1][2]做单元测试来debug排查,
可以比较明确的观察到timeService上的Timer状态。

[1]
https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

钟昊(zhonghao01)-商业智能部 <[hidden email]> 于2020年8月6日周四 下午5:31写道:

hi,everyone:


由于业务需求,需要对一条流中的数据分4个步骤处理,处理完第一步后,再过15分钟进行第二步,第二步完成后再过30分钟处理第三步,然后60分钟后进行第四步,我这边通过timerservice设置定时器的方式进行的实现,但是运行过程中发现第二步15分钟的定时器只有非常少部分消息到时间触发了定时任务onTimer中的逻辑,部分代码实现如下:


DataStream<String> dataStream = env.addSource(…);
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> missPingback = …;
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level1Tag = …;
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level2Tag = …;
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level3Tag = …;

SingleOutputStreamOperator<FinalItemFeature> missDs = dataStream
           .map(…)
           .filter(…)
           .assignTimestampsAndWatermarks(new AssignWaterMark())
           .keyBy(0)
           .timeWindow(Time.seconds(winSize))
           .process(new BatchMergeProcessFunction(missPingback));

SingleOutputStreamOperator<FinalItemFeature> level1MissedDs =
            missDs
           .getSideOutput(missPingback)
           .keyBy(0)
           // **********
这一步中,处理类中15分钟的定时器只有非常少部分数据的定时器timer触发了,触发比例10%不到。 ********
           .process(new HbaseTimerProcessFunc(level1Tag, 15));

SingleOutputStreamOperator<FinalItemFeature> level2MissedDs =
            level1MissedDs
           .keyBy(0)
           .process(new HbaseTimerProcessFunc(level2Tag, 30));

SingleOutputStreamOperator<FinalItemFeature> level3MissedDs =
            level2MissedDs
           .keyBy(0)
           .process(new HbaseTimerProcessFunc(level3Tag, 60));
DataStream<FinalItemFeature> l1Ds =
level1MissedDs.getSideOutput(level1Tag);
DataStream<FinalItemFeature> l2Ds =
level2MissedDs.getSideOutput(level2Tag);
DataStream<FinalItemFeature> l3Ds =
level3MissedDs.getSideOutput(level3Tag);

missDs.union(l1Ds).union(l2Ds).union(l3Ds).addSink(…);


———————————
—> HbaseTimerProcessFunc.class 中定时器设置方法
@Override
public void processElement(SimplifiedPingbackMsg value, Context context,
Collector<SimplifiedPingbackMsg> out) throws Exception {
   long ts = System.currentTimeMillis() + 60 * 1000 * timeout;
   pbState.put(value.getEventId() + ts, value);
   context.timerService().registerEventTimeTimer(ts);
   MetricModifyUtils.modifyPerDay(timerTotalCounter, 1);
}
———————————



// 水印提取类,类似于直接用了ProcessTime来提取的
class AssignWaterMark implements
AssignerWithPeriodicWatermarks<Tuple2<String, SimplifiedPingbackMsg>> {

    private long maxOutOfOrderness = 60 * 1000 * 5;
    private long currentMaxTimestamp = 0L;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Tuple2<String, SimplifiedPingbackMsg>
element, long previousElementTimestamp) {
        long timestamp = System.currentTimeMillis();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
}



所以为啥这里的实现会出现此类情况呢,这种情况该如何去排查为啥会有绝大多数timer不触发呢,我看了webui中水印生成是正常的。