自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。 INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id) group by tms_company; 总共发送了 4 条消息,顺序如下: 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了) 3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除) 4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除) 问题一: 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? 问题二: 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? 谢谢, 王磊 [hidden email] |
指定的更新键是tms_company? 结果是: yuantong:2 zhongtong:2 在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]> 写道: > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。 > >INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from > (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id) > group by tms_company; > > >总共发送了 4 条消息,顺序如下: > >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 > >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了) > >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除) > >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除) > > >问题一: > 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? > >问题二: > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? > >谢谢, >王磊 > > > >[hidden email] |
更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 tms_company 是有变化的。 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。 [hidden email] 发件人: Michael Ran 发送时间: 2020-04-30 17:23 收件人: user-zh 主题: Re:FlinkSQL Retraction 问题原理咨询 指定的更新键是tms_company? 结果是: yuantong:2 zhongtong:2 在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]> 写道: > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。 > >INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from > (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id) > group by tms_company; > > >总共发送了 4 条消息,顺序如下: > >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 > >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了) > >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除) > >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除) > > >问题一: > 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? > >问题二: > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? > >谢谢, >王磊 > > > >[hidden email] |
Hi,
问题一:删除数据可不单单只是retract stream的功能。upsert stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert stream也有retract的input数据的。JDBC实现的是upsert stream的消费。 问题二:正确数据应该是: 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 ( 删除 zhongtong 1) 3 {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2 ( 删除yuantong 1) 4 {"order_id":2,"tms_company":"zhongtong"} 数据库2条记录: yuantong 1, zhongtong 1 ( 删除yuantong 2) 你用了什么dialect?是不是mysql? Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。 看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建? Best, Jingsong Lee On Wed, May 6, 2020 at 10:36 AM [hidden email] < [hidden email]> wrote: > > 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 > tms_company 是有变化的。 > 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。 > > > > > [hidden email] > > 发件人: Michael Ran > 发送时间: 2020-04-30 17:23 > 收件人: user-zh > 主题: Re:FlinkSQL Retraction 问题原理咨询 > > > > 指定的更新键是tms_company? > > > 结果是: > yuantong:2 > zhongtong:2 > > > > > > > > > > > > > 在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]> > 写道: > > > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 > RDS, RDS 表没有主键,也没有唯一键。 > > > >INSERT INTO table_out select tms_company, count(distinct order_id) as > order_cnt from > > (select order_id, LAST_VALUE(tms_company) AS tms_company from > dwd_table group by order_id) > > group by tms_company; > > > > > >总共发送了 4 条消息,顺序如下: > > > >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 > > > >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 > (上一条记录被删除了) > > > >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, > yuantong 2 (增加了条记录,没有删除) > > > >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, > yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除) > > > > > >问题一: > > 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? > > > >问题二: > > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? > > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? > > > >谢谢, > >王磊 > > > > > > > >[hidden email] > -- Best, Jingsong Lee |
In reply to this post by wanglei2@geekplus.com.cn
1.flink 状态或者内存维护了所有结果。 2.当group by count 结果值(tms_company=1),新来一条记录变成(tms_company=2) tms_company=1 (旧,false) tms_company=2 (新,true) 3. 内存里面就会把旧的舍弃掉,用新的参与后续计算 4.如果存储(mysql 之类的),会生成对应的SQL 进行更新掉 在 2020-05-06 10:36:35,"[hidden email]" <[hidden email]> 写道: > >更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 tms_company 是有变化的。 >我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。 > > > > >[hidden email] > >发件人: Michael Ran >发送时间: 2020-04-30 17:23 >收件人: user-zh >主题: Re:FlinkSQL Retraction 问题原理咨询 > > > >指定的更新键是tms_company? > > >结果是: >yuantong:2 >zhongtong:2 > > > > > > > > > > > > >在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]> 写道: >> >>自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。 >> >>INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from >> (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id) >> group by tms_company; >> >> >>总共发送了 4 条消息,顺序如下: >> >>1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 >> >>2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了) >> >>3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除) >> >>4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除) >> >> >>问题一: >> 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? >> >>问题二: >> 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? >> 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? >> >>谢谢, >>王磊 >> >> >> >>[hidden email] |
In reply to this post by Jingsong Li
Thanks Jingsong Lee. 我用的是 MySQL,sink 表中没有任何主键或唯一键. 如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。 我把 flink sql-client 客户端设置 SET execution.result-mode=changelog 试验了下,左边标上了是第几条 kafka 消息导致的行为: +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1 第1条消息:执行一个 INSERT 第2条消息:执行了 一个 DELETE, 一个 INSERT 第3条消息:执行了一个 INSERT ON DUPLICATE UPDATE 第4条消息:执行了两个 INSERT ON DUPLICATE UPDATE 我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是 INSERT ON DUPLICATE UPDATE 不知道我这样理解是否正确。 谢谢, 王磊 [hidden email] Sender: Jingsong Li Send Time: 2020-05-06 11:35 Receiver: user-zh Subject: Re: Re:FlinkSQL Retraction 问题原理咨询 Hi, 问题一:删除数据可不单单只是retract stream的功能。upsert stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert stream也有retract的input数据的。JDBC实现的是upsert stream的消费。 问题二:正确数据应该是: 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 ( 删除 zhongtong 1) 3 {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2 ( 删除yuantong 1) 4 {"order_id":2,"tms_company":"zhongtong"} 数据库2条记录: yuantong 1, zhongtong 1 ( 删除yuantong 2) 你用了什么dialect?是不是mysql? Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。 看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建? Best, Jingsong Lee On Wed, May 6, 2020 at 10:36 AM [hidden email] < [hidden email]> wrote: > > 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 > tms_company 是有变化的。 > 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。 > > > > > [hidden email] > > 发件人: Michael Ran > 发送时间: 2020-04-30 17:23 > 收件人: user-zh > 主题: Re:FlinkSQL Retraction 问题原理咨询 > > > > 指定的更新键是tms_company? > > > 结果是: > yuantong:2 > zhongtong:2 > > > > > > > > > > > > > 在 2020-04-30 17:08:22,"[hidden email]" <[hidden email]> > 写道: > > > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 > RDS, RDS 表没有主键,也没有唯一键。 > > > >INSERT INTO table_out select tms_company, count(distinct order_id) as > order_cnt from > > (select order_id, LAST_VALUE(tms_company) AS tms_company from > dwd_table group by order_id) > > group by tms_company; > > > > > >总共发送了 4 条消息,顺序如下: > > > >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 > > > >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 > (上一条记录被删除了) > > > >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, > yuantong 2 (增加了条记录,没有删除) > > > >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, > yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除) > > > > > >问题一: > > 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? > > > >问题二: > > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? > > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? > > > >谢谢, > >王磊 > > > > > > > >[hidden email] > -- Best, Jingsong Lee |
Hi,
> sink 表中没有任何主键或唯一键 这个时候更合理的方式应该是抛出异常,不过实现上可能有些不好搞 > 回撤导致的结果变成 0 ,就会执行 delete , 否则就是update 你理解的完全正确 Best Jingsong Lee On Wed, May 6, 2020 at 12:39 PM [hidden email] < [hidden email]> wrote: > > Thanks Jingsong Lee. > > 我用的是 MySQL,sink 表中没有任何主键或唯一键. > 如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。 > > 我把 flink sql-client 客户端设置 SET execution.result-mode=changelog > 试验了下,左边标上了是第几条 kafka 消息导致的行为: > > +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 > 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1 > > 第1条消息:执行一个 INSERT > 第2条消息:执行了 一个 DELETE, 一个 INSERT > 第3条消息:执行了一个 INSERT ON DUPLICATE UPDATE > 第4条消息:执行了两个 INSERT ON DUPLICATE UPDATE > > > 我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是 INSERT ON DUPLICATE UPDATE > > 不知道我这样理解是否正确。 > > 谢谢, > 王磊 > > > > > [hidden email] > > Sender: Jingsong Li > Send Time: 2020-05-06 11:35 > Receiver: user-zh > Subject: Re: Re:FlinkSQL Retraction 问题原理咨询 > Hi, > > 问题一:删除数据可不单单只是retract stream的功能。upsert > stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert > stream也有retract的input数据的。JDBC实现的是upsert stream的消费。 > > 问题二:正确数据应该是: > 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 > 2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 ( 删除 > zhongtong 1) > 3 {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2 ( > 删除yuantong 1) > 4 {"order_id":2,"tms_company":"zhongtong"} 数据库2条记录: yuantong 1, > zhongtong 1 ( 删除yuantong 2) > > 你用了什么dialect?是不是mysql? > Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。 > 看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建? > > Best, > Jingsong Lee > > On Wed, May 6, 2020 at 10:36 AM [hidden email] < > [hidden email]> wrote: > > > > > 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 > > tms_company 是有变化的。 > > 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。 > > > > > > > > > > [hidden email] > > > > 发件人: Michael Ran > > 发送时间: 2020-04-30 17:23 > > 收件人: user-zh > > 主题: Re:FlinkSQL Retraction 问题原理咨询 > > > > > > > > 指定的更新键是tms_company? > > > > > > 结果是: > > yuantong:2 > > zhongtong:2 > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020-04-30 17:08:22,"[hidden email]" < > [hidden email]> > > 写道: > > > > > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 > > RDS, RDS 表没有主键,也没有唯一键。 > > > > > >INSERT INTO table_out select tms_company, count(distinct order_id) as > > order_cnt from > > > (select order_id, LAST_VALUE(tms_company) AS tms_company from > > dwd_table group by order_id) > > > group by tms_company; > > > > > > > > >总共发送了 4 条消息,顺序如下: > > > > > >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 > > > > > >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 > > (上一条记录被删除了) > > > > > >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, > > yuantong 2 (增加了条记录,没有删除) > > > > > >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, > > yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除) > > > > > > > > >问题一: > > > 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 > > > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? > > > > > >问题二: > > > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? > > > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? > > > > > >谢谢, > > >王磊 > > > > > > > > > > > >[hidden email] > > > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |