统计数据含有中间回撤数据的问题

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

统计数据含有中间回撤数据的问题

xushanshan
This post was updated on .
目前我们的业务数据分析逻辑大部分基于flink sql编写的, 下面是一个例子 任务逻辑如下
CREATE TABLE kafka_source (
  `policy_id`   BIGINT ,
  `plan_code`   int ,
  `gmt_created` TIMESTAMP(3)
) WITH (
  'format.type'='json',
  'connector.type'='kafka',
  'connector.version'='0.11',
  'connector.topic'='data-flink-test-blcs-201912301523',
  'connector.properties.bootstrap.servers'='kafka.test.za.net:9092',
  'connector.properties.group.id'='local'
);

create view kafka_source_last_value as
select
    policy_id,
    LAST_VALUE(plan_code)   as plan_code,
    LAST_VALUE(gmt_created) as gmt_created
from kafka_source
group by `policy_id`
;

create table print_sink (
  `date_str`         varchar,
  `policy_count`     bigint,
  `plan_sum`         bigint,
  primary key (date_str)
) with (
  'connector.type'='print'
);

insert into print_sink
select
date_format(gmt_created, 'yyyy-MM-dd') as date_str,
count(policy_id) as policy_count,
sum(plan_code) as plan_sum
from kafka_source_last_value
group by date_format(gmt_created, 'yyyy-MM-dd');
输入数据顺序为
{“policy_id”:1, “plan_code”:1000, “gmt_created”:”2020-09-15 12:00:00”}
{“policy_id”:2, “plan_code”:1500, “gmt_created”:”2020-09-15 12:00:03”}
{“policy_id”:1, “plan_code”:1300, “gmt_created”:”2020-09-15 12:00:05”}
期望的输出结果是
(true,2020-09-15,1,1000)
(true,2020-09-15,2,2500)
(true,2020-09-15,2,2800)
实际的输出结果是
(true,2020-09-15,1,1000)
(true,2020-09-15,2,2500)
(true,2020-09-15,1,1500)
(true,2020-09-15,2,2800)

问题点:

在发送第三条数据的时候,因为要取LastValue, flink的数据发出了一条回撤数据,然后再新发一条新的数据更新统计结果

往存储引擎写出数据会存在如下问题:

1.直接写出数据到下游存储引擎,对接存储引擎的应用会查询到中间回撤的数据,业务上不够友好。

2.参考官方的flink-jdbc的实现,SinkFunction上实现CheckpointedFunction接口并实现snapshotState方法,基于checkpoint的触发一批写出数据可以解决前面的问题1,但是checkpoint的触发频率不能开的很快,这将导致了SinkFunction写出数据不够实时,业务上也不够友好。

请问有什么好的解决办法吗?

Reply | Threaded
Open this post in threaded view
|

Re: 统计数据含有中间回撤数据的问题

Evan
hello 你的图片挂了,可以找个图床工具贴上去,这里附上链接
或者直接文字描述,图片一般是不会显示的



 
发件人: 1337220620
发送时间: 2020-09-16 13:14
收件人: user-zh
主题: 统计数据含有中间回撤数据的问题
目前我们的业务数据分析逻辑大部分基于flink sql编写的,
下面是一个例子
任务逻辑如下
输入数据顺序为
{“policy_id”:1, “plan_code”:1000, “gmt_created”:”2020-09-15 12:00:00”}
{“policy_id”:2, “plan_code”:1500, “gmt_created”:”2020-09-15 12:00:03”}
{“policy_id”:1, “plan_code”:1300, “gmt_created”:”2020-09-15 12:00:05”}
 
期望的输出结果是
 
实际的输出结果是
 
问题点:在发送第三条数据的时候,因为要取LastValue, flink的数据发出了一条回撤数据,然后再新发一条新的数据更新统计结果
 往存储引擎写出数据会存在如下问题:
1.直接写出数据到下游存储引擎,对接存储引擎的应用会查询到中间回撤的数据,业务上不够友好。
2.参考官方的flink-jdbc的实现,SinkFunction上实现CheckpointedFunction接口并实现snapshotState方法,基于checkpoint的触发一批写出数据可以解决前面的问题1,但是checkpoint的触发频率不能开的很快,这将导致了SinkFunction写出数据不够实时,业务上也不够友好。
 请问有什么好的解决办法吗?
Reply | Threaded
Open this post in threaded view
|

Re: 统计数据含有中间回撤数据的问题

xushanshan
问题内容已修改补充完成



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 统计数据含有中间回撤数据的问题

Jark
Administrator
开启 minibatch 可以基本解决中间结果的问题:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation

Best,
Jark

On Fri, 18 Sep 2020 at 11:57, xushanshan <[hidden email]> wrote:

> 问题内容已修改补充完成
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: 统计数据含有中间回撤数据的问题

xushanshan
hi, Jark
 开启 minibatch
是将中间数据按一批次处理,如果中间回撤数据和后续的更新数据分到两个minibatch里了,还是不能避免下游系统查询到中间结果的问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 统计数据含有中间回撤数据的问题

xushanshan
In reply to this post by xushanshan
关于这个问题的后续案例和我司的解决方案在这个链接内,因为nabble的显示排版问题,内容没能发布在讨论区,望大家谅解,也欢迎大家给出这个问题的宝贵建议
http://xushanshan.gitee.io/piclab/flink/checkpoint/transactionBarrier.html
<http://xushanshan.gitee.io/piclab/flink/checkpoint/transactionBarrier.html>  



--
Sent from: http://apache-flink.147419.n8.nabble.com/