Field types of query result and registered TableSink [Result] do not match

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Field types of query result and registered TableSink [Result] do not match

zhisheng
hi, all

我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL yidun_score
字段也是定义的 numeric(5,2) 类型,结果会报异常。

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Field types of query result and registered TableSink
[Result] do not match.
Query result schema: [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink [Result] do not match.
Query result schema: [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)

我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个 bug?
Reply | Threaded
Open this post in threaded view
|

Re: Field types of query result and registered TableSink [Result] do not match

Jark
Administrator
Hi zhisheng,

你用的是1.9吗? 试过 1.10.0 blink planner 么?

On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> wrote:

> hi, all
>
> 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL yidun_score
> 字段也是定义的 numeric(5,2) 类型,结果会报异常。
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Field types of query result and registered TableSink
> [Result] do not match.
> Query result schema: [user_new_id: Long, total_credit_score: Integer,
> total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> BigDecimal, is_delete: Boolean]
> TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> BigDecimal, is_delete: Boolean]
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
>
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> at
>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> at
>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink [Result] do not match.
> Query result schema: [user_new_id: Long, total_credit_score: Integer,
> total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> BigDecimal, is_delete: Boolean]
> TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> BigDecimal, is_delete: Boolean]
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> at scala.Option.map(Option.scala:146)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
> 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个
> bug?
>
Reply | Threaded
Open this post in threaded view
|

Re: Field types of query result and registered TableSink [Result] do not match

zhisheng
对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner

Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道:

> Hi zhisheng,
>
> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
>
> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> wrote:
>
> > hi, all
> >
> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> yidun_score
> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: Field types of query result and registered TableSink
> > [Result] do not match.
> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > at
> >
> >
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> > at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> > at
> >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > Caused by: org.apache.flink.table.api.ValidationException: Field types of
> > query result and registered TableSink [Result] do not match.
> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > at
> >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> > at scala.Option.map(Option.scala:146)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >
> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个
> > bug?
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Field types of query result and registered TableSink [Result] do not match

zhisheng
hi, Jark

我刚使用 1.10.0 测试,报错异常如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the
physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of
the TableSink consumed type.
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
at
org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
at
org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
at
org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
at
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
at
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal
type can only be mapped to DECIMAL(38, 18).
... 26 more

看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。

看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验

http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png

看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:

