我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
有没有比较好的优化方案能缓解这样的问题? |
Administrator
|
能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
需要明确下,到底是什么节点慢了。 On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: > 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 > 有没有比较好的优化方案能缓解这样的问题? |
select
ri.sub_clazz_number, prcrs.rounds, count(*) as num from subclazz gs JOIN (SELECT gce.number, min( gce.extension_value ) AS grade FROM course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP BY gce.number) AS temp ON temp.number = gs.course_number AND temp.grade>30 JOIN right_info ri ON gs.number = ri.sub_clazz_number join wide_subclazz ws on ws.number = ri.sub_clazz_number join course gc on gc.number = ws.course_number and gc.course_category_id in (30,40) left join performance_regular_can_renewal_sign prcrs on prcrs.order_number = ri.order_number and prcrs.rounds in (1,2) where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null and not exists (select 1 from internal_staff gis where gis.user_id = ri.user_id) and not exists (select 1 from clazz_extension ce where ws.clazz_number = ce.number and ce.extension_type = 3 and ce.isdel = 0 and ce.extension_value in (1,3,4,7,8,11)) group by ri.sub_clazz_number, prcrs.rounds Sql代码是这样的。 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 > 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道: > > 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? > 需要明确下,到底是什么节点慢了。 > > On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: > >> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >> 有没有比较好的优化方案能缓解这样的问题? |
Administrator
|
瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 Best, Jark On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote: > select > ri.sub_clazz_number, > prcrs.rounds, > count(*) as num > from > subclazz gs > JOIN > (SELECT gce.number, min( gce.extension_value ) AS grade FROM > course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP > BY gce.number) AS temp > ON > temp.number = gs.course_number AND temp.grade>30 > JOIN > right_info ri > ON > gs.number = ri.sub_clazz_number > join > wide_subclazz ws > on > ws.number = ri.sub_clazz_number > join > course gc > on > gc.number = ws.course_number and gc.course_category_id in (30,40) > left join > performance_regular_can_renewal_sign prcrs > on prcrs.order_number = ri.order_number and prcrs.rounds in (1,2) > where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null > and not exists (select 1 from internal_staff gis where gis.user_id = > ri.user_id) > and not exists (select 1 from clazz_extension ce where ws.clazz_number = > ce.number > and ce.extension_type = 3 and ce.isdel = 0 > and ce.extension_value in (1,3,4,7,8,11)) > group by ri.sub_clazz_number, prcrs.rounds > Sql代码是这样的。 > 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 > > > 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道: > > > > 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? > > 需要明确下,到底是什么节点慢了。 > > > > On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: > > > >> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 > >> 有没有比较好的优化方案能缓解这样的问题? > > > |
即使我将not exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
下面是截图,(我上传图片每次都看不了啥情况) https://imgchr.com/i/DeqixU https://imgchr.com/i/DeqP2T > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道: > > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 > > Best, > Jark > > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote: > >> select >> ri.sub_clazz_number, >> prcrs.rounds, >> count(*) as num >> from >> subclazz gs >> JOIN >> (SELECT gce.number, min( gce.extension_value ) AS grade FROM >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP >> BY gce.number) AS temp >> ON >> temp.number = gs.course_number AND temp.grade>30 >> JOIN >> right_info ri >> ON >> gs.number = ri.sub_clazz_number >> join >> wide_subclazz ws >> on >> ws.number = ri.sub_clazz_number >> join >> course gc >> on >> gc.number = ws.course_number and gc.course_category_id in (30,40) >> left join >> performance_regular_can_renewal_sign prcrs >> on prcrs.order_number = ri.order_number and prcrs.rounds in (1,2) >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null >> and not exists (select 1 from internal_staff gis where gis.user_id = >> ri.user_id) >> and not exists (select 1 from clazz_extension ce where ws.clazz_number = >> ce.number >> and ce.extension_type = 3 and ce.isdel = 0 >> and ce.extension_value in (1,3,4,7,8,11)) >> group by ri.sub_clazz_number, prcrs.rounds >> Sql代码是这样的。 >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 >> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道: >>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? >>> 需要明确下,到底是什么节点慢了。 >>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: >>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >>>> 有没有比较好的优化方案能缓解这样的问题? >> >> >> |
Administrator
|
估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。 https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ 解决办法文中也有提及: 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下: execution.checkpointing.interval: 10min # checkpoint间隔时间 execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint 失败容忍次数 restart-strategy: fixed-delay # 重试策略 restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数 Best, Jark On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote: > 即使我将not > exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。 > 下面是截图,(我上传图片每次都看不了啥情况) > https://imgchr.com/i/DeqixU > https://imgchr.com/i/DeqP2T > > > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道: > > > > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 > > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 > > > > Best, > > Jark > > > > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote: > > > >> select > >> ri.sub_clazz_number, > >> prcrs.rounds, > >> count(*) as num > >> from > >> subclazz gs > >> JOIN > >> (SELECT gce.number, min( gce.extension_value ) AS grade FROM > >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 > GROUP > >> BY gce.number) AS temp > >> ON > >> temp.number = gs.course_number AND temp.grade>30 > >> JOIN > >> right_info ri > >> ON > >> gs.number = ri.sub_clazz_number > >> join > >> wide_subclazz ws > >> on > >> ws.number = ri.sub_clazz_number > >> join > >> course gc > >> on > >> gc.number = ws.course_number and gc.course_category_id in (30,40) > >> left join > >> performance_regular_can_renewal_sign prcrs > >> on prcrs.order_number = ri.order_number and prcrs.rounds in (1,2) > >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null > >> and not exists (select 1 from internal_staff gis where gis.user_id = > >> ri.user_id) > >> and not exists (select 1 from clazz_extension ce where ws.clazz_number = > >> ce.number > >> and ce.extension_type = 3 and ce.isdel = 0 > >> and ce.extension_value in (1,3,4,7,8,11)) > >> group by ri.sub_clazz_number, prcrs.rounds > >> Sql代码是这样的。 > >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 > >> > >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道: > >>> > >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? > >>> 需要明确下,到底是什么节点慢了。 > >>> > >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: > >>> > >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 > >>>> 有没有比较好的优化方案能缓解这样的问题? > >> > >> > >> > > > |
Administrator
|
另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote: > 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。 > 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。 > https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ > > 解决办法文中也有提及: > > 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下: > > execution.checkpointing.interval: 10min # checkpoint间隔时间 > execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint > 失败容忍次数 > restart-strategy: fixed-delay # 重试策略 > restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数 > > Best, > Jark > > On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote: > >> 即使我将not >> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。 >> 下面是截图,(我上传图片每次都看不了啥情况) >> https://imgchr.com/i/DeqixU >> https://imgchr.com/i/DeqP2T >> >> > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道: >> > >> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 >> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 >> > >> > Best, >> > Jark >> > >> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote: >> > >> >> select >> >> ri.sub_clazz_number, >> >> prcrs.rounds, >> >> count(*) as num >> >> from >> >> subclazz gs >> >> JOIN >> >> (SELECT gce.number, min( gce.extension_value ) AS grade FROM >> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 >> GROUP >> >> BY gce.number) AS temp >> >> ON >> >> temp.number = gs.course_number AND temp.grade>30 >> >> JOIN >> >> right_info ri >> >> ON >> >> gs.number = ri.sub_clazz_number >> >> join >> >> wide_subclazz ws >> >> on >> >> ws.number = ri.sub_clazz_number >> >> join >> >> course gc >> >> on >> >> gc.number = ws.course_number and gc.course_category_id in (30,40) >> >> left join >> >> performance_regular_can_renewal_sign prcrs >> >> on prcrs.order_number = ri.order_number and prcrs.rounds in (1,2) >> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null >> >> and not exists (select 1 from internal_staff gis where gis.user_id = >> >> ri.user_id) >> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number >> = >> >> ce.number >> >> and ce.extension_type = 3 and ce.isdel = 0 >> >> and ce.extension_value in (1,3,4,7,8,11)) >> >> group by ri.sub_clazz_number, prcrs.rounds >> >> Sql代码是这样的。 >> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 >> >> >> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道: >> >>> >> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? >> >>> 需要明确下,到底是什么节点慢了。 >> >>> >> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: >> >>> >> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >> >>>> 有没有比较好的优化方案能缓解这样的问题? >> >> >> >> >> >> >> >> >> |
我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢? 如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table 在 2020-11-18 10:34:48,"Jark Wu" <[hidden email]> 写道: >另外,join 节点的并发可以再增加一些,提升 join 的处理性能。 > >On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote: > >> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。 >> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。 >> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ >> >> 解决办法文中也有提及: >> >> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下: >> >> execution.checkpointing.interval: 10min # checkpoint间隔时间 >> execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint >> 失败容忍次数 >> restart-strategy: fixed-delay # 重试策略 >> restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数 >> >> Best, >> Jark >> >> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote: >> >>> 即使我将not >>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。 >>> 下面是截图,(我上传图片每次都看不了啥情况) >>> https://imgchr.com/i/DeqixU >>> https://imgchr.com/i/DeqP2T >>> >>> > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道: >>> > >>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 >>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 >>> > >>> > Best, >>> > Jark >>> > >>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote: >>> > >>> >> select >>> >> ri.sub_clazz_number, >>> >> prcrs.rounds, >>> >> count(*) as num >>> >> from >>> >> subclazz gs >>> >> JOIN >>> >> (SELECT gce.number, min( gce.extension_value ) AS grade FROM >>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 >>> GROUP >>> >> BY gce.number) AS temp >>> >> ON >>> >> temp.number = gs.course_number AND temp.grade>30 >>> >> JOIN >>> >> right_info ri >>> >> ON >>> >> gs.number = ri.sub_clazz_number >>> >> join >>> >> wide_subclazz ws >>> >> on >>> >> ws.number = ri.sub_clazz_number >>> >> join >>> >> course gc >>> >> on >>> >> gc.number = ws.course_number and gc.course_category_id in (30,40) >>> >> left join >>> >> performance_regular_can_renewal_sign prcrs >>> >> on prcrs.order_number = ri.order_number and prcrs.rounds in (1,2) >>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null >>> >> and not exists (select 1 from internal_staff gis where gis.user_id = >>> >> ri.user_id) >>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number >>> = >>> >> ce.number >>> >> and ce.extension_type = 3 and ce.isdel = 0 >>> >> and ce.extension_value in (1,3,4,7,8,11)) >>> >> group by ri.sub_clazz_number, prcrs.rounds >>> >> Sql代码是这样的。 >>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 >>> >> >>> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道: >>> >>> >>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? >>> >>> 需要明确下,到底是什么节点慢了。 >>> >>> >>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: >>> >>> >>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >>> >>>> 有没有比较好的优化方案能缓解这样的问题? >>> >> >>> >> >>> >> >>> >>> >>> |
抱歉,描述错了。。
你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢 在 2020-11-18 11:59:52,"hailongwang" <[hidden email]> 写道: >我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ? >你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢? >如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。 > > >[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins >[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table >在 2020-11-18 10:34:48,"Jark Wu" <[hidden email]> 写道: >>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。 >> >>On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote: >> >>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。 >>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。 >>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ >>> >>> 解决办法文中也有提及: >>> >>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下: >>> >>> execution.checkpointing.interval: 10min # checkpoint间隔时间 >>> execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint >>> 失败容忍次数 >>> restart-strategy: fixed-delay # 重试策略 >>> restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数 >>> >>> Best, >>> Jark >>> >>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote: >>> >>>> 即使我将not >>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。 >>>> 下面是截图,(我上传图片每次都看不了啥情况) >>>> https://imgchr.com/i/DeqixU >>>> https://imgchr.com/i/DeqP2T >>>> >>>> > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道: >>>> > >>>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 >>>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 >>>> > >>>> > Best, >>>> > Jark >>>> > >>>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote: >>>> > >>>> >> select >>>> >> ri.sub_clazz_number, >>>> >> prcrs.rounds, >>>> >> count(*) as num >>>> >> from >>>> >> subclazz gs >>>> >> JOIN >>>> >> (SELECT gce.number, min( gce.extension_value ) AS grade FROM >>>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 >>>> GROUP >>>> >> BY gce.number) AS temp >>>> >> ON >>>> >> temp.number = gs.course_number AND temp.grade>30 >>>> >> JOIN >>>> >> right_info ri >>>> >> ON >>>> >> gs.number = ri.sub_clazz_number >>>> >> join >>>> >> wide_subclazz ws >>>> >> on >>>> >> ws.number = ri.sub_clazz_number >>>> >> join >>>> >> course gc >>>> >> on >>>> >> gc.number = ws.course_number and gc.course_category_id in (30,40) >>>> >> left join >>>> >> performance_regular_can_renewal_sign prcrs >>>> >> on prcrs.order_number = ri.order_number and prcrs.rounds in (1,2) >>>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null >>>> >> and not exists (select 1 from internal_staff gis where gis.user_id = >>>> >> ri.user_id) >>>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number >>>> = >>>> >> ce.number >>>> >> and ce.extension_type = 3 and ce.isdel = 0 >>>> >> and ce.extension_value in (1,3,4,7,8,11)) >>>> >> group by ri.sub_clazz_number, prcrs.rounds >>>> >> Sql代码是这样的。 >>>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 >>>> >> >>>> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道: >>>> >>> >>>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? >>>> >>> 需要明确下,到底是什么节点慢了。 >>>> >>> >>>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: >>>> >>> >>>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >>>> >>>> 有没有比较好的优化方案能缓解这样的问题? >>>> >> >>>> >> >>>> >> >>>> >>>> >>>> |
cdc关联
| | 18579099920 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2020年11月18日 13:07,hailongwang 写道: 抱歉,描述错了。。 你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢 在 2020-11-18 11:59:52,"hailongwang" <[hidden email]> 写道: >我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ? >你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢? >如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。 > > >[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins >[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table >在 2020-11-18 10:34:48,"Jark Wu" <[hidden email]> 写道: >>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。 >> >>On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote: >> >>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。 >>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。 >>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ >>> >>> 解决办法文中也有提及: >>> >>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下: >>> >>> execution.checkpointing.interval: 10min # checkpoint间隔时间 >>> execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint >>> 失败容忍次数 >>> restart-strategy: fixed-delay # 重试策略 >>> restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数 >>> >>> Best, >>> Jark >>> >>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote: >>> >>>> 即使我将not >>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。 >>>> 下面是截图,(我上传图片每次都看不了啥情况) >>>> https://imgchr.com/i/DeqixU >>>> https://imgchr.com/i/DeqP2T >>>> >>>> > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道: >>>> > >>>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 >>>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 >>>> > >>>> > Best, >>>> > Jark >>>> > >>>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote: >>>> > >>>> >> select >>>> >> ri.sub_clazz_number, >>>> >> prcrs.rounds, >>>> >> count(*) as num >>>> >> from >>>> >> subclazz gs >>>> >> JOIN >>>> >> (SELECT gce.number, min( gce.extension_value ) AS grade FROM >>>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 >>>> GROUP >>>> >> BY gce.number) AS temp >>>> >> ON >>>> >> temp.number = gs.course_number AND temp.grade>30 >>>> >> JOIN >>>> >> right_info ri >>>> >> ON >>>> >> gs.number = ri.sub_clazz_number >>>> >> join >>>> >> wide_subclazz ws >>>> >> on >>>> >> ws.number = ri.sub_clazz_number >>>> >> join >>>> >> course gc >>>> >> on >>>> >> gc.number = ws.course_number and gc.course_category_id in (30,40) >>>> >> left join >>>> >> performance_regular_can_renewal_sign prcrs >>>> >> on prcrs.order_number = ri.order_number and prcrs.rounds in (1,2) >>>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null >>>> >> and not exists (select 1 from internal_staff gis where gis.user_id = >>>> >> ri.user_id) >>>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number >>>> = >>>> >> ce.number >>>> >> and ce.extension_type = 3 and ce.isdel = 0 >>>> >> and ce.extension_value in (1,3,4,7,8,11)) >>>> >> group by ri.sub_clazz_number, prcrs.rounds >>>> >> Sql代码是这样的。 >>>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 >>>> >> >>>> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道: >>>> >>> >>>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? >>>> >>> 需要明确下,到底是什么节点慢了。 >>>> >>> >>>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: >>>> >>> >>>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >>>> >>>> 有没有比较好的优化方案能缓解这样的问题? >>>> >> >>>> >> >>>> >> >>>> >>>> >>>> |
In reply to this post by hailongwang
我设置的全是cdcmysql表关联
> 在 2020年11月18日,下午1:07,hailongwang <[hidden email]> 写道: > > 抱歉,描述错了。。 > 你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢 > > 在 2020-11-18 11:59:52,"hailongwang" <[hidden email]> 写道: >> 我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ? >> 你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢? >> 如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。 >> >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins >> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table >> 在 2020-11-18 10:34:48,"Jark Wu" <[hidden email]> 写道: >>> 另外,join 节点的并发可以再增加一些,提升 join 的处理性能。 >>> >>> On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote: >>> >>>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。 >>>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。 >>>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ >>>> >>>> 解决办法文中也有提及: >>>> >>>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下: >>>> >>>> execution.checkpointing.interval: 10min # checkpoint间隔时间 >>>> execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint >>>> 失败容忍次数 >>>> restart-strategy: fixed-delay # 重试策略 >>>> restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数 >>>> >>>> Best, >>>> Jark >>>> >>>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote: >>>> >>>>> 即使我将not >>>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。 >>>>> 下面是截图,(我上传图片每次都看不了啥情况) >>>>> https://imgchr.com/i/DeqixU >>>>> https://imgchr.com/i/DeqP2T >>>>> >>>>>> 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道: >>>>>> >>>>>> 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 >>>>>> 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote: >>>>>> >>>>>>> select >>>>>>> ri.sub_clazz_number, >>>>>>> prcrs.rounds, >>>>>>> count(*) as num >>>>>>> from >>>>>>> subclazz gs >>>>>>> JOIN >>>>>>> (SELECT gce.number, min( gce.extension_value ) AS grade FROM >>>>>>> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 >>>>> GROUP >>>>>>> BY gce.number) AS temp >>>>>>> ON >>>>>>> temp.number = gs.course_number AND temp.grade>30 >>>>>>> JOIN >>>>>>> right_info ri >>>>>>> ON >>>>>>> gs.number = ri.sub_clazz_number >>>>>>> join >>>>>>> wide_subclazz ws >>>>>>> on >>>>>>> ws.number = ri.sub_clazz_number >>>>>>> join >>>>>>> course gc >>>>>>> on >>>>>>> gc.number = ws.course_number and gc.course_category_id in (30,40) >>>>>>> left join >>>>>>> performance_regular_can_renewal_sign prcrs >>>>>>> on prcrs.order_number = ri.order_number and prcrs.rounds in (1,2) >>>>>>> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null >>>>>>> and not exists (select 1 from internal_staff gis where gis.user_id = >>>>>>> ri.user_id) >>>>>>> and not exists (select 1 from clazz_extension ce where ws.clazz_number >>>>> = >>>>>>> ce.number >>>>>>> and ce.extension_type = 3 and ce.isdel = 0 >>>>>>> and ce.extension_value in (1,3,4,7,8,11)) >>>>>>> group by ri.sub_clazz_number, prcrs.rounds >>>>>>> Sql代码是这样的。 >>>>>>> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 >>>>>>> >>>>>>>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道: >>>>>>>> >>>>>>>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? >>>>>>>> 需要明确下,到底是什么节点慢了。 >>>>>>>> >>>>>>>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote: >>>>>>>> >>>>>>>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 >>>>>>>>> 有没有比较好的优化方案能缓解这样的问题? >>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> >>>>> |
借宝地,我们的场景很像,cdc流与cdc的join,打成宽表。
我也遇到过这种问题,cdc流与cdc的join, 当数据量大的时候,容易出问题(我的checkpoint设置的时间是2小时,超时时间只设置成了10分钟,失败次数设置得超大,超时时长太长,设置成2小时,基本上数据都流不动) 1、snapshot 的时候,老是会有i/o问题。flink侧的日志就是这样的。 ./flink--taskexecutor-0-flink-taskmanager-v1-11-2-fcf8f675f-gn8q8.log.5:146619:2020-11-14 00:19:53,578 ERROR io.debezium.connector.mysql.SnapshotReader [] - Failed due to error: Aborting snapshot due to error when last running 'SELECT * FROM `low_db`.`t_low_media`': Streaming result set com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@3d208504 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries. MySQL侧的show processlist 就看不到'SELECT * FROM `low_db`.`t_low_media`'这个命令了。 猜测是因为反压严重,然后链路上没有数据传输,空闲太久,挂了。。。。 2、join后的view做sink的时候,由于有join在,在某些情况下,结果输出存在乱序情况(Retract流输出可能放到不同的subtask上),导致sink结果不对。除非并行度设置成1,不然大概率乱序! -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |