你好。
在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,
我是这样用的,
1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2. 在connector里指定watermark,其中transTime是消息里的字段
" rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd
HH:mm:ss')), \n " +
" WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +
3. 然后直接用datastream的window
ds.keyBy(marketCode).timeWindow(Time.minutes(1L));
但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗?
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
timestamp marker). Is the time characteristic set to 'ProcessingTime', or
did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
即使我在datastream里定义了strategy ,
ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));
也还是报上面一样的错。
--
Sent from:
http://apache-flink.147419.n8.nabble.com/