关于sink失败 不消费kafka消息的处理

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

关于sink失败 不消费kafka消息的处理

范超
大家好,我现在有个疑问
目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?


多谢大家了

范超
Reply | Threaded
Open this post in threaded view
|

Re: 关于sink失败 不消费kafka消息的处理

Benchao Li-2
这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

答复: 关于sink失败 不消费kafka消息的处理

范超
感谢,目前也是通过打开checkpoint来改进的,待会测试一下看看是不是可以

-----邮件原件-----
发件人: Benchao Li [mailto:[hidden email]]
发送时间: 2020年8月26日 星期三 12:59
收件人: user-zh <[hidden email]>
主题: Re: 关于sink失败 不消费kafka消息的处理

这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

答复: 关于sink失败 不消费kafka消息的处理

范超
In reply to this post by Benchao Li-2
您好 BenChao ,不知道是否有可以参考的两阶段提交的Flink 实例或者文档资料

-----邮件原件-----
发件人: Benchao Li [mailto:[hidden email]]
发送时间: 2020年8月26日 星期三 12:59
收件人: user-zh <[hidden email]>
主题: Re: 关于sink失败 不消费kafka消息的处理

这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

RE: 关于sink失败 不消费kafka消息的处理

venn
可以参考下这个: https://www.cnblogs.com/bethunebtj/p/9168274.html#5-%E4%B8%BA%E6%89%A7%E8%A1%8C%E4%BF%9D%E9%A9%BE%E6%8A%A4%E8%88%AAfault-tolerant%E4%B8%8E%E4%BF%9D%E8%AF%81exactly-once%E8%AF%AD%E4%B9%89

-----Original Message-----
From: user-zh-return-6980-wxchunjhyy=[hidden email] <user-zh-return-6980-wxchunjhyy=[hidden email]> On Behalf Of 范超
Sent: Wednesday, August 26, 2020 2:42 PM
To: [hidden email]
Subject: 答复: 关于sink失败 不消费kafka消息的处理

您好 BenChao ,不知道是否有可以参考的两阶段提交的Flink 实例或者文档资料

-----邮件原件-----
发件人: Benchao Li [mailto:[hidden email]]
发送时间: 2020年8月26日 星期三 12:59
收件人: user-zh <[hidden email]>
主题: Re: 关于sink失败 不消费kafka消息的处理

这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 关于sink失败 不消费kafka消息的处理

Eleanore Jin
In reply to this post by Benchao Li-2
Hi Benchao
可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink 都是kafka, 如果 sink
不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto commit
offset 看起来似乎没有什么区别

可否具体解释一下? 谢谢!

Eleanore

On Tue, Aug 25, 2020 at 9:59 PM Benchao Li <[hidden email]> wrote:

> 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
>
> 范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:
>
> > 大家好,我现在有个疑问
> > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> >
> >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
> >
> >
> > 多谢大家了
> >
> > 范超
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于sink失败 不消费kafka消息的处理

shizk233
Hi Eleanore,这个问题我可以提供一点理解作为参考

1.chk与at least once
checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。

2. sink2PC
在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。

这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。

3.kafka auto commit
chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
n的时候才提交offset。
kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk n的完成。

Eleanore Jin <[hidden email]> 于2020年8月26日周三 下午11:51写道:

> Hi Benchao
> 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink 都是kafka, 如果 sink
> 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto commit
> offset 看起来似乎没有什么区别
>
> 可否具体解释一下? 谢谢!
>
> Eleanore
>
> On Tue, Aug 25, 2020 at 9:59 PM Benchao Li <[hidden email]> wrote:
>
> > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> >
> > 范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:
> >
> > > 大家好,我现在有个疑问
> > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > >
> > >
> >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
> > >
> > >
> > > 多谢大家了
> > >
> > > 范超
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于sink失败 不消费kafka消息的处理

Eleanore Jin
Hi shizk233,

非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink topic,
中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer send,
那么如果开启checkpoint, state 就只是source operator kafka offset.

假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6.
假如这个时候publish message 4 失败了, 那么job restart from last successful checkpoint,
source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗

谢谢!
Eleanore

On Wed, Aug 26, 2020 at 9:32 AM shizk233 <[hidden email]>
wrote:

