flink1.10 用flinksql 写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.10 用flinksql 写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

tiantingting5435@163.com
你好,
flink1.10,用flinkSQL写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
看到网上的资料说是,upsertSink的primary key是通过query来推断的,而我的query无法推断出PK,所以报错。说是需要1.10的临时解决方法是加一层group by,使得query可以推断出 primary key。
但是,我添加group by以后还是报错,这个问题该怎么解决呢??到底query是如何推断PK的??

以下是我的sql语句:
创建表的语句(字段有点多,还请见谅)
CREATE TABLE `AssetInfoRiskResultSinkTable` (
  rowkey string,
  d ROW(`id` bigint,
  `user_id` string,
  `income_no` string ,
  `nation` string,
  `card_auth_expiry_time` string ,
  `user_name` string ,
  `login_phone` string ,
  `card_no` string ,
  `address` string ,
  `highest_eduction` string ,
  `is_married` string ,
  `resident_address` string ,
  `resident_province` string ,
  `resident_city` string ,
  `resident_town` string ,
  `profession` string ,
  `job_salary` string ,
  `company_name` string ,
  `company_province` string ,
  `company_city` string ,
  `company_town` string ,
  `company_address` string ,
  `family_name` string ,
  `family_phone` string ,
  `workmate_name` string ,
  `workmate_phone` string ,
  `bank_card_no` string ,
  `bank_phone` string ,
  `device_address` string ,
  `device_ip` string ,
  `contacts` string ,
  `create_user` string ,
  `create_time` string ,
  `update_user` string ,
  `update_time` string,
  `scene_id` string  ,
  `access_type` string  ,
  `status` string  ,
  `label_id` string  ,
  `label_name` string  ,
  `ocr_real_name` string ,
  `ocr_id_card` string ,
  `ocr_id_card_address` string,
  `longitude` string ,
  `latitude` string ,
  `imei` string ,
  `imsi` string ,
  `mac` string ,
  `resident_province2` string ,
  `loan_use` string ,
  `channel_code` string  ,
  `channel_name` string  ,
  `credit_card_number` string  ,
  `product_type_code` string  ,
  `product_type_name` string  ,
  `apply_amount` string  ,
  `income_time` string  ,
  `credit_card_phone` string  ,
  `credit_card_amount` string  ,
  `period` string  ,
  `start_work_time` string  ,
  `register_channel` string  ,
  `register_channel_name` string  ,
  `bank_name` string  ,
  `company_call` string  ,
  `system_tag` string  ,
  `is_trans` string  ,
  `is_dial_confirm` string  ,
  `is_dial_type` string  ,
  `is_dial_typeM` string  ,
  `is_dial_typeHuman` string  ,
  `user_risk_score` string  ,
  `upload_imgs` string ,
  `upload_status` string ,
  `famliy_relationship` string  ,
  `work_relationship` string  ,
  `request_time` string  ,
  `ac_record` string ,
  `profession_station` string  ,
  `mobile_os` string  ,
  `mobile_type` string  ,
  `mobile_brand` string  ,
  `networktype` string  ,
  `jail_break` string  ,
  `open_udid` string  ,
  `simulator` string  ,
  `idfa` string  ,
  `idfv` string  ,
  `device_type` string  ,
  `credit_card_id_card_no` string  ,
  `credit_card_username` string  ,
  `sign_issue_org` string  ,
  `user_level` string  ,
  `channel_request_time` string  ,
  `real_income_no` string ,
  `org_channel_code` string  ,
  `qq` string  ,
  `mail` string  ,
  `pre_grant_credit_amount` string  ,
  `pre_grant_credit_term` string  ,
  `pre_grant_credit_term_unit` string  ,
  `monthly_repay_amount` string  ,
  `total_repay_amount` string  ,
  `xhd_white_list_flag` string  ,
  `white_list_flag` string  ,
  `white_list_level` string ,
  `white_list_type` string ,
  `bus_type` string  ,
  `housing_fund_status` string  ,
  `operator_auth_status` string ,
  `credit_card_status` string  ,
  `pboc_credit_status` string ,
  `bh_url_flag` string,
  `zmxy_auth_expiry_time` string  ,
  `operator_auth_expiry_time` string  ,
  `housing_fund_status_time` string  ,
  `credit_card_expiry_time` string  ,
  `pboc_credit_status_time` string  ,
  `credit_card_bank_name` string  ,
  `due_limit_unit` string  ,
  `due_limit` string  ,
  `loan_amount` string  ,
  `risk_lead_flag` string ,
  `period_unit` string  ,
  `face_score` string  ,
  `notify_url` string  ,
  `org_id` string  ,
  `contract_id` string  ,
  `birthday` string  ,
  `zmxy_status` string  ,
  `is_root` string  ,
  `is_virtualmachine` string  ,
  `appnum` string  ,
  `wifi_ip` string ,
  `blue_mac` string  ,
  `wifi_mac` string ,
  `vpn_ip` string  ,
  `cell_ip` string ,
  `true_ip` string  ,
  `is_helical_accelerator` string  ,
  `bussiness` string  ,
  `manual_check` string  ,
  `has_contacts` string  ,
  `length_of_residence_year` string  ,
  `length_of_residence_month` string  ,
  `gps_province` string  ,
  `gps_city` string  ,
  `gps_area` string  ,
  `gps_detail_address` string  ,
  `positional_titles` string  ,
  `work_years` string  ,
  `work_months` string  ,
  `max_acceptable_monthly_payment` string  ,
  `profession_code` string  ,
  `occupation` string  ,
  `career_status` string  ,
  `work_position` string  ,
  `work_time` string  ,
  `end_result` string  )
  ) with (
   'connector.type' = 'hbase',
   'connector.version' = '1.4.3',
   'connector.table-name' = 'rtest:borrower_related_asset_info_real_time',
   'connector.zookeeper.quorum' = 'fdw6.fengjr.inc,fdw4.fengjr.inc,fdw5.fengjr.inc,fjr-yz-204-11,fjr-yz-204-13',
   'connector.zookeeper.znode.parent' = 'hbase_test',
   'connector.write.buffer-flush.max-size' = '10mb',
   'connector.write.buffer-flush.max-rows' = '1000',
   'connector.write.buffer-flush.interval' = '2s'
  )

