Flink同一DAG里两个并行动态流的问题

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink同一DAG里两个并行动态流的问题

梁溪
各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为:
步骤1:原始数据source->逻辑处理->生成基础数据tmp;
步骤2:基础数据tmp->转化为结果数据a->落盘;
步骤3:基础数据tmp->窗口统计数据b->落盘。


于是编写了由以上流程组成的DAG,具体如下:
1.val source = ds from KafkaConnector;
2.val broadcast = ruleMap.broacast();
3.val tmp = source.connect(broadcast).process();  
4.val a = tmp.map().addSink();  
5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink();


其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问:
一、这样编写的代码应该是正确的吧?
二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4?
三、若一成立,是不是因为:
1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum)
2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4?
3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈?
四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql?


小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分!











Reply | Threaded
Open this post in threaded view
|

回复:Flink同一DAG里两个并行动态流的问题

曾耀武
为啥你说五慢了不影响四。


五先写到kafka然后再增大兵法消费。应该会快很多吧,可以试一下。





------------------ 原始邮件 ------------------
发件人: 梁溪 <[hidden email]&gt;
发送时间: 2019年11月9日 20:56
收件人: user-zh <[hidden email]&gt;
主题: 回复:Flink同一DAG里两个并行动态流的问题



各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为:
步骤1:原始数据source-&gt;逻辑处理-&gt;生成基础数据tmp;
步骤2:基础数据tmp-&gt;转化为结果数据a-&gt;落盘;
步骤3:基础数据tmp-&gt;窗口统计数据b-&gt;落盘。


于是编写了由以上流程组成的DAG,具体如下:
1.val source = ds from KafkaConnector;
2.val broadcast = ruleMap.broacast();
3.val tmp = source.connect(broadcast).process();&nbsp;
4.val a = tmp.map().addSink();&nbsp;
5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink();


其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问:
一、这样编写的代码应该是正确的吧?
二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4?
三、若一成立,是不是因为:
1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum)
2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4?
3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈?
四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql?


小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分!
Reply | Threaded
Open this post in threaded view
|

回复:Flink同一DAG里两个并行动态流的问题

梁溪
4和5不是相互独立的两个流吗?




| |
梁溪
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2019年11月09日 21:50,曾耀武 写道:
为啥你说五慢了不影响四。


五先写到kafka然后再增大兵法消费。应该会快很多吧,可以试一下。





------------------ 原始邮件 ------------------
发件人: 梁溪 <[hidden email]&gt;
发送时间: 2019年11月9日 20:56
收件人: user-zh <[hidden email]&gt;
主题: 回复:Flink同一DAG里两个并行动态流的问题



各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为:
步骤1:原始数据source-&gt;逻辑处理-&gt;生成基础数据tmp;
步骤2:基础数据tmp-&gt;转化为结果数据a-&gt;落盘;
步骤3:基础数据tmp-&gt;窗口统计数据b-&gt;落盘。


于是编写了由以上流程组成的DAG,具体如下:
1.val source = ds from KafkaConnector;
2.val broadcast = ruleMap.broacast();
3.val tmp = source.connect(broadcast).process();&nbsp;
4.val a = tmp.map().addSink();&nbsp;
5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink();


其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问:
一、这样编写的代码应该是正确的吧?
二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4?
三、若一成立,是不是因为:
1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum)
2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4?
3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈?
四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql?


小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分!
Reply | Threaded
Open this post in threaded view
|

回复:Flink同一DAG里两个并行动态流的问题

曾耀武
如果不影响,那你的那个流一边快一边慢。快的一边已经从source处理了几亿条,慢的才几千条。那中间多的是被换存在慢的那个节点还是source给每个流单独消费一次。


肯定是由最慢的下游决定的速度




------------------ 原始邮件 ------------------
发件人: 梁溪 <[hidden email]&gt;
发送时间: 2019年11月9日 21:59
收件人: user-zh <[hidden email]&gt;
抄送: user-zh <[hidden email]&gt;
主题: 回复:Flink同一DAG里两个并行动态流的问题



4和5不是相互独立的两个流吗?




| |
梁溪
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2019年11月09日 21:50,曾耀武 写道:
为啥你说五慢了不影响四。


五先写到kafka然后再增大兵法消费。应该会快很多吧,可以试一下。





------------------ 原始邮件 ------------------
发件人: 梁溪 <[hidden email]&amp;gt;
发送时间: 2019年11月9日 20:56
收件人: user-zh <[hidden email]&amp;gt;
主题: 回复:Flink同一DAG里两个并行动态流的问题



各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为:
步骤1:原始数据source-&amp;gt;逻辑处理-&amp;gt;生成基础数据tmp;
步骤2:基础数据tmp-&amp;gt;转化为结果数据a-&amp;gt;落盘;
步骤3:基础数据tmp-&amp;gt;窗口统计数据b-&amp;gt;落盘。


于是编写了由以上流程组成的DAG,具体如下:
1.val source = ds from KafkaConnector;
2.val broadcast = ruleMap.broacast();
3.val tmp = source.connect(broadcast).process();&amp;nbsp;
4.val a = tmp.map().addSink();&amp;nbsp;
5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink();


其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问:
一、这样编写的代码应该是正确的吧?
二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4?
三、若一成立,是不是因为:
1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum)
2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4?
3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈?
四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql?


小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分!
Reply | Threaded
Open this post in threaded view
|

回复:Flink同一DAG里两个并行动态流的问题

梁溪
哦哦这样啊,那我这边再看看,谢谢啦!




| |
梁溪
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2019年11月09日 22:10,曾耀武 写道:
如果不影响,那你的那个流一边快一边慢。快的一边已经从source处理了几亿条,慢的才几千条。那中间多的是被换存在慢的那个节点还是source给每个流单独消费一次。


肯定是由最慢的下游决定的速度




------------------ 原始邮件 ------------------
发件人: 梁溪 <[hidden email]&gt;
发送时间: 2019年11月9日 21:59
收件人: user-zh <[hidden email]&gt;
抄送: user-zh <[hidden email]&gt;
主题: 回复:Flink同一DAG里两个并行动态流的问题



4和5不是相互独立的两个流吗?




| |
梁溪
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2019年11月09日 21:50,曾耀武 写道:
为啥你说五慢了不影响四。


五先写到kafka然后再增大兵法消费。应该会快很多吧,可以试一下。





------------------ 原始邮件 ------------------
发件人: 梁溪 <[hidden email]&amp;gt;
发送时间: 2019年11月9日 20:56
收件人: user-zh <[hidden email]&amp;gt;
主题: 回复:Flink同一DAG里两个并行动态流的问题



各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为:
步骤1:原始数据source-&amp;gt;逻辑处理-&amp;gt;生成基础数据tmp;
步骤2:基础数据tmp-&amp;gt;转化为结果数据a-&amp;gt;落盘;
步骤3:基础数据tmp-&amp;gt;窗口统计数据b-&amp;gt;落盘。


于是编写了由以上流程组成的DAG,具体如下:
1.val source = ds from KafkaConnector;
2.val broadcast = ruleMap.broacast();
3.val tmp = source.connect(broadcast).process();&amp;nbsp;
4.val a = tmp.map().addSink();&amp;nbsp;
5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink();


其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问:
一、这样编写的代码应该是正确的吧?
二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4?
三、若一成立,是不是因为:
1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum)
2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4?
3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈?
四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql?


小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分!