Upsert Kafka 的format 为什么要求是INSERT-ONLY的

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Upsert Kafka 的format 为什么要求是INSERT-ONLY的

刘首维
Hi all,



    最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode 必须是insert-only的,请问这是什么原因呢。

    如果不是的话,请直接指正我,谢谢。





Flink version 1.12.1
Reply | Threaded
Open this post in threaded view
|

Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

Shengkai Fang
Hi.

当初的设计是基于kafka的compacted topic设计的,而compacted
topic有自身的表达changelog的语法,例如:使用value 为 null 表示tombstone
message。从这个角度出发的话,我们仅从kafka的角度去理解数据,而非从format的角度去解析数据。

这当然引入了一些问题,例如当利用upsert-kafka读取数据的时候需要维护一个state以记住读取的所有的key。

Best,
Shengkai

刘首维 <[hidden email]> 于2021年3月15日周一 上午11:48写道:

> Hi all,
>
>
>
>     最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode
> 必须是insert-only的,请问这是什么原因呢。
>
>     如果不是的话,请直接指正我,谢谢。
>
>
>
>
>
> Flink version 1.12.1
>
Reply | Threaded
Open this post in threaded view
|

答复: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

刘首维
Hi Shengkai,


感谢回复


让我理解一下:

       在ChangelogNormalize中

          1.  Rowkind是未生效的

          2.  null表达墓碑

          3.  保存全量数据的overhead


 如果我的理解是对的,那么假设遇到了不论是+u i,还是-u,d 都会被理解为是一次insert,从而促使下游emit record?

我们现在有若干自定义的Format,如果为了适配Upsert Kafka,format需要对d,(-u) 事件发射value == null的Record吗




________________________________
发件人: Shengkai Fang <[hidden email]>
发送时间: 2021年3月15日 14:21:31
收件人: [hidden email]
主题: Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

Hi.

当初的设计是基于kafka的compacted topic设计的,而compacted
topic有自身的表达changelog的语法,例如:使用value 为 null 表示tombstone
message。从这个角度出发的话,我们仅从kafka的角度去理解数据,而非从format的角度去解析数据。

这当然引入了一些问题,例如当利用upsert-kafka读取数据的时候需要维护一个state以记住读取的所有的key。

Best,
Shengkai

刘首维 <[hidden email]> 于2021年3月15日周一 上午11:48写道:

> Hi all,
>
>
>
>     最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode
> 必须是insert-only的,请问这是什么原因呢。
>
>     如果不是的话,请直接指正我,谢谢。
>
>
>
>
>
> Flink version 1.12.1
>
Reply | Threaded
Open this post in threaded view
|

Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

Shengkai Fang
Hi.

对于table scan而言
- +I和+U都是被认为是insert消息, changelog normalize 则是会将消息处理为正确的类型;
- 我们在scan的时候看到 tombstone的消息的value部分是空,因此直接将类型设置为delete,在changelog
normalize的时候会补全value部分的值。
- -u消息是不会存入到upsert-kafka之中的

详细的可以参考下这里的ppt[1]

Best,
Shengkai

[1] https://flink-learning.org.cn/developers/flink-training-course3/


刘首维 <[hidden email]> 于2021年3月15日周一 下午2:39写道:

> Hi Shengkai,
>
>
> 感谢回复
>
>
> 让我理解一下:
>
>        在ChangelogNormalize中
>
>           1.  Rowkind是未生效的
>
>           2.  null表达墓碑
>
>           3.  保存全量数据的overhead
>
>
>  如果我的理解是对的,那么假设遇到了不论是+u i,还是-u,d 都会被理解为是一次insert,从而促使下游emit record?
>
> 我们现在有若干自定义的Format,如果为了适配Upsert Kafka,format需要对d,(-u) 事件发射value ==
> null的Record吗
>
>
>
>
> ________________________________
> 发件人: Shengkai Fang <[hidden email]>
> 发送时间: 2021年3月15日 14:21:31
> 收件人: [hidden email]
> 主题: Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的
>
> Hi.
>
> 当初的设计是基于kafka的compacted topic设计的,而compacted
> topic有自身的表达changelog的语法,例如:使用value 为 null 表示tombstone
> message。从这个角度出发的话,我们仅从kafka的角度去理解数据,而非从format的角度去解析数据。
>
> 这当然引入了一些问题,例如当利用upsert-kafka读取数据的时候需要维护一个state以记住读取的所有的key。
>
> Best,
> Shengkai
>
> 刘首维 <[hidden email]> 于2021年3月15日周一 上午11:48写道:
>
> > Hi all,
> >
> >
> >
> >     最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode
> > 必须是insert-only的,请问这是什么原因呢。
> >
> >     如果不是的话,请直接指正我,谢谢。
> >
> >
> >
> >
> >
> > Flink version 1.12.1
> >
>