插入语句:
INSERT INTO AssetInfoRiskResultSinkTable
select
MD5(real_income_no) as rowkey,
Row(id,
        user_id,
        income_no,
        nation,
        card_auth_expiry_time,
        user_name,
        phone,
        id_card,
        address,
        highest_eduction,
        is_married,
        resident_address,
        resident_province,
        resident_city,
        resident_town,
        profession,
        job_salary,
        company_name,
        company_province,
        company_city,
        company_town,
        company_address,
        family_name,
        family_phone,
        workmate_name,
        workmate_phone,
        bank_card_no,
        bank_phone,
        device_address,
        device_ip,
        contacts,
        create_user,
        create_time,
        update_user,
        update_time,
        scene_id,
        access_type,
        status,
        label_id,
        label_name,
        ocr_real_name,
        ocr_id_card,
        ocr_id_card_address,
        longitude,
        latitude,
        imei,
        imsi,
        mac,
        resident_province2,
        loan_use,
        channel_code,
        channel_name,
        credit_card_number,
        product_type_code,
        product_type_name,
        apply_amount,
        income_time,
        credit_card_phone,
        credit_card_amount,
        `period`,
        start_work_time,
        register_channel,
        register_channel_name,
        bank_name,
        company_call,
        system_tag,
        is_trans,
        is_dial_confirm,
        is_dial_type,
        is_dial_typem,
        is_dial_typeHuman,
        user_risk_score,
        upload_imgs,
        upload_status,
        famliy_relationship,
        work_relationship,
        request_time,
        ac_record,
        profession_station,
        mobile_os,
        mobile_type,
        mobile_brand,
        networktype,
        jail_break,
        open_udid,
        simulator,
        idfa,
        idfv,
        device_type,
        credit_card_id_card_no,
        credit_card_username,
        sign_issue_org,
        user_level,
        channel_request_time,
        real_income_no,
        org_channel_code,
        qq,
        mail,
        pre_grant_credit_amount,
        pre_grant_credit_term,
        pre_grant_credit_term_unit,
        monthly_repay_amount,
        total_repay_amount,
        xhd_white_list_flag,
        white_list_flag,
        white_list_level,
        white_list_type,
        bus_type,
        housing_fund_status,
        operator_auth_status,
        credit_card_status,
        pboc_credit_status,
        bh_url_flag,
        zmxy_auth_expiry_time,
        operator_auth_expiry_time,
        housing_fund_status_time,
        credit_card_expiry_time,
        pboc_credit_status_time,
        credit_card_bank_name,
        due_limit_unit,
        due_limit,
        loan_amount,
        risk_lead_flag,
        period_unit,
        face_score,
        notify_url,
        org_id,
        contract_id,
        birthday,
        zmxy_status,
        is_root,
        is_virtualmachine,
        appnum,
        wifi_ip,
        blue_mac,
        wifi_mac,
        vpn_ip,
        cell_ip,
        true_ip,
        is_helical_accelerator,
        bussiness,
        manual_check,
        has_contacts,
        length_of_residence_year,
        length_of_residence_month,
        gps_province,
        gps_city,
        gps_area,
        gps_detail_address,
        positional_titles,
        work_years,
        work_months,
        max_acceptable_monthly_payment,
        profession_code,
        occupation,
        career_status,
        work_position,
        work_time,
        end_result) as d
