你好。
在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample, 我是这样用的, 1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 2. 在connector里指定watermark,其中transTime是消息里的字段 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" + 3. 然后直接用datastream的window ds.keyBy(marketCode).timeWindow(Time.minutes(1L)); 但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗? java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? 即使我在datastream里定义了strategy , ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))); 也还是报上面一样的错。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi marble,
使用 Datastream 开发的话,Kafka connector 的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。 对应的中文文档对应在文献3和4. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html [3] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html [4] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_timestamps_watermarks.html Best, Hailong Wang 在 2020-11-03 09:12:42,"[hidden email]" <[hidden email]> 写道: >你好。 >在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample, > >我是这样用的, >1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >2. 在connector里指定watermark,其中transTime是消息里的字段 >" rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd >HH:mm:ss')), \n " + >" WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" + > >3. 然后直接用datastream的window >ds.keyBy(marketCode).timeWindow(Time.minutes(1L)); > >但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗? >java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no >timestamp marker). Is the time characteristic set to 'ProcessingTime', or >did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? > >即使我在datastream里定义了strategy , >ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))); > >也还是报上面一样的错。 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by marble.zhong@coinflex.com.INVALID
Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event
Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。 public class MyExample { public static void main(String[] args) throws Exception { // 创建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 设置时间特性为 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 水印策略 WatermarkStrategy<WC> watermarkStrategy = WatermarkStrategy .<WC>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner<WC>() { @Override public long extractTimestamp(WC wc, long l) { return wc.getEventTime() * 1000; } }); // Kafka 配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "Kafka地址:9092"); properties.setProperty("group.id", "test"); env.addSource(new FlinkKafkaConsumer<>("flinktest1", new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest()) // map 构建 WC 对象 .map(new MapFunction<ObjectNode, WC>() { @Override public WC map(ObjectNode jsonNode) throws Exception { JsonNode valueNode = jsonNode.get("value"); WC wc = new WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong()); return wc; } }) // 设定水印策略 .assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(WC::getWord) // 窗口设置,这里设置为滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 设置窗口延迟 .allowedLateness(Time.seconds(2)) .reduce(new ReduceFunction<WC>() { @Override public WC reduce(WC wc, WC t1) throws Exception { return new WC(wc.getWord(), wc.getCount() + t1.getCount()); } }) .print(); env.execute(); } static class WC { public String word; public int count; public long eventTime; public long getEventTime() { return eventTime; } public void setEventTime(long eventTime) { this.eventTime = eventTime; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public WC(String word, int count) { this.word = word; this.count = count; } public WC(String word, int count,long eventTime) { this.word = word; this.count = count; this.eventTime = eventTime; } @Override public String toString() { return "WC{" + "word='" + word + '\'' + ", count=" + count + '}'; } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |