flink SQL UpsertTable 语义问题

classic Classic list List threaded Threaded
4 messages Options
hb
Reply | Threaded
Open this post in threaded view
|

flink SQL UpsertTable 语义问题

hb
SQL 如下:
INSERT INTO upsertTable
   SELECT * FROM (
      SELECT cnt0 as id, count(id) as cnt FROM
         (SELECT id, count(*) as cnt0 FROM orderTable GROUP BY id)
      GROUP BY cnt0
   )
   WHERE cnt > 0
输入数据:
1L, "hz"
2L, "hz"
3L, "hz"
1L, "hz"


当最后的WHERE 条件cnt >0(或者不加),结果是:
(true,1,3)       // 会被后面的(true,1,2)覆盖
(true,1,2)    
(true,2,1)
这个结果理解是对的, 最后结果是 (true,1,2)  和 (true,2,1)


但是如果WHERE 条件是 cnt > 2
结果就是
(true,1,3)
这个就不理解了
为什么会输出中间结果(true,1,3),这样就和批处理上的sql结果不一致了
而不应该是没有输出么,或者 先发(true,1,3)再发(false,1,3)





Reply | Threaded
Open this post in threaded view
|

Re: flink SQL UpsertTable 语义问题

Wenlong Lyu
Hi, hb,这个是因为优化器目前的bug,只考虑了sink是upsertSink,没有考虑到还有filter,最后优化的结果变成agg没有发出retract(1,3)的消息了,你可以把你的sink改成RetractSink应该就不会有问题了。

> 在 2019年11月4日,上午11:06,hb <[hidden email]> 写道:
>
> SQL 如下:
> INSERT INTO upsertTable
>   SELECT * FROM (
>      SELECT cnt0 as id, count(id) as cnt FROM
>         (SELECT id, count(*) as cnt0 FROM orderTable GROUP BY id)
>      GROUP BY cnt0
>   )
>   WHERE cnt > 0
> 输入数据:
> 1L, "hz"
> 2L, "hz"
> 3L, "hz"
> 1L, "hz"
>
>
> 当最后的WHERE 条件cnt >0(或者不加),结果是:
> (true,1,3)       // 会被后面的(true,1,2)覆盖
> (true,1,2)    
> (true,2,1)
> 这个结果理解是对的, 最后结果是 (true,1,2)  和 (true,2,1)
>
>
> 但是如果WHERE 条件是 cnt > 2
> 结果就是
> (true,1,3)
> 这个就不理解了
> 为什么会输出中间结果(true,1,3),这样就和批处理上的sql结果不一致了
> 而不应该是没有输出么,或者 先发(true,1,3)再发(false,1,3)
>
>
>
>
>

hb
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink SQL UpsertTable 语义问题

hb


是因为Retract, 相比于upsert会发送很多false的 中间数据, 想用 upsert 对下游(k/v系统)效率高些,
这个bug,下个版本会修复么?

在 2019-11-05 09:07:56,"Wenlong Lyu" <[hidden email]> 写道:

>Hi, hb,这个是因为优化器目前的bug,只考虑了sink是upsertSink,没有考虑到还有filter,最后优化的结果变成agg没有发出retract(1,3)的消息了,你可以把你的sink改成RetractSink应该就不会有问题了。
>
>> 在 2019年11月4日,上午11:06,hb <[hidden email]> 写道:
>>
>> SQL 如下:
>> INSERT INTO upsertTable
>>   SELECT * FROM (
>>      SELECT cnt0 as id, count(id) as cnt FROM
>>         (SELECT id, count(*) as cnt0 FROM orderTable GROUP BY id)
>>      GROUP BY cnt0
>>   )
>>   WHERE cnt > 0
>> 输入数据:
>> 1L, "hz"
>> 2L, "hz"
>> 3L, "hz"
>> 1L, "hz"
>>
>>
>> 当最后的WHERE 条件cnt >0(或者不加),结果是:
>> (true,1,3)       // 会被后面的(true,1,2)覆盖
>> (true,1,2)    
>> (true,2,1)
>> 这个结果理解是对的, 最后结果是 (true,1,2)  和 (true,2,1)
>>
>>
>> 但是如果WHERE 条件是 cnt > 2
>> 结果就是
>> (true,1,3)
>> 这个就不理解了
>> 为什么会输出中间结果(true,1,3),这样就和批处理上的sql结果不一致了
>> 而不应该是没有输出么,或者 先发(true,1,3)再发(false,1,3)
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: flink SQL UpsertTable 语义问题

Wenlong Lyu
你可以试试在sink上攒个小batch,大部分retract 和后面的add都能合并消除掉,不会对下游产生压力

> 在 2019年11月5日,上午10:18,hb <[hidden email]> 写道:
>
>
>
> 是因为Retract, 相比于upsert会发送很多false的 中间数据, 想用 upsert 对下游(k/v系统)效率高些,
> 这个bug,下个版本会修复么?
>
> 在 2019-11-05 09:07:56,"Wenlong Lyu" <[hidden email]> 写道:
>> Hi, hb,这个是因为优化器目前的bug,只考虑了sink是upsertSink,没有考虑到还有filter,最后优化的结果变成agg没有发出retract(1,3)的消息了,你可以把你的sink改成RetractSink应该就不会有问题了。
>>
>>> 在 2019年11月4日,上午11:06,hb <[hidden email]> 写道:
>>>
>>> SQL 如下:
>>> INSERT INTO upsertTable
>>>  SELECT * FROM (
>>>     SELECT cnt0 as id, count(id) as cnt FROM
>>>        (SELECT id, count(*) as cnt0 FROM orderTable GROUP BY id)
>>>     GROUP BY cnt0
>>>  )
>>>  WHERE cnt > 0
>>> 输入数据:
>>> 1L, "hz"
>>> 2L, "hz"
>>> 3L, "hz"
>>> 1L, "hz"
>>>
>>>
>>> 当最后的WHERE 条件cnt >0(或者不加),结果是:
>>> (true,1,3)       // 会被后面的(true,1,2)覆盖
>>> (true,1,2)    
>>> (true,2,1)
>>> 这个结果理解是对的, 最后结果是 (true,1,2)  和 (true,2,1)
>>>
>>>
>>> 但是如果WHERE 条件是 cnt > 2
>>> 结果就是
>>> (true,1,3)
>>> 这个就不理解了
>>> 为什么会输出中间结果(true,1,3),这样就和批处理上的sql结果不一致了
>>> 而不应该是没有输出么,或者 先发(true,1,3)再发(false,1,3)
>>>
>>>
>>>
>>>
>>>