Login  Register

Re: blink planner的org.apache.flink.table.api.ValidationException报错

Posted by JingsongLee on Jan 14, 2020; 3:25am
URL: http://apache-flink.370.s1.nabble.com/blink-planner-org-apache-flink-table-api-ValidationException-tp1492p1502.html

Hi Kevin,

这是什么版本?
Doc类能完整提供下吗?方便我们复现。

Best,
Jingsong Lee


------------------------------------------------------------------
From:Kevin Liao <[hidden email]>
Send Time:2020年1月13日(星期一) 17:37
To:user-zh <[hidden email]>
Subject:blink planner的org.apache.flink.table.api.ValidationException报错

tEnv.connect(new Kafka()
        .version("universal")
        .topic("xxx")
        .startFromLatest()
        .property("bootstrap.servers",
            "xxxx")
        .property("group.id", "xxxx"))
        .withFormat(new Json().failOnMissingField(false).deriveSchema())
        .withSchema(new Schema()
//                .field("logger_name", Types.STRING)
//                .field("host", Types.STRING)
//            .field("@timestamp", Types.SQL_TIMESTAMP)
//                .field("_rowtime", Types.SQL_TIMESTAMP)
//                .rowtime(
//                    new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
                .field("doc", Types.POJO(Doc.class))
        )
        .inAppendMode()
        .registerTableSource("xxx");

    Table result = tEnv.sqlQuery(
        "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

//    result.printSchema();
    tEnv.toAppendStream(result,
        new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
            STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
STRING, STRING, STRING,
            STRING, LONG, STRING, INT, STRING, INT)).print();



以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


、、、

Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of
table field 'doc' does not match with type
PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field
'doc' of the TableSource return type.
 at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
 at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
 at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
 at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
 at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
 at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
 at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
 at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
 at com.sogou.qidian.BatchJob.main(BatchJob.java:83)

Execution failed for task ':BatchJob.main()'.
> Process 'command '/Users/liaoxu/.sdkman/candidates/java/current/bin/java'' finished with non-zero exit value 1


、、、


仔细比对了报错日志里两个 Doc类型是相同的


谢谢