FlinkSQL Retraction 问题原理咨询

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkSQL Retraction 问题原理咨询

wanglei2@geekplus.com.cn

自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。

INSERT INTO table_out select  tms_company,  count(distinct order_id) as order_cnt from
    (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id)
 group by tms_company;


总共发送了 4 条消息,顺序如下:

1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1

2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1  (上一条记录被删除了)

3  {"order_id":2,"tms_company":"yuantong"}     数据库2条记录: yuantong 1, yuantong 2  (增加了条记录,没有删除)

4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1    (增加了两条记录,没有删除)


问题一:
    第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?

问题二:
   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?

谢谢,
王磊



[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re:FlinkSQL Retraction 问题原理咨询

Michael Ran



指定的更新键是tms_company?


结果是:
yuantong:2
zhongtong:2












在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]> 写道:

>
>自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。
>
>INSERT INTO table_out select  tms_company,  count(distinct order_id) as order_cnt from
>    (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id)
> group by tms_company;
>
>
>总共发送了 4 条消息,顺序如下:
>
>1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
>
>2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1  (上一条记录被删除了)
>
>3  {"order_id":2,"tms_company":"yuantong"}     数据库2条记录: yuantong 1, yuantong 2  (增加了条记录,没有删除)
>
>4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1    (增加了两条记录,没有删除)
>
>
>问题一:
>    第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
>
>问题二:
>   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
>   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
>
>谢谢,
>王磊
>
>
>
>[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: Re:FlinkSQL Retraction 问题原理咨询

wanglei2@geekplus.com.cn

更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 tms_company 是有变化的。
我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。




[hidden email]

发件人: Michael Ran
发送时间: 2020-04-30 17:23
收件人: user-zh
主题: Re:FlinkSQL Retraction 问题原理咨询
 
 
 
指定的更新键是tms_company?
 
 
结果是:
yuantong:2
zhongtong:2
 
 
 
 
 
 
 
 
 
 
 
 
在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]> 写道:

>
>自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。
>
>INSERT INTO table_out select  tms_company,  count(distinct order_id) as order_cnt from
>    (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id)
> group by tms_company;
>
>
>总共发送了 4 条消息,顺序如下:
>
>1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
>
>2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1  (上一条记录被删除了)
>
>3  {"order_id":2,"tms_company":"yuantong"}     数据库2条记录: yuantong 1, yuantong 2  (增加了条记录,没有删除)
>
>4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1    (增加了两条记录,没有删除)
>
>
>问题一:
>    第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
>
>问题二:
>   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
>   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
>
>谢谢,
>王磊
>
>
>
>[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re:FlinkSQL Retraction 问题原理咨询

Jingsong Li
Hi,

问题一:删除数据可不单单只是retract stream的功能。upsert
stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
stream也有retract的input数据的。JDBC实现的是upsert stream的消费。

问题二:正确数据应该是:
1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1  ( 删除
zhongtong 1)
3  {"order_id":2,"tms_company":"yuantong"}     数据库1条记录: yuantong 2  (
删除yuantong 1)
4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
zhongtong 1    ( 删除yuantong 2)

你用了什么dialect?是不是mysql?
Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?

Best,
Jingsong Lee

On Wed, May 6, 2020 at 10:36 AM [hidden email] <
[hidden email]> wrote:

>
> 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> tms_company 是有变化的。
> 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
> [hidden email]
>
> 发件人: Michael Ran
> 发送时间: 2020-04-30 17:23
> 收件人: user-zh
> 主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
> 指定的更新键是tms_company?
>
>
> 结果是:
> yuantong:2
> zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]>
> 写道:
> >
> >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> RDS, RDS 表没有主键,也没有唯一键。
> >
> >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> order_cnt from
> >    (select order_id, LAST_VALUE(tms_company) AS tms_company from
> dwd_table group by order_id)
> > group by tms_company;
> >
> >
> >总共发送了 4 条消息,顺序如下:
> >
> >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> >
> >2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1
> (上一条记录被删除了)
> >
> >3  {"order_id":2,"tms_company":"yuantong"}     数据库2条记录: yuantong 1,
> yuantong 2  (增加了条记录,没有删除)
> >
> >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1,
> yuantong 2, yuantong 1, zhongtong 1    (增加了两条记录,没有删除)
> >
> >
> >问题一:
> >    第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> >
> >问题二:
> >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> >
> >谢谢,
> >王磊
> >
> >
> >
> >[hidden email]
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:回复: Re:FlinkSQL Retraction 问题原理咨询

