你好,
flink1.10,用flinkSQL写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated. 看到网上的资料说是,upsertSink的primary key是通过query来推断的,而我的query无法推断出PK,所以报错。说是需要1.10的临时解决方法是加一层group by,使得query可以推断出 primary key。 但是,我添加group by以后还是报错,这个问题该怎么解决呢??到底query是如何推断PK的?? 以下是我的sql语句: 创建表的语句(字段有点多,还请见谅) CREATE TABLE `AssetInfoRiskResultSinkTable` ( rowkey string, d ROW(`id` bigint, `user_id` string, `income_no` string , `nation` string, `card_auth_expiry_time` string , `user_name` string , `login_phone` string , `card_no` string , `address` string , `highest_eduction` string , `is_married` string , `resident_address` string , `resident_province` string , `resident_city` string , `resident_town` string , `profession` string , `job_salary` string , `company_name` string , `company_province` string , `company_city` string , `company_town` string , `company_address` string , `family_name` string , `family_phone` string , `workmate_name` string , `workmate_phone` string , `bank_card_no` string , `bank_phone` string , `device_address` string , `device_ip` string , `contacts` string , `create_user` string , `create_time` string , `update_user` string , `update_time` string, `scene_id` string , `access_type` string , `status` string , `label_id` string , `label_name` string , `ocr_real_name` string , `ocr_id_card` string , `ocr_id_card_address` string, `longitude` string , `latitude` string , `imei` string , `imsi` string , `mac` string , `resident_province2` string , `loan_use` string , `channel_code` string , `channel_name` string , `credit_card_number` string , `product_type_code` string , `product_type_name` string , `apply_amount` string , `income_time` string , `credit_card_phone` string , `credit_card_amount` string , `period` string , `start_work_time` string , `register_channel` string , `register_channel_name` string , `bank_name` string , `company_call` string , `system_tag` string , `is_trans` string , `is_dial_confirm` string , `is_dial_type` string , `is_dial_typeM` string , `is_dial_typeHuman` string , `user_risk_score` string , `upload_imgs` string , `upload_status` string , `famliy_relationship` string , `work_relationship` string , `request_time` string , `ac_record` string , `profession_station` string , `mobile_os` string , `mobile_type` string , `mobile_brand` string , `networktype` string , `jail_break` string , `open_udid` string , `simulator` string , `idfa` string , `idfv` string , `device_type` string , `credit_card_id_card_no` string , `credit_card_username` string , `sign_issue_org` string , `user_level` string , `channel_request_time` string , `real_income_no` string , `org_channel_code` string , `qq` string , `mail` string , `pre_grant_credit_amount` string , `pre_grant_credit_term` string , `pre_grant_credit_term_unit` string , `monthly_repay_amount` string , `total_repay_amount` string , `xhd_white_list_flag` string , `white_list_flag` string , `white_list_level` string , `white_list_type` string , `bus_type` string , `housing_fund_status` string , `operator_auth_status` string , `credit_card_status` string , `pboc_credit_status` string , `bh_url_flag` string, `zmxy_auth_expiry_time` string , `operator_auth_expiry_time` string , `housing_fund_status_time` string , `credit_card_expiry_time` string , `pboc_credit_status_time` string , `credit_card_bank_name` string , `due_limit_unit` string , `due_limit` string , `loan_amount` string , `risk_lead_flag` string , `period_unit` string , `face_score` string , `notify_url` string , `org_id` string , `contract_id` string , `birthday` string , `zmxy_status` string , `is_root` string , `is_virtualmachine` string , `appnum` string , `wifi_ip` string , `blue_mac` string , `wifi_mac` string , `vpn_ip` string , `cell_ip` string , `true_ip` string , `is_helical_accelerator` string , `bussiness` string , `manual_check` string , `has_contacts` string , `length_of_residence_year` string , `length_of_residence_month` string , `gps_province` string , `gps_city` string , `gps_area` string , `gps_detail_address` string , `positional_titles` string , `work_years` string , `work_months` string , `max_acceptable_monthly_payment` string , `profession_code` string , `occupation` string , `career_status` string , `work_position` string , `work_time` string , `end_result` string ) ) with ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = 'rtest:borrower_related_asset_info_real_time', 'connector.zookeeper.quorum' = 'fdw6.fengjr.inc,fdw4.fengjr.inc,fdw5.fengjr.inc,fjr-yz-204-11,fjr-yz-204-13', 'connector.zookeeper.znode.parent' = 'hbase_test', 'connector.write.buffer-flush.max-size' = '10mb', 'connector.write.buffer-flush.max-rows' = '1000', 'connector.write.buffer-flush.interval' = '2s' ) 插入语句: INSERT INTO AssetInfoRiskResultSinkTable select MD5(real_income_no) as rowkey, Row(id, user_id, income_no, nation, card_auth_expiry_time, user_name, phone, id_card, address, highest_eduction, is_married, resident_address, resident_province, resident_city, resident_town, profession, job_salary, company_name, company_province, company_city, company_town, company_address, family_name, family_phone, workmate_name, workmate_phone, bank_card_no, bank_phone, device_address, device_ip, contacts, create_user, create_time, update_user, update_time, scene_id, access_type, status, label_id, label_name, ocr_real_name, ocr_id_card, ocr_id_card_address, longitude, latitude, imei, imsi, mac, resident_province2, loan_use, channel_code, channel_name, credit_card_number, product_type_code, product_type_name, apply_amount, income_time, credit_card_phone, credit_card_amount, `period`, start_work_time, register_channel, register_channel_name, bank_name, company_call, system_tag, is_trans, is_dial_confirm, is_dial_type, is_dial_typem, is_dial_typeHuman, user_risk_score, upload_imgs, upload_status, famliy_relationship, work_relationship, request_time, ac_record, profession_station, mobile_os, mobile_type, mobile_brand, networktype, jail_break, open_udid, simulator, idfa, idfv, device_type, credit_card_id_card_no, credit_card_username, sign_issue_org, user_level, channel_request_time, real_income_no, org_channel_code, qq, mail, pre_grant_credit_amount, pre_grant_credit_term, pre_grant_credit_term_unit, monthly_repay_amount, total_repay_amount, xhd_white_list_flag, white_list_flag, white_list_level, white_list_type, bus_type, housing_fund_status, operator_auth_status, credit_card_status, pboc_credit_status, bh_url_flag, zmxy_auth_expiry_time, operator_auth_expiry_time, housing_fund_status_time, credit_card_expiry_time, pboc_credit_status_time, credit_card_bank_name, due_limit_unit, due_limit, loan_amount, risk_lead_flag, period_unit, face_score, notify_url, org_id, contract_id, birthday, zmxy_status, is_root, is_virtualmachine, appnum, wifi_ip, blue_mac, wifi_mac, vpn_ip, cell_ip, true_ip, is_helical_accelerator, bussiness, manual_check, has_contacts, length_of_residence_year, length_of_residence_month, gps_province, gps_city, gps_area, gps_detail_address, positional_titles, work_years, work_months, max_acceptable_monthly_payment, profession_code, occupation, career_status, work_position, work_time, end_result) as d from ( SELECT real_income_no, id, user_id, income_no, nation, card_auth_expiry_time, if (user_name is not null and user_name < '',unifedEncryption(user_name),user_name) as user_name, if(login_phone is not null and login_phone < '',unifedEncryption(login_phone),login_phone) as phone, if(card_no is not null and card_no < '',unifedEncryption(card_no),card_no) as id_card, address, highest_eduction, is_married, resident_address, resident_province, resident_city, resident_town, profession, job_salary, company_name, company_province, company_city, company_town, company_address, if(family_name is not null and family_name < '',unifedEncryption(family_name),family_name) as family_name, if(family_phone is not null and family_phone < '',unifedEncryption(family_phone),family_phone) as family_phone, if(workmate_name is not null and workmate_name < '',unifedEncryption(workmate_name),workmate_name) as workmate_name, if(workmate_phone is not null and workmate_phone < '',unifedEncryption(workmate_phone),workmate_phone) as workmate_phone, if(bank_card_no is not null and bank_card_no < '',unifedEncryption(bank_card_no),bank_card_no) as bank_card_no, if(bank_phone is not null and bank_phone < '',unifedEncryption(bank_phone),bank_phone) as bank_phone, device_address, device_ip, contacts, create_user, create_time, update_user, update_time, scene_id, access_type, status, label_id, label_name, if(ocr_real_name is not null and ocr_real_name < '',unifedEncryption(ocr_real_name),ocr_real_name) as ocr_real_name, if(ocr_id_card is not null and ocr_id_card < '',unifedEncryption(ocr_id_card),ocr_id_card) as ocr_id_card, ocr_id_card_address, longitude, latitude, imei, imsi, mac, resident_province2, loan_use, channel_code, channel_name, if(credit_card_number is not null and credit_card_number < '',unifedEncryption(credit_card_number),credit_card_number) as credit_card_number, product_type_code, product_type_name, apply_amount, income_time, if(credit_card_phone is not null and credit_card_phone < '',unifedEncryption(credit_card_phone),credit_card_phone) as credit_card_phone, credit_card_amount, `period`, start_work_time, register_channel, register_channel_name, bank_name, if(company_call is not null and company_call < '',unifedEncryption(company_call),company_call) as company_call, system_tag, is_trans, is_dial_confirm, is_dial_type, is_dial_typeM as is_dial_typem, is_dial_typeHuman, user_risk_score, upload_imgs, upload_status, famliy_relationship, work_relationship, request_time, ac_record, profession_station, mobile_os, mobile_type, mobile_brand, networktype, jail_break, open_udid, simulator, idfa, idfv, device_type, credit_card_id_card_no, credit_card_username, sign_issue_org, user_level, channel_request_time, org_channel_code, qq, mail, pre_grant_credit_amount, pre_grant_credit_term, pre_grant_credit_term_unit, monthly_repay_amount, total_repay_amount, xhd_white_list_flag, white_list_flag, white_list_level, white_list_type, bus_type, housing_fund_status, operator_auth_status, credit_card_status, pboc_credit_status, bh_url_flag, zmxy_auth_expiry_time, operator_auth_expiry_time, housing_fund_status_time, credit_card_expiry_time, pboc_credit_status_time, credit_card_bank_name, due_limit_unit, due_limit, loan_amount, risk_lead_flag, period_unit, face_score, notify_url, org_id, contract_id, birthday, zmxy_status, is_root, is_virtualmachine, appnum, wifi_ip, blue_mac, wifi_mac, vpn_ip, cell_ip, true_ip, is_helical_accelerator, bussiness, manual_check, has_contacts, length_of_residence_year, length_of_residence_month, gps_province, gps_city, gps_area, gps_detail_address, positional_titles, work_years, work_months, max_acceptable_monthly_payment, profession_code, occupation, career_status, work_position, work_time, end_result FROM ( SELECT *, ROW_NUMBER () OVER ( PARTITION BY id ORDER BY update_time DESC ) AS rowNum FROM AssetInfoRiskResultTable ) WHERE rowNum = 1) [hidden email] |
Hello,
你试下 MD5(real_income_no) as rowkey 放在query的里层,最外层的group by直接用 rowkey试下, Flink 1.11 之后支持在 table里声明 PK, 1.11后就不用推导了。 祝好, Leonard Xu > 在 2020年7月1日,13:51,[hidden email] 写道: > > MD5(real_income_no) as rowkey, |
In reply to this post by tiantingting5435@163.com
Hello,
你试下 MD5(real_income_no) as rowkey 放在query的里层,最外层的group by直接用 rowkey试下, Flink 1.11 之后支持在 table里声明 PK, 1.11后就不用推导了。 祝好, Leonard Xu > 在 2020年7月1日,13:51,[hidden email] 写道: > > MD5(real_income_no) as rowkey, |
Free forum by Nabble | Edit this page |