> Hi Eleanore,这个问题我可以提供一点理解作为参考
>
> 1.chk与at least once
> checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
>
> 2. sink2PC
> 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
>
> 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
>
> 3.kafka auto commit
> chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> n的时候才提交offset。
> kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk n的完成。
>
> Eleanore Jin <[hidden email]> 于2020年8月26日周三 下午11:51写道:
>
> > Hi Benchao
> > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink 都是kafka, 如果
> sink
> > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto commit
> > offset 看起来似乎没有什么区别
> >
> > 可否具体解释一下? 谢谢!
> >
> > Eleanore
> >
> > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li <[hidden email]> wrote:
> >
> > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > >
> > > 范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:
> > >
> > > > 大家好,我现在有个疑问
> > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > >
> > > >
> > >
> >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
> > > >
> > > >
> > > > 多谢大家了
> > > >
> > > > 范超
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于sink失败 不消费kafka消息的处理

Benchao Li-2
Hi Eleanore,shizk233 同学给出的解释已经很全面了。

对于你后面提的这个问题,我感觉这个理解应该不太正确。
开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候
会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是
at least once的语义,也就是数据可能会重复,但是不会丢。

Eleanore Jin <[hidden email]> 于2020年8月27日周四 上午9:53写道:

> Hi shizk233,
>
> 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink topic,
> 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer send,
> 那么如果开启checkpoint, state 就只是source operator kafka offset.
>
> 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6.
> 假如这个时候publish message 4 失败了, 那么job restart from last successful checkpoint,
> source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
>
> 谢谢!
> Eleanore
>
> On Wed, Aug 26, 2020 at 9:32 AM shizk233 <[hidden email]>
> wrote:
>
> > Hi Eleanore,这个问题我可以提供一点理解作为参考
> >
> > 1.chk与at least once
> > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
> >
> > 2. sink2PC
> > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
> >
> > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
> >
> > 3.kafka auto commit
> > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> > n的时候才提交offset。
> > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk n的完成。
> >
> > Eleanore Jin <[hidden email]> 于2020年8月26日周三 下午11:51写道:
> >
> > > Hi Benchao
> > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink 都是kafka, 如果
> > sink
> > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> commit
> > > offset 看起来似乎没有什么区别
> > >
> > > 可否具体解释一下? 谢谢!
> > >
> > > Eleanore
> > >
> > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li <[hidden email]>
> wrote:
> > >
> > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > > >
> > > > 范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:
> > > >
> > > > > 大家好,我现在有个疑问
> > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > > >
> > > > >
> > > >
> > >
> >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
> > > > >
> > > > >
> > > > > 多谢大家了
> > > > >
> > > > > 范超
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

答复: 关于sink失败 不消费kafka消息的处理

范超
> 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> 这个时候source operator成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6.
> 假如这个时候publish message 4 失败了, 那么job restart from last successful
> checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗

按照我个人理解,应该是sink环节的部分失败,会使得sink环节的checkpoint失败,而jobmanager会因为这个sink环节的失败,而标记这个checkpoint的快照整体失败。
从而重启消费会从source的1开始重新消费


-----邮件原件-----
发件人: Benchao Li [mailto:[hidden email]]
发送时间: 2020年8月27日 星期四 10:06
收件人: user-zh <[hidden email]>
主题: Re: 关于sink失败 不消费kafka消息的处理

Hi Eleanore,shizk233 同学给出的解释已经很全面了。

对于你后面提的这个问题,我感觉这个理解应该不太正确。
开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候
会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是
at least once的语义,也就是数据可能会重复,但是不会丢。

Eleanore Jin <[hidden email]> 于2020年8月27日周四 上午9:53写道:

> Hi shizk233,
>
> 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink
> topic,
> 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer send,
> 那么如果开启checkpoint, state 就只是source operator kafka offset.
>
> 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6.
> 假如这个时候publish message 4 失败了, 那么job restart from last successful
> checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
>
> 谢谢!
> Eleanore
>
> On Wed, Aug 26, 2020 at 9:32 AM shizk233 <[hidden email]>
> wrote:
>
> > Hi Eleanore,这个问题我可以提供一点理解作为参考
> >
> > 1.chk与at least once
> > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
> >
> > 2. sink2PC
> > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
> >
> > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
> >
> > 3.kafka auto commit
> > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> > n的时候才提交offset。
> > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk
> > n的完成。
> >
> > Eleanore Jin <[hidden email]> 于2020年8月26日周三 下午11:51写道:
> >
> > > Hi Benchao
> > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink
> > > 都是kafka, 如果
> > sink
> > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> commit
> > > offset 看起来似乎没有什么区别
> > >
> > > 可否具体解释一下? 谢谢!
> > >
> > > Eleanore
> > >
> > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li <[hidden email]>
> wrote:
> > >
> > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > > >
> > > > 范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:
> > > >
> > > > > 大家好,我现在有个疑问
> > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > > >
> > > > >
> > > >
> > >
> >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提
> 交kafka的消费位移呢?
> > > > >
> > > > >
> > > > > 多谢大家了
> > > > >
> > > > > 范超
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 关于sink失败 不消费kafka消息的处理

