场景:
实时统计每天的GMV,但是订单金额是会修改的。 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. 这时如果不减去上午已经统计的金额。那么总金额就是错的。 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 刚入坑实时处理,请大神赐教 |
我也遇到过这种业务场景,flink要去做update太难了,只能做累加。
我的处理方案是将update转为insert。 先插入一条“负”记录,抵消原来的那条,然后插入一条新记录,即向kafa插入2条消息: 消息1:009订单 金额:-1000 消息2:009订单 金额:500 这样flink累加的金额就是对的。 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月8日(星期二) 下午5:56 收件人: "user-zh"<[hidden email]>; 主题: flink实时统计GMV,如果订单金额下午变了该怎么处理 场景: &nbsp; &nbsp;实时统计每天的GMV,但是订单金额是会修改的。 &nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 &nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. &nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. 这时如果不减去上午已经统计的金额。那么总金额就是错的。&nbsp;&nbsp; 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 &nbsp; 刚入坑实时处理,请大神赐教 |
开始我也是想这样考虑的。但是还有其他问题。如果改的不是订单金额,而是订单状态。这是也是会生成一条到kafka的。
这时就不能减去这个金额了,但是这个就会涉及订单金额比较了,算法又麻烦一大截。好困惑哦 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月8日(星期二) 晚上6:18 收件人: "user-zh"<[hidden email]>; 主题: 回复:flink实时统计GMV,如果订单金额下午变了该怎么处理 我也遇到过这种业务场景,flink要去做update太难了,只能做累加。 我的处理方案是将update转为insert。 先插入一条“负”记录,抵消原来的那条,然后插入一条新记录,即向kafa插入2条消息: 消息1:009订单&nbsp; 金额:-1000 消息2:009订单&nbsp; 金额:500 这样flink累加的金额就是对的。 ------------------&nbsp;原始邮件&nbsp;------------------ 发件人: "user-zh" <[hidden email]&gt;; 发送时间:&nbsp;2020年9月8日(星期二) 下午5:56 收件人:&nbsp;"user-zh"<[hidden email]&gt;; 主题:&nbsp;flink实时统计GMV,如果订单金额下午变了该怎么处理 场景: &amp;nbsp; &amp;nbsp;实时统计每天的GMV,但是订单金额是会修改的。 &amp;nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 &amp;nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. &amp;nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. 这时如果不减去上午已经统计的金额。那么总金额就是错的。&amp;nbsp;&amp;nbsp; 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 &amp;nbsp; 刚入坑实时处理,请大神赐教 |
如果是canal获取的mysql数据的话,你可以对比修改前数据是什么样子的,这样就可以规避状态的问题了
| | 引领 | | [hidden email] | 签名由网易邮箱大师定制 在2020年09月8日 18:24,徐振华<[hidden email]> 写道: 开始我也是想这样考虑的。但是还有其他问题。如果改的不是订单金额,而是订单状态。这是也是会生成一条到kafka的。 这时就不能减去这个金额了,但是这个就会涉及订单金额比较了,算法又麻烦一大截。好困惑哦 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月8日(星期二) 晚上6:18 收件人: "user-zh"<[hidden email]>; 主题: 回复:flink实时统计GMV,如果订单金额下午变了该怎么处理 我也遇到过这种业务场景,flink要去做update太难了,只能做累加。 我的处理方案是将update转为insert。 先插入一条“负”记录,抵消原来的那条,然后插入一条新记录,即向kafa插入2条消息: 消息1:009订单&nbsp; 金额:-1000 消息2:009订单&nbsp; 金额:500 这样flink累加的金额就是对的。 ------------------&nbsp;原始邮件&nbsp;------------------ 发件人: "user-zh" <[hidden email]&gt;; 发送时间:&nbsp;2020年9月8日(星期二) 下午5:56 收件人:&nbsp;"user-zh"<[hidden email]&gt;; 主题:&nbsp;flink实时统计GMV,如果订单金额下午变了该怎么处理 场景: &amp;nbsp; &amp;nbsp;实时统计每天的GMV,但是订单金额是会修改的。 &amp;nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 &amp;nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. &amp;nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. 这时如果不减去上午已经统计的金额。那么总金额就是错的。&amp;nbsp;&amp;nbsp; 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 &amp;nbsp; 刚入坑实时处理,请大神赐教 |
如果是状态变化,可不可以只获取最后一条
如果只是金额字段变化,比如在某个窗口内只能相减了 还有其他好的办法么? ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月8日(星期二) 晚上7:08 收件人: "[hidden email]"<[hidden email]>; 主题: 回复:flink实时统计GMV,如果订单金额下午变了该怎么处理 如果是canal获取的mysql数据的话,你可以对比修改前数据是什么样子的,这样就可以规避状态的问题了 | | 引领 | | [hidden email] | 签名由网易邮箱大师定制 在2020年09月8日 18:24,徐振华<[hidden email]> 写道: 开始我也是想这样考虑的。但是还有其他问题。如果改的不是订单金额,而是订单状态。这是也是会生成一条到kafka的。 这时就不能减去这个金额了,但是这个就会涉及订单金额比较了,算法又麻烦一大截。好困惑哦 &nbsp; ------------------&nbsp;原始邮件&nbsp;------------------ 发件人: "user-zh" <[hidden email]&gt;; 发送时间:&nbsp;2020年9月8日(星期二) 晚上6:18 收件人:&nbsp;"user-zh"<[hidden email]&gt;; 主题:&nbsp;回复:flink实时统计GMV,如果订单金额下午变了该怎么处理 我也遇到过这种业务场景,flink要去做update太难了,只能做累加。 我的处理方案是将update转为insert。 先插入一条“负”记录,抵消原来的那条,然后插入一条新记录,即向kafa插入2条消息: 消息1:009订单&amp;nbsp; 金额:-1000 消息2:009订单&amp;nbsp; 金额:500 这样flink累加的金额就是对的。 ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <[hidden email]&amp;gt;; 发送时间:&amp;nbsp;2020年9月8日(星期二) 下午5:56 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; 主题:&amp;nbsp;flink实时统计GMV,如果订单金额下午变了该怎么处理 场景: &amp;amp;nbsp; &amp;amp;nbsp;实时统计每天的GMV,但是订单金额是会修改的。 &amp;amp;nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 &amp;amp;nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. &amp;amp;nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. 这时如果不减去上午已经统计的金额。那么总金额就是错的。&amp;amp;nbsp;&amp;amp;nbsp; 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 &amp;amp;nbsp; 刚入坑实时处理,请大神赐教 |
O(∩_∩)O谢谢大家,看来取变化前后的值对比是最好的方式了
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月8日(星期二) 晚上11:23 收件人: "[hidden email]"<[hidden email]>; 主题: 回复:flink实时统计GMV,如果订单金额下午变了该怎么处理 如果是状态变化,可不可以只获取最后一条 如果只是金额字段变化,比如在某个窗口内只能相减了 还有其他好的办法么? ------------------&nbsp;原始邮件&nbsp;------------------ 发件人: "user-zh" <[hidden email]&gt;; 发送时间:&nbsp;2020年9月8日(星期二) 晚上7:08 收件人:&nbsp;"[hidden email]"<[hidden email]&gt;; 主题:&nbsp;回复:flink实时统计GMV,如果订单金额下午变了该怎么处理 如果是canal获取的mysql数据的话,你可以对比修改前数据是什么样子的,这样就可以规避状态的问题了 | | 引领 | | [hidden email] | 签名由网易邮箱大师定制 在2020年09月8日 18:24,徐振华<[hidden email]&gt; 写道: 开始我也是想这样考虑的。但是还有其他问题。如果改的不是订单金额,而是订单状态。这是也是会生成一条到kafka的。 这时就不能减去这个金额了,但是这个就会涉及订单金额比较了,算法又麻烦一大截。好困惑哦 &amp;nbsp; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <[hidden email]&amp;gt;; 发送时间:&amp;nbsp;2020年9月8日(星期二) 晚上6:18 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; 主题:&amp;nbsp;回复:flink实时统计GMV,如果订单金额下午变了该怎么处理 我也遇到过这种业务场景,flink要去做update太难了,只能做累加。 我的处理方案是将update转为insert。 先插入一条“负”记录,抵消原来的那条,然后插入一条新记录,即向kafa插入2条消息: 消息1:009订单&amp;amp;nbsp; 金额:-1000 消息2:009订单&amp;amp;nbsp; 金额:500 这样flink累加的金额就是对的。 ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------ 发件人:&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "user-zh"&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; <[hidden email]&amp;amp;gt;; 发送时间:&amp;amp;nbsp;2020年9月8日(星期二) 下午5:56 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;; 主题:&amp;amp;nbsp;flink实时统计GMV,如果订单金额下午变了该怎么处理 场景: &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;实时统计每天的GMV,但是订单金额是会修改的。 &amp;amp;amp;nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 &amp;amp;amp;nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. &amp;amp;amp;nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. 这时如果不减去上午已经统计的金额。那么总金额就是错的。&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 &amp;amp;amp;nbsp; 刚入坑实时处理,请大神赐教 |
In reply to this post by xuzh
不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
1. 首先版本是1.11+, 可以直接用binlog format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by yyy这种,那这个sum指标会自动做好这件事。 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的 聚合逻辑,效果也是一样的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication xuzh <[hidden email]> 于2020年9月8日周二 下午5:56写道: > 场景: > 实时统计每天的GMV,但是订单金额是会修改的。 > 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. > 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > 这时如果不减去上午已经统计的金额。那么总金额就是错的。 > 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > 刚入坑实时处理,请大神赐教 -- Best, Benchao Li |
请问第1点是有实际的案例使用了么?
意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 谢谢. ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月9日(星期三) 中午1:09 收件人: "user-zh"<[hidden email]>; 主题: Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: 1. 首先版本是1.11+, 可以直接用binlog format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by yyy这种,那这个sum指标会自动做好这件事。 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的 聚合逻辑,效果也是一样的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication xuzh <[hidden email]> 于2020年9月8日周二 下午5:56写道: > 场景: > &nbsp; &nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > &nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > &nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. > &nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > 这时如果不减去上午已经统计的金额。那么总金额就是错的。&nbsp;&nbsp; > 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > &nbsp; 刚入坑实时处理,请大神赐教 -- Best, Benchao Li |
直接根据订单的id进行retract(使用last_value group by id
),然后sum就可以了吧。只要你设置的状态保存期是的大于你订单金额的冷却时间就行。 忝忝向仧 <[hidden email]> 于2020年9月9日周三 下午10:54写道: > 请问第1点是有实际的案例使用了么? > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > 谢谢. > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [hidden email]>; > 发送时间: 2020年9月9日(星期三) 中午1:09 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > 1. 首先版本是1.11+, 可以直接用binlog > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > yyy这种,那这个sum指标会自动做好这件事。 > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的 > 聚合逻辑,效果也是一样的。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > xuzh <[hidden email]> 于2020年9月8日周二 下午5:56写道: > > > 场景: > > &nbsp; &nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > > &nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > > &nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. > > &nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > > 这时如果不减去上午已经统计的金额。那么总金额就是错的。&nbsp;&nbsp; > > 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > > > > &nbsp; 刚入坑实时处理,请大神赐教 > > > > -- > > Best, > Benchao Li |
In reply to this post by 忝忝向仧
1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。
如果还有自己的binlog格式,也可以自定义format来实现。 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为 1. append / update_after 消息会累加到聚合指标上 2. delete / update_before 消息会从聚合指标上进行retract [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html 忝忝向仧 <[hidden email]> 于2020年9月9日周三 下午10:54写道: > 请问第1点是有实际的案例使用了么? > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > 谢谢. > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [hidden email]>; > 发送时间: 2020年9月9日(星期三) 中午1:09 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > 1. 首先版本是1.11+, 可以直接用binlog > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > yyy这种,那这个sum指标会自动做好这件事。 > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的 > 聚合逻辑,效果也是一样的。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > xuzh <[hidden email]> 于2020年9月8日周二 下午5:56写道: > > > 场景: > > &nbsp; &nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > > &nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > > &nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. > > &nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > > 这时如果不减去上午已经统计的金额。那么总金额就是错的。&nbsp;&nbsp; > > 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > > > > &nbsp; 刚入坑实时处理,请大神赐教 > > > > -- > > Best, > Benchao Li -- Best, Benchao Li |
上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗?
感觉底层和 last_value() group by id是一样的。 Benchao Li <[hidden email]> 于2020年9月10日周四 上午10:34写道: > > 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 > 如果还有自己的binlog格式,也可以自定义format来实现。 > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为 > 1. append / update_after 消息会累加到聚合指标上 > 2. delete / update_before 消息会从聚合指标上进行retract > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html > > 忝忝向仧 <[hidden email]> 于2020年9月9日周三 下午10:54写道: > > > 请问第1点是有实际的案例使用了么? > > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > > 谢谢. > > > > > > > > > > ------------------ 原始邮件 ------------------ > > 发件人: > > "user-zh" > > < > > [hidden email]>; > > 发送时间: 2020年9月9日(星期三) 中午1:09 > > 收件人: "user-zh"<[hidden email]>; > > > > 主题: Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > > 1. 首先版本是1.11+, 可以直接用binlog > > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > > yyy这种,那这个sum指标会自动做好这件事。 > > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] > 将append数据流转成retract数据流,这样下游再用同样的 > > 聚合逻辑,效果也是一样的。 > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > > > > xuzh <[hidden email]> 于2020年9月8日周二 下午5:56写道: > > > > > 场景: > > > &nbsp; &nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > > > &nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > > > &nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. > > > &nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > > > 这时如果不减去上午已经统计的金额。那么总金额就是错的。&nbsp;&nbsp; > > > 请问是不是根据 update /delete > 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > > > > > > > &nbsp; 刚入坑实时处理,请大神赐教 > > > > > > > > -- > > > > Best, > > Benchao Li > > > > -- > > Best, > Benchao Li > |
sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。
当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。 last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。 lec ssmi <[hidden email]> 于2020年9月10日周四 下午2:35写道: > 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗? > 感觉底层和 last_value() group by id是一样的。 > > Benchao Li <[hidden email]> 于2020年9月10日周四 上午10:34写道: > > > > > > 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 > > 如果还有自己的binlog格式,也可以自定义format来实现。 > > > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为 > > 1. append / update_after 消息会累加到聚合指标上 > > 2. delete / update_before 消息会从聚合指标上进行retract > > > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html > > [2] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html > > > > 忝忝向仧 <[hidden email]> 于2020年9月9日周三 下午10:54写道: > > > > > 请问第1点是有实际的案例使用了么? > > > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > > > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > > > 谢谢. > > > > > > > > > > > > > > > ------------------ 原始邮件 ------------------ > > > 发件人: > > > "user-zh" > > > < > > > [hidden email]>; > > > 发送时间: 2020年9月9日(星期三) 中午1:09 > > > 收件人: "user-zh"<[hidden email]>; > > > > > > 主题: Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > > > > > > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > > > 1. 首先版本是1.11+, 可以直接用binlog > > > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > > > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > > > yyy这种,那这个sum指标会自动做好这件事。 > > > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] > > 将append数据流转成retract数据流,这样下游再用同样的 > > > 聚合逻辑,效果也是一样的。 > > > > > > [1] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > > > > > > > xuzh <[hidden email]> 于2020年9月8日周二 下午5:56写道: > > > > > > > 场景: > > > > &nbsp; &nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > > > > &nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > > > > &nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka > ,GMV实时统计为1000. > > > > &nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > > > > 这时如果不减去上午已经统计的金额。那么总金额就是错的。&nbsp;&nbsp; > > > > 请问是不是根据 update /delete > > 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > > > > > > > > > > &nbsp; 刚入坑实时处理,请大神赐教 > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li |
如果只是聚合的结果,像sum这种函数,可以先减去原来的值,然后再加上更新后的值。但如果是count(distinct)呢?还是需要把具体的每个值都存起来把。
Benchao Li <[hidden email]> 于2020年9月10日周四 下午3:26写道: > sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。 > 当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。 > > last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。 > > lec ssmi <[hidden email]> 于2020年9月10日周四 下午2:35写道: > > > 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗? > > 感觉底层和 last_value() group by id是一样的。 > > > > Benchao Li <[hidden email]> 于2020年9月10日周四 上午10:34写道: > > > > > > > > > > > 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 > > > 如果还有自己的binlog格式,也可以自定义format来实现。 > > > > > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为 > > > 1. append / update_after 消息会累加到聚合指标上 > > > 2. delete / update_before 消息会从聚合指标上进行retract > > > > > > > > > [1] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html > > > [2] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html > > > > > > 忝忝向仧 <[hidden email]> 于2020年9月9日周三 下午10:54写道: > > > > > > > 请问第1点是有实际的案例使用了么? > > > > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > > > > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > > > > 谢谢. > > > > > > > > > > > > > > > > > > > > ------------------ 原始邮件 ------------------ > > > > 发件人: > > > > "user-zh" > > > > < > > > > [hidden email]>; > > > > 发送时间: 2020年9月9日(星期三) 中午1:09 > > > > 收件人: "user-zh"<[hidden email]>; > > > > > > > > 主题: Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > > > > > > > > > > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > > > > 1. 首先版本是1.11+, 可以直接用binlog > > > > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > > > > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > > > > yyy这种,那这个sum指标会自动做好这件事。 > > > > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] > > > 将append数据流转成retract数据流,这样下游再用同样的 > > > > 聚合逻辑,效果也是一样的。 > > > > > > > > [1] > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > > > > > > > > > > xuzh <[hidden email]> 于2020年9月8日周二 下午5:56写道: > > > > > > > > > 场景: > > > > > &nbsp; &nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > > > > > &nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > > > > > &nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka > > ,GMV实时统计为1000. > > > > > &nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > > > > > 这时如果不减去上午已经统计的金额。那么总金额就是错的。&nbsp;&nbsp; > > > > > 请问是不是根据 update /delete > > > 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > > > > > > > > > > > > > &nbsp; 刚入坑实时处理,请大神赐教 > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > -- > > Best, > Benchao Li > |
In reply to this post by xuzh
个人理解有几种实现方案
1、通过主键加LAST_VALUE()使用最新的记录进行计算 2、通过flink-cdc connector source 3、自己根据操作类型写计算逻辑 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
经过测试flink_cdc 可以达到效果
flink + debezium 也可是可以的。如Benchao Li-2 解释的 谢谢大家,学习了 ------------------ 原始邮件 ------------------ 发件人: "silence"<[hidden email]>; 发送时间: 2020年9月10日(星期四) 下午4:15 收件人: "user-zh"<[hidden email]>; 主题: Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 个人理解有几种实现方案 1、通过主键加LAST_VALUE()使用最新的记录进行计算 2、通过flink-cdc connector source 3、自己根据操作类型写计算逻辑 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |