|
Flink Table API Schema定义里面的 Types.SQL_TTIMESTAMP 类型用json表示的话一定要用
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'表示吗?
示例程序如下:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
TableConfig tableConfig = tEnv.getConfig();
tableConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));
tEnv.registerFunction("DateUtil",new DateUtil());
tEnv.connect(
new Kafka()
.version("universal")
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("jsontest")
.property("bootstrap.servers", "localhost:9092")
.property("group.id","test")
.startFromLatest()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(
new Schema()
*.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(2000) )*
.field("fruit", Types.STRING)
.field("number", Types.INT)
)
.inAppendMode()
.registerTableSource("source");
Table sourceTbl = tEnv.scan("source");
sourceTbl.printSchema();
tEnv.toAppendStream(sourceTbl, Row.class).print();
env.execute();
测试数据 {"eventtime": "2019-12-17T11:11:29.555Z", "fruit": "orange", "number":
45}2019-12-17T11:11:29.555Z", "fruit": "orange", "number": 45}
我想问的是关于 加粗 部分代码的两个问题:
这个示例中我想使用事件时间eventtime,所以 field("rowtime", Types.SQL_TIMESTAMP) 中就一定要使用
rowtime 名称吗?[1]
如果是处理时间processingtime 要如何表示?实际传入的json数据字段却是"eventtime",而且格式方式我试了用 long表示的
epochInMillis不行,改成 yyyy-MM-dd HH:mm:ss也不行,后来看了源码用了
yyyy-MM-dd'T'HH:mm:ss.SSS'Z' 才通过了。想问一下有没有办法指定dateFormat? [2]
|