大家好 kafka table connector eventTime的问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

大家好 kafka table connector eventTime的问题

marble.zhong@coinflex.com.INVALID
你好。在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime,
没有找到一些相应的sample,我是这样用的,1.
设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);2.
在connector里指定watermark,其中transTime是消息里的字段"    rowtime AS
TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd HH:mm:ss')), \n "
+"    WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +3.
然后直接用datastream的windowds.keyBy(marketCode).timeWindow(Time.minutes(1L));
但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗?java.lang.RuntimeException:
Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time
characteristic set to 'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?即使我在datastream里定义了strategy
,ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));也还是报上面一样的错。



--
Sent from: http://apache-flink.147419.n8.nabble.com/