kafka table connector eventTime的问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

kafka table connector eventTime的问题

marble.zhong@coinflex.com.INVALID
你好。
在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,

我是这样用的,
1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2. 在connector里指定watermark,其中transTime是消息里的字段
"    rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd
HH:mm:ss')), \n " +
"    WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +

3. 然后直接用datastream的window
ds.keyBy(marketCode).timeWindow(Time.minutes(1L));

但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗?
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
timestamp marker). Is the time characteristic set to 'ProcessingTime', or
did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

即使我在datastream里定义了strategy ,
ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));

也还是报上面一样的错。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:kafka table connector eventTime的问题

hailongwang
Hi marble,
 使用 Datastream 开发的话,Kafka connector 的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。
对应的中文文档对应在文献3和4.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
[3] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html
[4] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_timestamps_watermarks.html


Best,
Hailong Wang

在 2020-11-03 09:12:42,"[hidden email]" <[hidden email]> 写道:

>你好。
>在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,
>
>我是这样用的,
>1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>2. 在connector里指定watermark,其中transTime是消息里的字段
>"    rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd
>HH:mm:ss')), \n " +
>"    WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +
>
>3. 然后直接用datastream的window
>ds.keyBy(marketCode).timeWindow(Time.minutes(1L));
>
>但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗?
>java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>timestamp marker). Is the time characteristic set to 'ProcessingTime', or
>did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>
>即使我在datastream里定义了strategy ,
>ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));
>
>也还是报上面一样的错。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: kafka table connector eventTime的问题

WeiXubin
In reply to this post by marble.zhong@coinflex.com.INVALID
Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event
Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。

public class MyExample {

    public static void main(String[] args) throws Exception {
        // 创建环境
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 设置时间特性为
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 水印策略
        WatermarkStrategy<WC> watermarkStrategy = WatermarkStrategy
                .<WC>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new
SerializableTimestampAssigner<WC>() {
                    @Override
                    public long extractTimestamp(WC wc, long l) {
                        return wc.getEventTime() * 1000;
                    }
                });

        // Kafka 配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "Kafka地址:9092");
        properties.setProperty("group.id", "test");

        env.addSource(new FlinkKafkaConsumer<>("flinktest1", new
JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())
                // map 构建 WC 对象
                .map(new MapFunction<ObjectNode, WC>() {
                    @Override
                    public WC map(ObjectNode jsonNode) throws Exception {
                        JsonNode valueNode = jsonNode.get("value");
                        WC wc = new
WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());
                        return wc;
                    }
                })
                // 设定水印策略
                .assignTimestampsAndWatermarks(watermarkStrategy)
                .keyBy(WC::getWord)
                // 窗口设置,这里设置为滚动窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 设置窗口延迟
                .allowedLateness(Time.seconds(2))
                .reduce(new ReduceFunction<WC>() {
                    @Override
                    public WC reduce(WC wc, WC t1) throws Exception {
                        return new WC(wc.getWord(), wc.getCount() +
t1.getCount());
                    }
                })
                .print();

        env.execute();
    }


    static class WC {
        public String word;
        public int count;
        public long eventTime;

        public long getEventTime() {
            return eventTime;
        }

        public void setEventTime(long eventTime) {
            this.eventTime = eventTime;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }

        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }
       
        public WC(String word, int count,long eventTime) {
            this.word = word;
            this.count = count;
            this.eventTime = eventTime;
        }
     
        @Override
        public String toString() {
            return "WC{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/