Flink cdc 多表关联处理延迟很大

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink cdc 多表关联处理延迟很大

丁浩浩
我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
有没有比较好的优化方案能缓解这样的问题?
Reply | Threaded
Open this post in threaded view
|

Re: Flink cdc 多表关联处理延迟很大

Jark
Administrator
能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
需要明确下,到底是什么节点慢了。

On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:

> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
> 有没有比较好的优化方案能缓解这样的问题?
Reply | Threaded
Open this post in threaded view
|

Re: Flink cdc 多表关联处理延迟很大

丁浩浩
select
    ri.sub_clazz_number,
    prcrs.rounds,
    count(*) as num
from
    subclazz gs
JOIN
    (SELECT gce.number, min( gce.extension_value ) AS grade FROM course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP BY gce.number) AS temp
ON
    temp.number = gs.course_number AND temp.grade>30
JOIN
    right_info ri
ON  
    gs.number = ri.sub_clazz_number
join
    wide_subclazz ws  
on
    ws.number = ri.sub_clazz_number
join
    course gc
on
    gc.number = ws.course_number and gc.course_category_id in (30,40)  
left join
    performance_regular_can_renewal_sign prcrs
on prcrs.order_number = ri.order_number and    prcrs.rounds in (1,2)
where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
and not exists (select 1 from internal_staff gis where gis.user_id = ri.user_id)
and not exists (select 1 from clazz_extension ce where ws.clazz_number = ce.number  
    and ce.extension_type = 3 and ce.isdel = 0  
    and ce.extension_value in (1,3,4,7,8,11))  
group by ri.sub_clazz_number, prcrs.rounds
Sql代码是这样的。
瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。

> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道:
>
> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
> 需要明确下,到底是什么节点慢了。
>
> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:
>
>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>> 有没有比较好的优化方案能缓解这样的问题?


Reply | Threaded
Open this post in threaded view
|

Re: Flink cdc 多表关联处理延迟很大

Jark
Administrator
瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。

Best,
Jark

On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote:

> select
>     ri.sub_clazz_number,
>     prcrs.rounds,
>     count(*) as num
> from
>     subclazz gs
> JOIN
>     (SELECT gce.number, min( gce.extension_value ) AS grade FROM
> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP
> BY gce.number) AS temp
> ON
>     temp.number = gs.course_number AND temp.grade>30
> JOIN
>     right_info ri
> ON
>     gs.number = ri.sub_clazz_number
> join
>     wide_subclazz ws
> on
>     ws.number = ri.sub_clazz_number
> join
>     course gc
> on
>     gc.number = ws.course_number and gc.course_category_id in (30,40)
> left join
>     performance_regular_can_renewal_sign prcrs
> on prcrs.order_number = ri.order_number and    prcrs.rounds in (1,2)
> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
> and not exists (select 1 from internal_staff gis where gis.user_id =
> ri.user_id)
> and not exists (select 1 from clazz_extension ce where ws.clazz_number =
> ce.number
>     and ce.extension_type = 3 and ce.isdel = 0
>     and ce.extension_value in (1,3,4,7,8,11))
> group by ri.sub_clazz_number, prcrs.rounds
> Sql代码是这样的。
> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>
> > 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道:
> >
> > 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
> > 需要明确下,到底是什么节点慢了。
> >
> > On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:
> >
> >> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
> >> 有没有比较好的优化方案能缓解这样的问题?
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink cdc 多表关联处理延迟很大

丁浩浩
即使我将not exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
下面是截图,(我上传图片每次都看不了啥情况)
https://imgchr.com/i/DeqixU
https://imgchr.com/i/DeqP2T

> 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道:
>
> 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
> 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>
> Best,
> Jark
>
> On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote:
>
>> select
>>    ri.sub_clazz_number,
>>    prcrs.rounds,
>>    count(*) as num
>> from
>>    subclazz gs
>> JOIN
>>    (SELECT gce.number, min( gce.extension_value ) AS grade FROM
>> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP
>> BY gce.number) AS temp
>> ON
>>    temp.number = gs.course_number AND temp.grade>30
>> JOIN
>>    right_info ri
>> ON
>>    gs.number = ri.sub_clazz_number
>> join
>>    wide_subclazz ws
>> on
>>    ws.number = ri.sub_clazz_number
>> join
>>    course gc
>> on
>>    gc.number = ws.course_number and gc.course_category_id in (30,40)
>> left join
>>    performance_regular_can_renewal_sign prcrs
>> on prcrs.order_number = ri.order_number and    prcrs.rounds in (1,2)
>> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>> and not exists (select 1 from internal_staff gis where gis.user_id =
>> ri.user_id)
>> and not exists (select 1 from clazz_extension ce where ws.clazz_number =
>> ce.number
>>    and ce.extension_type = 3 and ce.isdel = 0
>>    and ce.extension_value in (1,3,4,7,8,11))
>> group by ri.sub_clazz_number, prcrs.rounds
>> Sql代码是这样的。
>> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>>
>>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道:
>>>
>>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>>> 需要明确下,到底是什么节点慢了。
>>>
>>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:
>>>
>>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>>>> 有没有比较好的优化方案能缓解这样的问题?
>>
>>
>>


Reply | Threaded
Open this post in threaded view
|

Re: Flink cdc 多表关联处理延迟很大

Jark
Administrator
估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ

解决办法文中也有提及:

解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:

execution.checkpointing.interval: 10min   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
失败容忍次数
restart-strategy: fixed-delay  # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数

Best,
Jark

On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote:

> 即使我将not
> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
> 下面是截图,(我上传图片每次都看不了啥情况)
> https://imgchr.com/i/DeqixU
> https://imgchr.com/i/DeqP2T
>
> > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道:
> >
> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
> >
> > Best,
> > Jark
> >
> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote:
> >
> >> select
> >>    ri.sub_clazz_number,
> >>    prcrs.rounds,
> >>    count(*) as num
> >> from
> >>    subclazz gs
> >> JOIN
> >>    (SELECT gce.number, min( gce.extension_value ) AS grade FROM
> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
> GROUP
> >> BY gce.number) AS temp
> >> ON
> >>    temp.number = gs.course_number AND temp.grade>30
> >> JOIN
> >>    right_info ri
> >> ON
> >>    gs.number = ri.sub_clazz_number
> >> join
> >>    wide_subclazz ws
> >> on
> >>    ws.number = ri.sub_clazz_number
> >> join
> >>    course gc
> >> on
> >>    gc.number = ws.course_number and gc.course_category_id in (30,40)
> >> left join
> >>    performance_regular_can_renewal_sign prcrs
> >> on prcrs.order_number = ri.order_number and    prcrs.rounds in (1,2)
> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
> >> and not exists (select 1 from internal_staff gis where gis.user_id =
> >> ri.user_id)
> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number =
> >> ce.number
> >>    and ce.extension_type = 3 and ce.isdel = 0
> >>    and ce.extension_value in (1,3,4,7,8,11))
> >> group by ri.sub_clazz_number, prcrs.rounds
> >> Sql代码是这样的。
> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
> >>
> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道:
> >>>
> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
> >>> 需要明确下,到底是什么节点慢了。
> >>>
> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:
> >>>
> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
> >>>> 有没有比较好的优化方案能缓解这样的问题?
> >>
> >>
> >>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink cdc 多表关联处理延迟很大

Jark
Administrator
另外,join 节点的并发可以再增加一些,提升 join 的处理性能。

On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote:

> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>
> 解决办法文中也有提及:
>
> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>
> execution.checkpointing.interval: 10min   # checkpoint间隔时间
> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
> 失败容忍次数
> restart-strategy: fixed-delay  # 重试策略
> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>
> Best,
> Jark
>
> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote:
>
>> 即使我将not
>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
>> 下面是截图,(我上传图片每次都看不了啥情况)
>> https://imgchr.com/i/DeqixU
>> https://imgchr.com/i/DeqP2T
>>
>> > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道:
>> >
>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>> >
>> > Best,
>> > Jark
>> >
>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote:
>> >
>> >> select
>> >>    ri.sub_clazz_number,
>> >>    prcrs.rounds,
>> >>    count(*) as num
>> >> from
>> >>    subclazz gs
>> >> JOIN
>> >>    (SELECT gce.number, min( gce.extension_value ) AS grade FROM
>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
>> GROUP
>> >> BY gce.number) AS temp
>> >> ON
>> >>    temp.number = gs.course_number AND temp.grade>30
>> >> JOIN
>> >>    right_info ri
>> >> ON
>> >>    gs.number = ri.sub_clazz_number
>> >> join
>> >>    wide_subclazz ws
>> >> on
>> >>    ws.number = ri.sub_clazz_number
>> >> join
>> >>    course gc
>> >> on
>> >>    gc.number = ws.course_number and gc.course_category_id in (30,40)
>> >> left join
>> >>    performance_regular_can_renewal_sign prcrs
>> >> on prcrs.order_number = ri.order_number and    prcrs.rounds in (1,2)
>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>> >> and not exists (select 1 from internal_staff gis where gis.user_id =
>> >> ri.user_id)
>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
>> =
>> >> ce.number
>> >>    and ce.extension_type = 3 and ce.isdel = 0
>> >>    and ce.extension_value in (1,3,4,7,8,11))
>> >> group by ri.sub_clazz_number, prcrs.rounds
>> >> Sql代码是这样的。
>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>> >>
>> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道:
>> >>>
>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>> >>> 需要明确下,到底是什么节点慢了。
>> >>>
>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:
>> >>>
>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>> >>>> 有没有比较好的优化方案能缓解这样的问题?
>> >>
>> >>
>> >>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink cdc 多表关联处理延迟很大

hailongwang
我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
在 2020-11-18 10:34:48,"Jark Wu" <[hidden email]> 写道:

>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>
>On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote:
>
>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>>
>> 解决办法文中也有提及:
>>
>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>>
>> execution.checkpointing.interval: 10min   # checkpoint间隔时间
>> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
>> 失败容忍次数
>> restart-strategy: fixed-delay  # 重试策略
>> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>>
>> Best,
>> Jark
>>
>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote:
>>
>>> 即使我将not
>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
>>> 下面是截图,(我上传图片每次都看不了啥情况)
>>> https://imgchr.com/i/DeqixU
>>> https://imgchr.com/i/DeqP2T
>>>
>>> > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道:
>>> >
>>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>>> >
>>> > Best,
>>> > Jark
>>> >
>>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote:
>>> >
>>> >> select
>>> >>    ri.sub_clazz_number,
>>> >>    prcrs.rounds,
>>> >>    count(*) as num
>>> >> from
>>> >>    subclazz gs
>>> >> JOIN
>>> >>    (SELECT gce.number, min( gce.extension_value ) AS grade FROM
>>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
>>> GROUP
>>> >> BY gce.number) AS temp
>>> >> ON
>>> >>    temp.number = gs.course_number AND temp.grade>30
>>> >> JOIN
>>> >>    right_info ri
>>> >> ON
>>> >>    gs.number = ri.sub_clazz_number
>>> >> join
>>> >>    wide_subclazz ws
>>> >> on
>>> >>    ws.number = ri.sub_clazz_number
>>> >> join
>>> >>    course gc
>>> >> on
>>> >>    gc.number = ws.course_number and gc.course_category_id in (30,40)
>>> >> left join
>>> >>    performance_regular_can_renewal_sign prcrs
>>> >> on prcrs.order_number = ri.order_number and    prcrs.rounds in (1,2)
>>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>>> >> and not exists (select 1 from internal_staff gis where gis.user_id =
>>> >> ri.user_id)
>>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
>>> =
>>> >> ce.number
>>> >>    and ce.extension_type = 3 and ce.isdel = 0
>>> >>    and ce.extension_value in (1,3,4,7,8,11))
>>> >> group by ri.sub_clazz_number, prcrs.rounds
>>> >> Sql代码是这样的。
>>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>>> >>
>>> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道:
>>> >>>
>>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>>> >>> 需要明确下,到底是什么节点慢了。
>>> >>>
>>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:
>>> >>>
>>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>>> >>>> 有没有比较好的优化方案能缓解这样的问题?
>>> >>
>>> >>
>>> >>
>>>
>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re: Flink cdc 多表关联处理延迟很大

