|
大佬们好,
有问题想请教:
1、使用1.9版本的flinksql时,如果以kafka作为数据源,那么当中的Csv format部分(标青色的部分)该如何写才能正确匹配上Schema中的字段?【因为大部分资料中都是以json format比较多,所以没法参考】
tableEnv.connect(
new Kafka()
.version("0.10")
.topic("topic1")
.property("bootstrap.servers", "node1:9092")
.property("group.id","test")
.startFromLatest()
)
.withFormat(
new Csv()
.schema(Types.ROW(。。。。。))
#我填“.schema(Types.ROW(Types.SQL_TIMESTAMP, Types.STRING, Types.INT)) ”会报错“Caused by: org.apache.flink.table.api.ValidationException: Could not map the schema field 'fruit' to a field from source. Please specify the source field from which it can be derived.”
.fieldDelimiter(',')
.lineDelimiter("\r\n")
)
.withSchema(
new Schema()
.field("rowtime",Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(0)
)
.field("fruit", Types.STRING)
.field("number", Types.INT)
)
.inAppendMode()
.registerTableSource("sourceTable")
期待大佬的指导!谢谢!
|