hi everyone,
zhangsan|man|28|[hidden email]|{"math":98, "language":{"english":89, "french":95}}|china|beijing 这是一条来自kafka消息队列中的数据,当我创建kafka ddl为之定义schema时,报出异常信息: Exception in thread "main" java.lang.IllegalArgumentException: Only simple types are supported in the second level nesting of fields 'alex_1' but was: ROW<`english` INT, `french` INT> at org.apache.flink.formats.csv.CsvRowSchemaConverter.validateNestedField(CsvRowSchemaConverter.java:220) at org.apache.flink.formats.csv.CsvRowSchemaConverter.convertType(CsvRowSchemaConverter.java:197) at org.apache.flink.formats.csv.CsvRowSchemaConverter.convert(CsvRowSchemaConverter.java:145) at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema$Builder.<init>(CsvRowDataDeserializationSchema.java:98) at org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:79) at org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:71) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:401) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:184) at org.apache.flink.table.planner.sources.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:262) at org.apache.flink.table.planner.sources.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:73) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:101) ... ... DDL语句: CREATE TABLE kafka_source ( name STRING, sex STRING, age INT, mail STRING, alex_1 ROW<math INT, `language` ROW<english INT, french INT>>, country STRING, city STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'topic5-r1p3', 'properties.bootstrap.servers' = '10.1.128.63:9092', 'properties.group.id' = 'group5', 'format' = 'csv', 'csv.field-delimiter' = '|', 'scan.startup.mode' = 'group-offsets' ) 看起来似乎是ROW嵌套的问题,但是当把format改成json,却没有问题,想请问在csv中嵌套这样的json数据,schema该如何定义? best, amenhub |
Flink版本 1.12.0
发件人: [hidden email] 发送时间: 2021-01-03 16:09 收件人: user-zh 主题: Flink SQL DDL Schema csv嵌套json hi everyone, zhangsan|man|28|[hidden email]|{"math":98, "language":{"english":89, "french":95}}|china|beijing 这是一条来自kafka消息队列中的数据,当我创建kafka ddl为之定义schema时,报出异常信息: Exception in thread "main" java.lang.IllegalArgumentException: Only simple types are supported in the second level nesting of fields 'alex_1' but was: ROW<`english` INT, `french` INT> at org.apache.flink.formats.csv.CsvRowSchemaConverter.validateNestedField(CsvRowSchemaConverter.java:220) at org.apache.flink.formats.csv.CsvRowSchemaConverter.convertType(CsvRowSchemaConverter.java:197) at org.apache.flink.formats.csv.CsvRowSchemaConverter.convert(CsvRowSchemaConverter.java:145) at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema$Builder.<init>(CsvRowDataDeserializationSchema.java:98) at org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:79) at org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:71) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:401) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:184) at org.apache.flink.table.planner.sources.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:262) at org.apache.flink.table.planner.sources.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:73) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:101) ... ... DDL语句: CREATE TABLE kafka_source ( name STRING, sex STRING, age INT, mail STRING, alex_1 ROW<math INT, `language` ROW<english INT, french INT>>, country STRING, city STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'topic5-r1p3', 'properties.bootstrap.servers' = '10.1.128.63:9092', 'properties.group.id' = 'group5', 'format' = 'csv', 'csv.field-delimiter' = '|', 'scan.startup.mode' = 'group-offsets' ) 看起来似乎是ROW嵌套的问题,但是当把format改成json,却没有问题,想请问在csv中嵌套这样的json数据,schema该如何定义? best, amenhub |
Free forum by Nabble | Edit this page |