Eleanore Jin
感谢大家的回答,

我用的是APACHE BEAM, 然后RUNNER 用的是Flink, 这里是Beam 提供的KAFKA 的CONNECTOR, 如果看source
的话,它是有state checkpointed: Beam KafkaIO KafkaUnboundedReader
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L239>
但是看sink, 它没有任何state,是一个stateless的operator: Beam KafkaIO KafkaWriter
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java>,
所以这就是我想确认如果在sink 没有state 的前提下,那么是不是开启checkpoint, 只有source 记录 offset 和用
kafka auto commit offset 其实是一样的,既不能保证at least once,也不能 exactly once

谢谢!

On Wed, Aug 26, 2020 at 7:31 PM 范超 <[hidden email]> wrote:

> > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > 这个时候source operator成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6.
> > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
>
>
> 按照我个人理解,应该是sink环节的部分失败,会使得sink环节的checkpoint失败,而jobmanager会因为这个sink环节的失败,而标记这个checkpoint的快照整体失败。
> 从而重启消费会从source的1开始重新消费
>
>
> -----邮件原件-----
> 发件人: Benchao Li [mailto:[hidden email]]
> 发送时间: 2020年8月27日 星期四 10:06
> 收件人: user-zh <[hidden email]>
> 主题: Re: 关于sink失败 不消费kafka消息的处理
>
> Hi Eleanore,shizk233 同学给出的解释已经很全面了。
>
> 对于你后面提的这个问题,我感觉这个理解应该不太正确。
> 开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候
> 会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是
> at least once的语义,也就是数据可能会重复,但是不会丢。
>
> Eleanore Jin <[hidden email]> 于2020年8月27日周四 上午9:53写道:
>
> > Hi shizk233,
> >
> > 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink
> > topic,
> > 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer send,
> > 那么如果开启checkpoint, state 就只是source operator kafka offset.
> >
> > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> 假设是6.
> > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> >
> > 谢谢!
> > Eleanore
> >
> > On Wed, Aug 26, 2020 at 9:32 AM shizk233 <[hidden email]>
> > wrote:
> >
> > > Hi Eleanore,这个问题我可以提供一点理解作为参考
> > >
> > > 1.chk与at least once
> > > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> > > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
> > >
> > > 2. sink2PC
> > > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> > > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> > > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
> > >
> > > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
> > >
> > > 3.kafka auto commit
> > > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> > > n的时候才提交offset。
> > > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk
> > > n的完成。
> > >
> > > Eleanore Jin <[hidden email]> 于2020年8月26日周三 下午11:51写道:
> > >
> > > > Hi Benchao
> > > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink
> > > > 都是kafka, 如果
> > > sink
> > > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> > commit
> > > > offset 看起来似乎没有什么区别
> > > >
> > > > 可否具体解释一下? 谢谢!
> > > >
> > > > Eleanore
> > > >
> > > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li <[hidden email]>
> > wrote:
> > > >
> > > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > > > >
> > > > > 范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:
> > > > >
> > > > > > 大家好,我现在有个疑问
> > > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提
> > 交kafka的消费位移呢?
> > > > > >
> > > > > >
> > > > > > 多谢大家了
> > > > > >
> > > > > > 范超
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

答复: 关于sink失败 不消费kafka消息的处理

范超
In reply to this post by venn
感谢 Venn

-----邮件原件-----
发件人: venn [mailto:[hidden email]]
发送时间: 2020年8月26日 星期三 18:57
收件人: [hidden email]
主题: RE: 关于sink失败 不消费kafka消息的处理

可以参考下这个: https://www.cnblogs.com/bethunebtj/p/9168274.html#5-%E4%B8%BA%E6%89%A7%E8%A1%8C%E4%BF%9D%E9%A9%BE%E6%8A%A4%E8%88%AAfault-tolerant%E4%B8%8E%E4%BF%9D%E8%AF%81exactly-once%E8%AF%AD%E4%B9%89

-----Original Message-----
From: user-zh-return-6980-wxchunjhyy=[hidden email] <user-zh-return-6980-wxchunjhyy=[hidden email]> On Behalf Of 范超
Sent: Wednesday, August 26, 2020 2:42 PM
To: [hidden email]
Subject: 答复: 关于sink失败 不消费kafka消息的处理

您好 BenChao ,不知道是否有可以参考的两阶段提交的Flink 实例或者文档资料

