Flink Table Schema中Types.SQL_TTIMESTAMP类型json表示问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink Table Schema中Types.SQL_TTIMESTAMP类型json表示问题

陈帅
Flink Table API Schema定义里面的 Types.SQL_TTIMESTAMP 类型用json表示的话一定要用
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'表示吗?
示例程序如下:
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        TableConfig tableConfig = tEnv.getConfig();

tableConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));

        tEnv.registerFunction("DateUtil",new DateUtil());
        tEnv.connect(
                new Kafka()
                        .version("universal")
                        //   "0.8", "0.9", "0.10", "0.11", and "universal"
                        .topic("jsontest")
                        .property("bootstrap.servers", "localhost:9092")
                        .property("group.id","test")
                        .startFromLatest()
        )
                .withFormat(
                        new Json()
                                .failOnMissingField(false)
                                .deriveSchema()
                )
                .withSchema(

                        new Schema()




*.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(2000)                                )*
                                .field("fruit", Types.STRING)
                                .field("number", Types.INT)
                )
                .inAppendMode()
                .registerTableSource("source");

        Table sourceTbl = tEnv.scan("source");
        sourceTbl.printSchema();
        tEnv.toAppendStream(sourceTbl, Row.class).print();

        env.execute();

测试数据 {"eventtime": "2019-12-17T11:11:29.555Z", "fruit": "orange", "number":
45}2019-12-17T11:11:29.555Z", "fruit": "orange", "number": 45}
我想问的是关于 加粗 部分代码的两个问题:
这个示例中我想使用事件时间eventtime,所以 field("rowtime", Types.SQL_TIMESTAMP) 中就一定要使用
rowtime 名称吗?[1]
如果是处理时间processingtime 要如何表示?实际传入的json数据字段却是"eventtime",而且格式方式我试了用 long表示的
epochInMillis不行,改成 yyyy-MM-dd HH:mm:ss也不行,后来看了源码用了
yyyy-MM-dd'T'HH:mm:ss.SSS'Z' 才通过了。想问一下有没有办法指定dateFormat? [2]