Checks whether the given physical field type and logical field type are
compatible at the edges of the table ecosystem. Types are still compatible
if the physical type is a legacy decimal type (converted from
Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to support
legacy TypeInformation for TableSource and TableSink.

看起来像是在兼容旧的 TypeInformation

zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道:

> 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
>
> Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道:
>
>> Hi zhisheng,
>>
>> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
>>
>> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> wrote:
>>
>> > hi, all
>> >
>> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
>> yidun_score
>> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> > caused an error: Field types of query result and registered TableSink
>> > [Result] do not match.
>> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> > at
>> >
>> >
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
>> > at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
>> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>> > at
>> >
>> >
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>> > at
>> >
>> >
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>> > at java.security.AccessController.doPrivileged(Native Method)
>> > at javax.security.auth.Subject.doAs(Subject.java:422)
>> > at
>> >
>> >
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>> > at
>> >
>> >
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> > Caused by: org.apache.flink.table.api.ValidationException: Field types
>> of
>> > query result and registered TableSink [Result] do not match.
>> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > at
>> >
>> >
>> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
>> > at scala.Option.map(Option.scala:146)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >
>> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个
>> > bug?
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Field types of query result and registered TableSink [Result] do not match

Jark
Administrator
Hi zhisheng,

我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。
legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。
这是框架做的一个合法性校验。

Best,
Jark

On Thu, 19 Mar 2020 at 09:33, zhisheng <[hidden email]> wrote:

> hi, Jark
>
> 我刚使用 1.10.0 测试,报错异常如下:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the
> physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of
> the TableSink consumed type.
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> at scala.Option.map(Option.scala:146)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
> Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal
> type can only be mapped to DECIMAL(38, 18).
> ... 26 more
>
> 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。
>
> 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验
>
> http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png
>
> 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:
>
> Checks whether the given physical field type and logical field type are
> compatible at the edges of the table ecosystem. Types are still compatible
> if the physical type is a legacy decimal type (converted from
> Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to support
> legacy TypeInformation for TableSource and TableSink.
>
> 看起来像是在兼容旧的 TypeInformation
>
> zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道:
>
> > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
> >
> > Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道:
> >
> >> Hi zhisheng,
> >>
> >> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
> >>
> >> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> wrote:
> >>
> >> > hi, all
> >> >
> >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> >> yidun_score
> >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> >> >
> >> > org.apache.flink.client.program.ProgramInvocationException: The main
> >> method
> >> > caused an error: Field types of query result and registered TableSink
> >> > [Result] do not match.
> >> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> >> > total_order_count: Integer, loss_total_order_count: Integer,
> >> yidun_score:
> >> > BigDecimal, is_delete: Boolean]
> >> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> >> > total_order_count: Integer, loss_total_order_count: Integer,
> >> yidun_score:
> >> > BigDecimal, is_delete: Boolean]
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> >> > at
> >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> >> > at java.security.AccessController.doPrivileged(Native Method)
> >> > at javax.security.auth.Subject.doAs(Subject.java:422)
> >> > at
> >> >
> >> >
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> >> > Caused by: org.apache.flink.table.api.ValidationException: Field types
> >> of
> >> > query result and registered TableSink [Result] do not match.
> >> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> >> > total_order_count: Integer, loss_total_order_count: Integer,
> >> yidun_score:
> >> > BigDecimal, is_delete: Boolean]
> >> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> >> > total_order_count: Integer, loss_total_order_count: Integer,
> >> yidun_score:
> >> > BigDecimal, is_delete: Boolean]
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> >> > at scala.Option.map(Option.scala:146)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> >> > at
> >> >
> >> >
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> > at
> >> >
> >> >
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >
> >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case
> 是不是一个
> >> > bug?
> >> >
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Field types of query result and registered TableSink [Result] do not match

zhisheng
hi, Jark

我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下:

String ddlSource = "CREATE TABLE test (\n" +
                "    yidun_score numeric(5, 2)\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = '0.11',\n" +
                "    'connector.topic' = 'test',\n" +
                "    'connector.startup-mode' = 'latest-offset',\n" +
                "    'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
                "    'format.type' = 'json'\n" +
                ")";

        String ddlSink = "CREATE TABLE test_aggregate (\n" +
                "    yidun_score numeric(5, 2)\n" +
                ") WITH (\n" +
                "    'connector.type' = 'jdbc',\n" +
                "    'connector.driver' = 'org.postgresql.Driver',\n" +
                "    'connector.url' =
'jdbc:postgresql://localhost:3600/test',\n" +
                "    'connector.table' = 'test_aggregate', \n" +
                "    'connector.username' = 'admin', \n" +
                "    'connector.password' = '1234546',\n" +
                "    'connector.write.flush.max-rows' = '1' \n" +
                ")";

        String sql = "insert into test_aggregate select yidun_score from
test";

        blinkStreamTableEnv.sqlUpdate(ddlSource);
        blinkStreamTableEnv.sqlUpdate(ddlSink);
        blinkStreamTableEnv.sqlUpdate(sql);

没有自定义过 TableSink



Jark Wu <[hidden email]> 于2020年3月19日周四 上午9:43写道:

