关于1.9版本的flinksql中的csv format的问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

关于1.9版本的flinksql中的csv format的问题

往事Ψ随风
大佬们好,
有问题想请教:
1、使用1.9版本的flinksql时,如果以kafka作为数据源,那么当中的Csv format部分(标青色的部分)该如何写才能正确匹配上Schema中的字段?【因为大部分资料中都是以json format比较多,所以没法参考】

tableEnv.connect(


new Kafka()


.version("0.10")


.topic("topic1")


.property("bootstrap.servers", "node1:9092")


.property("group.id","test")


.startFromLatest()


)

.withFormat(


new Csv()


.schema(Types.ROW(。。。。。))        

#我填“.schema(Types.ROW(Types.SQL_TIMESTAMP, Types.STRING, Types.INT)) ”会报错“Caused by: org.apache.flink.table.api.ValidationException: Could not map the schema field 'fruit' to a field from source. Please specify the source field from which it can be derived.”





.fieldDelimiter(',')


.lineDelimiter("\r\n")


)

.withSchema(


new Schema()


.field("rowtime",Types.SQL_TIMESTAMP)


.rowtime(new Rowtime()


.timestampsFromField("eventtime")


.watermarksPeriodicBounded(0)


)


.field("fruit", Types.STRING)


.field("number", Types.INT)


)

.inAppendMode()

.registerTableSource("sourceTable")




期待大佬的指导!谢谢!