-----邮件原件-----
发件人: Benchao Li [mailto:[hidden email]]
发送时间: 2020年8月26日 星期三 12:59
收件人: user-zh <[hidden email]>
主题: Re: 关于sink失败 不消费kafka消息的处理

这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 关于sink失败 不消费kafka消息的处理

shizk233
In reply to this post by Eleanore Jin
Hi Eleanore,

我觉得是不一样的,差别就在于kafka auto commit发生在source算子消费了kafka event时(不会等待数据完成sink写入),
而chk机制提交offset发生在所有节点完成同一chk后。

虽然sink是stateless的,但这不妨碍它做chk。做chk的条件就是算子收到chk的barrier消息并且把barrier消息之前的数据处理完成。
所以chk机制提交offset时,可以保证之前的数据已经写入sink,是at least once的。

Eleanore Jin <[hidden email]> 于2020年8月28日周五 上午1:17写道:

> 感谢大家的回答,
>
> 我用的是APACHE BEAM, 然后RUNNER 用的是Flink, 这里是Beam 提供的KAFKA 的CONNECTOR, 如果看source
> 的话,它是有state checkpointed: Beam KafkaIO KafkaUnboundedReader
> <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L239
> >
> 但是看sink, 它没有任何state,是一个stateless的operator: Beam KafkaIO KafkaWriter
> <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
> >,
> 所以这就是我想确认如果在sink 没有state 的前提下,那么是不是开启checkpoint, 只有source 记录 offset 和用
> kafka auto commit offset 其实是一样的,既不能保证at least once,也不能 exactly once
>
> 谢谢!
>
> On Wed, Aug 26, 2020 at 7:31 PM 范超 <[hidden email]> wrote:
>
> > > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > > 这个时候source operator成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> 假设是6.
> > > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> >
> >
> >
> 按照我个人理解,应该是sink环节的部分失败,会使得sink环节的checkpoint失败,而jobmanager会因为这个sink环节的失败,而标记这个checkpoint的快照整体失败。
> > 从而重启消费会从source的1开始重新消费
> >
> >
> > -----邮件原件-----
> > 发件人: Benchao Li [mailto:[hidden email]]
> > 发送时间: 2020年8月27日 星期四 10:06
> > 收件人: user-zh <[hidden email]>
> > 主题: Re: 关于sink失败 不消费kafka消息的处理
> >
> > Hi Eleanore,shizk233 同学给出的解释已经很全面了。
> >
> > 对于你后面提的这个问题,我感觉这个理解应该不太正确。
> > 开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候
> > 会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是
> > at least once的语义,也就是数据可能会重复,但是不会丢。
> >
> > Eleanore Jin <[hidden email]> 于2020年8月27日周四 上午9:53写道:
> >
> > > Hi shizk233,
> > >
> > > 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink
> > > topic,
> > > 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer send,
> > > 那么如果开启checkpoint, state 就只是source operator kafka offset.
> > >
> > > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > > 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> > 假设是6.
> > > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> > >
> > > 谢谢!
> > > Eleanore
> > >
> > > On Wed, Aug 26, 2020 at 9:32 AM shizk233 <[hidden email]>
> > > wrote:
> > >
> > > > Hi Eleanore,这个问题我可以提供一点理解作为参考
> > > >
> > > > 1.chk与at least once
> > > > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> > > > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
> > > >
> > > > 2. sink2PC
> > > > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> > > > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> > > > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
> > > >
> > > > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
> > > >
> > > > 3.kafka auto commit
> > > > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> > > > n的时候才提交offset。
> > > > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk
> > > > n的完成。
> > > >
> > > > Eleanore Jin <[hidden email]> 于2020年8月26日周三 下午11:51写道:
> > > >
> > > > > Hi Benchao
> > > > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink
> > > > > 都是kafka, 如果
> > > > sink
> > > > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> > > commit
> > > > > offset 看起来似乎没有什么区别
> > > > >
> > > > > 可否具体解释一下? 谢谢!
> > > > >
> > > > > Eleanore
> > > > >
> > > > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li <[hidden email]>
> > > wrote:
> > > > >
> > > > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > > > > >
> > > > > > 范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:
> > > > > >
> > > > > > > 大家好,我现在有个疑问
> > > > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提
> > > 交kafka的消费位移呢?
> > > > > > >
> > > > > > >
> > > > > > > 多谢大家了
> > > > > > >
> > > > > > > 范超
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Best,
> > > > > > Benchao Li
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于sink失败 不消费kafka消息的处理

Eleanore Jin
Hi shizk233,

非常感谢你的解答,困扰我多时的问题终于明白了! 谢谢!


On Thu, Aug 27, 2020 at 10:28 PM shizk233 <[hidden email]>
wrote:

