Flink version: 1.12.0
在使用 Flink 执行 Flink SQL 流表 join 维表, 运行报错(流表SQL 和维表SQL单独运行都没有问题), 错误堆栈信息如下: Exception in thread "main" java.lang.RuntimeException: org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$18,isNull$17,,STRING,None) and ArrayBuffer(GeneratedExpression(((int) 4),false,,INT NOT NULL,Some(4)), GeneratedExpression(((int) 8),false,,INT NOT NULL,Some(8))). at com.hmd.stream.SqlSubmit.main(SqlSubmit.java:47) Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$18,isNull$17,,STRING,None) and ArrayBuffer(GeneratedExpression(((int) 4),false,,INT NOT NULL,Some(4)), GeneratedExpression(((int) 8),false,,INT NOT NULL,Some(8))). at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:307) at scala.Option.orElse(Option.scala:289) at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:307) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:143) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:143) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:190) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:38) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:675) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665) at com.hmd.stream.SqlSubmit.callInsertInto(SqlSubmit.java:110) at com.hmd.stream.SqlSubmit.callCommand(SqlSubmit.java:85) at com.hmd.stream.SqlSubmit.run(SqlSubmit.java:71) at com.hmd.stream.SqlSubmit.main(SqlSubmit.java:45) 运行的 SQL 如下: -- 流表 source kafka CREATE TABLE `t_Order_Order` ( id BIGINT, type INT, amount VARCHAR, receivedAmount VARCHAR, channelType VARCHAR, accountId BIGINT, isCreditPeriod VARCHAR, isCyclePeriod VARCHAR, originalOrderId VARCHAR, status VARCHAR, insertTime VARCHAR, `mark` INT, isNegotiation VARCHAR, statusTime1 VARCHAR, statusTime2 VARCHAR, statusTime4 VARCHAR, statusTime8 VARCHAR, statusTime16 VARCHAR, isFirstOrder VARCHAR, isReturn VARCHAR , orgCode VARCHAR , proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'ods_Homedo_t_Order_Order', 'properties.bootstrap.servers' = '10.0.15.130:9092', 'properties.group.id' = 'test-homodo', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'json.ignore-parse-errors' = 'true' ); -- dim_finance_account_fortest CREATE TABLE `dim_finance_account_fortest`( `id` BIGINT, `mark` INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='jdbc', 'table-name'='t_finance_account_fortest', 'url'='jdbc:mysql://10.0.0.29:3306/hmd_ods', 'username'='read', 'password'='si7v3#,a', 'lookup.cache.max-rows'='5000', 'lookup.cache.ttl'='600s' ); -- sink CREATE TABLE `sink_kafka` ( order_id BIGINT , account_id BIGINT , original_order_id VARCHAR , org_code VARCHAR , order_type_id INT, order_status_id VARCHAR, order_status_name VARCHAR, channel_type_id VARCHAR, channel_type_name VARCHAR, is_credit_order INT, is_credit_period VARCHAR, is_cycle_period VARCHAR, is_negotiation VARCHAR, is_firstorder VARCHAR, is_return VARCHAR, order_gmv VARCHAR, reder_total_received_amount VARCHAR, order_date VARCHAR, order_time VARCHAR, order_cancel_time VARCHAR, order_complete_time VARCHAR, order_pend_payment VARCHAR, order_confirm_time VARCHAR ) WITH ( 'connector' = 'print' ); INSERT INTO `sink_kafka` SELECT oo.id AS order_id , oo.accountId AS account_id, oo.originalOrderId AS original_order_id, oo.orgCode AS org_code, oo.type AS order_type_id, oo.status AS order_status_id, ( case when oo.status = 1 then '交易取消' when oo.status = 2 then '交易完成' when oo.status = 4 then '等待客服处理' when oo.status = 8 then '等待客户付款' when oo.status =16 then '等待订单发出' when oo.status =32 then '等待客户收货' else '其他' end ) AS order_status_name, oo.channelType AS channel_type_id, ( case oo.channelType when 1 then 'OMS' when 2 then 'PC' when 4 then 'M站' when 8 then 'APP' when 16 then 'APP' when 32 then '小程序' else '其他' end ) AS channel_type_name, ( case when oo.isCreditPeriod =1 and oo.isCyclePeriod =1 then 1 else 0 end ) AS is_credit_order, oo.isCreditPeriod AS is_credit_period, oo.isCyclePeriod AS is_cycle_period, oo.isNegotiation AS is_negotiation, oo.isFirstOrder AS is_firstorder, oo.isReturn AS is_return, oo.amount AS order_gmv, oo.receivedAmount AS reder_total_received_amount, SUBSTR(oo.insertTime,1,10) AS order_date, SUBSTR(oo.insertTime,1,19) AS order_time, oo.statusTime1 AS order_cancel_time, oo.statusTime2 AS order_complete_time, ( case when oo.isCreditPeriod =1 and oo.isCyclePeriod =1 then oo.statusTime4 else oo.statusTime8 end ) AS order_pend_payment, oo.statusTime16 AS order_confirm_time FROM (SELECT id , type , amount , receivedAmount , channelType , accountId , isCreditPeriod , isCyclePeriod , originalOrderId , status, insertTime , isNegotiation , statusTime1 , statusTime2 , statusTime4 , statusTime8 , statusTime16 , isFirstOrder , isReturn , orgCode, proctime FROM `t_Order_Order` WHERE mark > 0) AS oo LEFT JOIN `dim_finance_account_fortest` FOR SYSTEM_TIME AS OF oo.proctime AS dfat ON oo.accountId = dfat.id <http://dfat.id/>; |
Free forum by Nabble | Edit this page |