hailongwang
抱歉,描述错了。。
你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢

在 2020-11-18 11:59:52,"hailongwang" <[hidden email]> 写道:

>我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
>你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
>如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。
>
>
>[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
>[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
>在 2020-11-18 10:34:48,"Jark Wu" <[hidden email]> 写道:
>>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>>
>>On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote:
>>
>>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
>>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
>>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>>>
>>> 解决办法文中也有提及:
>>>
>>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>>>
>>> execution.checkpointing.interval: 10min   # checkpoint间隔时间
>>> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
>>> 失败容忍次数
>>> restart-strategy: fixed-delay  # 重试策略
>>> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote:
>>>
>>>> 即使我将not
>>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
>>>> 下面是截图,(我上传图片每次都看不了啥情况)
>>>> https://imgchr.com/i/DeqixU
>>>> https://imgchr.com/i/DeqP2T
>>>>
>>>> > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道:
>>>> >
>>>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>>>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>>>> >
>>>> > Best,
>>>> > Jark
>>>> >
>>>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote:
>>>> >
>>>> >> select
>>>> >>    ri.sub_clazz_number,
>>>> >>    prcrs.rounds,
>>>> >>    count(*) as num
>>>> >> from
>>>> >>    subclazz gs
>>>> >> JOIN
>>>> >>    (SELECT gce.number, min( gce.extension_value ) AS grade FROM
>>>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
>>>> GROUP
>>>> >> BY gce.number) AS temp
>>>> >> ON
>>>> >>    temp.number = gs.course_number AND temp.grade>30
>>>> >> JOIN
>>>> >>    right_info ri
>>>> >> ON
>>>> >>    gs.number = ri.sub_clazz_number
>>>> >> join
>>>> >>    wide_subclazz ws
>>>> >> on
>>>> >>    ws.number = ri.sub_clazz_number
>>>> >> join
>>>> >>    course gc
>>>> >> on
>>>> >>    gc.number = ws.course_number and gc.course_category_id in (30,40)
>>>> >> left join
>>>> >>    performance_regular_can_renewal_sign prcrs
>>>> >> on prcrs.order_number = ri.order_number and    prcrs.rounds in (1,2)
>>>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>>>> >> and not exists (select 1 from internal_staff gis where gis.user_id =
>>>> >> ri.user_id)
>>>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
>>>> =
>>>> >> ce.number
>>>> >>    and ce.extension_type = 3 and ce.isdel = 0
>>>> >>    and ce.extension_value in (1,3,4,7,8,11))
>>>> >> group by ri.sub_clazz_number, prcrs.rounds
>>>> >> Sql代码是这样的。
>>>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>>>> >>
>>>> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道:
>>>> >>>
>>>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>>>> >>> 需要明确下,到底是什么节点慢了。
>>>> >>>
>>>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:
>>>> >>>
>>>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>>>> >>>> 有没有比较好的优化方案能缓解这样的问题?
>>>> >>
>>>> >>
>>>> >>
>>>>
>>>>
>>>>
Reply | Threaded
Open this post in threaded view
|

回复:Flink cdc 多表关联处理延迟很大

丁浩浩
cdc关联


| |
18579099920
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年11月18日 13:07,hailongwang 写道:
抱歉,描述错了。。
你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢

在 2020-11-18 11:59:52,"hailongwang" <[hidden email]> 写道:

>我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
>你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
>如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。
>
>
>[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
>[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
>在 2020-11-18 10:34:48,"Jark Wu" <[hidden email]> 写道:
>>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>>
>>On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote:
>>
>>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
>>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
>>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>>>
>>> 解决办法文中也有提及:
>>>
>>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>>>
>>> execution.checkpointing.interval: 10min   # checkpoint间隔时间
>>> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
>>> 失败容忍次数
>>> restart-strategy: fixed-delay  # 重试策略
>>> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote:
>>>
>>>> 即使我将not
>>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
>>>> 下面是截图,(我上传图片每次都看不了啥情况)
>>>> https://imgchr.com/i/DeqixU
>>>> https://imgchr.com/i/DeqP2T
>>>>
>>>> > 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道:
>>>> >
>>>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>>>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>>>> >
>>>> > Best,
>>>> > Jark
>>>> >
>>>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote:
>>>> >
>>>> >> select
>>>> >>    ri.sub_clazz_number,
>>>> >>    prcrs.rounds,
>>>> >>    count(*) as num
>>>> >> from
>>>> >>    subclazz gs
>>>> >> JOIN
>>>> >>    (SELECT gce.number, min( gce.extension_value ) AS grade FROM
>>>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
>>>> GROUP
>>>> >> BY gce.number) AS temp
>>>> >> ON
>>>> >>    temp.number = gs.course_number AND temp.grade>30
>>>> >> JOIN
>>>> >>    right_info ri
>>>> >> ON
>>>> >>    gs.number = ri.sub_clazz_number
>>>> >> join
>>>> >>    wide_subclazz ws
>>>> >> on
>>>> >>    ws.number = ri.sub_clazz_number
>>>> >> join
>>>> >>    course gc
>>>> >> on
>>>> >>    gc.number = ws.course_number and gc.course_category_id in (30,40)
>>>> >> left join
>>>> >>    performance_regular_can_renewal_sign prcrs
>>>> >> on prcrs.order_number = ri.order_number and    prcrs.rounds in (1,2)
>>>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>>>> >> and not exists (select 1 from internal_staff gis where gis.user_id =
>>>> >> ri.user_id)
>>>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
>>>> =
>>>> >> ce.number
>>>> >>    and ce.extension_type = 3 and ce.isdel = 0
>>>> >>    and ce.extension_value in (1,3,4,7,8,11))
>>>> >> group by ri.sub_clazz_number, prcrs.rounds
>>>> >> Sql代码是这样的。
>>>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>>>> >>
>>>> >>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道:
>>>> >>>
>>>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>>>> >>> 需要明确下,到底是什么节点慢了。
>>>> >>>
>>>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:
>>>> >>>
>>>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>>>> >>>> 有没有比较好的优化方案能缓解这样的问题?
>>>> >>
>>>> >>
>>>> >>
>>>>
>>>>
>>>>
Reply | Threaded
Open this post in threaded view
|

Re: Flink cdc 多表关联处理延迟很大

丁浩浩
In reply to this post by hailongwang
我设置的全是cdcmysql表关联


> 在 2020年11月18日,下午1:07,hailongwang <[hidden email]> 写道:
>
> 抱歉,描述错了。。
> 你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢
>
> 在 2020-11-18 11:59:52,"hailongwang" <[hidden email]> 写道:
>> 我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
>> 你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
>> 如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。
>>
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
>> 在 2020-11-18 10:34:48,"Jark Wu" <[hidden email]> 写道:
>>> 另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>>>
>>> On Wed, 18 Nov 2020 at 11:34, Jark Wu <[hidden email]> wrote:
>>>
>>>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
>>>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
>>>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>>>>
>>>> 解决办法文中也有提及:
>>>>
>>>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>>>>
>>>> execution.checkpointing.interval: 10min   # checkpoint间隔时间
>>>> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
>>>> 失败容忍次数
>>>> restart-strategy: fixed-delay  # 重试策略
>>>> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <[hidden email]> wrote:
>>>>
>>>>> 即使我将not
>>>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
>>>>> 下面是截图,(我上传图片每次都看不了啥情况)
>>>>> https://imgchr.com/i/DeqixU
>>>>> https://imgchr.com/i/DeqP2T
>>>>>
>>>>>> 在 2020年11月16日,上午10:29,Jark Wu <[hidden email]> 写道:
>>>>>>
>>>>>> 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>>>>>> 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <[hidden email]> wrote:
>>>>>>
>>>>>>> select
>>>>>>>   ri.sub_clazz_number,
>>>>>>>   prcrs.rounds,
>>>>>>>   count(*) as num
>>>>>>> from
>>>>>>>   subclazz gs
>>>>>>> JOIN
>>>>>>>   (SELECT gce.number, min( gce.extension_value ) AS grade FROM
>>>>>>> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
>>>>> GROUP
>>>>>>> BY gce.number) AS temp
>>>>>>> ON
>>>>>>>   temp.number = gs.course_number AND temp.grade>30
>>>>>>> JOIN
>>>>>>>   right_info ri
>>>>>>> ON
>>>>>>>   gs.number = ri.sub_clazz_number
>>>>>>> join
>>>>>>>   wide_subclazz ws
>>>>>>> on
>>>>>>>   ws.number = ri.sub_clazz_number
>>>>>>> join
>>>>>>>   course gc
>>>>>>> on
>>>>>>>   gc.number = ws.course_number and gc.course_category_id in (30,40)
>>>>>>> left join
>>>>>>>   performance_regular_can_renewal_sign prcrs
>>>>>>> on prcrs.order_number = ri.order_number and    prcrs.rounds in (1,2)
>>>>>>> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>>>>>>> and not exists (select 1 from internal_staff gis where gis.user_id =
>>>>>>> ri.user_id)
>>>>>>> and not exists (select 1 from clazz_extension ce where ws.clazz_number
>>>>> =
>>>>>>> ce.number
>>>>>>>   and ce.extension_type = 3 and ce.isdel = 0
>>>>>>>   and ce.extension_value in (1,3,4,7,8,11))
>>>>>>> group by ri.sub_clazz_number, prcrs.rounds
>>>>>>> Sql代码是这样的。
>>>>>>> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>>>>>>>
>>>>>>>> 在 2020年11月14日,下午5:53,Jark Wu <[hidden email]> 写道:
>>>>>>>>
>>>>>>>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>>>>>>>> 需要明确下,到底是什么节点慢了。
>>>>>>>>
>>>>>>>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <[hidden email]> wrote:
>>>>>>>>
>>>>>>>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>>>>>>>>> 有没有比较好的优化方案能缓解这样的问题?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>>


Reply | Threaded
Open this post in threaded view
|

Re: Flink cdc 多表关联处理延迟很大

jindy_liu
借宝地,我们的场景很像,cdc流与cdc的join,打成宽表。

我也遇到过这种问题,cdc流与cdc的join,
当数据量大的时候,容易出问题(我的checkpoint设置的时间是2小时,超时时间只设置成了10分钟,失败次数设置得超大,超时时长太长,设置成2小时,基本上数据都流不动)

1、snapshot 的时候,老是会有i/o问题。flink侧的日志就是这样的。
./flink--taskexecutor-0-flink-taskmanager-v1-11-2-fcf8f675f-gn8q8.log.5:146619:2020-11-14
00:19:53,578 ERROR io.debezium.connector.mysql.SnapshotReader                  
[] - Failed due to error: Aborting snapshot due to error when last running
'SELECT * FROM `low_db`.`t_low_media`': Streaming result set
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@3d208504 is still
active. No statements may be issued when any streaming result sets are open
and in use on a given connection. Ensure that you have called .close() on
any active streaming result sets before attempting more queries.

MySQL侧的show processlist 就看不到'SELECT * FROM `low_db`.`t_low_media`'这个命令了。

猜测是因为反压严重,然后链路上没有数据传输,空闲太久,挂了。。。。

2、join后的view做sink的时候,由于有join在,在某些情况下,结果输出存在乱序情况(Retract流输出可能放到不同的subtask上),导致sink结果不对。除非并行度设置成1,不然大概率乱序!






--
Sent from: http://apache-flink.147419.n8.nabble.com/