单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

jindy_liu
请问下,有没有大佬做过类似的事情?

另外,flink side out功能,可以将单流分成多流,但是不是分成多流后,但两条流sink的时候,是不是没法保证sink时候的时序?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
lgs
Reply | Threaded
Open this post in threaded view
|

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

lgs
我也有类似的需求。
期望第一个sink能先执行,然后第二个sink再执行。因为第二个sink要去读第一个sink保存的数据。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

Jark
Administrator
你可以先用 map 再用 addSink,这样他们的调用被 chain 在一起,可以达到先写入 mysql ,再写入 kafka 的目的。

 datastream.map(new MySQLSinkMapFunction()).addSink(new
FlinkKafkaProducer()).

也就是将 mysql sink 伪装成了一个 MapFunction,里面先做了 写 mysql 的动作,写成功后再将数据输出到下游。

另外,如果要在 SQL 中解决这个需求的话,会比较麻烦,因为标准语法中没有这么个语法支持这个功能。

Best,
Jark

On Fri, 10 Jul 2020 at 16:12, lgs <[hidden email]> wrote:

> 我也有类似的需求。
> 期望第一个sink能先执行,然后第二个sink再执行。因为第二个sink要去读第一个sink保存的数据。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

jindy_liu


如果可以chain在一起,这个可以保证顺序性,我去试试。

这里再追问下,实际中,如里单流源里的数据也要顺序处理,可以设置并行度为1;

这里可能也要考虑下mysqlSinkFunction里的sink效率问题,需要积累一些数据再sink,这里可以做到跟kafka
sink的batch_size和linger.ms配合联动吗?比如满1000条,或超过1s,就同时sink?

谢谢~




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

Jark
Administrator
你可以在 mysqlSinkFunction 中攒 buffer,在 timer trigger 或者 checkpoint 时 flush
mysql database,以及 output。

On Mon, 13 Jul 2020 at 15:36, jindy_liu <[hidden email]> wrote:

>
>
> 如果可以chain在一起,这个可以保证顺序性,我去试试。
>
> 这里再追问下,实际中,如里单流源里的数据也要顺序处理,可以设置并行度为1;
>
> 这里可能也要考虑下mysqlSinkFunction里的sink效率问题,需要积累一些数据再sink,这里可以做到跟kafka
> sink的batch_size和linger.ms配合联动吗?比如满1000条,或超过1s,就同时sink?
>
> 谢谢~
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

jindy_liu
本来还想尽最大可能的复用源码,看了下JdbcOutputFormat的源码实现,batch size是sql语句的个数据,kafka的batch
size是字节数,两个协调不好,两个各sink各自的时间阈值也同步不了。

我准备按你的说的方式,用RichFlatMapFunction,里面实现实现一个buffer。
等buffer达阈值或定时时间条件满足时,一次性手动调用JdbcOutputFormat(可以设置更大的buffer值)的writeRecord和flush;不满足的时候,RichFlatMapFunction里不输出元素;
这样kafka的batch sinka节奏应该就不用管了,两者的batch条件相互独立。
我自己初步看了下,应该可以?
初学者,望大佬提点,还有其它的注意事项要注意不?






--
Sent from: http://apache-flink.147419.n8.nabble.com/