blink planner的org.apache.flink.table.api.ValidationException报错

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

blink planner的org.apache.flink.table.api.ValidationException报错

Kevin Liao
tEnv.connect(new Kafka()
        .version("universal")
        .topic("xxx")
        .startFromLatest()
        .property("bootstrap.servers",
            "xxxx")
        .property("group.id", "xxxx"))
        .withFormat(new Json().failOnMissingField(false).deriveSchema())
        .withSchema(new Schema()
//                .field("logger_name", Types.STRING)
//                .field("host", Types.STRING)
//            .field("@timestamp", Types.SQL_TIMESTAMP)
//                .field("_rowtime", Types.SQL_TIMESTAMP)
//                .rowtime(
//                    new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
                .field("doc", Types.POJO(Doc.class))
        )
        .inAppendMode()
        .registerTableSource("xxx");

    Table result = tEnv.sqlQuery(
        "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

//    result.printSchema();
    tEnv.toAppendStream(result,
        new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
            STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
STRING, STRING, STRING,
            STRING, LONG, STRING, INT, STRING, INT)).print();



以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


、、、

Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of
table field 'doc' does not match with type
PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field
'doc' of the TableSource return type.
        at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
        at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
        at com.sogou.qidian.BatchJob.main(BatchJob.java:83)

Execution failed for task ':BatchJob.main()'.
> Process 'command '/Users/liaoxu/.sdkman/candidates/java/current/bin/java'' finished with non-zero exit value 1


、、、


仔细比对了报错日志里两个 Doc类型是相同的


谢谢
Reply | Threaded
Open this post in threaded view
|

Re: blink planner的org.apache.flink.table.api.ValidationException报错

JingsongLee
Hi Kevin,

这是什么版本?
Doc类能完整提供下吗?方便我们复现。

Best,
Jingsong Lee


------------------------------------------------------------------
From:Kevin Liao <[hidden email]>
Send Time:2020年1月13日(星期一) 17:37
To:user-zh <[hidden email]>
Subject:blink planner的org.apache.flink.table.api.ValidationException报错

tEnv.connect(new Kafka()
        .version("universal")
        .topic("xxx")
        .startFromLatest()
        .property("bootstrap.servers",
            "xxxx")
        .property("group.id", "xxxx"))
        .withFormat(new Json().failOnMissingField(false).deriveSchema())
        .withSchema(new Schema()
//                .field("logger_name", Types.STRING)
//                .field("host", Types.STRING)
//            .field("@timestamp", Types.SQL_TIMESTAMP)
//                .field("_rowtime", Types.SQL_TIMESTAMP)
//                .rowtime(
//                    new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
                .field("doc", Types.POJO(Doc.class))
        )
        .inAppendMode()
        .registerTableSource("xxx");

    Table result = tEnv.sqlQuery(
        "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

//    result.printSchema();
    tEnv.toAppendStream(result,
        new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
            STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
STRING, STRING, STRING,
            STRING, LONG, STRING, INT, STRING, INT)).print();



以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


、、、

Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of
table field 'doc' does not match with type
PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field
'doc' of the TableSource return type.
 at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
 at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
 at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
 at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
 at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
 at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
 at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
 at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
 at com.sogou.qidian.BatchJob.main(BatchJob.java:83)

Execution failed for task ':BatchJob.main()'.
> Process 'command '/Users/liaoxu/.sdkman/candidates/java/current/bin/java'' finished with non-zero exit value 1


、、、


仔细比对了报错日志里两个 Doc类型是相同的


谢谢
Reply | Threaded
Open this post in threaded view
|

Re: blink planner的org.apache.flink.table.api.ValidationException报错

Kevin Liao
flink 版本是 1.9.1 release

Doc
完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
30 多个字段,我理解这跟字段数关系不大

```

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;

  ... // omit some, omit getters and setters

```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)

JingsongLee <[hidden email]> 于2020年1月14日周二 上午11:25写道:

> Hi Kevin,
>
> 这是什么版本?
> Doc类能完整提供下吗?方便我们复现。
>
> Best,
> Jingsong Lee
>
>
> ------------------------------------------------------------------
> From:Kevin Liao <[hidden email]>
> Send Time:2020年1月13日(星期一) 17:37
> To:user-zh <[hidden email]>
> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>
> tEnv.connect(new Kafka()
>         .version("universal")
>         .topic("xxx")
>         .startFromLatest()
>         .property("bootstrap.servers",
>             "xxxx")
>         .property("group.id", "xxxx"))
>         .withFormat(new Json().failOnMissingField(false).deriveSchema())
>         .withSchema(new Schema()
> //                .field("logger_name", Types.STRING)
> //                .field("host", Types.STRING)
> //            .field("@timestamp", Types.SQL_TIMESTAMP)
> //                .field("_rowtime", Types.SQL_TIMESTAMP)
> //                .rowtime(
> //                    new
>
> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
>                 .field("doc", Types.POJO(Doc.class))
>         )
>         .inAppendMode()
>         .registerTableSource("xxx");
>
>     Table result = tEnv.sqlQuery(
>         "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>
> //    result.printSchema();
>     tEnv.toAppendStream(result,
>         new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
> STRING, STRING, STRING,
>             STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
> STRING, STRING, STRING,
>             STRING, LONG, STRING, INT, STRING, INT)).print();
>
>
>
> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>
>
> 、、、
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of
> table field 'doc' does not match with type
> PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field
> 'doc' of the TableSource return type.
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>  at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
>  at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
>  at com.sogou.qidian.BatchJob.main(BatchJob.java:83)
>
> Execution failed for task ':BatchJob.main()'.
> > Process 'command
> '/Users/liaoxu/.sdkman/candidates/java/current/bin/java'' finished with
> non-zero exit value 1
>
>
> 、、、
>
>
> 仔细比对了报错日志里两个 Doc类型是相同的
>
>
> 谢谢
>
Reply | Threaded
Open this post in threaded view
|

Re: blink planner的org.apache.flink.table.api.ValidationException报错

JingsongLee
谢谢,
你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。

Best,
Jingsong Lee


------------------------------------------------------------------
From:Kevin Liao <[hidden email]>
Send Time:2020年1月14日(星期二) 11:38
To:user-zh <[hidden email]>; JingsongLee <[hidden email]>
Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错

flink 版本是 1.9.1 release

Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 30 多个字段,我理解这跟字段数关系不大

```
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;
  ... // omit some, omit getters and setters
```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
JingsongLee <[hidden email]> 于2020年1月14日周二 上午11:25写道:
Hi Kevin,

 这是什么版本?
 Doc类能完整提供下吗?方便我们复现。

 Best,
 Jingsong Lee


 ------------------------------------------------------------------
 From:Kevin Liao <[hidden email]>
 Send Time:2020年1月13日(星期一) 17:37
 To:user-zh <[hidden email]>
 Subject:blink planner的org.apache.flink.table.api.ValidationException报错

 tEnv.connect(new Kafka()
         .version("universal")
         .topic("xxx")
         .startFromLatest()
         .property("bootstrap.servers",
             "xxxx")
         .property("group.id", "xxxx"))
         .withFormat(new Json().failOnMissingField(false).deriveSchema())
         .withSchema(new Schema()
 //                .field("logger_name", Types.STRING)
 //                .field("host", Types.STRING)
 //            .field("@timestamp", Types.SQL_TIMESTAMP)
 //                .field("_rowtime", Types.SQL_TIMESTAMP)
 //                .rowtime(
 //                    new
 Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
                 .field("doc", Types.POJO(Doc.class))
         )
         .inAppendMode()
         .registerTableSource("xxx");

     Table result = tEnv.sqlQuery(
         "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

 //    result.printSchema();
     tEnv.toAppendStream(result,
         new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
 STRING, STRING, STRING,
             STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
 STRING, STRING, STRING,
             STRING, LONG, STRING, INT, STRING, INT)).print();



 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


 、、、

 Exception in thread "main"
 org.apache.flink.table.api.ValidationException: Type
 LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of
 table field 'doc' does not match with type
 PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field
 'doc' of the TableSource return type.
  at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
  at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
  at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
  at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
  at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
  at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
  at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
  at com.sogou.qidian.BatchJob.main(BatchJob.java:83)

 Execution failed for task ':BatchJob.main()'.
 > Process 'command '/Users/liaoxu/.sdkman/candidates/java/current/bin/java'' finished with non-zero exit value 1


 、、、


 仔细比对了报错日志里两个 Doc类型是相同的


 谢谢
