关联join出错

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

关联join出错

Kevin

HI ALL :
    打扰大家了。我用flink sql 关联数据。出现了如下报错。有人遇到过么 谢谢
a.sales_dept_name是string类型
p.second_province也是string类型

日志报错:
2020-04-24 16:35:14,873 INFO  org.apache.flink.table.module.ModuleManager                   - Cannot find FunctionDefinition is not null from any loaded modules
2020-04-24 16:35:14,873 INFO  org.apache.flink.table.module.ModuleManager                   - Cannot find FunctionDefinition current_timestamp from any loaded modules
2020-04-24 16:35:14,881 WARN  org.apache.flink.table.client.cli.CliClient                   - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL statement.
        at org.apache.flink.table.client.gateway.local.LocalExecutor.createTable(LocalExecutor.java:678)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:612)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:477)
        at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:510)
        at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:295)
        at java.util.Optional.ifPresent(Optional.java:159)
        at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
Caused by: java.lang.RuntimeException: while converting `a`.`sales_dept_name` = `p`.`second_province`
        at org.apache.calcite.sql2rel.ReflectiveConvertletTable.lambda$registerNodeTypeMethod$0(ReflectiveConvertletTable.java:86)
        at org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)
        at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4787)
        at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoinCondition(SqlToRelConverter.java:2708)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2100)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$createTable$16(LocalExecutor.java:675)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:231)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.createTable(LocalExecutor.java:675)
        ... 9 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.calcite.sql2rel.ReflectiveConvertletTable.lambda$registerNodeTypeMethod$0(ReflectiveConvertletTable.java:83)
        ... 32 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.get(ArrayList.java:429)
        at org.apache.calcite.sql2rel.SqlToRelConverter$LookupContext.findRel(SqlToRelConverter.java:5300)
        at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.lookup(SqlToRelConverter.java:4424)
        at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.lookupExp(SqlToRelConverter.java:4369)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3720)
        at org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
        at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
        at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
        at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
        at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
        at org.apache.calcite.sql2rel.StandardConvertletTable.convertExpressionList(StandardConvertletTable.java:793)
        at org.apache.calcite.sql2rel.StandardConvertletTable.convertCall(StandardConvertletTable.java:769)
        at org.apache.calcite.sql2rel.StandardConvertletTable.convertCall(StandardConvertletTable.java:756)
        ... 36 more
Reply | Threaded
Open this post in threaded view
|

Re: 关联join出错

Benchao Li
Hi,
能否提供一下完整的DDL以及query呢?
此外,你用的是哪个版本,哪个planner?

Kevin <[hidden email]> 于2020年4月24日周五 下午4:36写道:

>
> HI ALL :
>     打扰大家了。我用flink sql 关联数据。出现了如下报错。有人遇到过么 谢谢
> a.sales_dept_name是string类型
> p.second_province也是string类型
>
> 日志报错:
> 2020-04-24 16:35:14,873 INFO  org.apache.flink.table.module.ModuleManager
>                  - Cannot find FunctionDefinition is not null from any
> loaded modules
> 2020-04-24 16:35:14,873 INFO  org.apache.flink.table.module.ModuleManager
>                  - Cannot find FunctionDefinition current_timestamp from
> any loaded modules
> 2020-04-24 16:35:14,881 WARN  org.apache.flink.table.client.cli.CliClient
>                  - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
> statement.
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.createTable(LocalExecutor.java:678)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:612)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:477)
>         at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:510)
>         at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:295)
>         at java.util.Optional.ifPresent(Optional.java:159)
>         at
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
>         at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>         at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: java.lang.RuntimeException: while converting
> `a`.`sales_dept_name` = `p`.`second_province`
>         at
> org.apache.calcite.sql2rel.ReflectiveConvertletTable.lambda$registerNodeTypeMethod$0(ReflectiveConvertletTable.java:86)
>         at
> org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4787)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertJoinCondition(SqlToRelConverter.java:2708)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2100)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$createTable$16(LocalExecutor.java:675)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:231)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.createTable(LocalExecutor.java:675)
>         ... 9 more
> Caused by: java.lang.reflect.InvocationTargetException
>         at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.calcite.sql2rel.ReflectiveConvertletTable.lambda$registerNodeTypeMethod$0(ReflectiveConvertletTable.java:83)
>         ... 32 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>         at java.util.ArrayList.get(ArrayList.java:429)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$LookupContext.findRel(SqlToRelConverter.java:5300)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.lookup(SqlToRelConverter.java:4424)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.lookupExp(SqlToRelConverter.java:4369)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3720)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>         at
> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>         at
> org.apache.calcite.sql2rel.StandardConvertletTable.convertExpressionList(StandardConvertletTable.java:793)
>         at
> org.apache.calcite.sql2rel.StandardConvertletTable.convertCall(StandardConvertletTable.java:769)
>         at
> org.apache.calcite.sql2rel.StandardConvertletTable.convertCall(StandardConvertletTable.java:756)
>         ... 36 more



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 关联join出错