> Hi zhisheng,
>
> 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。
> legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。
> 这是框架做的一个合法性校验。
>
> Best,
> Jark
>
> On Thu, 19 Mar 2020 at 09:33, zhisheng <[hidden email]> wrote:
>
> > hi, Jark
> >
> > 我刚使用 1.10.0 测试,报错异常如下:
> >
> > Exception in thread "main"
> org.apache.flink.table.api.ValidationException:
> > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the
> > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of
> > the TableSink consumed type.
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
> > at
> >
> >
> org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> > at
> >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
> > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> > at
> >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> > at scala.Option.map(Option.scala:146)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
> > Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal
> > type can only be mapped to DECIMAL(38, 18).
> > ... 26 more
> >
> > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。
> >
> > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验
> >
> > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png
> >
> > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:
> >
> > Checks whether the given physical field type and logical field type are
> > compatible at the edges of the table ecosystem. Types are still
> compatible
> > if the physical type is a legacy decimal type (converted from
> > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to
> support
> > legacy TypeInformation for TableSource and TableSink.
> >
> > 看起来像是在兼容旧的 TypeInformation
> >
> > zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道:
> >
> > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
> > >
> > > Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道:
> > >
> > >> Hi zhisheng,
> > >>
> > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
> > >>
> > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]>
> wrote:
> > >>
> > >> > hi, all
> > >> >
> > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> > >> yidun_score
> > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> > >> >
> > >> > org.apache.flink.client.program.ProgramInvocationException: The main
> > >> method
> > >> > caused an error: Field types of query result and registered
> TableSink
> > >> > [Result] do not match.
> > >> > Query result schema: [user_new_id: Long, total_credit_score:
> Integer,
> > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > >> yidun_score:
> > >> > BigDecimal, is_delete: Boolean]
> > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> Integer,
> > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > >> yidun_score:
> > >> > BigDecimal, is_delete: Boolean]
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> > >> > at
> > >>
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> > >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > >> > at java.security.AccessController.doPrivileged(Native Method)
> > >> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > >> > at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > >> > Caused by: org.apache.flink.table.api.ValidationException: Field
> types
> > >> of
> > >> > query result and registered TableSink [Result] do not match.
> > >> > Query result schema: [user_new_id: Long, total_credit_score:
> Integer,
> > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > >> yidun_score:
> > >> > BigDecimal, is_delete: Boolean]
> > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> Integer,
> > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > >> yidun_score:
> > >> > BigDecimal, is_delete: Boolean]
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> > >> > at scala.Option.map(Option.scala:146)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > >> >
> > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case
> > 是不是一个
> > >> > bug?
> > >> >
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Field types of query result and registered TableSink [Result] do not match

Jark
Administrator
Hi zhisheng,

目前 kafka source & jdbc sink 都是用的 TypeInformation ,所以都只能声明成 38, 18 或者直接写
DECIMAL ,默认就是 38, 18。
这个问题会在升级到 new source/sink interface (FLIP-95)后有效解决。

Best,
Jark

On Thu, 19 Mar 2020 at 10:31, zhisheng <[hidden email]> wrote:

