关于event-time的定义与产生时��戳位置的问题。

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

关于event-time的定义与产生时��戳位置的问题。

xiaohei.info

hi,all:
  event time这个时间戳是在什么时候打到数据上面去的,看api是在flink source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。
  那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗?
  不知道有哪里是我理解不对的地方望指教!
  祝好~
Reply | Threaded
Open this post in threaded view
|

Re:关于event-time的定义与产生时间戳位置的问题。

邵志鹏
Hi,
可以看下事件时间戳的生成,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html
下面例子里时间戳都是来自element里面的时间字段。还有一个AscendingTimestampExtractor。
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */publicclassBoundedOutOfOrdernessGeneratorimplementsAssignerWithPeriodicWatermarks<MyEvent>{privatefinallongmaxOutOfOrderness=3500;// 3.5 secondsprivatelongcurrentMaxTimestamp;@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){longtimestamp=element.getCreationTime();currentMaxTimestamp=Math.max(timestamp,currentMaxTimestamp);returntimestamp;}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current highest timestamp minus the out-of-orderness boundreturnnewWatermark(currentMaxTimestamp-maxOutOfOrderness);}}/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */publicclassTimeLagWatermarkGeneratorimplementsAssignerWithPeriodicWatermarks<MyEvent>{privatefinallongmaxTimeLag=5000;// 5 seconds@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current time minus the maximum time lagreturnnewWatermark(System.currentTimeMillis()-maxTimeLag);}}
publicclassPunctuatedAssignerimplementsAssignerWithPunctuatedWatermarks<MyEvent>{@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkcheckAndGetNextWatermark(MyEventlastElement,longextractedTimestamp){returnlastElement.hasWatermarkMarker()?newWatermark(extractedTimestamp):null;}}
希望能有所帮助。


DataStream和kafkaSource后面都可以调用assignTimestampsAndWatermarks。


kafkaSource.assignTimestampsAndWatermarks(newAscendingTimestampExtractor<MyType>(){@OverridepubliclongextractAscendingTimestamp(MyTypeelement){returnelement.eventTimestamp();}});







在 2019-08-07 15:47:41,"xiaohei.info" <[hidden email]> 写道:
>
>hi,all:
>  event time这个时间戳是在什么时候打到数据上面去的,看api是在flink source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。
>  那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗?
>  不知道有哪里是我理解不对的地方望指教!
>  祝好~
Reply | Threaded
Open this post in threaded view
|

Re: 关于event-time的定义与产生时间戳位置的问题。

zhaoheng.zhaoheng@qq.com
In reply to this post by xiaohei.info

hi,all:
  event time这个时间戳是在什么时候打到数据上面去的,看api是在flink source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。
  那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗?
  不知道有哪里是我理解不对的地方望指教!
  祝好~
Reply | Threaded
Open this post in threaded view
|

Re: 关于event-time的定义与产生时间戳位置的问题。

Alec Chen
In reply to this post by xiaohei.info
Hi,
Q: event time这个时间戳是在什么时候打到数据上面去的,
A: event time按字面意思理解为event发生的时间, 如果产生数据的设备提供了记录时间的字段, 并且业务逻辑也需要使用这个时间,
则可以将该时间作为event time. 更多信息可以参考
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
关于event
time, processing time的描述

[hidden email] <[hidden email]> 于2019年8月8日周四 下午4:36写道:

>
> hi,all:
>   event time这个时间戳是在什么时候打到数据上面去的,看api是在flink
> source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka
> source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。
>
> 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗?
>   不知道有哪里是我理解不对的地方望指教!
>   祝好~
>