关于Watermark的使用调试问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

关于Watermark的使用调试问题

Xavier
   想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
function之后,watermark会自动重置为默认值的情况。
    谢谢!
--

Best Regards,
*Xavier*
Reply | Threaded
Open this post in threaded view
|

Re: 关于Watermark的使用调试问题

tison
可以中途产生,走这个接口

org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy<T>)

麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况

Best,
tison.


Xavier <[hidden email]> 于2021年3月7日周日 下午7:51写道:

>    想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
> function之后,watermark会自动重置为默认值的情况。
>     谢谢!
> --
>
> Best Regards,
> *Xavier*
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于Watermark的使用调试问题

Xavier

是这样的,可以产生,问题从watermarksState里面拿出来的时间戳,会变成是默认值,全局除过这里有设置过,再无任何关于watermark的逻辑。

val dataLoadStream = data
.map(new EventMapFunction(config))
// Add watermark
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[EventData](Duration.ofMinutes(1))
.withTimestampAssigner(new SerializableTimestampAssigner[EventData] {
override def extractTimestamp(element: EventData, recordTimestamp: Long)
: Long = element.getEventTimestamp
})
)

On Sun, Mar 7, 2021 at 10:38 PM tison <[hidden email]> wrote:
可以中途产生,走这个接口

org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy<T>)

麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况

Best,
tison.


Xavier <[hidden email]> 于2021年3月7日周日 下午7:51写道:

>    想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
> function之后,watermark会自动重置为默认值的情况。
>     谢谢!
> --
>
> Best Regards,
> *Xavier*
>


--

Best Regards,
Xavier