Flink SQL DDL Schema csv嵌套json

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL DDL Schema csv嵌套json

amenhub@163.com
hi everyone,

zhangsan|man|28|[hidden email]|{"math":98, "language":{"english":89, "french":95}}|china|beijing

这是一条来自kafka消息队列中的数据,当我创建kafka ddl为之定义schema时,报出异常信息:
Exception in thread "main" java.lang.IllegalArgumentException: Only simple types are supported in the second level nesting of fields 'alex_1' but was: ROW<`english` INT, `french` INT>
at org.apache.flink.formats.csv.CsvRowSchemaConverter.validateNestedField(CsvRowSchemaConverter.java:220)
at org.apache.flink.formats.csv.CsvRowSchemaConverter.convertType(CsvRowSchemaConverter.java:197)
at org.apache.flink.formats.csv.CsvRowSchemaConverter.convert(CsvRowSchemaConverter.java:145)
at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema$Builder.<init>(CsvRowDataDeserializationSchema.java:98)
at org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:79)
at org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:71)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:401)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:184)
at org.apache.flink.table.planner.sources.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:262)
at org.apache.flink.table.planner.sources.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:73)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:101)
        ...
        ...

DDL语句:
CREATE TABLE kafka_source (
        name STRING,
sex STRING,
age INT,
mail STRING,
alex_1 ROW<math INT, `language` ROW<english INT, french INT>>,
country STRING,
city STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic5-r1p3',
    'properties.bootstrap.servers' = '10.1.128.63:9092',
    'properties.group.id' = 'group5',
    'format' = 'csv',
    'csv.field-delimiter' = '|',
    'scan.startup.mode' = 'group-offsets'
)

看起来似乎是ROW嵌套的问题,但是当把format改成json,却没有问题,想请问在csv中嵌套这样的json数据,schema该如何定义?

best,
amenhub


Reply | Threaded
Open this post in threaded view
|

回复: Flink SQL DDL Schema csv嵌套json

amenhub@163.com
Flink版本 1.12.0



 
发件人: [hidden email]
发送时间: 2021-01-03 16:09
收件人: user-zh
主题: Flink SQL DDL Schema csv嵌套json
hi everyone,
 
zhangsan|man|28|[hidden email]|{"math":98, "language":{"english":89, "french":95}}|china|beijing
 
这是一条来自kafka消息队列中的数据,当我创建kafka ddl为之定义schema时,报出异常信息:
Exception in thread "main" java.lang.IllegalArgumentException: Only simple types are supported in the second level nesting of fields 'alex_1' but was: ROW<`english` INT, `french` INT>
at org.apache.flink.formats.csv.CsvRowSchemaConverter.validateNestedField(CsvRowSchemaConverter.java:220)
at org.apache.flink.formats.csv.CsvRowSchemaConverter.convertType(CsvRowSchemaConverter.java:197)
at org.apache.flink.formats.csv.CsvRowSchemaConverter.convert(CsvRowSchemaConverter.java:145)
at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema$Builder.<init>(CsvRowDataDeserializationSchema.java:98)
at org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:79)
at org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:71)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:401)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:184)
at org.apache.flink.table.planner.sources.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:262)
at org.apache.flink.table.planner.sources.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:73)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:101)
        ...
        ...
 
DDL语句:
CREATE TABLE kafka_source (
        name STRING,
sex STRING,
age INT,
mail STRING,
alex_1 ROW<math INT, `language` ROW<english INT, french INT>>,
country STRING,
city STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic5-r1p3',
    'properties.bootstrap.servers' = '10.1.128.63:9092',
    'properties.group.id' = 'group5',
    'format' = 'csv',
    'csv.field-delimiter' = '|',
    'scan.startup.mode' = 'group-offsets'
)
 
看起来似乎是ROW嵌套的问题,但是当把format改成json,却没有问题,想请问在csv中嵌套这样的json数据,schema该如何定义?
 
best,
amenhub