目前我们的业务数据分析逻辑大部分基于flink sql编写的,
下面是一个例子
任务逻辑如下
CREATE TABLE kafka_source (
`policy_id` BIGINT ,
`plan_code` int ,
`gmt_created` TIMESTAMP(3)
) WITH (
'format.type'='json',
'connector.type'='kafka',
'connector.version'='0.11',
'connector.topic'='data-flink-test-blcs-201912301523',
'connector.properties.bootstrap.servers'='kafka.test.za.net:9092',
'connector.properties.group.id'='local'
);
create view kafka_source_last_value as
select
policy_id,
LAST_VALUE(plan_code) as plan_code,
LAST_VALUE(gmt_created) as gmt_created
from kafka_source
group by `policy_id`
;
create table print_sink (
`date_str` varchar,
`policy_count` bigint,
`plan_sum` bigint,
primary key (date_str)
) with (
'connector.type'='print'
);
insert into print_sink
select
date_format(gmt_created, 'yyyy-MM-dd') as date_str,
count(policy_id) as policy_count,
sum(plan_code) as plan_sum
from kafka_source_last_value
group by date_format(gmt_created, 'yyyy-MM-dd');
输入数据顺序为
{“policy_id”:1, “plan_code”:1000, “gmt_created”:”2020-09-15 12:00:00”}
{“policy_id”:2, “plan_code”:1500, “gmt_created”:”2020-09-15 12:00:03”}
{“policy_id”:1, “plan_code”:1300, “gmt_created”:”2020-09-15 12:00:05”}
期望的输出结果是
(true,2020-09-15,1,1000)
(true,2020-09-15,2,2500)
(true,2020-09-15,2,2800)
实际的输出结果是
(true,2020-09-15,1,1000)
(true,2020-09-15,2,2500)
(true,2020-09-15,1,1500)
(true,2020-09-15,2,2800)
问题点:
在发送第三条数据的时候,因为要取LastValue, flink的数据发出了一条回撤数据,然后再新发一条新的数据更新统计结果
往存储引擎写出数据会存在如下问题:
1.直接写出数据到下游存储引擎,对接存储引擎的应用会查询到中间回撤的数据,业务上不够友好。
2.参考官方的flink-jdbc的实现,SinkFunction上实现CheckpointedFunction接口并实现snapshotState方法,基于checkpoint的触发一批写出数据可以解决前面的问题1,但是checkpoint的触发频率不能开的很快,这将导致了SinkFunction写出数据不够实时,业务上也不够友好。
请问有什么好的解决办法吗?