hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。
table.printSchema(); streamTableEnv.toRetractStream(table, Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); root |-- register_id: BIGINT |-- asi_uid: BIGINT |-- person_uuid: BIGINT |-- app_id: BIGINT |-- country_id: BIGINT |-- channel_id: STRING |-- device_id: STRING |-- adjust_id: STRING |-- google_adid: STRING |-- referrer: BIGINT |-- login_pwd: STRING |-- sync_data_flag: INT |-- register_phone_number: STRING |-- device_type: INT |-- imei: STRING |-- device_model: STRING |-- os_version: STRING |-- app_name: STRING |-- app_version: STRING |-- app_package_name: STRING |-- network_type: STRING |-- wifi_mac: STRING |-- longitude: DECIMAL(38, 18) |-- latitude: DECIMAL(38, 18) |-- geo_hash7: STRING |-- ip: STRING |-- register_time: BIGINT |-- etl_time: BIGINT NOT NULL org.apache.flink.table.api.TableException: BIGINT and VARCHAR(2147483647) does not have common type now |
Hi Dream,
可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。 Best, Hailong Wang 在 2020-10-19 09:50:33,"Dream-底限" <[hidden email]> 写道: >hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。 > >table.printSchema(); >streamTableEnv.toRetractStream(table, >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); > > > >root > |-- register_id: BIGINT > |-- asi_uid: BIGINT > |-- person_uuid: BIGINT > |-- app_id: BIGINT > |-- country_id: BIGINT > |-- channel_id: STRING > |-- device_id: STRING > |-- adjust_id: STRING > |-- google_adid: STRING > |-- referrer: BIGINT > |-- login_pwd: STRING > |-- sync_data_flag: INT > |-- register_phone_number: STRING > |-- device_type: INT > |-- imei: STRING > |-- device_model: STRING > |-- os_version: STRING > |-- app_name: STRING > |-- app_version: STRING > |-- app_package_name: STRING > |-- network_type: STRING > |-- wifi_mac: STRING > |-- longitude: DECIMAL(38, 18) > |-- latitude: DECIMAL(38, 18) > |-- geo_hash7: STRING > |-- ip: STRING > |-- register_time: BIGINT > |-- etl_time: BIGINT NOT NULL > > >org.apache.flink.table.api.TableException: BIGINT and >VARCHAR(2147483647) does not have common type now |
hi、
我查看了一下,join条件类型是一样的,我这面下游sink使用jdbc时候是可以运行的,但是转换为datastream时候失败了,下面是程序及异常: streamTableEnv.executeSql(kafkaDDL);//ddl语句见下面日志 Table table = streamTableEnv.sqlQuery("SELECT cast(t1.id as bigint) as register_id,cast(t1.uid as bigint) as asi_uid,cast(null as bigint) as person_uuid,cast(t1.app_id as bigint) as app_id,cast(t1.country_id as bigint) as country_id,t2.channel_id as channel_id,t2.device_id as device_id,t2.adjust_id as adjust_id,t2.google_adid as google_adid,cast(t3.referrer as bigint) as referrer,t3.login_pwd as login_pwd,cast(t1.sync_data_flag as int) as sync_data_flag,t3.phone_number as register_phone_number,cast(t2.device_type as int) as device_type,t2.imei as imei,t2.device_model as device_model,t2.os_version as os_version,t2.app_name as app_name,t2.app_version as app_version,t2.app_package_name as app_package_name,cast(t2.network_type as string) as network_type,t2.wifi_mac as wifi_mac,t2.lgt as longitude,t2.lat as latitude,cast(null as string) as geo_hash7,t2.ip as ip,unix_timestamp(t1.create_time,'yyyy-MM-dd HH:mm:ss') as register_time,UNIX_TIMESTAMP() as etl_time from (SELECT `uid`,`update_time`,`adjust_id`,`create_time`,`source_type`,`sync_data_flag`,`id`,`app_id`,`country_id` FROM (SELECT `rowData`.`uid`,`rowData`.`update_time`,`rowData`.`adjust_id`,`rowData`.`create_time`,`rowData`.`source_type`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`app_id`,`rowData`.`country_id`,`binlogTime`,ROW_NUMBER() OVER (PARTITION BY `rowData`.`uid`,`rowData`.`id` ORDER BY `binlogTime` desc) AS rownum FROM asi_user_user_service_t_user_register_source) WHERE rownum = 1) t1 left join (SELECT `refer_id`,`device_id`,`channel_id`,`device_type`,`imei`,`adjust_id`,`google_adid`,`device_model`,`os_version`,`app_name`,`app_package_name`,`app_version`,`ip`,`network_type`,`wifi_mac`,`lgt`,`lat`,`event_id`,`country_id`,`uid`,`create_time`,`api_version` FROM (SELECT `data`.`data`.`refer_id`,`data`.`data`.`device_id`,`data`.`data`.`channel_id`,`data`.`data`.`device_type`,`data`.`data`.`imei`,`data`.`data`.`adjust_id`,`data`.`data`.`google_adid`,`data`.`data`.`device_model`,`data`.`data`.`os_version`,`data`.`data`.`app_name`,`data`.`data`.`app_package_name`,`data`.`data`.`app_version`,`data`.`data`.`ip`,`data`.`data`.`network_type`,`data`.`data`.`wifi_mac`,`data`.`data`.`lgt`,`data`.`data`.`lat`,`data`.`event_id`,`data`.`country_id`,`data`.`uid`,`data`.`create_time`,`data`.`api_version`,ROW_NUMBER() OVER (PARTITION BY `data`.`data`.`refer_id`,`data`.`event_id` ORDER BY createTime desc) AS rownum FROM eventDeviceInfo where `data`.`event_id`=1002) WHERE rownum = 1) t2 on t1.uid = t2.refer_id left join (SELECT `register_channel_source`,`last_login_time`,`create_time`,`language`,`avatar`,`login_pwd`,`email_status`,`storage_source`,`uid`,`referrer`,`update_time`,`nickname`,`phone_number`,`sync_data_flag`,`id`,`country_id`,`email`,`status` FROM (SELECT `rowData`.`register_channel_source`,`rowData`.`last_login_time`,`rowData`.`create_time`,`rowData`.`language`,`rowData`.`avatar`,`rowData`.`login_pwd`,`rowData`.`email_status`,`rowData`.`storage_source`,`rowData`.`uid`,`rowData`.`referrer`,`rowData`.`update_time`,`rowData`.`nickname`,`rowData`.`phone_number`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`country_id`,`rowData`.`email`,`rowData`.`status`,`binlogTime`,ROW_NUMBER() OVER (PARTITION BY `rowData`.`uid` ORDER BY `binlogTime` desc) AS rownum FROM asi_user_user_service_t_user) WHERE rownum = 1) t3 on t1.uid=t3.uid"); table.printSchema(); streamTableEnv.toRetractStream(table, Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); streamExecEnv.execute("kafka-json-test"); CREATE TABLE eventDeviceInfo (`data` ROW<`event_id` BIGINT,`country_id` BIGINT,`uid` BIGINT,`create_time` BIGINT,`data` ROW<`refer_id` STRING,`device_id` STRING,`channel_id` STRING,`device_type` BIGINT,`imei` STRING,`adjust_id` STRING,`google_adid` STRING,`device_model` STRING,`os_version` STRING,`app_name` STRING,`app_package_name` STRING,`app_version` STRING,`ip` STRING,`network_type` BIGINT,`wifi_mac` STRING,`lgt` DECIMAL(38,18),`lat` DECIMAL(38,18)>,`api_version` STRING>,`createTime` BIGINT) WITH ('connector' = 'kafka-0.11','topic' = 'eventDeviceInfo','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'test','properties.max.poll.records' = '1000','properties.flink.poll-timeout' = '10','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true') CREATE TABLE asi_user_user_service_t_user (`binlogTime` BIGINT,`rowData` ROW<`register_channel_source` STRING,`last_login_time` STRING,`create_time` STRING,`language` STRING,`avatar` STRING,`login_pwd` STRING,`email_status` STRING,`storage_source` STRING,`uid` STRING,`referrer` STRING,`update_time` STRING,`nickname` STRING,`phone_number` STRING,`sync_data_flag` STRING,`id` STRING,`country_id` STRING,`email` STRING,`status` STRING>) WITH ('connector' = 'kafka-0.11','topic' = 'asi_user_user_service_t_user','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'test','properties.max.poll.records' = '1000','properties.flink.poll-timeout' = '10','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true') CREATE TABLE asi_user_user_service_t_user_register_source (`binlogTime` BIGINT,`rowData` ROW<`uid` BIGINT,`update_time` STRING,`adjust_id` STRING,`create_time` STRING,`source_type` STRING,`sync_data_flag` STRING,`id` STRING,`app_id` STRING,`country_id` BIGINT>) WITH ('connector' = 'kafka-0.11','topic' = 'asi_user_user_service_t_user_register_source','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'test','properties.max.poll.records' = '1000','properties.flink.poll-timeout' = '10','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true') root |-- register_id: BIGINT |-- asi_uid: BIGINT |-- person_uuid: BIGINT |-- app_id: BIGINT |-- country_id: BIGINT |-- channel_id: STRING |-- device_id: STRING |-- adjust_id: STRING |-- google_adid: STRING |-- referrer: BIGINT |-- login_pwd: STRING |-- sync_data_flag: INT |-- register_phone_number: STRING |-- device_type: INT |-- imei: STRING |-- device_model: STRING |-- os_version: STRING |-- app_name: STRING |-- app_version: STRING |-- app_package_name: STRING |-- network_type: STRING |-- wifi_mac: STRING |-- longitude: DECIMAL(38, 18) |-- latitude: DECIMAL(38, 18) |-- geo_hash7: STRING |-- ip: STRING |-- register_time: BIGINT |-- etl_time: BIGINT NOT NULL org.apache.flink.table.api.TableException: BIGINT and VARCHAR(2147483647) does not have common type now at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:76) at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:65) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule.onMatch(JoinConditionTypeCoerceRule.scala:65) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307) at com.akulaku.blade.source.BladeKafkaJoinCodeTest.eventDeviceInfo(BladeKafkaJoinCodeTest.java:265) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) Process finished with exit code 255 hailongwang <[hidden email]> 于2020年10月19日周一 下午8:35写道: > Hi Dream, > 可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。 > Best, > Hailong Wang > > 在 2020-10-19 09:50:33,"Dream-底限" <[hidden email]> 写道: > >hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。 > > > >table.printSchema(); > >streamTableEnv.toRetractStream(table, > > >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); > > > > > > > >root > > |-- register_id: BIGINT > > |-- asi_uid: BIGINT > > |-- person_uuid: BIGINT > > |-- app_id: BIGINT > > |-- country_id: BIGINT > > |-- channel_id: STRING > > |-- device_id: STRING > > |-- adjust_id: STRING > > |-- google_adid: STRING > > |-- referrer: BIGINT > > |-- login_pwd: STRING > > |-- sync_data_flag: INT > > |-- register_phone_number: STRING > > |-- device_type: INT > > |-- imei: STRING > > |-- device_model: STRING > > |-- os_version: STRING > > |-- app_name: STRING > > |-- app_version: STRING > > |-- app_package_name: STRING > > |-- network_type: STRING > > |-- wifi_mac: STRING > > |-- longitude: DECIMAL(38, 18) > > |-- latitude: DECIMAL(38, 18) > > |-- geo_hash7: STRING > > |-- ip: STRING > > |-- register_time: BIGINT > > |-- etl_time: BIGINT NOT NULL > > > > > >org.apache.flink.table.api.TableException: BIGINT and > >VARCHAR(2147483647) does not have common type now > |
Hi,
我看其中一个 condition 是 `t1.uid = t2.refer_id` 其中 uid 是 bigint 类型,refer_id 是 varchar 类型。 你再确认下? Best, Hailong Wang At 2020-10-20 08:55:34, "Dream-底限" <[hidden email]> wrote: >hi、 >我查看了一下,join条件类型是一样的,我这面下游sink使用jdbc时候是可以运行的,但是转换为datastream时候失败了,下面是程序及异常: > >streamTableEnv.executeSql(kafkaDDL);//ddl语句见下面日志 > > >Table table = streamTableEnv.sqlQuery("SELECT cast(t1.id as bigint) as >register_id,cast(t1.uid as bigint) as asi_uid,cast(null as bigint) as >person_uuid,cast(t1.app_id as bigint) as app_id,cast(t1.country_id as >bigint) as country_id,t2.channel_id as channel_id,t2.device_id as >device_id,t2.adjust_id as adjust_id,t2.google_adid as >google_adid,cast(t3.referrer as bigint) as referrer,t3.login_pwd as >login_pwd,cast(t1.sync_data_flag as int) as >sync_data_flag,t3.phone_number as >register_phone_number,cast(t2.device_type as int) as >device_type,t2.imei as imei,t2.device_model as >device_model,t2.os_version as os_version,t2.app_name as >app_name,t2.app_version as app_version,t2.app_package_name as >app_package_name,cast(t2.network_type as string) as >network_type,t2.wifi_mac as wifi_mac,t2.lgt as longitude,t2.lat as >latitude,cast(null as string) as geo_hash7,t2.ip as >ip,unix_timestamp(t1.create_time,'yyyy-MM-dd HH:mm:ss') as >register_time,UNIX_TIMESTAMP() as etl_time from (SELECT >`uid`,`update_time`,`adjust_id`,`create_time`,`source_type`,`sync_data_flag`,`id`,`app_id`,`country_id` >FROM (SELECT `rowData`.`uid`,`rowData`.`update_time`,`rowData`.`adjust_id`,`rowData`.`create_time`,`rowData`.`source_type`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`app_id`,`rowData`.`country_id`,`binlogTime`,ROW_NUMBER() >OVER (PARTITION BY `rowData`.`uid`,`rowData`.`id` ORDER BY >`binlogTime` desc) AS rownum FROM >asi_user_user_service_t_user_register_source) WHERE rownum = 1) t1 >left join (SELECT >`refer_id`,`device_id`,`channel_id`,`device_type`,`imei`,`adjust_id`,`google_adid`,`device_model`,`os_version`,`app_name`,`app_package_name`,`app_version`,`ip`,`network_type`,`wifi_mac`,`lgt`,`lat`,`event_id`,`country_id`,`uid`,`create_time`,`api_version` >FROM (SELECT `data`.`data`.`refer_id`,`data`.`data`.`device_id`,`data`.`data`.`channel_id`,`data`.`data`.`device_type`,`data`.`data`.`imei`,`data`.`data`.`adjust_id`,`data`.`data`.`google_adid`,`data`.`data`.`device_model`,`data`.`data`.`os_version`,`data`.`data`.`app_name`,`data`.`data`.`app_package_name`,`data`.`data`.`app_version`,`data`.`data`.`ip`,`data`.`data`.`network_type`,`data`.`data`.`wifi_mac`,`data`.`data`.`lgt`,`data`.`data`.`lat`,`data`.`event_id`,`data`.`country_id`,`data`.`uid`,`data`.`create_time`,`data`.`api_version`,ROW_NUMBER() >OVER (PARTITION BY `data`.`data`.`refer_id`,`data`.`event_id` ORDER BY >createTime desc) AS rownum FROM eventDeviceInfo where >`data`.`event_id`=1002) WHERE rownum = 1) t2 on t1.uid = t2.refer_id >left join (SELECT >`register_channel_source`,`last_login_time`,`create_time`,`language`,`avatar`,`login_pwd`,`email_status`,`storage_source`,`uid`,`referrer`,`update_time`,`nickname`,`phone_number`,`sync_data_flag`,`id`,`country_id`,`email`,`status` >FROM (SELECT `rowData`.`register_channel_source`,`rowData`.`last_login_time`,`rowData`.`create_time`,`rowData`.`language`,`rowData`.`avatar`,`rowData`.`login_pwd`,`rowData`.`email_status`,`rowData`.`storage_source`,`rowData`.`uid`,`rowData`.`referrer`,`rowData`.`update_time`,`rowData`.`nickname`,`rowData`.`phone_number`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`country_id`,`rowData`.`email`,`rowData`.`status`,`binlogTime`,ROW_NUMBER() >OVER (PARTITION BY `rowData`.`uid` ORDER BY `binlogTime` desc) AS >rownum FROM asi_user_user_service_t_user) WHERE rownum = 1) t3 on >t1.uid=t3.uid"); > >table.printSchema(); >streamTableEnv.toRetractStream(table, >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); >streamExecEnv.execute("kafka-json-test"); > > > > >CREATE TABLE eventDeviceInfo (`data` ROW<`event_id` >BIGINT,`country_id` BIGINT,`uid` BIGINT,`create_time` BIGINT,`data` >ROW<`refer_id` STRING,`device_id` STRING,`channel_id` >STRING,`device_type` BIGINT,`imei` STRING,`adjust_id` >STRING,`google_adid` STRING,`device_model` STRING,`os_version` >STRING,`app_name` STRING,`app_package_name` STRING,`app_version` >STRING,`ip` STRING,`network_type` BIGINT,`wifi_mac` STRING,`lgt` >DECIMAL(38,18),`lat` DECIMAL(38,18)>,`api_version` >STRING>,`createTime` BIGINT) WITH ('connector' = 'kafka-0.11','topic' >= 'eventDeviceInfo','properties.bootstrap.servers' = >'127.0.0.1:9092','properties.group.id' = >'test','properties.max.poll.records' = >'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' = >'earliest-offset','format' = 'json','json.fail-on-missing-field' = >'false','json.ignore-parse-errors' = 'true') >CREATE TABLE asi_user_user_service_t_user (`binlogTime` >BIGINT,`rowData` ROW<`register_channel_source` >STRING,`last_login_time` STRING,`create_time` STRING,`language` >STRING,`avatar` STRING,`login_pwd` STRING,`email_status` >STRING,`storage_source` STRING,`uid` STRING,`referrer` >STRING,`update_time` STRING,`nickname` STRING,`phone_number` >STRING,`sync_data_flag` STRING,`id` STRING,`country_id` STRING,`email` >STRING,`status` STRING>) WITH ('connector' = 'kafka-0.11','topic' = >'asi_user_user_service_t_user','properties.bootstrap.servers' = >'127.0.0.1:9092','properties.group.id' = >'test','properties.max.poll.records' = >'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' = >'earliest-offset','format' = 'json','json.fail-on-missing-field' = >'false','json.ignore-parse-errors' = 'true') >CREATE TABLE asi_user_user_service_t_user_register_source >(`binlogTime` BIGINT,`rowData` ROW<`uid` BIGINT,`update_time` >STRING,`adjust_id` STRING,`create_time` STRING,`source_type` >STRING,`sync_data_flag` STRING,`id` STRING,`app_id` >STRING,`country_id` BIGINT>) WITH ('connector' = 'kafka-0.11','topic' >= 'asi_user_user_service_t_user_register_source','properties.bootstrap.servers' >= '127.0.0.1:9092','properties.group.id' = >'test','properties.max.poll.records' = >'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' = >'earliest-offset','format' = 'json','json.fail-on-missing-field' = >'false','json.ignore-parse-errors' = 'true') >root > |-- register_id: BIGINT > |-- asi_uid: BIGINT > |-- person_uuid: BIGINT > |-- app_id: BIGINT > |-- country_id: BIGINT > |-- channel_id: STRING > |-- device_id: STRING > |-- adjust_id: STRING > |-- google_adid: STRING > |-- referrer: BIGINT > |-- login_pwd: STRING > |-- sync_data_flag: INT > |-- register_phone_number: STRING > |-- device_type: INT > |-- imei: STRING > |-- device_model: STRING > |-- os_version: STRING > |-- app_name: STRING > |-- app_version: STRING > |-- app_package_name: STRING > |-- network_type: STRING > |-- wifi_mac: STRING > |-- longitude: DECIMAL(38, 18) > |-- latitude: DECIMAL(38, 18) > |-- geo_hash7: STRING > |-- ip: STRING > |-- register_time: BIGINT > |-- etl_time: BIGINT NOT NULL > > >org.apache.flink.table.api.TableException: BIGINT and >VARCHAR(2147483647) does not have common type now > > at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:76) > at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:65) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule.onMatch(JoinConditionTypeCoerceRule.scala:65) > at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) > at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) > at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264) > at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) > at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) > at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) > at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) > at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331) > at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307) > at com.akulaku.blade.source.BladeKafkaJoinCodeTest.eventDeviceInfo(BladeKafkaJoinCodeTest.java:265) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > > >Process finished with exit code 255 > > >hailongwang <[hidden email]> 于2020年10月19日周一 下午8:35写道: > >> Hi Dream, >> 可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。 >> Best, >> Hailong Wang >> >> 在 2020-10-19 09:50:33,"Dream-底限" <[hidden email]> 写道: >> >hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。 >> > >> >table.printSchema(); >> >streamTableEnv.toRetractStream(table, >> >> >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); >> > >> > >> > >> >root >> > |-- register_id: BIGINT >> > |-- asi_uid: BIGINT >> > |-- person_uuid: BIGINT >> > |-- app_id: BIGINT >> > |-- country_id: BIGINT >> > |-- channel_id: STRING >> > |-- device_id: STRING >> > |-- adjust_id: STRING >> > |-- google_adid: STRING >> > |-- referrer: BIGINT >> > |-- login_pwd: STRING >> > |-- sync_data_flag: INT >> > |-- register_phone_number: STRING >> > |-- device_type: INT >> > |-- imei: STRING >> > |-- device_model: STRING >> > |-- os_version: STRING >> > |-- app_name: STRING >> > |-- app_version: STRING >> > |-- app_package_name: STRING >> > |-- network_type: STRING >> > |-- wifi_mac: STRING >> > |-- longitude: DECIMAL(38, 18) >> > |-- latitude: DECIMAL(38, 18) >> > |-- geo_hash7: STRING >> > |-- ip: STRING >> > |-- register_time: BIGINT >> > |-- etl_time: BIGINT NOT NULL >> > >> > >> >org.apache.flink.table.api.TableException: BIGINT and >> >VARCHAR(2147483647) does not have common type now >> |
hi、
是的,类型部分不匹配,类型改完之后程序运行正常了,感谢 hailongwang <[hidden email]> 于2020年10月20日周二 下午4:13写道: > Hi, > 我看其中一个 condition 是 `t1.uid = t2.refer_id` > 其中 uid 是 bigint 类型,refer_id 是 varchar 类型。 > 你再确认下? > > > Best, > Hailong Wang > > > > > At 2020-10-20 08:55:34, "Dream-底限" <[hidden email]> wrote: > >hi、 > >我查看了一下,join条件类型是一样的,我这面下游sink使用jdbc时候是可以运行的,但是转换为datastream时候失败了,下面是程序及异常: > > > >streamTableEnv.executeSql(kafkaDDL);//ddl语句见下面日志 > > > > > >Table table = streamTableEnv.sqlQuery("SELECT cast(t1.id as bigint) as > >register_id,cast(t1.uid as bigint) as asi_uid,cast(null as bigint) as > >person_uuid,cast(t1.app_id as bigint) as app_id,cast(t1.country_id as > >bigint) as country_id,t2.channel_id as channel_id,t2.device_id as > >device_id,t2.adjust_id as adjust_id,t2.google_adid as > >google_adid,cast(t3.referrer as bigint) as referrer,t3.login_pwd as > >login_pwd,cast(t1.sync_data_flag as int) as > >sync_data_flag,t3.phone_number as > >register_phone_number,cast(t2.device_type as int) as > >device_type,t2.imei as imei,t2.device_model as > >device_model,t2.os_version as os_version,t2.app_name as > >app_name,t2.app_version as app_version,t2.app_package_name as > >app_package_name,cast(t2.network_type as string) as > >network_type,t2.wifi_mac as wifi_mac,t2.lgt as longitude,t2.lat as > >latitude,cast(null as string) as geo_hash7,t2.ip as > >ip,unix_timestamp(t1.create_time,'yyyy-MM-dd HH:mm:ss') as > >register_time,UNIX_TIMESTAMP() as etl_time from (SELECT > > >`uid`,`update_time`,`adjust_id`,`create_time`,`source_type`,`sync_data_flag`,`id`,`app_id`,`country_id` > >FROM (SELECT > `rowData`.`uid`,`rowData`.`update_time`,`rowData`.`adjust_id`,`rowData`.`create_time`,`rowData`.`source_type`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`app_id`,`rowData`.`country_id`,`binlogTime`,ROW_NUMBER() > >OVER (PARTITION BY `rowData`.`uid`,`rowData`.`id` ORDER BY > >`binlogTime` desc) AS rownum FROM > >asi_user_user_service_t_user_register_source) WHERE rownum = 1) t1 > >left join (SELECT > > >`refer_id`,`device_id`,`channel_id`,`device_type`,`imei`,`adjust_id`,`google_adid`,`device_model`,`os_version`,`app_name`,`app_package_name`,`app_version`,`ip`,`network_type`,`wifi_mac`,`lgt`,`lat`,`event_id`,`country_id`,`uid`,`create_time`,`api_version` > >FROM (SELECT > `data`.`data`.`refer_id`,`data`.`data`.`device_id`,`data`.`data`.`channel_id`,`data`.`data`.`device_type`,`data`.`data`.`imei`,`data`.`data`.`adjust_id`,`data`.`data`.`google_adid`,`data`.`data`.`device_model`,`data`.`data`.`os_version`,`data`.`data`.`app_name`,`data`.`data`.`app_package_name`,`data`.`data`.`app_version`,`data`.`data`.`ip`,`data`.`data`.`network_type`,`data`.`data`.`wifi_mac`,`data`.`data`.`lgt`,`data`.`data`.`lat`,`data`.`event_id`,`data`.`country_id`,`data`.`uid`,`data`.`create_time`,`data`.`api_version`,ROW_NUMBER() > >OVER (PARTITION BY `data`.`data`.`refer_id`,`data`.`event_id` ORDER BY > >createTime desc) AS rownum FROM eventDeviceInfo where > >`data`.`event_id`=1002) WHERE rownum = 1) t2 on t1.uid = t2.refer_id > >left join (SELECT > > >`register_channel_source`,`last_login_time`,`create_time`,`language`,`avatar`,`login_pwd`,`email_status`,`storage_source`,`uid`,`referrer`,`update_time`,`nickname`,`phone_number`,`sync_data_flag`,`id`,`country_id`,`email`,`status` > >FROM (SELECT > `rowData`.`register_channel_source`,`rowData`.`last_login_time`,`rowData`.`create_time`,`rowData`.`language`,`rowData`.`avatar`,`rowData`.`login_pwd`,`rowData`.`email_status`,`rowData`.`storage_source`,`rowData`.`uid`,`rowData`.`referrer`,`rowData`.`update_time`,`rowData`.`nickname`,`rowData`.`phone_number`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`country_id`,`rowData`.`email`,`rowData`.`status`,`binlogTime`,ROW_NUMBER() > >OVER (PARTITION BY `rowData`.`uid` ORDER BY `binlogTime` desc) AS > >rownum FROM asi_user_user_service_t_user) WHERE rownum = 1) t3 on > >t1.uid=t3.uid"); > > > >table.printSchema(); > >streamTableEnv.toRetractStream(table, > > >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); > >streamExecEnv.execute("kafka-json-test"); > > > > > > > > > >CREATE TABLE eventDeviceInfo (`data` ROW<`event_id` > >BIGINT,`country_id` BIGINT,`uid` BIGINT,`create_time` BIGINT,`data` > >ROW<`refer_id` STRING,`device_id` STRING,`channel_id` > >STRING,`device_type` BIGINT,`imei` STRING,`adjust_id` > >STRING,`google_adid` STRING,`device_model` STRING,`os_version` > >STRING,`app_name` STRING,`app_package_name` STRING,`app_version` > >STRING,`ip` STRING,`network_type` BIGINT,`wifi_mac` STRING,`lgt` > >DECIMAL(38,18),`lat` DECIMAL(38,18)>,`api_version` > >STRING>,`createTime` BIGINT) WITH ('connector' = 'kafka-0.11','topic' > >= 'eventDeviceInfo','properties.bootstrap.servers' = > >'127.0.0.1:9092','properties.group.id' = > >'test','properties.max.poll.records' = > >'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' = > >'earliest-offset','format' = 'json','json.fail-on-missing-field' = > >'false','json.ignore-parse-errors' = 'true') > >CREATE TABLE asi_user_user_service_t_user (`binlogTime` > >BIGINT,`rowData` ROW<`register_channel_source` > >STRING,`last_login_time` STRING,`create_time` STRING,`language` > >STRING,`avatar` STRING,`login_pwd` STRING,`email_status` > >STRING,`storage_source` STRING,`uid` STRING,`referrer` > >STRING,`update_time` STRING,`nickname` STRING,`phone_number` > >STRING,`sync_data_flag` STRING,`id` STRING,`country_id` STRING,`email` > >STRING,`status` STRING>) WITH ('connector' = 'kafka-0.11','topic' = > >'asi_user_user_service_t_user','properties.bootstrap.servers' = > >'127.0.0.1:9092','properties.group.id' = > >'test','properties.max.poll.records' = > >'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' = > >'earliest-offset','format' = 'json','json.fail-on-missing-field' = > >'false','json.ignore-parse-errors' = 'true') > >CREATE TABLE asi_user_user_service_t_user_register_source > >(`binlogTime` BIGINT,`rowData` ROW<`uid` BIGINT,`update_time` > >STRING,`adjust_id` STRING,`create_time` STRING,`source_type` > >STRING,`sync_data_flag` STRING,`id` STRING,`app_id` > >STRING,`country_id` BIGINT>) WITH ('connector' = 'kafka-0.11','topic' > >= > 'asi_user_user_service_t_user_register_source','properties.bootstrap.servers' > >= '127.0.0.1:9092','properties.group.id' = > >'test','properties.max.poll.records' = > >'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' = > >'earliest-offset','format' = 'json','json.fail-on-missing-field' = > >'false','json.ignore-parse-errors' = 'true') > >root > > |-- register_id: BIGINT > > |-- asi_uid: BIGINT > > |-- person_uuid: BIGINT > > |-- app_id: BIGINT > > |-- country_id: BIGINT > > |-- channel_id: STRING > > |-- device_id: STRING > > |-- adjust_id: STRING > > |-- google_adid: STRING > > |-- referrer: BIGINT > > |-- login_pwd: STRING > > |-- sync_data_flag: INT > > |-- register_phone_number: STRING > > |-- device_type: INT > > |-- imei: STRING > > |-- device_model: STRING > > |-- os_version: STRING > > |-- app_name: STRING > > |-- app_version: STRING > > |-- app_package_name: STRING > > |-- network_type: STRING > > |-- wifi_mac: STRING > > |-- longitude: DECIMAL(38, 18) > > |-- latitude: DECIMAL(38, 18) > > |-- geo_hash7: STRING > > |-- ip: STRING > > |-- register_time: BIGINT > > |-- etl_time: BIGINT NOT NULL > > > > > >org.apache.flink.table.api.TableException: BIGINT and > >VARCHAR(2147483647) does not have common type now > > > > at > org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:76) > > at > org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:65) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule.onMatch(JoinConditionTypeCoerceRule.scala:65) > > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) > > at > org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) > > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) > > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264) > > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) > > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at scala.collection.immutable.Range.foreach(Range.scala:160) > > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) > > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331) > > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307) > > at > com.akulaku.blade.source.BladeKafkaJoinCodeTest.eventDeviceInfo(BladeKafkaJoinCodeTest.java:265) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > > at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > > > > > >Process finished with exit code 255 > > > > > >hailongwang <[hidden email]> 于2020年10月19日周一 下午8:35写道: > > > >> Hi Dream, > >> 可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。 > >> Best, > >> Hailong Wang > >> > >> 在 2020-10-19 09:50:33,"Dream-底限" <[hidden email]> 写道: > >> >hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。 > >> > > >> >table.printSchema(); > >> >streamTableEnv.toRetractStream(table, > >> > >> > >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); > >> > > >> > > >> > > >> >root > >> > |-- register_id: BIGINT > >> > |-- asi_uid: BIGINT > >> > |-- person_uuid: BIGINT > >> > |-- app_id: BIGINT > >> > |-- country_id: BIGINT > >> > |-- channel_id: STRING > >> > |-- device_id: STRING > >> > |-- adjust_id: STRING > >> > |-- google_adid: STRING > >> > |-- referrer: BIGINT > >> > |-- login_pwd: STRING > >> > |-- sync_data_flag: INT > >> > |-- register_phone_number: STRING > >> > |-- device_type: INT > >> > |-- imei: STRING > >> > |-- device_model: STRING > >> > |-- os_version: STRING > >> > |-- app_name: STRING > >> > |-- app_version: STRING > >> > |-- app_package_name: STRING > >> > |-- network_type: STRING > >> > |-- wifi_mac: STRING > >> > |-- longitude: DECIMAL(38, 18) > >> > |-- latitude: DECIMAL(38, 18) > >> > |-- geo_hash7: STRING > >> > |-- ip: STRING > >> > |-- register_time: BIGINT > >> > |-- etl_time: BIGINT NOT NULL > >> > > >> > > >> >org.apache.flink.table.api.TableException: BIGINT and > >> >VARCHAR(2147483647) does not have common type now > >> > |
Free forum by Nabble | Edit this page |