Reply | Threaded
Open this post in threaded view
|

Re: blink planner的org.apache.flink.table.api.ValidationException报错

Kevin Liao
我用的是
https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
官网下载的

您说的 master 最新的版本我稍后试一下,谢谢

JingsongLee <[hidden email]> 于2020年1月14日周二 上午11:51写道:

> 谢谢,
> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。
>
> Best,
> Jingsong Lee
>
> ------------------------------------------------------------------
> From:Kevin Liao <[hidden email]>
> Send Time:2020年1月14日(星期二) 11:38
> To:user-zh <[hidden email]>; JingsongLee <
> [hidden email]>
> Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错
>
> flink 版本是 1.9.1 release
>
> Doc
> 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
> 30 多个字段,我理解这跟字段数关系不大
>
> ```
>
> import org.apache.commons.lang3.builder.ToStringBuilder;
> import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
>
> /**
>  * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
>  */
> @JsonIgnoreProperties(ignoreUnknown = true)
> public class Doc {
>
>   private String suv;
>   private Float factor = 1F;
>   private String st;
>   private String agentId;
>   private Long timestamp;
>
>   ... // omit some, omit getters and setters
>
> ```
>
> 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
>
> JingsongLee <[hidden email]> 于2020年1月14日周二 上午11:25写道:
> Hi Kevin,
>
> 这是什么版本?
> Doc类能完整提供下吗?方便我们复现。
>
> Best,
> Jingsong Lee
>
>
> ------------------------------------------------------------------
> From:Kevin Liao <[hidden email]>
> Send Time:2020年1月13日(星期一) 17:37
> To:user-zh <[hidden email]>
> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>
> tEnv.connect(new Kafka()
>         .version("universal")
>         .topic("xxx")
>         .startFromLatest()
>         .property("bootstrap.servers",
>             "xxxx")
>         .property("group.id", "xxxx"))
>         .withFormat(new Json().failOnMissingField(false).deriveSchema())
>         .withSchema(new Schema()
> //                .field("logger_name", Types.STRING)
> //                .field("host", Types.STRING)
> //            .field("@timestamp", Types.SQL_TIMESTAMP)
> //                .field("_rowtime", Types.SQL_TIMESTAMP)
> //                .rowtime(
> //                    new
>
> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
>                 .field("doc", Types.POJO(Doc.class))
>         )
>         .inAppendMode()
>         .registerTableSource("xxx");
>
>     Table result = tEnv.sqlQuery(
>         "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>
> //    result.printSchema();
>     tEnv.toAppendStream(result,
>         new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
> STRING, STRING, STRING,
>             STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
> STRING, STRING, STRING,
>             STRING, LONG, STRING, INT, STRING, INT)).print();
>
>
>
> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>
>
> 、、、
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of
> table field 'doc' does not match with type
> PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field
> 'doc' of the TableSource return type.
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>  at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
>  at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
>  at com.sogou.qidian.BatchJob.main(BatchJob.java:83)
>
> Execution failed for task ':BatchJob.main()'.
> > Process 'command
> '/Users/liaoxu/.sdkman/candidates/java/current/bin/java'' finished with
> non-zero exit value 1
>
>
> 、、、
>
>
> 仔细比对了报错日志里两个 Doc类型是相同的
>
>
> 谢谢
>
>
Reply | Threaded
Open this post in threaded view
|

Re: blink planner的org.apache.flink.table.api.ValidationException报错

Kevin Liao
改用最新 master 代码编译(打包后版本 1.11-SNAPSHOT)

将这段

.withSchema(new Schema()
//                .field("logger_name", Types.STRING)
//                .field("host", Types.STRING)
//            .field("@timestamp", Types.SQL_TIMESTAMP)
//                .field("_rowtime", Types.SQL_TIMESTAMP)
//                .rowtime(
//                    new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
                .field("doc", Types.POJO(Doc.class))
        )


改成使用 DataTypes 后可以跑通


Kevin Liao <[hidden email]> 于2020年1月14日周二 上午11:52写道:

> 我用的是
> https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
> 官网下载的
>
> 您说的 master 最新的版本我稍后试一下,谢谢
>
> JingsongLee <[hidden email]> 于2020年1月14日周二 上午11:51写道:
>
>> 谢谢,
>> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。
>>
>> Best,
>> Jingsong Lee
>>
>> ------------------------------------------------------------------
>> From:Kevin Liao <[hidden email]>
>> Send Time:2020年1月14日(星期二) 11:38
>> To:user-zh <[hidden email]>; JingsongLee <
>> [hidden email]>
>> Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错
>>
>> flink 版本是 1.9.1 release
>>
>> Doc
>> 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
>> 30 多个字段,我理解这跟字段数关系不大
>>
>> ```
>>
>> import org.apache.commons.lang3.builder.ToStringBuilder;
>> import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
>>
>> /**
>>  * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
>>  */
>> @JsonIgnoreProperties(ignoreUnknown = true)
>> public class Doc {
>>
>>   private String suv;
>>   private Float factor = 1F;
>>   private String st;
>>   private String agentId;
>>   private Long timestamp;
>>
>>   ... // omit some, omit getters and setters
>>
>> ```
>>
>> 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
>>
>> JingsongLee <[hidden email]> 于2020年1月14日周二 上午11:25写道:
>> Hi Kevin,
>>
>> 这是什么版本?
>> Doc类能完整提供下吗?方便我们复现。
>>
>> Best,
>> Jingsong Lee
>>
>>
>> ------------------------------------------------------------------
>> From:Kevin Liao <[hidden email]>
>> Send Time:2020年1月13日(星期一) 17:37
>> To:user-zh <[hidden email]>
>> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>>
>> tEnv.connect(new Kafka()
>>         .version("universal")
>>         .topic("xxx")
>>         .startFromLatest()
>>         .property("bootstrap.servers",
>>             "xxxx")
>>         .property("group.id", "xxxx"))
>>         .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>         .withSchema(new Schema()
>> //                .field("logger_name", Types.STRING)
>> //                .field("host", Types.STRING)
>> //            .field("@timestamp", Types.SQL_TIMESTAMP)
>> //                .field("_rowtime", Types.SQL_TIMESTAMP)
>> //                .rowtime(
>> //                    new
>>
>> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
>>                 .field("doc", Types.POJO(Doc.class))
>>         )
>>         .inAppendMode()
>>         .registerTableSource("xxx");
>>
>>     Table result = tEnv.sqlQuery(
>>         "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>>
>> //    result.printSchema();
>>     tEnv.toAppendStream(result,
>>         new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
>> STRING, STRING, STRING,
>>             STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
>> STRING, STRING, STRING,
>>             STRING, LONG, STRING, INT, STRING, INT)).print();
>>
>>
>>
>> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>>
>>
>> 、、、
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type
>> LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of
>> table field 'doc' does not match with type
>> PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field
>> 'doc' of the TableSource return type.
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>>  at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>>  at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
>>  at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
>>  at com.sogou.qidian.BatchJob.main(BatchJob.java:83)
>>
>> Execution failed for task ':BatchJob.main()'.
>> > Process 'command
>> '/Users/liaoxu/.sdkman/candidates/java/current/bin/java'' finished with
>> non-zero exit value 1
>>
>>
>> 、、、
>>
>>
>> 仔细比对了报错日志里两个 Doc类型是相同的
>>
>>
>> 谢谢
>>
>>