hi,all: event time这个时间戳是在什么时候打到数据上面去的,看api是在flink source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗? 不知道有哪里是我理解不对的地方望指教! 祝好~ |
Hi,
可以看下事件时间戳的生成,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html 下面例子里时间戳都是来自element里面的时间字段。还有一个AscendingTimestampExtractor。 /** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. */publicclassBoundedOutOfOrdernessGeneratorimplementsAssignerWithPeriodicWatermarks<MyEvent>{privatefinallongmaxOutOfOrderness=3500;// 3.5 secondsprivatelongcurrentMaxTimestamp;@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){longtimestamp=element.getCreationTime();currentMaxTimestamp=Math.max(timestamp,currentMaxTimestamp);returntimestamp;}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current highest timestamp minus the out-of-orderness boundreturnnewWatermark(currentMaxTimestamp-maxOutOfOrderness);}}/** * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */publicclassTimeLagWatermarkGeneratorimplementsAssignerWithPeriodicWatermarks<MyEvent>{privatefinallongmaxTimeLag=5000;// 5 seconds@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current time minus the maximum time lagreturnnewWatermark(System.currentTimeMillis()-maxTimeLag);}} publicclassPunctuatedAssignerimplementsAssignerWithPunctuatedWatermarks<MyEvent>{@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkcheckAndGetNextWatermark(MyEventlastElement,longextractedTimestamp){returnlastElement.hasWatermarkMarker()?newWatermark(extractedTimestamp):null;}} 希望能有所帮助。 DataStream和kafkaSource后面都可以调用assignTimestampsAndWatermarks。 kafkaSource.assignTimestampsAndWatermarks(newAscendingTimestampExtractor<MyType>(){@OverridepubliclongextractAscendingTimestamp(MyTypeelement){returnelement.eventTimestamp();}}); 在 2019-08-07 15:47:41,"xiaohei.info" <[hidden email]> 写道: > >hi,all: > event time这个时间戳是在什么时候打到数据上面去的,看api是在flink source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。 > 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗? > 不知道有哪里是我理解不对的地方望指教! > 祝好~ |
In reply to this post by xiaohei.info
hi,all: event time这个时间戳是在什么时候打到数据上面去的,看api是在flink source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗? 不知道有哪里是我理解不对的地方望指教! 祝好~ |
In reply to this post by xiaohei.info
Hi,
Q: event time这个时间戳是在什么时候打到数据上面去的, A: event time按字面意思理解为event发生的时间, 如果产生数据的设备提供了记录时间的字段, 并且业务逻辑也需要使用这个时间, 则可以将该时间作为event time. 更多信息可以参考 https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html 关于event time, processing time的描述 [hidden email] <[hidden email]> 于2019年8月8日周四 下午4:36写道: > > hi,all: > event time这个时间戳是在什么时候打到数据上面去的,看api是在flink > source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka > source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。 > > 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗? > 不知道有哪里是我理解不对的地方望指教! > 祝好~ > |
Free forum by Nabble | Edit this page |