Michael Ran
In reply to this post by wanglei2@geekplus.com.cn



1.flink 状态或者内存维护了所有结果。

2.当group by  count 结果值(tms_company=1),新来一条记录变成(tms_company=2)

   tms_company=1 (旧,false)
tms_company=2 (新,true)


3. 内存里面就会把旧的舍弃掉,用新的参与后续计算


4.如果存储(mysql 之类的),会生成对应的SQL 进行更新掉







在 2020-05-06 10:36:35,"[hidden email]" <[hidden email]> 写道:

>
>更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 tms_company 是有变化的。
>我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
>[hidden email]
>
>发件人: Michael Ran
>发送时间: 2020-04-30 17:23
>收件人: user-zh
>主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
>指定的更新键是tms_company?
>
>
>结果是:
>yuantong:2
>zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]> 写道:
>>
>>自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。
>>
>>INSERT INTO table_out select  tms_company,  count(distinct order_id) as order_cnt from
>>    (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id)
>> group by tms_company;
>>
>>
>>总共发送了 4 条消息,顺序如下:
>>
>>1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
>>
>>2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1  (上一条记录被删除了)
>>
>>3  {"order_id":2,"tms_company":"yuantong"}     数据库2条记录: yuantong 1, yuantong 2  (增加了条记录,没有删除)
>>
>>4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1    (增加了两条记录,没有删除)
>>
>>
>>问题一:
>>    第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
>>
>>问题二:
>>   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
>>   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
>>
>>谢谢,
>>王磊
>>
>>
>>
>>[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re:FlinkSQL Retraction 问题原理咨询

wanglei2@geekplus.com.cn
In reply to this post by Jingsong Li

Thanks  Jingsong Lee.

我用的是 MySQL,sink 表中没有任何主键或唯一键.
如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。

我把 flink sql-client 客户端设置 SET execution.result-mode=changelog 试验了下,左边标上了是第几条 kafka 消息导致的行为:

 +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1

第1条消息:执行一个 INSERT
第2条消息:执行了 一个 DELETE, 一个  INSERT
第3条消息:执行了一个  INSERT ON DUPLICATE UPDATE
第4条消息:执行了两个  INSERT ON DUPLICATE UPDATE


我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是    INSERT ON DUPLICATE UPDATE

不知道我这样理解是否正确。

谢谢,
王磊




[hidden email]

Sender: Jingsong Li
Send Time: 2020-05-06 11:35
Receiver: user-zh
Subject: Re: Re:FlinkSQL Retraction 问题原理咨询
Hi,
 
问题一:删除数据可不单单只是retract stream的功能。upsert
stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
 
问题二:正确数据应该是:
1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1  ( 删除
zhongtong 1)
3  {"order_id":2,"tms_company":"yuantong"}     数据库1条记录: yuantong 2  (
删除yuantong 1)
4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
zhongtong 1    ( 删除yuantong 2)
 
你用了什么dialect?是不是mysql?
Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
 
Best,
Jingsong Lee
 
On Wed, May 6, 2020 at 10:36 AM [hidden email] <
[hidden email]> wrote:
 

>
> 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> tms_company 是有变化的。
> 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
> [hidden email]
>
> 发件人: Michael Ran
> 发送时间: 2020-04-30 17:23
> 收件人: user-zh
> 主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
> 指定的更新键是tms_company?
>
>
> 结果是:
> yuantong:2
> zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]>
> 写道:
> >
> >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> RDS, RDS 表没有主键,也没有唯一键。
> >
> >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> order_cnt from
> >    (select order_id, LAST_VALUE(tms_company) AS tms_company from
> dwd_table group by order_id)
> > group by tms_company;
> >
> >
> >总共发送了 4 条消息,顺序如下:
> >
> >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> >
> >2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1
> (上一条记录被删除了)
> >
> >3  {"order_id":2,"tms_company":"yuantong"}     数据库2条记录: yuantong 1,
> yuantong 2  (增加了条记录,没有删除)
> >
> >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1,
> yuantong 2, yuantong 1, zhongtong 1    (增加了两条记录,没有删除)
> >
> >
> >问题一:
> >    第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> >
> >问题二:
> >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> >
> >谢谢,
> >王磊
> >
> >
> >
> >[hidden email]
>
 
 
--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re:FlinkSQL Retraction 问题原理咨询

Jingsong Li
Hi,

> sink 表中没有任何主键或唯一键

这个时候更合理的方式应该是抛出异常,不过实现上可能有些不好搞

> 回撤导致的结果变成 0 ,就会执行 delete , 否则就是update

你理解的完全正确

Best
Jingsong Lee

On Wed, May 6, 2020 at 12:39 PM [hidden email] <
[hidden email]> wrote:

>
> Thanks  Jingsong Lee.
>
> 我用的是 MySQL,sink 表中没有任何主键或唯一键.
> 如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。
>
> 我把 flink sql-client 客户端设置 SET execution.result-mode=changelog
> 试验了下,左边标上了是第几条 kafka 消息导致的行为:
>
>  +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1
> 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1
>
> 第1条消息:执行一个 INSERT
> 第2条消息:执行了 一个 DELETE, 一个  INSERT
> 第3条消息:执行了一个  INSERT ON DUPLICATE UPDATE
> 第4条消息:执行了两个  INSERT ON DUPLICATE UPDATE
>
>
> 我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是    INSERT ON DUPLICATE UPDATE
>
> 不知道我这样理解是否正确。
>
> 谢谢,
> 王磊
>
>
>
>
> [hidden email]
>
> Sender: Jingsong Li
> Send Time: 2020-05-06 11:35
> Receiver: user-zh
> Subject: Re: Re:FlinkSQL Retraction 问题原理咨询
> Hi,
>
> 问题一:删除数据可不单单只是retract stream的功能。upsert
> stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
> stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
>
> 问题二:正确数据应该是:
> 1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> 2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1  ( 删除
> zhongtong 1)
> 3  {"order_id":2,"tms_company":"yuantong"}     数据库1条记录: yuantong 2  (
> 删除yuantong 1)
> 4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
> zhongtong 1    ( 删除yuantong 2)
>
> 你用了什么dialect?是不是mysql?
> Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
> 看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
>
> Best,
> Jingsong Lee
>
> On Wed, May 6, 2020 at 10:36 AM [hidden email] <
> [hidden email]> wrote:
>
> >
> > 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> > tms_company 是有变化的。
> > 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
> >
> >
> >
> >
> > [hidden email]
> >
> > 发件人: Michael Ran
> > 发送时间: 2020-04-30 17:23
> > 收件人: user-zh
> > 主题: Re:FlinkSQL Retraction 问题原理咨询
> >
> >
> >
> > 指定的更新键是tms_company?
> >
> >
> > 结果是:
> > yuantong:2
> > zhongtong:2
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-04-30 17:08:22,"[hidden email]" <
> [hidden email]>
> > 写道:
> > >
> > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> > RDS, RDS 表没有主键,也没有唯一键。
> > >
> > >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> > order_cnt from
> > >    (select order_id, LAST_VALUE(tms_company) AS tms_company from
> > dwd_table group by order_id)
> > > group by tms_company;
> > >
> > >
> > >总共发送了 4 条消息,顺序如下:
> > >
> > >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> > >
> > >2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1
> > (上一条记录被删除了)
> > >
> > >3  {"order_id":2,"tms_company":"yuantong"}     数据库2条记录: yuantong 1,
> > yuantong 2  (增加了条记录,没有删除)
> > >
> > >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1,
> > yuantong 2, yuantong 1, zhongtong 1    (增加了两条记录,没有删除)
> > >
> > >
> > >问题一:
> > >    第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> >
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> > >
> > >问题二:
> > >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> > >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> > >
> > >谢谢,
> > >王磊
> > >
> > >
> > >
> > >[hidden email]
> >
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee