hi:
1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999} jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}} connect: streamTableEnv 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) ... 7 more
|
Administrator
|
Hi,
看了你的数据,"data" 是一个 array<row> 的类型,所以 data 的schema定义需要改成 ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING))) 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json schema 了。 Best, Jark On Fri, 20 Mar 2020 at 11:34, 宇张 <[hidden email]> wrote: > hi: > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint > [image: image.png] > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug > > json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999} > > jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}} > connect: > > streamTableEnv > .connect( > new Kafka() > .version("0.11") > .topic("mysql_binlog_test_str") > .startFromEarliest() > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092") > ) > .withFormat( > new Json() > .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}") > ) > .withSchema( > new Schema() > .field("business", DataTypes.STRING()) > .field("data", DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW( > DataTypes.FIELD("tracking_number", DataTypes.STRING()), > DataTypes.FIELD("invoice_no", DataTypes.STRING()))))) > .field("database", DataTypes.STRING()) > .field("table", DataTypes.STRING()) > .field("ts", DataTypes.DECIMAL(38, 18)) > .field("type", DataTypes.STRING()) > .field("putRowNum", DataTypes.DECIMAL(38, 18)) > ) > .createTemporaryTable("Test"); > > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object. > > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > Caused by: java.lang.ClassCastException: > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode > cannot be cast to > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > ... 7 more > > > |
hi, 好的,我这面进行了尝试,将 data 的schema定义需要改成 ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留 .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY<ROW<`tracking_number` STRING, `invoice_no` STRING>> of table field 'data' does not match with the physical type ROW<`f0` ROW<`tracking_number` STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return type. 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢 On Fri, Mar 20, 2020 at 11:42 AM Jark Wu <[hidden email]> wrote: Hi, |
hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成 ARRAY(ROW(...))另外删除 .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过 On Fri, Mar 20, 2020 at 12:08 PM 宇张 <[hidden email]> wrote:
|
hi、 好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug On Fri, Mar 20, 2020 at 2:17 PM 宇张 <[hidden email]> wrote:
|
Administrator
|
Hi,
你发的图片都裂开了。。。 建议直接贴文本或者先上传到某个图床服务,然后将链接贴过来。 1. 使用 DECIMAL 抛什么错误呢? 2. 如果保留jsonSchema的话,要保证 table schema 和 json schema 是一致的,也就是不仅 table schema 要正确,json schema 也得要正确。 这其实多了很多额外的成本,所以一般建议不配置 jsonSchema。理论上 table schema 能映射出所有的复杂的格式。 Best, Jark On Fri, 20 Mar 2020 at 14:48, 宇张 <[hidden email]> wrote: > hi、 > 好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug > [image: image.png] > > On Fri, Mar 20, 2020 at 2:17 PM 宇张 <[hidden email]> wrote: > >> hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成 >> ARRAY(ROW(...)) >> 另外删除 >> .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过 >> [image: image.png] >> >> >> On Fri, Mar 20, 2020 at 12:08 PM 宇张 <[hidden email]> wrote: >> >>> hi, >>> 好的,我这面进行了尝试,将 data 的schema定义需要改成 >>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", >>> STRING))) >>> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留 >>> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main" >>> org.apache.flink.table.api.ValidationException: Type >>> ARRAY<ROW<`tracking_number` STRING, `invoice_no` STRING>> of table field >>> 'data' does not match with the physical type ROW<`f0` ROW<`tracking_number` >>> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return >>> type. >>> >>> 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢 >>> [image: image.png] >>> >>> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu <[hidden email]> wrote: >>> >>>> Hi, >>>> >>>> 看了你的数据,"data" 是一个 array<row> 的类型,所以 data 的schema定义需要改成 >>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", >>>> STRING))) >>>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 >>>> json >>>> schema 了。 >>>> >>>> Best, >>>> Jark >>>> >>>> On Fri, 20 Mar 2020 at 11:34, 宇张 <[hidden email]> wrote: >>>> >>>> > hi: >>>> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint >>>> > [image: image.png] >>>> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug >>>> > >>>> > >>>> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999} >>>> > >>>> > >>>> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}} >>>> > connect: >>>> > >>>> > streamTableEnv >>>> > .connect( >>>> > new Kafka() >>>> > .version("0.11") >>>> > .topic("mysql_binlog_test_str") >>>> > .startFromEarliest() >>>> > .property("zookeeper.connect", >>>> "localhost:2181") >>>> > .property("bootstrap.servers", >>>> "localhost:9092") >>>> > ) >>>> > .withFormat( >>>> > new Json() >>>> > >>>> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}") >>>> > ) >>>> > .withSchema( >>>> > new Schema() >>>> > .field("business", DataTypes.STRING()) >>>> > .field("data", >>>> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW( >>>> > DataTypes.FIELD("tracking_number", >>>> DataTypes.STRING()), >>>> > DataTypes.FIELD("invoice_no", >>>> DataTypes.STRING()))))) >>>> > .field("database", DataTypes.STRING()) >>>> > .field("table", DataTypes.STRING()) >>>> > .field("ts", DataTypes.DECIMAL(38, 18)) >>>> > .field("type", DataTypes.STRING()) >>>> > .field("putRowNum", DataTypes.DECIMAL(38, 18)) >>>> > ) >>>> > .createTemporaryTable("Test"); >>>> > >>>> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON >>>> object. >>>> > >>>> > at >>>> > >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) >>>> > at >>>> > >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) >>>> > at >>>> > >>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) >>>> > at >>>> > >>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) >>>> > at >>>> > >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >>>> > at >>>> > >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>> > at >>>> > >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>> > at >>>> > >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) >>>> > Caused by: java.lang.ClassCastException: >>>> > >>>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode >>>> > cannot be cast to >>>> > >>>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode >>>> > at >>>> > >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411) >>>> > at >>>> > >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>>> > at >>>> > >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >>>> > at >>>> > >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >>>> > at >>>> > >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>>> > at >>>> > >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >>>> > ... 7 more >>>> > >>>> > >>>> > >>>> >>> |
hi,
了解了,我重新整理一下: streamTableEnv .connect( new Kafka() .version("0.11") .topic("mysql_binlog_test") .startFromEarliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat( new Json() ) .withSchema( new Schema() .field("business", DataTypes.STRING()) .field("data", DataTypes.ARRAY( DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("vendor_id", DataTypes.DOUBLE()), DataTypes.FIELD("status", DataTypes.BIGINT()), DataTypes.FIELD("create_time", DataTypes.BIGINT()), DataTypes.FIELD("tracking_number", DataTypes.STRING()), DataTypes.FIELD("invoice_no", DataTypes.STRING()), DataTypes.FIELD("parent_id", DataTypes.BIGINT())))) .field("database", DataTypes.STRING()) .field("old", DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status", DataTypes.DECIMAL(38,18))))) .field("table", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("type", DataTypes.STRING()) .field("putRowNum", DataTypes.BIGINT()) ) .createTemporaryTable("Test"); 这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的; 异常: Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY<ROW<`logistics_status` DECIMAL(38, 18)>> of table field 'old' does not match with the physical type ARRAY<ROW<`logistics_status` LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157) at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) at com.akulaku.data.main.StreamMain.main(StreamMain.java:58) On Fri, Mar 20, 2020 at 4:43 PM Jark Wu <[hidden email]> wrote: > Hi, > > 你发的图片都裂开了。。。 建议直接贴文本或者先上传到某个图床服务,然后将链接贴过来。 > > 1. 使用 DECIMAL 抛什么错误呢? > 2. 如果保留jsonSchema的话,要保证 table schema 和 json schema 是一致的,也就是不仅 table schema > 要正确,json schema 也得要正确。 > 这其实多了很多额外的成本,所以一般建议不配置 jsonSchema。理论上 table schema 能映射出所有的复杂的格式。 > > Best, > Jark > > > On Fri, 20 Mar 2020 at 14:48, 宇张 <[hidden email]> wrote: > > > hi、 > > 好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug > > [image: image.png] > > > > On Fri, Mar 20, 2020 at 2:17 PM 宇张 <[hidden email]> wrote: > > > >> hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成 > >> ARRAY(ROW(...)) > >> 另外删除 > >> .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过 > >> [image: image.png] > >> > >> > >> On Fri, Mar 20, 2020 at 12:08 PM 宇张 <[hidden email]> wrote: > >> > >>> hi, > >>> 好的,我这面进行了尝试,将 data 的schema定义需要改成 > >>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", > >>> STRING))) > >>> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留 > >>> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main" > >>> org.apache.flink.table.api.ValidationException: Type > >>> ARRAY<ROW<`tracking_number` STRING, `invoice_no` STRING>> of table > field > >>> 'data' does not match with the physical type ROW<`f0` > ROW<`tracking_number` > >>> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource > return > >>> type. > >>> > >>> > 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢 > >>> [image: image.png] > >>> > >>> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu <[hidden email]> wrote: > >>> > >>>> Hi, > >>>> > >>>> 看了你的数据,"data" 是一个 array<row> 的类型,所以 data 的schema定义需要改成 > >>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", > >>>> STRING))) > >>>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 > >>>> json > >>>> schema 了。 > >>>> > >>>> Best, > >>>> Jark > >>>> > >>>> On Fri, 20 Mar 2020 at 11:34, 宇张 <[hidden email]> wrote: > >>>> > >>>> > hi: > >>>> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint > >>>> > [image: image.png] > >>>> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug > >>>> > > >>>> > > >>>> > json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999} > >>>> > > >>>> > > >>>> > jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}} > >>>> > connect: > >>>> > > >>>> > streamTableEnv > >>>> > .connect( > >>>> > new Kafka() > >>>> > .version("0.11") > >>>> > .topic("mysql_binlog_test_str") > >>>> > .startFromEarliest() > >>>> > .property("zookeeper.connect", > >>>> "localhost:2181") > >>>> > .property("bootstrap.servers", > >>>> "localhost:9092") > >>>> > ) > >>>> > .withFormat( > >>>> > new Json() > >>>> > > >>>> > .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}") > >>>> > ) > >>>> > .withSchema( > >>>> > new Schema() > >>>> > .field("business", DataTypes.STRING()) > >>>> > .field("data", > >>>> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW( > >>>> > DataTypes.FIELD("tracking_number", > >>>> DataTypes.STRING()), > >>>> > DataTypes.FIELD("invoice_no", > >>>> DataTypes.STRING()))))) > >>>> > .field("database", DataTypes.STRING()) > >>>> > .field("table", DataTypes.STRING()) > >>>> > .field("ts", DataTypes.DECIMAL(38, 18)) > >>>> > .field("type", DataTypes.STRING()) > >>>> > .field("putRowNum", DataTypes.DECIMAL(38, > 18)) > >>>> > ) > >>>> > .createTemporaryTable("Test"); > >>>> > > >>>> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON > >>>> object. > >>>> > > >>>> > at > >>>> > > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) > >>>> > at > >>>> > > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) > >>>> > at > >>>> > > >>>> > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) > >>>> > at > >>>> > > >>>> > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) > >>>> > at > >>>> > > >>>> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > >>>> > at > >>>> > > >>>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > >>>> > at > >>>> > > >>>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > >>>> > at > >>>> > > >>>> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > >>>> > Caused by: java.lang.ClassCastException: > >>>> > > >>>> > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode > >>>> > cannot be cast to > >>>> > > >>>> > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > >>>> > at > >>>> > > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411) > >>>> > at > >>>> > > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > >>>> > at > >>>> > > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > >>>> > at > >>>> > > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > >>>> > at > >>>> > > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > >>>> > at > >>>> > > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > >>>> > ... 7 more > >>>> > > >>>> > > >>>> > > >>>> > >>> > |
Hi 张宇
看起来是TypeMappingUtils中校验字段物理类型和逻辑类型的bug。 开了一个issue: https://issues.apache.org/jira/browse/FLINK-16800 *Best Regards,* *Zhenghua Gao* On Fri, Mar 20, 2020 at 5:28 PM 宇张 <[hidden email]> wrote: > hi, > 了解了,我重新整理一下: > streamTableEnv > .connect( > new Kafka() > .version("0.11") > .topic("mysql_binlog_test") > .startFromEarliest() > .property("zookeeper.connect", > "localhost:2181") > .property("bootstrap.servers", > "localhost:9092") > ) > .withFormat( > new Json() > ) > .withSchema( > new Schema() > .field("business", DataTypes.STRING()) > .field("data", DataTypes.ARRAY( > DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()), > DataTypes.FIELD("vendor_id", > DataTypes.DOUBLE()), > DataTypes.FIELD("status", > DataTypes.BIGINT()), > DataTypes.FIELD("create_time", > DataTypes.BIGINT()), > DataTypes.FIELD("tracking_number", > DataTypes.STRING()), > DataTypes.FIELD("invoice_no", > DataTypes.STRING()), > DataTypes.FIELD("parent_id", > DataTypes.BIGINT())))) > .field("database", DataTypes.STRING()) > .field("old", > DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status", > DataTypes.DECIMAL(38,18))))) > .field("table", DataTypes.STRING()) > .field("ts", DataTypes.BIGINT()) > .field("type", DataTypes.STRING()) > .field("putRowNum", DataTypes.BIGINT()) > ) > .createTemporaryTable("Test"); > 这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的; > 异常: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type ARRAY<ROW<`logistics_status` DECIMAL(38, 18)>> of table field 'old' > does not match with the physical type ARRAY<ROW<`logistics_status` > LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return > type. > at > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > at > > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277) > at > > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254) > at > > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157) > at > org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110) > at > > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > at > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > at > > org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > > org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214) > at > > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192) > at > > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at > > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) > at > > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) > at com.akulaku.data.main.StreamMain.main(StreamMain.java:58) > > > On Fri, Mar 20, 2020 at 4:43 PM Jark Wu <[hidden email]> wrote: > > > Hi, > > > > 你发的图片都裂开了。。。 建议直接贴文本或者先上传到某个图床服务,然后将链接贴过来。 > > > > 1. 使用 DECIMAL 抛什么错误呢? > > 2. 如果保留jsonSchema的话,要保证 table schema 和 json schema 是一致的,也就是不仅 table > schema > > 要正确,json schema 也得要正确。 > > 这其实多了很多额外的成本,所以一般建议不配置 jsonSchema。理论上 table schema 能映射出所有的复杂的格式。 > > > > Best, > > Jark > > > > > > On Fri, 20 Mar 2020 at 14:48, 宇张 <[hidden email]> wrote: > > > > > hi、 > > > 好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug > > > [image: image.png] > > > > > > On Fri, Mar 20, 2020 at 2:17 PM 宇张 <[hidden email]> wrote: > > > > > >> hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成 > > >> ARRAY(ROW(...)) > > >> 另外删除 > > >> > .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过 > > >> [image: image.png] > > >> > > >> > > >> On Fri, Mar 20, 2020 at 12:08 PM 宇张 <[hidden email]> wrote: > > >> > > >>> hi, > > >>> 好的,我这面进行了尝试,将 data 的schema定义需要改成 > > >>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", > > >>> STRING))) > > >>> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留 > > >>> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main" > > >>> org.apache.flink.table.api.ValidationException: Type > > >>> ARRAY<ROW<`tracking_number` STRING, `invoice_no` STRING>> of table > > field > > >>> 'data' does not match with the physical type ROW<`f0` > > ROW<`tracking_number` > > >>> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource > > return > > >>> type. > > >>> > > >>> > > > 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢 > > >>> [image: image.png] > > >>> > > >>> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu <[hidden email]> wrote: > > >>> > > >>>> Hi, > > >>>> > > >>>> 看了你的数据,"data" 是一个 array<row> 的类型,所以 data 的schema定义需要改成 > > >>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", > > >>>> STRING))) > > >>>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 > > >>>> json > > >>>> schema 了。 > > >>>> > > >>>> Best, > > >>>> Jark > > >>>> > > >>>> On Fri, 20 Mar 2020 at 11:34, 宇张 <[hidden email]> wrote: > > >>>> > > >>>> > hi: > > >>>> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint > > >>>> > [image: image.png] > > >>>> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug > > >>>> > > > >>>> > > > >>>> > > > json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999} > > >>>> > > > >>>> > > > >>>> > > > jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}} > > >>>> > connect: > > >>>> > > > >>>> > streamTableEnv > > >>>> > .connect( > > >>>> > new Kafka() > > >>>> > .version("0.11") > > >>>> > .topic("mysql_binlog_test_str") > > >>>> > .startFromEarliest() > > >>>> > .property("zookeeper.connect", > > >>>> "localhost:2181") > > >>>> > .property("bootstrap.servers", > > >>>> "localhost:9092") > > >>>> > ) > > >>>> > .withFormat( > > >>>> > new Json() > > >>>> > > > >>>> > > > .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}") > > >>>> > ) > > >>>> > .withSchema( > > >>>> > new Schema() > > >>>> > .field("business", DataTypes.STRING()) > > >>>> > .field("data", > > >>>> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW( > > >>>> > DataTypes.FIELD("tracking_number", > > >>>> DataTypes.STRING()), > > >>>> > DataTypes.FIELD("invoice_no", > > >>>> DataTypes.STRING()))))) > > >>>> > .field("database", DataTypes.STRING()) > > >>>> > .field("table", DataTypes.STRING()) > > >>>> > .field("ts", DataTypes.DECIMAL(38, 18)) > > >>>> > .field("type", DataTypes.STRING()) > > >>>> > .field("putRowNum", DataTypes.DECIMAL(38, > > 18)) > > >>>> > ) > > >>>> > .createTemporaryTable("Test"); > > >>>> > > > >>>> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON > > >>>> object. > > >>>> > > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > > >>>> > Caused by: java.lang.ClassCastException: > > >>>> > > > >>>> > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode > > >>>> > cannot be cast to > > >>>> > > > >>>> > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > > >>>> > at > > >>>> > > > >>>> > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > > >>>> > ... 7 more > > >>>> > > > >>>> > > > >>>> > > > >>>> > > >>> > > > |
Free forum by Nabble | Edit this page |