> Hi Eleanore,
>
> 我觉得是不一样的,差别就在于kafka auto commit发生在source算子消费了kafka event时(不会等待数据完成sink写入),
> 而chk机制提交offset发生在所有节点完成同一chk后。
>
>
> 虽然sink是stateless的,但这不妨碍它做chk。做chk的条件就是算子收到chk的barrier消息并且把barrier消息之前的数据处理完成。
> 所以chk机制提交offset时,可以保证之前的数据已经写入sink,是at least once的。
>
> Eleanore Jin <[hidden email]> 于2020年8月28日周五 上午1:17写道:
>
> > 感谢大家的回答,
> >
> > 我用的是APACHE BEAM, 然后RUNNER 用的是Flink, 这里是Beam 提供的KAFKA 的CONNECTOR,
> 如果看source
> > 的话,它是有state checkpointed: Beam KafkaIO KafkaUnboundedReader
> > <
> >
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L239
> > >
> > 但是看sink, 它没有任何state,是一个stateless的operator: Beam KafkaIO KafkaWriter
> > <
> >
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
> > >,
> > 所以这就是我想确认如果在sink 没有state 的前提下,那么是不是开启checkpoint, 只有source 记录 offset 和用
> > kafka auto commit offset 其实是一样的,既不能保证at least once,也不能 exactly once
> >
> > 谢谢!
> >
> > On Wed, Aug 26, 2020 at 7:31 PM 范超 <[hidden email]> wrote:
> >
> > > > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > > > 这个时候source operator成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> > 假设是6.
> > > > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > > > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> > >
> > >
> > >
> >
> 按照我个人理解,应该是sink环节的部分失败,会使得sink环节的checkpoint失败,而jobmanager会因为这个sink环节的失败,而标记这个checkpoint的快照整体失败。
> > > 从而重启消费会从source的1开始重新消费
> > >
> > >
> > > -----邮件原件-----
> > > 发件人: Benchao Li [mailto:[hidden email]]
> > > 发送时间: 2020年8月27日 星期四 10:06
> > > 收件人: user-zh <[hidden email]>
> > > 主题: Re: 关于sink失败 不消费kafka消息的处理
> > >
> > > Hi Eleanore,shizk233 同学给出的解释已经很全面了。
> > >
> > > 对于你后面提的这个问题,我感觉这个理解应该不太正确。
> > > 开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候
> > > 会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是
> > > at least once的语义,也就是数据可能会重复,但是不会丢。
> > >
> > > Eleanore Jin <[hidden email]> 于2020年8月27日周四 上午9:53写道:
> > >
> > > > Hi shizk233,
> > > >
> > > > 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink
> > > > topic,
> > > > 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer
> send,
> > > > 那么如果开启checkpoint, state 就只是source operator kafka offset.
> > > >
> > > > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > > > 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> > > 假设是6.
> > > > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > > > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> > > >
> > > > 谢谢!
> > > > Eleanore
> > > >
> > > > On Wed, Aug 26, 2020 at 9:32 AM shizk233 <
> [hidden email]>
> > > > wrote:
> > > >
> > > > > Hi Eleanore,这个问题我可以提供一点理解作为参考
> > > > >
> > > > > 1.chk与at least once
> > > > > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> > > > > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
> > > > >
> > > > > 2. sink2PC
> > > > > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> > > > > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> > > > > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
> > > > >
> > > > > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
> > > > >
> > > > > 3.kafka auto commit
> > > > > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> > > > > n的时候才提交offset。
> > > > > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk
> > > > > n的完成。
> > > > >
> > > > > Eleanore Jin <[hidden email]> 于2020年8月26日周三 下午11:51写道:
> > > > >
> > > > > > Hi Benchao
> > > > > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink
> > > > > > 都是kafka, 如果
> > > > > sink
> > > > > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> > > > commit
> > > > > > offset 看起来似乎没有什么区别
> > > > > >
> > > > > > 可否具体解释一下? 谢谢!
> > > > > >
> > > > > > Eleanore
> > > > > >
> > > > > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li <[hidden email]
> >
> > > > wrote:
> > > > > >
> > > > > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > > > > > >
> > > > > > > 范超 <[hidden email]> 于2020年8月26日周三 上午11:38写道:
> > > > > > >
> > > > > > > > 大家好,我现在有个疑问
> > > > > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提
> > > > 交kafka的消费位移呢?
> > > > > > > >
> > > > > > > >
> > > > > > > > 多谢大家了
> > > > > > > >
> > > > > > > > 范超
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Best,
> > > > > > > Benchao Li
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>