from (
SELECT
        real_income_no,
        id,
        user_id,
        income_no,
        nation,
        card_auth_expiry_time,
        if (user_name is not null and user_name < '',unifedEncryption(user_name),user_name) as user_name,
        if(login_phone is not null and login_phone < '',unifedEncryption(login_phone),login_phone) as phone,
        if(card_no is not null and card_no < '',unifedEncryption(card_no),card_no) as id_card,
        address,
        highest_eduction,
        is_married,
        resident_address,
        resident_province,
        resident_city,
        resident_town,
        profession,
        job_salary,
        company_name,
        company_province,
        company_city,
        company_town,
        company_address,
        if(family_name is not null and family_name < '',unifedEncryption(family_name),family_name) as family_name,
        if(family_phone is not null and family_phone < '',unifedEncryption(family_phone),family_phone) as family_phone,
        if(workmate_name is not null and workmate_name < '',unifedEncryption(workmate_name),workmate_name) as workmate_name,
        if(workmate_phone is not null and workmate_phone < '',unifedEncryption(workmate_phone),workmate_phone) as workmate_phone,
        if(bank_card_no is not null and bank_card_no < '',unifedEncryption(bank_card_no),bank_card_no) as bank_card_no,
        if(bank_phone is not null and bank_phone < '',unifedEncryption(bank_phone),bank_phone) as bank_phone,
        device_address,
        device_ip,
        contacts,
        create_user,
        create_time,
        update_user,
        update_time,
        scene_id,
        access_type,
        status,
        label_id,
        label_name,
        if(ocr_real_name is not null and ocr_real_name < '',unifedEncryption(ocr_real_name),ocr_real_name) as ocr_real_name,
        if(ocr_id_card is not null and ocr_id_card < '',unifedEncryption(ocr_id_card),ocr_id_card) as ocr_id_card,
        ocr_id_card_address,
        longitude,
        latitude,
        imei,
        imsi,
        mac,
        resident_province2,
        loan_use,
        channel_code,
        channel_name,
        if(credit_card_number is not null and credit_card_number < '',unifedEncryption(credit_card_number),credit_card_number) as credit_card_number,
        product_type_code,
        product_type_name,
        apply_amount,
        income_time,
        if(credit_card_phone is not null and credit_card_phone < '',unifedEncryption(credit_card_phone),credit_card_phone) as credit_card_phone,
        credit_card_amount,
        `period`,
        start_work_time,
        register_channel,
        register_channel_name,
        bank_name,
        if(company_call is not null and company_call < '',unifedEncryption(company_call),company_call) as company_call,
        system_tag,
        is_trans,
        is_dial_confirm,
        is_dial_type,
        is_dial_typeM as is_dial_typem,
        is_dial_typeHuman,
        user_risk_score,
        upload_imgs,
        upload_status,
        famliy_relationship,
        work_relationship,
        request_time,
        ac_record,
        profession_station,
        mobile_os,
        mobile_type,
        mobile_brand,
        networktype,
        jail_break,
        open_udid,
        simulator,
        idfa,
        idfv,
        device_type,
        credit_card_id_card_no,
        credit_card_username,
        sign_issue_org,
        user_level,
        channel_request_time,
        org_channel_code,
        qq,
        mail,
        pre_grant_credit_amount,
        pre_grant_credit_term,
        pre_grant_credit_term_unit,
        monthly_repay_amount,
        total_repay_amount,
        xhd_white_list_flag,
        white_list_flag,
        white_list_level,
        white_list_type,
        bus_type,
        housing_fund_status,
        operator_auth_status,
        credit_card_status,
        pboc_credit_status,
        bh_url_flag,
        zmxy_auth_expiry_time,
        operator_auth_expiry_time,
        housing_fund_status_time,
        credit_card_expiry_time,
        pboc_credit_status_time,
        credit_card_bank_name,
        due_limit_unit,
        due_limit,
        loan_amount,
        risk_lead_flag,
        period_unit,
        face_score,
        notify_url,
        org_id,
        contract_id,
        birthday,
        zmxy_status,
        is_root,
        is_virtualmachine,
        appnum,
        wifi_ip,
        blue_mac,
        wifi_mac,
        vpn_ip,
        cell_ip,
        true_ip,
        is_helical_accelerator,
        bussiness,
        manual_check,
        has_contacts,
        length_of_residence_year,
        length_of_residence_month,
        gps_province,
        gps_city,
        gps_area,
        gps_detail_address,
        positional_titles,
        work_years,
        work_months,
        max_acceptable_monthly_payment,
        profession_code,
        occupation,
        career_status,
        work_position,
        work_time,
        end_result
FROM
        (
                SELECT
                        *, ROW_NUMBER () OVER (
                                PARTITION BY id
                                ORDER BY
                                        update_time DESC
                        ) AS rowNum
                FROM
                        AssetInfoRiskResultTable
        )
WHERE
        rowNum = 1)


[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10 用flinksql 写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

Leonard Xu
Hello,

你试下 MD5(real_income_no) as rowkey 放在query的里层,最外层的group by直接用 rowkey试下, Flink 1.11 之后支持在 table里声明 PK, 1.11后就不用推导了。

祝好,
Leonard Xu


> 在 2020年7月1日,13:51,[hidden email] 写道:
>
> MD5(real_income_no) as rowkey,

Reply | Threaded
Open this post in threaded view
|

回复: Re: flink1.10 用flinksql 写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

tiantingting5435@163.com
In reply to this post by tiantingting5435@163.com
Hello,

你试下 MD5(real_income_no) as rowkey 放在query的里层,最外层的group by直接用 rowkey试下, Flink 1.11 之后支持在 table里声明 PK, 1.11后就不用推导了。

祝好,
Leonard Xu


> 在 2020年7月1日,13:51,[hidden email] 写道:
>
> MD5(real_income_no) as rowkey,