从kafka sink 到hbase丢失数据

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

从kafka sink 到hbase丢失数据

夜思流年梦
Dear 开发者:


目前发现从kafka sink到 hbase 会丢数据,相同的sql ,如果用jdbc方式 来sink 则不会丢失数据,具体建表sql  和任务sql 如下
flink 版本 1.12


源表: 使用canal-json 接入
create table rt_ods.ods_za_log_member_base_info(
    MemberId bigint COMMENT '用户ID',
    NickName string COMMENT '用户昵称',
    proctime as PROCTIME()
) WITH (
    'properties.bootstrap.servers' = 'XXXX:9092',
    'connector' = 'kafka',
    'format' = 'canal-json',
    'topic' = 'ods_user_info',
    'properties.group.id' = 'rt_ods-ods_za_log_member_base_info',
    'scan.startup.mode' = 'group-offsets'
)


sink 表: hbase
create table rt_dwd.dwd_za_log_user_info(
    memberid STRING,
    f1 ROW <
    nick_name string >,
    PRIMARY KEY (memberid) NOT ENFORCED
) WITH (
    'sink.buffer-flush.max-size' = '0',
    'connector' = 'hbase-1.4',
    'zookeeper.quorum' = '10.51.3.48:2181',
    'sink.buffer-flush.interval' = '3s',
    'sink.buffer-flush.max-rows' = '100',
    'table-name' = 'dwd_user_info',
    'sink.parallelism' = '5'
)


具体sql :
insert into rt_dwd.dwd_za_log_user_info
select
      cast(t1.MemberId as VARCHAR)
      ,ROW(
      t1.NickName
      )
  from rt_ods.ods_za_log_member_base_info t1
 where t1.MemberId is not null
;


发现sink到 hbase 的时候会丢失数据,如果sink到mysql 则数据写入完整,mysql 建表语句
create table rt_dwd.dwd_za_log_user_info_mysql(
    memberid string COMMENT '主键ID',
    nick_name string COMMENT '昵称',
    primary key(memberid) NOT ENFORCED
) WITH (
    'password' = 'xxxx',
    'connector' = 'jdbc',
    'sink.buffer-flush.interval' = '3s',
    'sink.buffer-flush.max-rows' = '100',
    'table-name' = 'dwd_user_info',
    'url' = 'jdbc:mysql://xxxxx',
    'username' = 'xxx'
)


补充:
数据源是百分表, 将这一百张表的binlog数据 以canal-json的格式打入kafka,然后消费并sink 到hbase
不知道是否是hbase sink的参数调整有问题,我把  
 'sink.buffer-flush.interval' = '3s',
    'sink.buffer-flush.max-rows' = '100', 都设置成0 也无用