各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为:
步骤1:原始数据source->逻辑处理->生成基础数据tmp; 步骤2:基础数据tmp->转化为结果数据a->落盘; 步骤3:基础数据tmp->窗口统计数据b->落盘。 于是编写了由以上流程组成的DAG,具体如下: 1.val source = ds from KafkaConnector; 2.val broadcast = ruleMap.broacast(); 3.val tmp = source.connect(broadcast).process(); 4.val a = tmp.map().addSink(); 5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink(); 其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问: 一、这样编写的代码应该是正确的吧? 二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4? 三、若一成立,是不是因为: 1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum) 2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4? 3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈? 四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql? 小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分! |
为啥你说五慢了不影响四。
五先写到kafka然后再增大兵法消费。应该会快很多吧,可以试一下。 ------------------ 原始邮件 ------------------ 发件人: 梁溪 <[hidden email]> 发送时间: 2019年11月9日 20:56 收件人: user-zh <[hidden email]> 主题: 回复:Flink同一DAG里两个并行动态流的问题 各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为: 步骤1:原始数据source->逻辑处理->生成基础数据tmp; 步骤2:基础数据tmp->转化为结果数据a->落盘; 步骤3:基础数据tmp->窗口统计数据b->落盘。 于是编写了由以上流程组成的DAG,具体如下: 1.val source = ds from KafkaConnector; 2.val broadcast = ruleMap.broacast(); 3.val tmp = source.connect(broadcast).process(); 4.val a = tmp.map().addSink(); 5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink(); 其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问: 一、这样编写的代码应该是正确的吧? 二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4? 三、若一成立,是不是因为: 1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum) 2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4? 3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈? 四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql? 小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分! |
4和5不是相互独立的两个流吗?
| | 梁溪 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2019年11月09日 21:50,曾耀武 写道: 为啥你说五慢了不影响四。 五先写到kafka然后再增大兵法消费。应该会快很多吧,可以试一下。 ------------------ 原始邮件 ------------------ 发件人: 梁溪 <[hidden email]> 发送时间: 2019年11月9日 20:56 收件人: user-zh <[hidden email]> 主题: 回复:Flink同一DAG里两个并行动态流的问题 各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为: 步骤1:原始数据source->逻辑处理->生成基础数据tmp; 步骤2:基础数据tmp->转化为结果数据a->落盘; 步骤3:基础数据tmp->窗口统计数据b->落盘。 于是编写了由以上流程组成的DAG,具体如下: 1.val source = ds from KafkaConnector; 2.val broadcast = ruleMap.broacast(); 3.val tmp = source.connect(broadcast).process(); 4.val a = tmp.map().addSink(); 5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink(); 其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问: 一、这样编写的代码应该是正确的吧? 二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4? 三、若一成立,是不是因为: 1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum) 2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4? 3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈? 四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql? 小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分! |
如果不影响,那你的那个流一边快一边慢。快的一边已经从source处理了几亿条,慢的才几千条。那中间多的是被换存在慢的那个节点还是source给每个流单独消费一次。
肯定是由最慢的下游决定的速度 。 ------------------ 原始邮件 ------------------ 发件人: 梁溪 <[hidden email]> 发送时间: 2019年11月9日 21:59 收件人: user-zh <[hidden email]> 抄送: user-zh <[hidden email]> 主题: 回复:Flink同一DAG里两个并行动态流的问题 4和5不是相互独立的两个流吗? | | 梁溪 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2019年11月09日 21:50,曾耀武 写道: 为啥你说五慢了不影响四。 五先写到kafka然后再增大兵法消费。应该会快很多吧,可以试一下。 ------------------ 原始邮件 ------------------ 发件人: 梁溪 <[hidden email]&gt; 发送时间: 2019年11月9日 20:56 收件人: user-zh <[hidden email]&gt; 主题: 回复:Flink同一DAG里两个并行动态流的问题 各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为: 步骤1:原始数据source-&gt;逻辑处理-&gt;生成基础数据tmp; 步骤2:基础数据tmp-&gt;转化为结果数据a-&gt;落盘; 步骤3:基础数据tmp-&gt;窗口统计数据b-&gt;落盘。 于是编写了由以上流程组成的DAG,具体如下: 1.val source = ds from KafkaConnector; 2.val broadcast = ruleMap.broacast(); 3.val tmp = source.connect(broadcast).process();&nbsp; 4.val a = tmp.map().addSink();&nbsp; 5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink(); 其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问: 一、这样编写的代码应该是正确的吧? 二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4? 三、若一成立,是不是因为: 1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum) 2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4? 3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈? 四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql? 小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分! |
哦哦这样啊,那我这边再看看,谢谢啦!
| | 梁溪 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2019年11月09日 22:10,曾耀武 写道: 如果不影响,那你的那个流一边快一边慢。快的一边已经从source处理了几亿条,慢的才几千条。那中间多的是被换存在慢的那个节点还是source给每个流单独消费一次。 肯定是由最慢的下游决定的速度 。 ------------------ 原始邮件 ------------------ 发件人: 梁溪 <[hidden email]> 发送时间: 2019年11月9日 21:59 收件人: user-zh <[hidden email]> 抄送: user-zh <[hidden email]> 主题: 回复:Flink同一DAG里两个并行动态流的问题 4和5不是相互独立的两个流吗? | | 梁溪 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2019年11月09日 21:50,曾耀武 写道: 为啥你说五慢了不影响四。 五先写到kafka然后再增大兵法消费。应该会快很多吧,可以试一下。 ------------------ 原始邮件 ------------------ 发件人: 梁溪 <[hidden email]&gt; 发送时间: 2019年11月9日 20:56 收件人: user-zh <[hidden email]&gt; 主题: 回复:Flink同一DAG里两个并行动态流的问题 各位大佬晚上好,有个case麻烦大家看看,比如一个逻辑业务的处理流程为: 步骤1:原始数据source-&gt;逻辑处理-&gt;生成基础数据tmp; 步骤2:基础数据tmp-&gt;转化为结果数据a-&gt;落盘; 步骤3:基础数据tmp-&gt;窗口统计数据b-&gt;落盘。 于是编写了由以上流程组成的DAG,具体如下: 1.val source = ds from KafkaConnector; 2.val broadcast = ruleMap.broacast(); 3.val tmp = source.connect(broadcast).process();&nbsp; 4.val a = tmp.map().addSink();&nbsp; 5.val b = tmp.flatmap().keyBy( 8 fields ).window( 5min ).aggregate( sum ).addSink(); 其中,过程3生成tmp有很重的逻辑(很耗时),过程4的sink数据结果条数较少,但过程5的sink数据条数巨大,4和5都是sink to mysql,现在在业务上有个难题,过程5影响到了整个job,过程5的tps很高,流量200W/s,这个case会使得整个任务会产生很严重的背压现象,导致source停止消费数据。现有以下几个疑问: 一、这样编写的代码应该是正确的吧? 二、照理说两个不相干的流,过程5处理慢了不应该影响到过程4? 三、若一成立,是不是因为: 1)过程5窗口聚合处理过于复杂?(flatmap把1条数据展开为500条,真实的tps为:500*200W/s,keyBy有8个维度,aggr为sum) 2)过程5处理滞后,导致job反压,使得1的接收速度会越来越慢,1不接收的话就会使得3无法产生,3不产生,从而会影响到4? 3)过程5的数据量大,但sink目的地为mysql,mysql的写入IO为瓶颈? 四、若三的3)成立,解决办法可以是否用:过程5的sink中换为kafka之类的mq,最后再由mq同步到mysql? 小弟刚接触 flink的时间不长,还请各位大佬多多指点下,感谢万分! |
Free forum by Nabble | Edit this page |