Benchao Li
In reply to this post by Kevin
我对legacy planner不是很了解。
你是否可以尝试一下blink planner呢?看下在blink planner是否也存在这个问题。
BTW,应该在1.11开始blink planner就是默认的planner了。

Kevin <[hidden email]> 于2020年4月24日周五 下午6:33写道:

> 老师你好。又来麻烦你了。planner用的默认的。
> 我感觉是不是我的表left join 太多了导致的
>
> DDL语句:
> DROP TABLE IF EXISTS t1_oa_loan_requests;
> CREATE TABLE t1_oa_loan_requests (
> `id` BIGINT,
> `customer_name` VARCHAR,
> `ex_customer_name` VARCHAR,
> `credential` VARCHAR,
> `cert_start_date` VARCHAR,
> `cert_end_date` VARCHAR,
> `issue_dept` VARCHAR,
> `ex_credential` VARCHAR,
> `customer_phone` VARCHAR,
> `cust_income` VARCHAR,
> `cust_company` VARCHAR,
> `ex_customer_phone` VARCHAR,
> `ex_relation` VARCHAR,
> `married` VARCHAR,
> `nation` VARCHAR,
> `household_register` VARCHAR,
> `autochthon` VARCHAR,
> `household_addr_detail` VARCHAR,
> `car_type` VARCHAR,
> `mustang_type` VARCHAR,
> `newcar` VARCHAR,
> `install_plate_addr` VARCHAR,
> `addr_province` VARCHAR,
> `addr_city` VARCHAR,
> `addr_country` VARCHAR,
> `delivery_date` VARCHAR,
> `register_date` VARCHAR,
> `capacity` VARCHAR,
> `auto` VARCHAR,
> `km` VARCHAR,
> `skylight` VARCHAR,
> `color` VARCHAR,
> `config` VARCHAR,
> `valuation_channel` VARCHAR,
> `price_car300` VARCHAR,
> `loan_amount_eval` VARCHAR,
> `low_loan` VARCHAR,
> `first_payment` VARCHAR,
> `appraiser_amount_eval` VARCHAR,
> `loan_amount` VARCHAR,
> `loan_years` VARCHAR,
> `interest_rate` VARCHAR,
> `guaranty` VARCHAR,
> `month_pay` VARCHAR,
> `fee_bx` VARCHAR,
> `fee_za` VARCHAR,
> `fee_da` VARCHAR,
> `fee_di` VARCHAR,
> `fee_lv` VARCHAR,
> `fee_gps` VARCHAR,
> `return_amount` VARCHAR,
> `gps` VARCHAR,
> `area_group` VARCHAR,
> `back_point` VARCHAR,
> `actual_pay` VARCHAR,
> `payer_name` VARCHAR,
> `loan_time` VARCHAR,
> `bank_loan_time` VARCHAR,
> `bank_loan_amount` VARCHAR,
> `payment_time` VARCHAR,
> `advance_name` VARCHAR,
> `advance_phone` VARCHAR,
> `advance_bank` VARCHAR,
> `advance_account` VARCHAR,
> `advance_company` VARCHAR,
> `checked_gps` VARCHAR,
> `checked_pic` VARCHAR,
> `checked_paper` VARCHAR,
> `checked_home_visits` VARCHAR,
> `checked_household` VARCHAR,
> `checked_copy_video` VARCHAR,
> `audit_opinion` VARCHAR,
> `ex_audit_opinion` VARCHAR,
> `memo1` VARCHAR,
> `memo2` VARCHAR,
> `bank_time` VARCHAR,
> `receipt1` VARCHAR,
> `receipt2` VARCHAR,
> `receipt3` VARCHAR,
> `area` VARCHAR,
> `salesman` VARCHAR,
> `loan_status` VARCHAR,
> `insurance_status` VARCHAR,
> `create_time` VARCHAR,
> `update_time` VARCHAR,
> `created_by` VARCHAR,
> `department_id` VARCHAR,
> `process_id` VARCHAR,
> `area_code` VARCHAR,
> `area_province` VARCHAR,
> `area_city` VARCHAR,
> `area_country` VARCHAR,
> `contact_name1` VARCHAR,
> `contact_phone1` VARCHAR,
> `contact_name2` VARCHAR,
> `contact_phone2` VARCHAR,
> `eval_user_id` VARCHAR,
> `couple_name` VARCHAR,
> `couple_credential` VARCHAR,
> `couple_income` VARCHAR,
> `couple_phone` VARCHAR,
> `guarantor_name` VARCHAR,
> `guarantor_credential` VARCHAR,
> `guarantor_phone` VARCHAR,
> `payee_name` VARCHAR,
> `payee_account` VARCHAR,
> `account_opening_bank` VARCHAR,
> `payee_phone` VARCHAR,
> `insurance_company` VARCHAR,
> `financia_charges` VARCHAR,
> `auitor_updated` VARCHAR,
> `system_type_select` VARCHAR,
> `system_type` VARCHAR,
> `appraisal_company` VARCHAR,
> `appraisal_time` VARCHAR,
> `auitor_type` VARCHAR,
> `add_modify_log` VARCHAR,
> `add_modif_time` VARCHAR,
> `updated` VARCHAR,
> `supplement` VARCHAR,
> `supplement_user_id` VARCHAR,
> `supplement_time` VARCHAR,
> `revise_user_id` VARCHAR,
> `revise_time` VARCHAR,
> `license_code` VARCHAR,
> `license_index` VARCHAR,
> `add_license_time` VARCHAR,
> `cust_change` VARCHAR,
> `change_time` VARCHAR,
> `apply_loan_amount` VARCHAR,
> `replace_buy_flag` VARCHAR,
> `driver_license_flag` VARCHAR,
> `checked_insurance` VARCHAR,
> `apply_loan_amount_type` VARCHAR,
> `bank_trade_card_no` VARCHAR,
> `bank_detail_id` VARCHAR,
> `is_new` VARCHAR,
> `ignore_bank_detail_id` VARCHAR,
> `video_status` VARCHAR,
> `qingniu_id` VARCHAR,
> `loan_channel` BIGINT,
> `teller_pay_time` VARCHAR,
> `ocs_card_date` VARCHAR,
> `ocs_card_no` VARCHAR,
> `ocs_new_card_no` VARCHAR,
> `ocs_detail_id` VARCHAR,
> `ocs_ignore_detai_id` VARCHAR,
> `ocf_refuse_reason` VARCHAR,
> `ocf_detail_id` VARCHAR,
> `ocf_ignore_detai_id` VARCHAR,
> `mater_audit_result` VARCHAR,
> `mater_repaired` VARCHAR,
> `material_upt_flag` VARCHAR,
> `cloudroom_id` VARCHAR,
> `price` VARCHAR,
> `pg_price` VARCHAR,
> `city` VARCHAR,
> `year_check_date` VARCHAR,
> `buess_check_date` VARCHAR,
> `strong_risk_date` VARCHAR,
> `transfer_times` VARCHAR,
> `car_des` VARCHAR,
> `car_dealer` VARCHAR,
> `gear_box` VARCHAR,
> `brand` VARCHAR,
> `car_model` VARCHAR,
> `environment` VARCHAR,
> `vin` VARCHAR,
> `skylight_type` VARCHAR,
> `pg_user_id` VARCHAR,
> `pg_user_ame` VARCHAR,
> `pg_time` VARCHAR,
> `car_age` VARCHAR,
> `use_type` VARCHAR,
> `car_condition` VARCHAR,
> `city_name` VARCHAR,
> `autohomeid` VARCHAR,
> `js_car_model` VARCHAR,
> `province` VARCHAR,
> `back_count` VARCHAR,
> `regdate` VARCHAR,
> `material_upt_segno` VARCHAR,
> `customer_name_pinyin` VARCHAR,
> `terminal_id` VARCHAR,
> `actual_finance_cost` VARCHAR,
> `restatus` VARCHAR,
> `resstatus_text` VARCHAR,
> `receive_time` VARCHAR,
> `audit_time` VARCHAR,
> `receive_audit_time` VARCHAR,
> `other_time` VARCHAR,
> `to_copy_time` VARCHAR,
> `sales_dept_name` VARCHAR,
> `sales_dept_manager` VARCHAR,
> `created_by_province` VARCHAR,
> `created_by_city` VARCHAR,
> `car_id` VARCHAR,
> `car_from` VARCHAR,
> `mater_full` VARCHAR,
> `need_check_full` VARCHAR,
> `finance_group_no` VARCHAR,
> `finance_group_name_for_no` VARCHAR,
> `car_seller_name` VARCHAR,
> `car_seller_credential` VARCHAR,
> `loan_type` VARCHAR,
> `is_area_audit` VARCHAR,
> `is_picc` VARCHAR,
> `e_stages` VARCHAR,
> `e_stages_result` VARCHAR,
> `ignore_estages_result` VARCHAR,
> `snapshot` VARCHAR,
> `icbc_base_id` BIGINT,
> `order_type` VARCHAR,
> `is_bank_first_trial_printed` VARCHAR,
> `is_bank_save_file_printed` VARCHAR,
> `table_type` VARCHAR,
> `table_update` VARCHAR
>
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'oa_worklist_loan_requests',
>   'connector.startup-mode' = 'earliest-offset',
>   'connector.properties.bootstrap.servers' = '172.18.254.150:9092',
>   'connector.properties.zookeeper.connect' = '172.18.254.150:2181',
>
>   'connector.properties.key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
>
>   'connector.properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
>   'connector.properties.group.id' = 'testGroup1006',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type'='json',
>   'update-mode' = 'append',
>   'format.derive-schema' = 'true'
> );
>
> DROP TABLE IF EXISTS `t3_province_relationship_map`;
> CREATE TABLE `t3_province_relationship_map`  (
>   `first_province` VARCHAR   , --  '一级省区',
>   `second_province` VARCHAR   , --  '二级省区',
>   `res_dep` VARCHAR   , --  '责任部门',
>   `person_liable` VARCHAR   , --  '责任人',
>   `table_type` VARCHAR,
>   `table_update` VARCHAR
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'prd_worklist_t3_province_relationship_map',
>   'connector.startup-mode' = 'earliest-offset',
>   'connector.properties.bootstrap.servers' = '172.18.254.150:9092',
>   'connector.properties.zookeeper.connect' = '172.18.254.150:2181',
>
>   'connector.properties.key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
>
>   'connector.properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
>   'connector.properties.group.id' = 'testGroup1006',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type'='json',
>   'update-mode' = 'append',
>   'format.derive-schema' = 'true'
> );
>
>
> -- 创建视图 t3_province_relationship_map_view
> DROP VIEW t3_province_relationship_map_view;
> CREATE VIEW t3_province_relationship_map_view AS SELECT
> cast(first_province as VARCHAR) as first_province,
> cast(second_province as VARCHAR) as second_province,
> cast(res_dep as VARCHAR) as res_dep,
> cast(person_liable as VARCHAR) as person_liable
> FROM
> t3_province_relationship_map
> GROUP BY
> first_province,second_province,res_dep,person_liable;
>
>
> -- 创建视图 t1_oa_loan_requests_view
> DROP VIEW t1_oa_loan_requests_view;
> CREATE VIEW t1_oa_loan_requests_view AS
> SELECT a.id
>  ,a.create_time ,a.e_stages_result,a.icbc_base_id,a.sales_dept_name,a.loan_channel FROM t1_oa_loan_requests a INNER JOIN (SELECT
> c.id,max( c.table_update ) AS table_update FROM(
> SELECT d.* FROM t1_oa_loan_requests d LEFT JOIN (
> SELECT id FROM t1_oa_loan_requests WHERE table_type = 'delete' ) e ON d.id
>  = e.id WHERE e.id IS NULL ) c  GROUP BY c.id ) b ON a.id = b.id
>  AND a.table_update = b.table_update where   a.customer_name NOT LIKE '%测试%'  AND a.sales_dept_name NOT LIKE '%测试%';
>
> 查询语句:
> SELECT
> DATE_FORMAT(a.create_time,'yyyy-MM-dd') as create_time,
> a.loan_channel as loan_channel,
>
> case when p.first_province is null then '其它' else p.first_province end as province,
> count(*) as request_num,
>
> sum(case when (f.business_id is not null ) then 1 else 0 end )                                  as e_total,
>
> sum(case when (f.business_id is not null and a.e_stages_result in (1)) then 1 else 0 end)       as e_pass,
>
> sum(case when (f.business_id is not null and a.e_stages_result in (2,3,9))  then 1 else 0 end)  as e_reject,
>
> sum(case when (f.business_id is not null and a.e_stages_result is null ) then 1 else 0 end)     as e_noresulte,
>
> sum(case when b.stage_reject_result is null or substr(b.stage_reject_result,1,1) not in ('A','B','C','D','E') then 1 else 0 end) as tm_score_other,
> 0 as tm_score_A1,
> 0 as tm_score_A2,
> 0 as tm_score_B1,
> 0 as tm_score_B2,
> 0 as tm_score_C1,
> 0 as tm_score_C2,
> 0 as tm_score_C3,
> 0 as tm_score_D1,
> 0 as tm_score_D2,
> 0 as tm_score_E1,
>
> sum(case when c.business_id is not null then 1 else 0 end) as audit_first_num,
> CURRENT_TIMESTAMP  data_dt
> FROM t1_oa_loan_requests_view a
> LEFT JOIN t1_oa_icbc_base_info_view b on a.icbc_base_id=b.id
> LEFT JOIN t1_oa_flow_inst_task_segment_no_5_view c ON a.id = c.business_id
> LEFT JOIN t1_oa_flow_inst_task_segment_no_520_view c1 ON a.id
>  = c1.business_id
> LEFT JOIN t1_oa_flow_inst_task_segment_no_6120_view d ON a.id
>  = d.business_id
> LEFT JOIN t1_oa_flow_inst_task_segment_no_15_view f ON a.id
>  = f.business_id
> LEFT JOIN t2_crm_tm_loan_view g ON a.id=g.LA_ID
>
> LEFT JOIN t3_province_relationship_map p ON a.sales_dept_name = p.second_province
> where cast(a.create_time as date)<=cast(current_date as date)
> and DATE_FORMAT(a.create_time,'yyyy-MM-dd')<>'0000-00-00'
> and DATE_FORMAT(a.create_time,'yyyy-MM-dd')>='2018-01-01'
> group by DATE_FORMAT (a.create_time,'yyyy-MM-dd'),a.loan_channel,
>
> (CASE WHEN p.first_province IS NULL THEN '其它' else p.first_province end) limit 10;
>
>
> ------------------------------------------------------------------
> 发件人:Benchao Li <[hidden email]>
> 发送时间:2020年4月24日(星期五) 16:43
> 收件人:user-zh <[hidden email]>; Kevin <[hidden email]>
> 主 题:Re: 关联join出错
>
> Hi,
> 能否提供一下完整的DDL以及query呢?
> 此外,你用的是哪个版本,哪个planner?
>
> Kevin <[hidden email]> 于2020年4月24日周五 下午4:36写道:
>
> HI ALL :
>     打扰大家了。我用flink sql 关联数据。出现了如下报错。有人遇到过么 谢谢
> a.sales_dept_name是string类型
> p.second_province也是string类型
>
> 日志报错:
> 2020-04-24 16:35:14,873 INFO  org.apache.flink.table.module.ModuleManager
>                  - Cannot find FunctionDefinition is not null from any
> loaded modules
> 2020-04-24 16:35:14,873 INFO  org.apache.flink.table.module.ModuleManager
>                  - Cannot find FunctionDefinition current_timestamp from
> any loaded modules
> 2020-04-24 16:35:14,881 WARN  org.apache.flink.table.client.cli.CliClient
>                  - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
> statement.
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.createTable(LocalExecutor.java:678)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:612)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:477)
>         at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:510)
>         at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:295)
>         at java.util.Optional.ifPresent(Optional.java:159)
>         at
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
>         at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>         at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: java.lang.RuntimeException: while converting
> `a`.`sales_dept_name` = `p`.`second_province`
>         at
> org.apache.calcite.sql2rel.ReflectiveConvertletTable.lambda$registerNodeTypeMethod$0(ReflectiveConvertletTable.java:86)
>         at
> org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4787)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertJoinCondition(SqlToRelConverter.java:2708)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2100)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$createTable$16(LocalExecutor.java:675)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:231)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.createTable(LocalExecutor.java:675)
>         ... 9 more
> Caused by: java.lang.reflect.InvocationTargetException
>         at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.calcite.sql2rel.ReflectiveConvertletTable.lambda$registerNodeTypeMethod$0(ReflectiveConvertletTable.java:83)
>         ... 32 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>         at java.util.ArrayList.get(ArrayList.java:429)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$LookupContext.findRel(SqlToRelConverter.java:5300)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.lookup(SqlToRelConverter.java:4424)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.lookupExp(SqlToRelConverter.java:4369)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3720)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>         at
> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>         at
> org.apache.calcite.sql2rel.StandardConvertletTable.convertExpressionList(StandardConvertletTable.java:793)
>         at
> org.apache.calcite.sql2rel.StandardConvertletTable.convertCall(StandardConvertletTable.java:769)
>         at
> org.apache.calcite.sql2rel.StandardConvertletTable.convertCall(StandardConvertletTable.java:756)
>         ... 36 more
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]