|
Dear 开发者:
目前发现从kafka sink到 hbase 会丢数据,相同的sql ,如果用jdbc方式 来sink 则不会丢失数据,具体建表sql 和任务sql 如下
flink 版本 1.12
源表: 使用canal-json 接入
create table rt_ods.ods_za_log_member_base_info(
MemberId bigint COMMENT '用户ID',
NickName string COMMENT '用户昵称',
proctime as PROCTIME()
) WITH (
'properties.bootstrap.servers' = 'XXXX:9092',
'connector' = 'kafka',
'format' = 'canal-json',
'topic' = 'ods_user_info',
'properties.group.id' = 'rt_ods-ods_za_log_member_base_info',
'scan.startup.mode' = 'group-offsets'
)
sink 表: hbase
create table rt_dwd.dwd_za_log_user_info(
memberid STRING,
f1 ROW <
nick_name string >,
PRIMARY KEY (memberid) NOT ENFORCED
) WITH (
'sink.buffer-flush.max-size' = '0',
'connector' = 'hbase-1.4',
'zookeeper.quorum' = '10.51.3.48:2181',
'sink.buffer-flush.interval' = '3s',
'sink.buffer-flush.max-rows' = '100',
'table-name' = 'dwd_user_info',
'sink.parallelism' = '5'
)
具体sql :
insert into rt_dwd.dwd_za_log_user_info
select
cast(t1.MemberId as VARCHAR)
,ROW(
t1.NickName
)
from rt_ods.ods_za_log_member_base_info t1
where t1.MemberId is not null
;
发现sink到 hbase 的时候会丢失数据,如果sink到mysql 则数据写入完整,mysql 建表语句
create table rt_dwd.dwd_za_log_user_info_mysql(
memberid string COMMENT '主键ID',
nick_name string COMMENT '昵称',
primary key(memberid) NOT ENFORCED
) WITH (
'password' = 'xxxx',
'connector' = 'jdbc',
'sink.buffer-flush.interval' = '3s',
'sink.buffer-flush.max-rows' = '100',
'table-name' = 'dwd_user_info',
'url' = 'jdbc:mysql://xxxxx',
'username' = 'xxx'
)
补充:
数据源是百分表, 将这一百张表的binlog数据 以canal-json的格式打入kafka,然后消费并sink 到hbase
不知道是否是hbase sink的参数调整有问题,我把
'sink.buffer-flush.interval' = '3s',
'sink.buffer-flush.max-rows' = '100', 都设置成0 也无用
|