hi, all
我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL yidun_score 字段也是定义的 numeric(5,2) 类型,结果会报异常。 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field types of query result and registered TableSink [Result] do not match. Query result schema: [user_new_id: Long, total_credit_score: Integer, total_order_count: Integer, loss_total_order_count: Integer, yidun_score: BigDecimal, is_delete: Boolean] TableSink schema: [user_new_id: Long, total_credit_score: Integer, total_order_count: Integer, loss_total_order_count: Integer, yidun_score: BigDecimal, is_delete: Boolean] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [Result] do not match. Query result schema: [user_new_id: Long, total_credit_score: Integer, total_order_count: Integer, loss_total_order_count: Integer, yidun_score: BigDecimal, is_delete: Boolean] TableSink schema: [user_new_id: Long, total_credit_score: Integer, total_order_count: Integer, loss_total_order_count: Integer, yidun_score: BigDecimal, is_delete: Boolean] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个 bug? |
Administrator
|
Hi zhisheng,
你用的是1.9吗? 试过 1.10.0 blink planner 么? On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> wrote: > hi, all > > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL yidun_score > 字段也是定义的 numeric(5,2) 类型,结果会报异常。 > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Field types of query result and registered TableSink > [Result] do not match. > Query result schema: [user_new_id: Long, total_credit_score: Integer, > total_order_count: Integer, loss_total_order_count: Integer, yidun_score: > BigDecimal, is_delete: Boolean] > TableSink schema: [user_new_id: Long, total_credit_score: Integer, > total_order_count: Integer, loss_total_order_count: Integer, yidun_score: > BigDecimal, is_delete: Boolean] > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > at > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > at > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.table.api.ValidationException: Field types of > query result and registered TableSink [Result] do not match. > Query result schema: [user_new_id: Long, total_credit_score: Integer, > total_order_count: Integer, loss_total_order_count: Integer, yidun_score: > BigDecimal, is_delete: Boolean] > TableSink schema: [user_new_id: Long, total_credit_score: Integer, > total_order_count: Integer, loss_total_order_count: Integer, yidun_score: > BigDecimal, is_delete: Boolean] > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) > at scala.Option.map(Option.scala:146) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个 > bug? > |
对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道: > Hi zhisheng, > > 你用的是1.9吗? 试过 1.10.0 blink planner 么? > > On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> wrote: > > > hi, all > > > > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL > yidun_score > > 字段也是定义的 numeric(5,2) 类型,结果会报异常。 > > > > org.apache.flink.client.program.ProgramInvocationException: The main > method > > caused an error: Field types of query result and registered TableSink > > [Result] do not match. > > Query result schema: [user_new_id: Long, total_credit_score: Integer, > > total_order_count: Integer, loss_total_order_count: Integer, yidun_score: > > BigDecimal, is_delete: Boolean] > > TableSink schema: [user_new_id: Long, total_credit_score: Integer, > > total_order_count: Integer, loss_total_order_count: Integer, yidun_score: > > BigDecimal, is_delete: Boolean] > > at > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) > > at > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > > at > > > > > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > > at > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > > at > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122) > > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227) > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > > at > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > > at > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > > at > > > > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > > Caused by: org.apache.flink.table.api.ValidationException: Field types of > > query result and registered TableSink [Result] do not match. > > Query result schema: [user_new_id: Long, total_credit_score: Integer, > > total_order_count: Integer, loss_total_order_count: Integer, yidun_score: > > BigDecimal, is_delete: Boolean] > > TableSink schema: [user_new_id: Long, total_credit_score: Integer, > > total_order_count: Integer, loss_total_order_count: Integer, yidun_score: > > BigDecimal, is_delete: Boolean] > > at > > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) > > at scala.Option.map(Option.scala:146) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > > > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个 > > bug? > > > |
hi, Jark
我刚使用 1.10.0 测试,报错异常如下: Exception in thread "main" org.apache.flink.table.api.ValidationException: Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of the TableSink consumed type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) at org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265) at org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254) at org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49) Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal type can only be mapped to DECIMAL(38, 18). ... 26 more 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验 http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说: Checks whether the given physical field type and logical field type are compatible at the edges of the table ecosystem. Types are still compatible if the physical type is a legacy decimal type (converted from Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to support legacy TypeInformation for TableSource and TableSink. 看起来像是在兼容旧的 TypeInformation zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道: > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner > > Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道: > >> Hi zhisheng, >> >> 你用的是1.9吗? 试过 1.10.0 blink planner 么? >> >> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> wrote: >> >> > hi, all >> > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL >> yidun_score >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。 >> > >> > org.apache.flink.client.program.ProgramInvocationException: The main >> method >> > caused an error: Field types of query result and registered TableSink >> > [Result] do not match. >> > Query result schema: [user_new_id: Long, total_credit_score: Integer, >> > total_order_count: Integer, loss_total_order_count: Integer, >> yidun_score: >> > BigDecimal, is_delete: Boolean] >> > TableSink schema: [user_new_id: Long, total_credit_score: Integer, >> > total_order_count: Integer, loss_total_order_count: Integer, >> yidun_score: >> > BigDecimal, is_delete: Boolean] >> > at >> > >> > >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) >> > at >> > >> > >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >> > at >> > >> > >> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >> > at >> > >> > >> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) >> > at >> > >> > >> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122) >> > at >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227) >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) >> > at >> > >> > >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) >> > at >> > >> > >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) >> > at java.security.AccessController.doPrivileged(Native Method) >> > at javax.security.auth.Subject.doAs(Subject.java:422) >> > at >> > >> > >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) >> > at >> > >> > >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) >> > Caused by: org.apache.flink.table.api.ValidationException: Field types >> of >> > query result and registered TableSink [Result] do not match. >> > Query result schema: [user_new_id: Long, total_credit_score: Integer, >> > total_order_count: Integer, loss_total_order_count: Integer, >> yidun_score: >> > BigDecimal, is_delete: Boolean] >> > TableSink schema: [user_new_id: Long, total_credit_score: Integer, >> > total_order_count: Integer, loss_total_order_count: Integer, >> yidun_score: >> > BigDecimal, is_delete: Boolean] >> > at >> > >> > >> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) >> > at >> > >> > >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) >> > at >> > >> > >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) >> > at scala.Option.map(Option.scala:146) >> > at >> > >> > >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) >> > at >> > >> > >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) >> > at >> > >> > >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) >> > at >> > >> > >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> > at >> > >> > >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个 >> > bug? >> > >> > |
Administrator
|
Hi zhisheng,
我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。 legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。 这是框架做的一个合法性校验。 Best, Jark On Thu, 19 Mar 2020 at 09:33, zhisheng <[hidden email]> wrote: > hi, Jark > > 我刚使用 1.10.0 测试,报错异常如下: > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of > the TableSink consumed type. > at > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > at > > org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265) > at > > org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254) > at > > org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102) > at > > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > at > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) > at scala.Option.map(Option.scala:146) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49) > Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal > type can only be mapped to DECIMAL(38, 18). > ... 26 more > > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。 > > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验 > > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png > > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说: > > Checks whether the given physical field type and logical field type are > compatible at the edges of the table ecosystem. Types are still compatible > if the physical type is a legacy decimal type (converted from > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to support > legacy TypeInformation for TableSource and TableSink. > > 看起来像是在兼容旧的 TypeInformation > > zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道: > > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner > > > > Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道: > > > >> Hi zhisheng, > >> > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么? > >> > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> wrote: > >> > >> > hi, all > >> > > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL > >> yidun_score > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。 > >> > > >> > org.apache.flink.client.program.ProgramInvocationException: The main > >> method > >> > caused an error: Field types of query result and registered TableSink > >> > [Result] do not match. > >> > Query result schema: [user_new_id: Long, total_credit_score: Integer, > >> > total_order_count: Integer, loss_total_order_count: Integer, > >> yidun_score: > >> > BigDecimal, is_delete: Boolean] > >> > TableSink schema: [user_new_id: Long, total_credit_score: Integer, > >> > total_order_count: Integer, loss_total_order_count: Integer, > >> yidun_score: > >> > BigDecimal, is_delete: Boolean] > >> > at > >> > > >> > > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) > >> > at > >> > > >> > > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > >> > at > >> > > >> > > >> > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > >> > at > >> > > >> > > >> > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > >> > at > >> > > >> > > >> > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122) > >> > at > >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227) > >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > >> > at > >> > > >> > > >> > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > >> > at > >> > > >> > > >> > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > >> > at java.security.AccessController.doPrivileged(Native Method) > >> > at javax.security.auth.Subject.doAs(Subject.java:422) > >> > at > >> > > >> > > >> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > >> > at > >> > > >> > > >> > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > >> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > >> > Caused by: org.apache.flink.table.api.ValidationException: Field types > >> of > >> > query result and registered TableSink [Result] do not match. > >> > Query result schema: [user_new_id: Long, total_credit_score: Integer, > >> > total_order_count: Integer, loss_total_order_count: Integer, > >> yidun_score: > >> > BigDecimal, is_delete: Boolean] > >> > TableSink schema: [user_new_id: Long, total_credit_score: Integer, > >> > total_order_count: Integer, loss_total_order_count: Integer, > >> yidun_score: > >> > BigDecimal, is_delete: Boolean] > >> > at > >> > > >> > > >> > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) > >> > at > >> > > >> > > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) > >> > at > >> > > >> > > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) > >> > at scala.Option.map(Option.scala:146) > >> > at > >> > > >> > > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) > >> > at > >> > > >> > > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > >> > at > >> > > >> > > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > >> > at > >> > > >> > > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> > at > >> > > >> > > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> > > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case > 是不是一个 > >> > bug? > >> > > >> > > > |
hi, Jark
我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下: String ddlSource = "CREATE TABLE test (\n" + " yidun_score numeric(5, 2)\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test',\n" + " 'connector.startup-mode' = 'latest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'format.type' = 'json'\n" + ")"; String ddlSink = "CREATE TABLE test_aggregate (\n" + " yidun_score numeric(5, 2)\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.driver' = 'org.postgresql.Driver',\n" + " 'connector.url' = 'jdbc:postgresql://localhost:3600/test',\n" + " 'connector.table' = 'test_aggregate', \n" + " 'connector.username' = 'admin', \n" + " 'connector.password' = '1234546',\n" + " 'connector.write.flush.max-rows' = '1' \n" + ")"; String sql = "insert into test_aggregate select yidun_score from test"; blinkStreamTableEnv.sqlUpdate(ddlSource); blinkStreamTableEnv.sqlUpdate(ddlSink); blinkStreamTableEnv.sqlUpdate(sql); 没有自定义过 TableSink Jark Wu <[hidden email]> 于2020年3月19日周四 上午9:43写道: > Hi zhisheng, > > 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。 > legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。 > 这是框架做的一个合法性校验。 > > Best, > Jark > > On Thu, 19 Mar 2020 at 09:33, zhisheng <[hidden email]> wrote: > > > hi, Jark > > > > 我刚使用 1.10.0 测试,报错异常如下: > > > > Exception in thread "main" > org.apache.flink.table.api.ValidationException: > > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the > > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of > > the TableSink consumed type. > > at > > > > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > > at > > > > > org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265) > > at > > > > > org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254) > > at > > > > > org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102) > > at > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > > at > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > > at > > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287) > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > > at > > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) > > at scala.Option.map(Option.scala:146) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > > at > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49) > > Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal > > type can only be mapped to DECIMAL(38, 18). > > ... 26 more > > > > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。 > > > > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验 > > > > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png > > > > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说: > > > > Checks whether the given physical field type and logical field type are > > compatible at the edges of the table ecosystem. Types are still > compatible > > if the physical type is a legacy decimal type (converted from > > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to > support > > legacy TypeInformation for TableSource and TableSink. > > > > 看起来像是在兼容旧的 TypeInformation > > > > zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道: > > > > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner > > > > > > Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道: > > > > > >> Hi zhisheng, > > >> > > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么? > > >> > > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> > wrote: > > >> > > >> > hi, all > > >> > > > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL > > >> yidun_score > > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。 > > >> > > > >> > org.apache.flink.client.program.ProgramInvocationException: The main > > >> method > > >> > caused an error: Field types of query result and registered > TableSink > > >> > [Result] do not match. > > >> > Query result schema: [user_new_id: Long, total_credit_score: > Integer, > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > >> yidun_score: > > >> > BigDecimal, is_delete: Boolean] > > >> > TableSink schema: [user_new_id: Long, total_credit_score: > Integer, > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > >> yidun_score: > > >> > BigDecimal, is_delete: Boolean] > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122) > > >> > at > > >> > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227) > > >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > > >> > at java.security.AccessController.doPrivileged(Native Method) > > >> > at javax.security.auth.Subject.doAs(Subject.java:422) > > >> > at > > >> > > > >> > > > >> > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > >> > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > > >> > Caused by: org.apache.flink.table.api.ValidationException: Field > types > > >> of > > >> > query result and registered TableSink [Result] do not match. > > >> > Query result schema: [user_new_id: Long, total_credit_score: > Integer, > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > >> yidun_score: > > >> > BigDecimal, is_delete: Boolean] > > >> > TableSink schema: [user_new_id: Long, total_credit_score: > Integer, > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > >> yidun_score: > > >> > BigDecimal, is_delete: Boolean] > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) > > >> > at scala.Option.map(Option.scala:146) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > > >> > at > > >> > > > >> > > > >> > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > > >> > at > > >> > > > >> > > > >> > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > >> > at > > >> > > > >> > > > >> > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > >> > > > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case > > 是不是一个 > > >> > bug? > > >> > > > >> > > > > > > |
Administrator
|
Hi zhisheng,
目前 kafka source & jdbc sink 都是用的 TypeInformation ,所以都只能声明成 38, 18 或者直接写 DECIMAL ,默认就是 38, 18。 这个问题会在升级到 new source/sink interface (FLIP-95)后有效解决。 Best, Jark On Thu, 19 Mar 2020 at 10:31, zhisheng <[hidden email]> wrote: > hi, Jark > > 我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下: > > String ddlSource = "CREATE TABLE test (\n" + > " yidun_score numeric(5, 2)\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka',\n" + > " 'connector.version' = '0.11',\n" + > " 'connector.topic' = 'test',\n" + > " 'connector.startup-mode' = 'latest-offset',\n" + > " 'connector.properties.zookeeper.connect' = > 'localhost:2181',\n" + > " 'connector.properties.bootstrap.servers' = > 'localhost:9092',\n" + > " 'format.type' = 'json'\n" + > ")"; > > String ddlSink = "CREATE TABLE test_aggregate (\n" + > " yidun_score numeric(5, 2)\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.driver' = 'org.postgresql.Driver',\n" + > " 'connector.url' = > 'jdbc:postgresql://localhost:3600/test',\n" + > " 'connector.table' = 'test_aggregate', \n" + > " 'connector.username' = 'admin', \n" + > " 'connector.password' = '1234546',\n" + > " 'connector.write.flush.max-rows' = '1' \n" + > ")"; > > String sql = "insert into test_aggregate select yidun_score from > test"; > > blinkStreamTableEnv.sqlUpdate(ddlSource); > blinkStreamTableEnv.sqlUpdate(ddlSink); > blinkStreamTableEnv.sqlUpdate(sql); > > 没有自定义过 TableSink > > > > Jark Wu <[hidden email]> 于2020年3月19日周四 上午9:43写道: > > > Hi zhisheng, > > > > 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。 > > legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。 > > 这是框架做的一个合法性校验。 > > > > Best, > > Jark > > > > On Thu, 19 Mar 2020 at 09:33, zhisheng <[hidden email]> wrote: > > > > > hi, Jark > > > > > > 我刚使用 1.10.0 测试,报错异常如下: > > > > > > Exception in thread "main" > > org.apache.flink.table.api.ValidationException: > > > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the > > > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field > of > > > the TableSink consumed type. > > > at > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > > > at > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265) > > > at > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254) > > > at > > > > > > > > > org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102) > > > at > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > > > at > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > > > at > > > > > > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287) > > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > > > at > > > > > > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280) > > > at > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) > > > at > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) > > > at scala.Option.map(Option.scala:146) > > > at > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) > > > at > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > > > at > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > > at > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > > > at > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > > at > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49) > > > Caused by: org.apache.flink.table.api.ValidationException: Legacy > decimal > > > type can only be mapped to DECIMAL(38, 18). > > > ... 26 more > > > > > > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。 > > > > > > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验 > > > > > > > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png > > > > > > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说: > > > > > > Checks whether the given physical field type and logical field type are > > > compatible at the edges of the table ecosystem. Types are still > > compatible > > > if the physical type is a legacy decimal type (converted from > > > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to > > support > > > legacy TypeInformation for TableSource and TableSink. > > > > > > 看起来像是在兼容旧的 TypeInformation > > > > > > zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道: > > > > > > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner > > > > > > > > Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道: > > > > > > > >> Hi zhisheng, > > > >> > > > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么? > > > >> > > > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> > > wrote: > > > >> > > > >> > hi, all > > > >> > > > > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL > > > >> yidun_score > > > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。 > > > >> > > > > >> > org.apache.flink.client.program.ProgramInvocationException: The > main > > > >> method > > > >> > caused an error: Field types of query result and registered > > TableSink > > > >> > [Result] do not match. > > > >> > Query result schema: [user_new_id: Long, total_credit_score: > > Integer, > > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > > >> yidun_score: > > > >> > BigDecimal, is_delete: Boolean] > > > >> > TableSink schema: [user_new_id: Long, total_credit_score: > > Integer, > > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > > >> yidun_score: > > > >> > BigDecimal, is_delete: Boolean] > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122) > > > >> > at > > > >> > > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227) > > > >> > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > > > >> > at java.security.AccessController.doPrivileged(Native Method) > > > >> > at javax.security.auth.Subject.doAs(Subject.java:422) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > >> > at > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > > > >> > Caused by: org.apache.flink.table.api.ValidationException: Field > > types > > > >> of > > > >> > query result and registered TableSink [Result] do not match. > > > >> > Query result schema: [user_new_id: Long, total_credit_score: > > Integer, > > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > > >> yidun_score: > > > >> > BigDecimal, is_delete: Boolean] > > > >> > TableSink schema: [user_new_id: Long, total_credit_score: > > Integer, > > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > > >> yidun_score: > > > >> > BigDecimal, is_delete: Boolean] > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) > > > >> > at scala.Option.map(Option.scala:146) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > > >> > > > > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case > > > 是不是一个 > > > >> > bug? > > > >> > > > > >> > > > > > > > > > > |
好的,了解了,多谢 Jark
Jark Wu <[hidden email]> 于2020年3月19日周四 上午10:39写道: > Hi zhisheng, > > 目前 kafka source & jdbc sink 都是用的 TypeInformation ,所以都只能声明成 38, 18 或者直接写 > DECIMAL ,默认就是 38, 18。 > 这个问题会在升级到 new source/sink interface (FLIP-95)后有效解决。 > > Best, > Jark > > On Thu, 19 Mar 2020 at 10:31, zhisheng <[hidden email]> wrote: > > > hi, Jark > > > > 我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下: > > > > String ddlSource = "CREATE TABLE test (\n" + > > " yidun_score numeric(5, 2)\n" + > > ") WITH (\n" + > > " 'connector.type' = 'kafka',\n" + > > " 'connector.version' = '0.11',\n" + > > " 'connector.topic' = 'test',\n" + > > " 'connector.startup-mode' = 'latest-offset',\n" + > > " 'connector.properties.zookeeper.connect' = > > 'localhost:2181',\n" + > > " 'connector.properties.bootstrap.servers' = > > 'localhost:9092',\n" + > > " 'format.type' = 'json'\n" + > > ")"; > > > > String ddlSink = "CREATE TABLE test_aggregate (\n" + > > " yidun_score numeric(5, 2)\n" + > > ") WITH (\n" + > > " 'connector.type' = 'jdbc',\n" + > > " 'connector.driver' = 'org.postgresql.Driver',\n" + > > " 'connector.url' = > > 'jdbc:postgresql://localhost:3600/test',\n" + > > " 'connector.table' = 'test_aggregate', \n" + > > " 'connector.username' = 'admin', \n" + > > " 'connector.password' = '1234546',\n" + > > " 'connector.write.flush.max-rows' = '1' \n" + > > ")"; > > > > String sql = "insert into test_aggregate select yidun_score from > > test"; > > > > blinkStreamTableEnv.sqlUpdate(ddlSource); > > blinkStreamTableEnv.sqlUpdate(ddlSink); > > blinkStreamTableEnv.sqlUpdate(sql); > > > > 没有自定义过 TableSink > > > > > > > > Jark Wu <[hidden email]> 于2020年3月19日周四 上午9:43写道: > > > > > Hi zhisheng, > > > > > > 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。 > > > legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。 > > > 这是框架做的一个合法性校验。 > > > > > > Best, > > > Jark > > > > > > On Thu, 19 Mar 2020 at 09:33, zhisheng <[hidden email]> wrote: > > > > > > > hi, Jark > > > > > > > > 我刚使用 1.10.0 测试,报错异常如下: > > > > > > > > Exception in thread "main" > > > org.apache.flink.table.api.ValidationException: > > > > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with > the > > > > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field > > of > > > > the TableSink consumed type. > > > > at > > > > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287) > > > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) > > > > at scala.Option.map(Option.scala:146) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > > > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49) > > > > Caused by: org.apache.flink.table.api.ValidationException: Legacy > > decimal > > > > type can only be mapped to DECIMAL(38, 18). > > > > ... 26 more > > > > > > > > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) > 类型,就不报错了。 > > > > > > > > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验 > > > > > > > > > > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png > > > > > > > > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说: > > > > > > > > Checks whether the given physical field type and logical field type > are > > > > compatible at the edges of the table ecosystem. Types are still > > > compatible > > > > if the physical type is a legacy decimal type (converted from > > > > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to > > > support > > > > legacy TypeInformation for TableSource and TableSink. > > > > > > > > 看起来像是在兼容旧的 TypeInformation > > > > > > > > zhisheng <[hidden email]> 于2020年3月19日周四 上午8:31写道: > > > > > > > > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner > > > > > > > > > > Jark Wu <[hidden email]> 于2020年3月18日周三 下午11:47写道: > > > > > > > > > >> Hi zhisheng, > > > > >> > > > > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么? > > > > >> > > > > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <[hidden email]> > > > wrote: > > > > >> > > > > >> > hi, all > > > > >> > > > > > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL > > > > >> yidun_score > > > > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。 > > > > >> > > > > > >> > org.apache.flink.client.program.ProgramInvocationException: The > > main > > > > >> method > > > > >> > caused an error: Field types of query result and registered > > > TableSink > > > > >> > [Result] do not match. > > > > >> > Query result schema: [user_new_id: Long, total_credit_score: > > > Integer, > > > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > > > >> yidun_score: > > > > >> > BigDecimal, is_delete: Boolean] > > > > >> > TableSink schema: [user_new_id: Long, total_credit_score: > > > Integer, > > > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > > > >> yidun_score: > > > > >> > BigDecimal, is_delete: Boolean] > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122) > > > > >> > at > > > > >> > > > > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227) > > > > >> > at > > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > > > > >> > at java.security.AccessController.doPrivileged(Native Method) > > > > >> > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > > >> > at > > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > > > > >> > Caused by: org.apache.flink.table.api.ValidationException: Field > > > types > > > > >> of > > > > >> > query result and registered TableSink [Result] do not match. > > > > >> > Query result schema: [user_new_id: Long, total_credit_score: > > > Integer, > > > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > > > >> yidun_score: > > > > >> > BigDecimal, is_delete: Boolean] > > > > >> > TableSink schema: [user_new_id: Long, total_credit_score: > > > Integer, > > > > >> > total_order_count: Integer, loss_total_order_count: Integer, > > > > >> yidun_score: > > > > >> > BigDecimal, is_delete: Boolean] > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) > > > > >> > at scala.Option.map(Option.scala:146) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > > >> > at > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > > > >> > > > > > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 > case > > > > 是不是一个 > > > > >> > bug? > > > > >> > > > > > >> > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |