flink table转datastream失败

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink table转datastream失败

Dream-底限
hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。

table.printSchema();
streamTableEnv.toRetractStream(table,
Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();



root
 |-- register_id: BIGINT
 |-- asi_uid: BIGINT
 |-- person_uuid: BIGINT
 |-- app_id: BIGINT
 |-- country_id: BIGINT
 |-- channel_id: STRING
 |-- device_id: STRING
 |-- adjust_id: STRING
 |-- google_adid: STRING
 |-- referrer: BIGINT
 |-- login_pwd: STRING
 |-- sync_data_flag: INT
 |-- register_phone_number: STRING
 |-- device_type: INT
 |-- imei: STRING
 |-- device_model: STRING
 |-- os_version: STRING
 |-- app_name: STRING
 |-- app_version: STRING
 |-- app_package_name: STRING
 |-- network_type: STRING
 |-- wifi_mac: STRING
 |-- longitude: DECIMAL(38, 18)
 |-- latitude: DECIMAL(38, 18)
 |-- geo_hash7: STRING
 |-- ip: STRING
 |-- register_time: BIGINT
 |-- etl_time: BIGINT NOT NULL


org.apache.flink.table.api.TableException: BIGINT and
VARCHAR(2147483647) does not have common type now
Reply | Threaded
Open this post in threaded view
|

Re:flink table转datastream失败

hailongwang
Hi Dream,
可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。
Best,
Hailong Wang

在 2020-10-19 09:50:33,"Dream-底限" <[hidden email]> 写道:

>hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。
>
>table.printSchema();
>streamTableEnv.toRetractStream(table,
>Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
>
>
>
>root
> |-- register_id: BIGINT
> |-- asi_uid: BIGINT
> |-- person_uuid: BIGINT
> |-- app_id: BIGINT
> |-- country_id: BIGINT
> |-- channel_id: STRING
> |-- device_id: STRING
> |-- adjust_id: STRING
> |-- google_adid: STRING
> |-- referrer: BIGINT
> |-- login_pwd: STRING
> |-- sync_data_flag: INT
> |-- register_phone_number: STRING
> |-- device_type: INT
> |-- imei: STRING
> |-- device_model: STRING
> |-- os_version: STRING
> |-- app_name: STRING
> |-- app_version: STRING
> |-- app_package_name: STRING
> |-- network_type: STRING
> |-- wifi_mac: STRING
> |-- longitude: DECIMAL(38, 18)
> |-- latitude: DECIMAL(38, 18)
> |-- geo_hash7: STRING
> |-- ip: STRING
> |-- register_time: BIGINT
> |-- etl_time: BIGINT NOT NULL
>
>
>org.apache.flink.table.api.TableException: BIGINT and
>VARCHAR(2147483647) does not have common type now
Reply | Threaded
Open this post in threaded view
|

Re: flink table转datastream失败

Dream-底限
hi、
我查看了一下,join条件类型是一样的,我这面下游sink使用jdbc时候是可以运行的,但是转换为datastream时候失败了,下面是程序及异常:

streamTableEnv.executeSql(kafkaDDL);//ddl语句见下面日志


Table table = streamTableEnv.sqlQuery("SELECT cast(t1.id as bigint) as
register_id,cast(t1.uid as bigint) as asi_uid,cast(null as bigint) as
person_uuid,cast(t1.app_id as bigint) as app_id,cast(t1.country_id as
bigint) as country_id,t2.channel_id as channel_id,t2.device_id as
device_id,t2.adjust_id as adjust_id,t2.google_adid as
google_adid,cast(t3.referrer as bigint) as referrer,t3.login_pwd as
login_pwd,cast(t1.sync_data_flag as int) as
sync_data_flag,t3.phone_number as
register_phone_number,cast(t2.device_type as int) as
device_type,t2.imei as imei,t2.device_model as
device_model,t2.os_version as os_version,t2.app_name as
app_name,t2.app_version as app_version,t2.app_package_name as
app_package_name,cast(t2.network_type as string) as
network_type,t2.wifi_mac as wifi_mac,t2.lgt as longitude,t2.lat as
latitude,cast(null as string) as geo_hash7,t2.ip as
ip,unix_timestamp(t1.create_time,'yyyy-MM-dd HH:mm:ss') as
register_time,UNIX_TIMESTAMP() as etl_time from (SELECT
`uid`,`update_time`,`adjust_id`,`create_time`,`source_type`,`sync_data_flag`,`id`,`app_id`,`country_id`
FROM (SELECT `rowData`.`uid`,`rowData`.`update_time`,`rowData`.`adjust_id`,`rowData`.`create_time`,`rowData`.`source_type`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`app_id`,`rowData`.`country_id`,`binlogTime`,ROW_NUMBER()
OVER (PARTITION BY `rowData`.`uid`,`rowData`.`id` ORDER BY
`binlogTime` desc) AS rownum FROM
asi_user_user_service_t_user_register_source) WHERE rownum = 1) t1
left join  (SELECT
`refer_id`,`device_id`,`channel_id`,`device_type`,`imei`,`adjust_id`,`google_adid`,`device_model`,`os_version`,`app_name`,`app_package_name`,`app_version`,`ip`,`network_type`,`wifi_mac`,`lgt`,`lat`,`event_id`,`country_id`,`uid`,`create_time`,`api_version`
FROM (SELECT `data`.`data`.`refer_id`,`data`.`data`.`device_id`,`data`.`data`.`channel_id`,`data`.`data`.`device_type`,`data`.`data`.`imei`,`data`.`data`.`adjust_id`,`data`.`data`.`google_adid`,`data`.`data`.`device_model`,`data`.`data`.`os_version`,`data`.`data`.`app_name`,`data`.`data`.`app_package_name`,`data`.`data`.`app_version`,`data`.`data`.`ip`,`data`.`data`.`network_type`,`data`.`data`.`wifi_mac`,`data`.`data`.`lgt`,`data`.`data`.`lat`,`data`.`event_id`,`data`.`country_id`,`data`.`uid`,`data`.`create_time`,`data`.`api_version`,ROW_NUMBER()
OVER (PARTITION BY `data`.`data`.`refer_id`,`data`.`event_id` ORDER BY
createTime desc) AS rownum FROM eventDeviceInfo where
`data`.`event_id`=1002) WHERE rownum = 1) t2 on t1.uid = t2.refer_id
left join (SELECT
`register_channel_source`,`last_login_time`,`create_time`,`language`,`avatar`,`login_pwd`,`email_status`,`storage_source`,`uid`,`referrer`,`update_time`,`nickname`,`phone_number`,`sync_data_flag`,`id`,`country_id`,`email`,`status`
FROM (SELECT `rowData`.`register_channel_source`,`rowData`.`last_login_time`,`rowData`.`create_time`,`rowData`.`language`,`rowData`.`avatar`,`rowData`.`login_pwd`,`rowData`.`email_status`,`rowData`.`storage_source`,`rowData`.`uid`,`rowData`.`referrer`,`rowData`.`update_time`,`rowData`.`nickname`,`rowData`.`phone_number`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`country_id`,`rowData`.`email`,`rowData`.`status`,`binlogTime`,ROW_NUMBER()
OVER (PARTITION BY `rowData`.`uid` ORDER BY `binlogTime` desc) AS
rownum FROM asi_user_user_service_t_user) WHERE rownum = 1) t3 on
t1.uid=t3.uid");

table.printSchema();
streamTableEnv.toRetractStream(table,
Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
streamExecEnv.execute("kafka-json-test");




CREATE TABLE eventDeviceInfo (`data` ROW<`event_id`
BIGINT,`country_id` BIGINT,`uid` BIGINT,`create_time` BIGINT,`data`
ROW<`refer_id` STRING,`device_id` STRING,`channel_id`
STRING,`device_type` BIGINT,`imei` STRING,`adjust_id`
STRING,`google_adid` STRING,`device_model` STRING,`os_version`
STRING,`app_name` STRING,`app_package_name` STRING,`app_version`
STRING,`ip` STRING,`network_type` BIGINT,`wifi_mac` STRING,`lgt`
DECIMAL(38,18),`lat` DECIMAL(38,18)>,`api_version`
STRING>,`createTime` BIGINT) WITH ('connector' = 'kafka-0.11','topic'
= 'eventDeviceInfo','properties.bootstrap.servers' =
'127.0.0.1:9092','properties.group.id' =
'test','properties.max.poll.records' =
'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
'earliest-offset','format' = 'json','json.fail-on-missing-field' =
'false','json.ignore-parse-errors' = 'true')
CREATE TABLE asi_user_user_service_t_user (`binlogTime`
BIGINT,`rowData` ROW<`register_channel_source`
STRING,`last_login_time` STRING,`create_time` STRING,`language`
STRING,`avatar` STRING,`login_pwd` STRING,`email_status`
STRING,`storage_source` STRING,`uid` STRING,`referrer`
STRING,`update_time` STRING,`nickname` STRING,`phone_number`
STRING,`sync_data_flag` STRING,`id` STRING,`country_id` STRING,`email`
STRING,`status` STRING>) WITH ('connector' = 'kafka-0.11','topic' =
'asi_user_user_service_t_user','properties.bootstrap.servers' =
'127.0.0.1:9092','properties.group.id' =
'test','properties.max.poll.records' =
'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
'earliest-offset','format' = 'json','json.fail-on-missing-field' =
'false','json.ignore-parse-errors' = 'true')
CREATE TABLE asi_user_user_service_t_user_register_source
(`binlogTime` BIGINT,`rowData` ROW<`uid` BIGINT,`update_time`
STRING,`adjust_id` STRING,`create_time` STRING,`source_type`
STRING,`sync_data_flag` STRING,`id` STRING,`app_id`
STRING,`country_id` BIGINT>) WITH ('connector' = 'kafka-0.11','topic'
= 'asi_user_user_service_t_user_register_source','properties.bootstrap.servers'
= '127.0.0.1:9092','properties.group.id' =
'test','properties.max.poll.records' =
'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
'earliest-offset','format' = 'json','json.fail-on-missing-field' =
'false','json.ignore-parse-errors' = 'true')
root
 |-- register_id: BIGINT
 |-- asi_uid: BIGINT
 |-- person_uuid: BIGINT
 |-- app_id: BIGINT
 |-- country_id: BIGINT
 |-- channel_id: STRING
 |-- device_id: STRING
 |-- adjust_id: STRING
 |-- google_adid: STRING
 |-- referrer: BIGINT
 |-- login_pwd: STRING
 |-- sync_data_flag: INT
 |-- register_phone_number: STRING
 |-- device_type: INT
 |-- imei: STRING
 |-- device_model: STRING
 |-- os_version: STRING
 |-- app_name: STRING
 |-- app_version: STRING
 |-- app_package_name: STRING
 |-- network_type: STRING
 |-- wifi_mac: STRING
 |-- longitude: DECIMAL(38, 18)
 |-- latitude: DECIMAL(38, 18)
 |-- geo_hash7: STRING
 |-- ip: STRING
 |-- register_time: BIGINT
 |-- etl_time: BIGINT NOT NULL


org.apache.flink.table.api.TableException: BIGINT and
VARCHAR(2147483647) does not have common type now

        at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:76)
        at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:65)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule.onMatch(JoinConditionTypeCoerceRule.scala:65)
        at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
        at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
        at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
        at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
        at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
        at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
        at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)
        at com.akulaku.blade.source.BladeKafkaJoinCodeTest.eventDeviceInfo(BladeKafkaJoinCodeTest.java:265)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)


Process finished with exit code 255


hailongwang <[hidden email]> 于2020年10月19日周一 下午8:35写道:

> Hi Dream,
> 可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。
> Best,
> Hailong Wang
>
> 在 2020-10-19 09:50:33,"Dream-底限" <[hidden email]> 写道:
> >hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。
> >
> >table.printSchema();
> >streamTableEnv.toRetractStream(table,
>
> >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
> >
> >
> >
> >root
> > |-- register_id: BIGINT
> > |-- asi_uid: BIGINT
> > |-- person_uuid: BIGINT
> > |-- app_id: BIGINT
> > |-- country_id: BIGINT
> > |-- channel_id: STRING
> > |-- device_id: STRING
> > |-- adjust_id: STRING
> > |-- google_adid: STRING
> > |-- referrer: BIGINT
> > |-- login_pwd: STRING
> > |-- sync_data_flag: INT
> > |-- register_phone_number: STRING
> > |-- device_type: INT
> > |-- imei: STRING
> > |-- device_model: STRING
> > |-- os_version: STRING
> > |-- app_name: STRING
> > |-- app_version: STRING
> > |-- app_package_name: STRING
> > |-- network_type: STRING
> > |-- wifi_mac: STRING
> > |-- longitude: DECIMAL(38, 18)
> > |-- latitude: DECIMAL(38, 18)
> > |-- geo_hash7: STRING
> > |-- ip: STRING
> > |-- register_time: BIGINT
> > |-- etl_time: BIGINT NOT NULL
> >
> >
> >org.apache.flink.table.api.TableException: BIGINT and
> >VARCHAR(2147483647) does not have common type now
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink table转datastream失败

hailongwang
Hi,
 我看其中一个 condition 是  `t1.uid = t2.refer_id`
其中 uid 是 bigint 类型,refer_id 是 varchar 类型。
你再确认下?


Best,
Hailong Wang




At 2020-10-20 08:55:34, "Dream-底限" <[hidden email]> wrote:

>hi、
>我查看了一下,join条件类型是一样的,我这面下游sink使用jdbc时候是可以运行的,但是转换为datastream时候失败了,下面是程序及异常:
>
>streamTableEnv.executeSql(kafkaDDL);//ddl语句见下面日志
>
>
>Table table = streamTableEnv.sqlQuery("SELECT cast(t1.id as bigint) as
>register_id,cast(t1.uid as bigint) as asi_uid,cast(null as bigint) as
>person_uuid,cast(t1.app_id as bigint) as app_id,cast(t1.country_id as
>bigint) as country_id,t2.channel_id as channel_id,t2.device_id as
>device_id,t2.adjust_id as adjust_id,t2.google_adid as
>google_adid,cast(t3.referrer as bigint) as referrer,t3.login_pwd as
>login_pwd,cast(t1.sync_data_flag as int) as
>sync_data_flag,t3.phone_number as
>register_phone_number,cast(t2.device_type as int) as
>device_type,t2.imei as imei,t2.device_model as
>device_model,t2.os_version as os_version,t2.app_name as
>app_name,t2.app_version as app_version,t2.app_package_name as
>app_package_name,cast(t2.network_type as string) as
>network_type,t2.wifi_mac as wifi_mac,t2.lgt as longitude,t2.lat as
>latitude,cast(null as string) as geo_hash7,t2.ip as
>ip,unix_timestamp(t1.create_time,'yyyy-MM-dd HH:mm:ss') as
>register_time,UNIX_TIMESTAMP() as etl_time from (SELECT
>`uid`,`update_time`,`adjust_id`,`create_time`,`source_type`,`sync_data_flag`,`id`,`app_id`,`country_id`
>FROM (SELECT `rowData`.`uid`,`rowData`.`update_time`,`rowData`.`adjust_id`,`rowData`.`create_time`,`rowData`.`source_type`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`app_id`,`rowData`.`country_id`,`binlogTime`,ROW_NUMBER()
>OVER (PARTITION BY `rowData`.`uid`,`rowData`.`id` ORDER BY
>`binlogTime` desc) AS rownum FROM
>asi_user_user_service_t_user_register_source) WHERE rownum = 1) t1
>left join  (SELECT
>`refer_id`,`device_id`,`channel_id`,`device_type`,`imei`,`adjust_id`,`google_adid`,`device_model`,`os_version`,`app_name`,`app_package_name`,`app_version`,`ip`,`network_type`,`wifi_mac`,`lgt`,`lat`,`event_id`,`country_id`,`uid`,`create_time`,`api_version`
>FROM (SELECT `data`.`data`.`refer_id`,`data`.`data`.`device_id`,`data`.`data`.`channel_id`,`data`.`data`.`device_type`,`data`.`data`.`imei`,`data`.`data`.`adjust_id`,`data`.`data`.`google_adid`,`data`.`data`.`device_model`,`data`.`data`.`os_version`,`data`.`data`.`app_name`,`data`.`data`.`app_package_name`,`data`.`data`.`app_version`,`data`.`data`.`ip`,`data`.`data`.`network_type`,`data`.`data`.`wifi_mac`,`data`.`data`.`lgt`,`data`.`data`.`lat`,`data`.`event_id`,`data`.`country_id`,`data`.`uid`,`data`.`create_time`,`data`.`api_version`,ROW_NUMBER()
>OVER (PARTITION BY `data`.`data`.`refer_id`,`data`.`event_id` ORDER BY
>createTime desc) AS rownum FROM eventDeviceInfo where
>`data`.`event_id`=1002) WHERE rownum = 1) t2 on t1.uid = t2.refer_id
>left join (SELECT
>`register_channel_source`,`last_login_time`,`create_time`,`language`,`avatar`,`login_pwd`,`email_status`,`storage_source`,`uid`,`referrer`,`update_time`,`nickname`,`phone_number`,`sync_data_flag`,`id`,`country_id`,`email`,`status`
>FROM (SELECT `rowData`.`register_channel_source`,`rowData`.`last_login_time`,`rowData`.`create_time`,`rowData`.`language`,`rowData`.`avatar`,`rowData`.`login_pwd`,`rowData`.`email_status`,`rowData`.`storage_source`,`rowData`.`uid`,`rowData`.`referrer`,`rowData`.`update_time`,`rowData`.`nickname`,`rowData`.`phone_number`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`country_id`,`rowData`.`email`,`rowData`.`status`,`binlogTime`,ROW_NUMBER()
>OVER (PARTITION BY `rowData`.`uid` ORDER BY `binlogTime` desc) AS
>rownum FROM asi_user_user_service_t_user) WHERE rownum = 1) t3 on
>t1.uid=t3.uid");
>
>table.printSchema();
>streamTableEnv.toRetractStream(table,
>Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
>streamExecEnv.execute("kafka-json-test");
>
>
>
>
>CREATE TABLE eventDeviceInfo (`data` ROW<`event_id`
>BIGINT,`country_id` BIGINT,`uid` BIGINT,`create_time` BIGINT,`data`
>ROW<`refer_id` STRING,`device_id` STRING,`channel_id`
>STRING,`device_type` BIGINT,`imei` STRING,`adjust_id`
>STRING,`google_adid` STRING,`device_model` STRING,`os_version`
>STRING,`app_name` STRING,`app_package_name` STRING,`app_version`
>STRING,`ip` STRING,`network_type` BIGINT,`wifi_mac` STRING,`lgt`
>DECIMAL(38,18),`lat` DECIMAL(38,18)>,`api_version`
>STRING>,`createTime` BIGINT) WITH ('connector' = 'kafka-0.11','topic'
>= 'eventDeviceInfo','properties.bootstrap.servers' =
>'127.0.0.1:9092','properties.group.id' =
>'test','properties.max.poll.records' =
>'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
>'earliest-offset','format' = 'json','json.fail-on-missing-field' =
>'false','json.ignore-parse-errors' = 'true')
>CREATE TABLE asi_user_user_service_t_user (`binlogTime`
>BIGINT,`rowData` ROW<`register_channel_source`
>STRING,`last_login_time` STRING,`create_time` STRING,`language`
>STRING,`avatar` STRING,`login_pwd` STRING,`email_status`
>STRING,`storage_source` STRING,`uid` STRING,`referrer`
>STRING,`update_time` STRING,`nickname` STRING,`phone_number`
>STRING,`sync_data_flag` STRING,`id` STRING,`country_id` STRING,`email`
>STRING,`status` STRING>) WITH ('connector' = 'kafka-0.11','topic' =
>'asi_user_user_service_t_user','properties.bootstrap.servers' =
>'127.0.0.1:9092','properties.group.id' =
>'test','properties.max.poll.records' =
>'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
>'earliest-offset','format' = 'json','json.fail-on-missing-field' =
>'false','json.ignore-parse-errors' = 'true')
>CREATE TABLE asi_user_user_service_t_user_register_source
>(`binlogTime` BIGINT,`rowData` ROW<`uid` BIGINT,`update_time`
>STRING,`adjust_id` STRING,`create_time` STRING,`source_type`
>STRING,`sync_data_flag` STRING,`id` STRING,`app_id`
>STRING,`country_id` BIGINT>) WITH ('connector' = 'kafka-0.11','topic'
>= 'asi_user_user_service_t_user_register_source','properties.bootstrap.servers'
>= '127.0.0.1:9092','properties.group.id' =
>'test','properties.max.poll.records' =
>'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
>'earliest-offset','format' = 'json','json.fail-on-missing-field' =
>'false','json.ignore-parse-errors' = 'true')
>root
> |-- register_id: BIGINT
> |-- asi_uid: BIGINT
> |-- person_uuid: BIGINT
> |-- app_id: BIGINT
> |-- country_id: BIGINT
> |-- channel_id: STRING
> |-- device_id: STRING
> |-- adjust_id: STRING
> |-- google_adid: STRING
> |-- referrer: BIGINT
> |-- login_pwd: STRING
> |-- sync_data_flag: INT
> |-- register_phone_number: STRING
> |-- device_type: INT
> |-- imei: STRING
> |-- device_model: STRING
> |-- os_version: STRING
> |-- app_name: STRING
> |-- app_version: STRING
> |-- app_package_name: STRING
> |-- network_type: STRING
> |-- wifi_mac: STRING
> |-- longitude: DECIMAL(38, 18)
> |-- latitude: DECIMAL(38, 18)
> |-- geo_hash7: STRING
> |-- ip: STRING
> |-- register_time: BIGINT
> |-- etl_time: BIGINT NOT NULL
>
>
>org.apache.flink.table.api.TableException: BIGINT and
>VARCHAR(2147483647) does not have common type now
>
> at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:76)
> at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:65)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule.onMatch(JoinConditionTypeCoerceRule.scala:65)
> at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
> at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
> at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
> at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
> at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)
> at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)
> at com.akulaku.blade.source.BladeKafkaJoinCodeTest.eventDeviceInfo(BladeKafkaJoinCodeTest.java:265)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
>
>
>Process finished with exit code 255
>
>
>hailongwang <[hidden email]> 于2020年10月19日周一 下午8:35写道:
>
>> Hi Dream,
>> 可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。
>> Best,
>> Hailong Wang
>>
>> 在 2020-10-19 09:50:33,"Dream-底限" <[hidden email]> 写道:
>> >hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。
>> >
>> >table.printSchema();
>> >streamTableEnv.toRetractStream(table,
>>
>> >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
>> >
>> >
>> >
>> >root
>> > |-- register_id: BIGINT
>> > |-- asi_uid: BIGINT
>> > |-- person_uuid: BIGINT
>> > |-- app_id: BIGINT
>> > |-- country_id: BIGINT
>> > |-- channel_id: STRING
>> > |-- device_id: STRING
>> > |-- adjust_id: STRING
>> > |-- google_adid: STRING
>> > |-- referrer: BIGINT
>> > |-- login_pwd: STRING
>> > |-- sync_data_flag: INT
>> > |-- register_phone_number: STRING
>> > |-- device_type: INT
>> > |-- imei: STRING
>> > |-- device_model: STRING
>> > |-- os_version: STRING
>> > |-- app_name: STRING
>> > |-- app_version: STRING
>> > |-- app_package_name: STRING
>> > |-- network_type: STRING
>> > |-- wifi_mac: STRING
>> > |-- longitude: DECIMAL(38, 18)
>> > |-- latitude: DECIMAL(38, 18)
>> > |-- geo_hash7: STRING
>> > |-- ip: STRING
>> > |-- register_time: BIGINT
>> > |-- etl_time: BIGINT NOT NULL
>> >
>> >
>> >org.apache.flink.table.api.TableException: BIGINT and
>> >VARCHAR(2147483647) does not have common type now
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink table转datastream失败

Dream-底限
hi、
是的,类型部分不匹配,类型改完之后程序运行正常了,感谢

hailongwang <[hidden email]> 于2020年10月20日周二 下午4:13写道:

> Hi,
>  我看其中一个 condition 是  `t1.uid = t2.refer_id`
> 其中 uid 是 bigint 类型,refer_id 是 varchar 类型。
> 你再确认下?
>
>
> Best,
> Hailong Wang
>
>
>
>
> At 2020-10-20 08:55:34, "Dream-底限" <[hidden email]> wrote:
> >hi、
> >我查看了一下,join条件类型是一样的,我这面下游sink使用jdbc时候是可以运行的,但是转换为datastream时候失败了,下面是程序及异常:
> >
> >streamTableEnv.executeSql(kafkaDDL);//ddl语句见下面日志
> >
> >
> >Table table = streamTableEnv.sqlQuery("SELECT cast(t1.id as bigint) as
> >register_id,cast(t1.uid as bigint) as asi_uid,cast(null as bigint) as
> >person_uuid,cast(t1.app_id as bigint) as app_id,cast(t1.country_id as
> >bigint) as country_id,t2.channel_id as channel_id,t2.device_id as
> >device_id,t2.adjust_id as adjust_id,t2.google_adid as
> >google_adid,cast(t3.referrer as bigint) as referrer,t3.login_pwd as
> >login_pwd,cast(t1.sync_data_flag as int) as
> >sync_data_flag,t3.phone_number as
> >register_phone_number,cast(t2.device_type as int) as
> >device_type,t2.imei as imei,t2.device_model as
> >device_model,t2.os_version as os_version,t2.app_name as
> >app_name,t2.app_version as app_version,t2.app_package_name as
> >app_package_name,cast(t2.network_type as string) as
> >network_type,t2.wifi_mac as wifi_mac,t2.lgt as longitude,t2.lat as
> >latitude,cast(null as string) as geo_hash7,t2.ip as
> >ip,unix_timestamp(t1.create_time,'yyyy-MM-dd HH:mm:ss') as
> >register_time,UNIX_TIMESTAMP() as etl_time from (SELECT
>
> >`uid`,`update_time`,`adjust_id`,`create_time`,`source_type`,`sync_data_flag`,`id`,`app_id`,`country_id`
> >FROM (SELECT
> `rowData`.`uid`,`rowData`.`update_time`,`rowData`.`adjust_id`,`rowData`.`create_time`,`rowData`.`source_type`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`app_id`,`rowData`.`country_id`,`binlogTime`,ROW_NUMBER()
> >OVER (PARTITION BY `rowData`.`uid`,`rowData`.`id` ORDER BY
> >`binlogTime` desc) AS rownum FROM
> >asi_user_user_service_t_user_register_source) WHERE rownum = 1) t1
> >left join  (SELECT
>
> >`refer_id`,`device_id`,`channel_id`,`device_type`,`imei`,`adjust_id`,`google_adid`,`device_model`,`os_version`,`app_name`,`app_package_name`,`app_version`,`ip`,`network_type`,`wifi_mac`,`lgt`,`lat`,`event_id`,`country_id`,`uid`,`create_time`,`api_version`
> >FROM (SELECT
> `data`.`data`.`refer_id`,`data`.`data`.`device_id`,`data`.`data`.`channel_id`,`data`.`data`.`device_type`,`data`.`data`.`imei`,`data`.`data`.`adjust_id`,`data`.`data`.`google_adid`,`data`.`data`.`device_model`,`data`.`data`.`os_version`,`data`.`data`.`app_name`,`data`.`data`.`app_package_name`,`data`.`data`.`app_version`,`data`.`data`.`ip`,`data`.`data`.`network_type`,`data`.`data`.`wifi_mac`,`data`.`data`.`lgt`,`data`.`data`.`lat`,`data`.`event_id`,`data`.`country_id`,`data`.`uid`,`data`.`create_time`,`data`.`api_version`,ROW_NUMBER()
> >OVER (PARTITION BY `data`.`data`.`refer_id`,`data`.`event_id` ORDER BY
> >createTime desc) AS rownum FROM eventDeviceInfo where
> >`data`.`event_id`=1002) WHERE rownum = 1) t2 on t1.uid = t2.refer_id
> >left join (SELECT
>
> >`register_channel_source`,`last_login_time`,`create_time`,`language`,`avatar`,`login_pwd`,`email_status`,`storage_source`,`uid`,`referrer`,`update_time`,`nickname`,`phone_number`,`sync_data_flag`,`id`,`country_id`,`email`,`status`
> >FROM (SELECT
> `rowData`.`register_channel_source`,`rowData`.`last_login_time`,`rowData`.`create_time`,`rowData`.`language`,`rowData`.`avatar`,`rowData`.`login_pwd`,`rowData`.`email_status`,`rowData`.`storage_source`,`rowData`.`uid`,`rowData`.`referrer`,`rowData`.`update_time`,`rowData`.`nickname`,`rowData`.`phone_number`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`country_id`,`rowData`.`email`,`rowData`.`status`,`binlogTime`,ROW_NUMBER()
> >OVER (PARTITION BY `rowData`.`uid` ORDER BY `binlogTime` desc) AS
> >rownum FROM asi_user_user_service_t_user) WHERE rownum = 1) t3 on
> >t1.uid=t3.uid");
> >
> >table.printSchema();
> >streamTableEnv.toRetractStream(table,
>
> >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
> >streamExecEnv.execute("kafka-json-test");
> >
> >
> >
> >
> >CREATE TABLE eventDeviceInfo (`data` ROW<`event_id`
> >BIGINT,`country_id` BIGINT,`uid` BIGINT,`create_time` BIGINT,`data`
> >ROW<`refer_id` STRING,`device_id` STRING,`channel_id`
> >STRING,`device_type` BIGINT,`imei` STRING,`adjust_id`
> >STRING,`google_adid` STRING,`device_model` STRING,`os_version`
> >STRING,`app_name` STRING,`app_package_name` STRING,`app_version`
> >STRING,`ip` STRING,`network_type` BIGINT,`wifi_mac` STRING,`lgt`
> >DECIMAL(38,18),`lat` DECIMAL(38,18)>,`api_version`
> >STRING>,`createTime` BIGINT) WITH ('connector' = 'kafka-0.11','topic'
> >= 'eventDeviceInfo','properties.bootstrap.servers' =
> >'127.0.0.1:9092','properties.group.id' =
> >'test','properties.max.poll.records' =
> >'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
> >'earliest-offset','format' = 'json','json.fail-on-missing-field' =
> >'false','json.ignore-parse-errors' = 'true')
> >CREATE TABLE asi_user_user_service_t_user (`binlogTime`
> >BIGINT,`rowData` ROW<`register_channel_source`
> >STRING,`last_login_time` STRING,`create_time` STRING,`language`
> >STRING,`avatar` STRING,`login_pwd` STRING,`email_status`
> >STRING,`storage_source` STRING,`uid` STRING,`referrer`
> >STRING,`update_time` STRING,`nickname` STRING,`phone_number`
> >STRING,`sync_data_flag` STRING,`id` STRING,`country_id` STRING,`email`
> >STRING,`status` STRING>) WITH ('connector' = 'kafka-0.11','topic' =
> >'asi_user_user_service_t_user','properties.bootstrap.servers' =
> >'127.0.0.1:9092','properties.group.id' =
> >'test','properties.max.poll.records' =
> >'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
> >'earliest-offset','format' = 'json','json.fail-on-missing-field' =
> >'false','json.ignore-parse-errors' = 'true')
> >CREATE TABLE asi_user_user_service_t_user_register_source
> >(`binlogTime` BIGINT,`rowData` ROW<`uid` BIGINT,`update_time`
> >STRING,`adjust_id` STRING,`create_time` STRING,`source_type`
> >STRING,`sync_data_flag` STRING,`id` STRING,`app_id`
> >STRING,`country_id` BIGINT>) WITH ('connector' = 'kafka-0.11','topic'
> >=
> 'asi_user_user_service_t_user_register_source','properties.bootstrap.servers'
> >= '127.0.0.1:9092','properties.group.id' =
> >'test','properties.max.poll.records' =
> >'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
> >'earliest-offset','format' = 'json','json.fail-on-missing-field' =
> >'false','json.ignore-parse-errors' = 'true')
> >root
> > |-- register_id: BIGINT
> > |-- asi_uid: BIGINT
> > |-- person_uuid: BIGINT
> > |-- app_id: BIGINT
> > |-- country_id: BIGINT
> > |-- channel_id: STRING
> > |-- device_id: STRING
> > |-- adjust_id: STRING
> > |-- google_adid: STRING
> > |-- referrer: BIGINT
> > |-- login_pwd: STRING
> > |-- sync_data_flag: INT
> > |-- register_phone_number: STRING
> > |-- device_type: INT
> > |-- imei: STRING
> > |-- device_model: STRING
> > |-- os_version: STRING
> > |-- app_name: STRING
> > |-- app_version: STRING
> > |-- app_package_name: STRING
> > |-- network_type: STRING
> > |-- wifi_mac: STRING
> > |-- longitude: DECIMAL(38, 18)
> > |-- latitude: DECIMAL(38, 18)
> > |-- geo_hash7: STRING
> > |-- ip: STRING
> > |-- register_time: BIGINT
> > |-- etl_time: BIGINT NOT NULL
> >
> >
> >org.apache.flink.table.api.TableException: BIGINT and
> >VARCHAR(2147483647) does not have common type now
> >
> >       at
> org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:76)
> >       at
> org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule$$anonfun$onMatch$1.apply(JoinConditionTypeCoerceRule.scala:65)
> >       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >       at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >       at
> org.apache.flink.table.planner.plan.rules.logical.JoinConditionTypeCoerceRule.onMatch(JoinConditionTypeCoerceRule.scala:65)
> >       at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
> >       at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
> >       at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
> >       at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
> >       at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> >       at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
> >       at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> >       at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >       at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >       at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >       at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >       at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> >       at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >       at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >       at scala.collection.immutable.Range.foreach(Range.scala:160)
> >       at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >       at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> >       at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >       at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >       at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >       at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >       at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >       at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> >       at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
> >       at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
> >       at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> >       at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> >       at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> >       at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)
> >       at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)
> >       at
> com.akulaku.blade.source.BladeKafkaJoinCodeTest.eventDeviceInfo(BladeKafkaJoinCodeTest.java:265)
> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >       at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >       at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >       at java.lang.reflect.Method.invoke(Method.java:498)
> >       at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> >       at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> >       at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> >       at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> >       at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> >       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> >       at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> >       at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> >       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> >       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> >       at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> >       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> >       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> >       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> >       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> >       at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> >       at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> >       at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> >       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> >
> >
> >Process finished with exit code 255
> >
> >
> >hailongwang <[hidden email]> 于2020年10月19日周一 下午8:35写道:
> >
> >> Hi Dream,
> >> 可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。
> >> Best,
> >> Hailong Wang
> >>
> >> 在 2020-10-19 09:50:33,"Dream-底限" <[hidden email]> 写道:
> >> >hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。
> >> >
> >> >table.printSchema();
> >> >streamTableEnv.toRetractStream(table,
> >>
> >>
> >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
> >> >
> >> >
> >> >
> >> >root
> >> > |-- register_id: BIGINT
> >> > |-- asi_uid: BIGINT
> >> > |-- person_uuid: BIGINT
> >> > |-- app_id: BIGINT
> >> > |-- country_id: BIGINT
> >> > |-- channel_id: STRING
> >> > |-- device_id: STRING
> >> > |-- adjust_id: STRING
> >> > |-- google_adid: STRING
> >> > |-- referrer: BIGINT
> >> > |-- login_pwd: STRING
> >> > |-- sync_data_flag: INT
> >> > |-- register_phone_number: STRING
> >> > |-- device_type: INT
> >> > |-- imei: STRING
> >> > |-- device_model: STRING
> >> > |-- os_version: STRING
> >> > |-- app_name: STRING
> >> > |-- app_version: STRING
> >> > |-- app_package_name: STRING
> >> > |-- network_type: STRING
> >> > |-- wifi_mac: STRING
> >> > |-- longitude: DECIMAL(38, 18)
> >> > |-- latitude: DECIMAL(38, 18)
> >> > |-- geo_hash7: STRING
> >> > |-- ip: STRING
> >> > |-- register_time: BIGINT
> >> > |-- etl_time: BIGINT NOT NULL
> >> >
> >> >
> >> >org.apache.flink.table.api.TableException: BIGINT and
> >> >VARCHAR(2147483647) does not have common type now
> >>
>