> hi, Jark
>
> 我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下:
>
> String ddlSource = "CREATE TABLE test (\n" +
>                 "    yidun_score numeric(5, 2)\n" +
>                 ") WITH (\n" +
>                 "    'connector.type' = 'kafka',\n" +
>                 "    'connector.version' = '0.11',\n" +
>                 "    'connector.topic' = 'test',\n" +
>                 "    'connector.startup-mode' = 'latest-offset',\n" +
>                 "    'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
>                 "    'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
>                 "    'format.type' = 'json'\n" +
>                 ")";
>
>         String ddlSink = "CREATE TABLE test_aggregate (\n" +
>                 "    yidun_score numeric(5, 2)\n" +
>                 ") WITH (\n" +
>                 "    'connector.type' = 'jdbc',\n" +
>                 "    'connector.driver' = 'org.postgresql.Driver',\n" +
>                 "    'connector.url' =
> 'jdbc:postgresql://localhost:3600/test',\n" +
>                 "    'connector.table' = 'test_aggregate', \n" +
>                 "    'connector.username' = 'admin', \n" +
>                 "    'connector.password' = '1234546',\n" +
>                 "    'connector.write.flush.max-rows' = '1' \n" +
>                 ")";
>
>         String sql = "insert into test_aggregate select yidun_score from
> test";
>
>         blinkStreamTableEnv.sqlUpdate(ddlSource);
>         blinkStreamTableEnv.sqlUpdate(ddlSink);
>         blinkStreamTableEnv.sqlUpdate(sql);
>
> 没有自定义过 TableSink
>
>
>
> Jark Wu <[hidden email]> 于2020年3月19日周四 上午9:43写道:
>
> > Hi zhisheng,
> >
> > 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。
> > legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。
> > 这是框架做的一个合法性校验。
> >
> > Best,
> > Jark
> >
> > On Thu, 19 Mar 2020 at 09:33, zhisheng <[hidden email]> wrote:
> >
> > > hi, Jark
> > >
> > > 我刚使用 1.10.0 测试,报错异常如下:
> > >
> > > Exception in thread "main"
> > org.apache.flink.table.api.ValidationException:
> > > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the
> > > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field
> of
> > > the TableSink consumed type.
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
> > > at
> > >
> > >
> >
> org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
> > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> > > at scala.Option.map(Option.scala:146)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> > > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
> > > Caused by: org.apache.flink.table.api.ValidationException: Legacy
> decimal
> > > type can only be mapped to DECIMAL(38, 18).
> > > ... 26 more
> > >
> > > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。
> > >
> > > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验
> > >
> > >
> http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png
> > >
> > > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:
> > >
> > > Checks whether the given physical field type and logical field type are
> > > compatible at the edges of the table ecosystem. Types are still
> > compatible
> > > if the physical type is a legacy decimal type (converted from
> > > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to
> > support
> > > legacy TypeInformation for TableSource and TableSink.
> > >
> > > 看起来像是在兼容旧的 TypeInformation
> > >
> > > zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道:
> > >
> > > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
> > > >
> > > > Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道:
> > > >
> > > >> Hi zhisheng,
> > > >>
> > > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
> > > >>
> > > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]>
> > wrote:
> > > >>
> > > >> > hi, all
> > > >> >
> > > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> > > >> yidun_score
> > > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> > > >> >
> > > >> > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > >> method
> > > >> > caused an error: Field types of query result and registered
> > TableSink
> > > >> > [Result] do not match.
> > > >> > Query result schema: [user_new_id: Long, total_credit_score:
> > Integer,
> > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > >> yidun_score:
> > > >> > BigDecimal, is_delete: Boolean]
> > > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> > Integer,
> > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > >> yidun_score:
> > > >> > BigDecimal, is_delete: Boolean]
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> > > >> > at
> > > >>
> > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> > > >> > at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > > >> > at java.security.AccessController.doPrivileged(Native Method)
> > > >> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > >> > at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > > >> > Caused by: org.apache.flink.table.api.ValidationException: Field
> > types
> > > >> of
> > > >> > query result and registered TableSink [Result] do not match.
> > > >> > Query result schema: [user_new_id: Long, total_credit_score:
> > Integer,
> > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > >> yidun_score:
> > > >> > BigDecimal, is_delete: Boolean]
> > > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> > Integer,
> > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > >> yidun_score:
> > > >> > BigDecimal, is_delete: Boolean]
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> > > >> > at scala.Option.map(Option.scala:146)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > >> >
> > > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case
> > > 是不是一个
> > > >> > bug?
> > > >> >
> > > >>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Field types of query result and registered TableSink [Result] do not match

zhisheng
好的,了解了,多谢 Jark

Jark Wu <[hidden email]> 于2020年3月19日周四 上午10:39写道:

