对于kafka partition 设置时间戳及watermark

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

对于kafka partition 设置时间戳及watermark

张锴
我按官网操作,重写了序列化方式

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
props)kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long =
element.eventTimestamp})
val stream: DataStream[MyType] = env.addSource(kafkaSource)

*有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
Reply | Threaded
Open this post in threaded view
|

Re: 对于kafka partition 设置时间戳及watermark

r pp
是的

张锴 <[hidden email]> 于2020年12月19日周六 下午5:45写道:

> 我按官网操作,重写了序列化方式
>
> val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> props)kafkaSource.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor[MyType] {
>     def extractAscendingTimestamp(element: MyType): Long =
> element.eventTimestamp})
> val stream: DataStream[MyType] = env.addSource(kafkaSource)
>
> *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
>
Reply | Threaded
Open this post in threaded view
|

Re: 对于kafka partition 设置时间戳及watermark

nobleyd
setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。

r pp <[hidden email]> 于2020年12月20日周日 上午10:49写道:

> 是的
>
> 张锴 <[hidden email]> 于2020年12月19日周六 下午5:45写道:
>
> > 我按官网操作,重写了序列化方式
> >
> > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> > props)kafkaSource.assignTimestampsAndWatermarks(new
> > AscendingTimestampExtractor[MyType] {
> >     def extractAscendingTimestamp(element: MyType): Long =
> > element.eventTimestamp})
> > val stream: DataStream[MyType] = env.addSource(kafkaSource)
> >
> > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 对于kafka partition 设置时间戳及watermark

张锴
我现在用的flink 版本1.10.1 ,我点开 setAutoWatermarkInterval 看到private long
autoWatermarkInterval = 0;
是否代表它默认的执行频率是0,我理解的意思抽取的时间戳同时生成watermark,它们是一一对应的,不知道我的理解是否正确

赵一旦 <[hidden email]> 于2020年12月20日周日 下午11:15写道:

> setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。
>
> r pp <[hidden email]> 于2020年12月20日周日 上午10:49写道:
>
> > 是的
> >
> > 张锴 <[hidden email]> 于2020年12月19日周六 下午5:45写道:
> >
> > > 我按官网操作,重写了序列化方式
> > >
> > > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> > > props)kafkaSource.assignTimestampsAndWatermarks(new
> > > AscendingTimestampExtractor[MyType] {
> > >     def extractAscendingTimestamp(element: MyType): Long =
> > > element.eventTimestamp})
> > > val stream: DataStream[MyType] = env.addSource(kafkaSource)
> > >
> > > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
> > >
> >
>