> Hi zhisheng,
>
> 目前 kafka source & jdbc sink 都是用的 TypeInformation ,所以都只能声明成 38, 18 或者直接写
> DECIMAL ,默认就是 38, 18。
> 这个问题会在升级到 new source/sink interface (FLIP-95)后有效解决。
>
> Best,
> Jark
>
> On Thu, 19 Mar 2020 at 10:31, zhisheng <[hidden email]> wrote:
>
> > hi, Jark
> >
> > 我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下:
> >
> > String ddlSource = "CREATE TABLE test (\n" +
> >                 "    yidun_score numeric(5, 2)\n" +
> >                 ") WITH (\n" +
> >                 "    'connector.type' = 'kafka',\n" +
> >                 "    'connector.version' = '0.11',\n" +
> >                 "    'connector.topic' = 'test',\n" +
> >                 "    'connector.startup-mode' = 'latest-offset',\n" +
> >                 "    'connector.properties.zookeeper.connect' =
> > 'localhost:2181',\n" +
> >                 "    'connector.properties.bootstrap.servers' =
> > 'localhost:9092',\n" +
> >                 "    'format.type' = 'json'\n" +
> >                 ")";
> >
> >         String ddlSink = "CREATE TABLE test_aggregate (\n" +
> >                 "    yidun_score numeric(5, 2)\n" +
> >                 ") WITH (\n" +
> >                 "    'connector.type' = 'jdbc',\n" +
> >                 "    'connector.driver' = 'org.postgresql.Driver',\n" +
> >                 "    'connector.url' =
> > 'jdbc:postgresql://localhost:3600/test',\n" +
> >                 "    'connector.table' = 'test_aggregate', \n" +
> >                 "    'connector.username' = 'admin', \n" +
> >                 "    'connector.password' = '1234546',\n" +
> >                 "    'connector.write.flush.max-rows' = '1' \n" +
> >                 ")";
> >
> >         String sql = "insert into test_aggregate select yidun_score from
> > test";
> >
> >         blinkStreamTableEnv.sqlUpdate(ddlSource);
> >         blinkStreamTableEnv.sqlUpdate(ddlSink);
> >         blinkStreamTableEnv.sqlUpdate(sql);
> >
> > 没有自定义过 TableSink
> >
> >
> >
> > Jark Wu <[hidden email]> 于2020年3月19日周四 上午9:43写道:
> >
> > > Hi zhisheng,
> > >
> > > 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。
> > > legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。
> > > 这是框架做的一个合法性校验。
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 19 Mar 2020 at 09:33, zhisheng <[hidden email]> wrote:
> > >
> > > > hi, Jark
> > > >
> > > > 我刚使用 1.10.0 测试,报错异常如下:
> > > >
> > > > Exception in thread "main"
> > > org.apache.flink.table.api.ValidationException:
> > > > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with
> the
> > > > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field
> > of
> > > > the TableSink consumed type.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
> > > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> > > > at scala.Option.map(Option.scala:146)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> > > > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
> > > > Caused by: org.apache.flink.table.api.ValidationException: Legacy
> > decimal
> > > > type can only be mapped to DECIMAL(38, 18).
> > > > ... 26 more
> > > >
> > > > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18)
> 类型,就不报错了。
> > > >
> > > > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验
> > > >
> > > >
> > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png
> > > >
> > > > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:
> > > >
> > > > Checks whether the given physical field type and logical field type
> are
> > > > compatible at the edges of the table ecosystem. Types are still
> > > compatible
> > > > if the physical type is a legacy decimal type (converted from
> > > > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to
> > > support
> > > > legacy TypeInformation for TableSource and TableSink.
> > > >
> > > > 看起来像是在兼容旧的 TypeInformation
> > > >
> > > > zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道:
> > > >
> > > > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
> > > > >
> > > > > Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道:
> > > > >
> > > > >> Hi zhisheng,
> > > > >>
> > > > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
> > > > >>
> > > > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]>
> > > wrote:
> > > > >>
> > > > >> > hi, all
> > > > >> >
> > > > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> > > > >> yidun_score
> > > > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> > > > >> >
> > > > >> > org.apache.flink.client.program.ProgramInvocationException: The
> > main
> > > > >> method
> > > > >> > caused an error: Field types of query result and registered
> > > TableSink
> > > > >> > [Result] do not match.
> > > > >> > Query result schema: [user_new_id: Long, total_credit_score:
> > > Integer,
> > > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > > >> yidun_score:
> > > > >> > BigDecimal, is_delete: Boolean]
> > > > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> > > Integer,
> > > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > > >> yidun_score:
> > > > >> > BigDecimal, is_delete: Boolean]
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> > > > >> > at
> > > > >>
> > >
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> > > > >> > at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > > > >> > at java.security.AccessController.doPrivileged(Native Method)
> > > > >> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > >> > at
> > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > > > >> > Caused by: org.apache.flink.table.api.ValidationException: Field
> > > types
> > > > >> of
> > > > >> > query result and registered TableSink [Result] do not match.
> > > > >> > Query result schema: [user_new_id: Long, total_credit_score:
> > > Integer,
> > > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > > >> yidun_score:
> > > > >> > BigDecimal, is_delete: Boolean]
> > > > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> > > Integer,
> > > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > > >> yidun_score:
> > > > >> > BigDecimal, is_delete: Boolean]
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> > > > >> > at scala.Option.map(Option.scala:146)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > > >> >
> > > > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个
> case
> > > > 是不是一个
> > > > >> > bug?
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>