flink 聚合 job 重启问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 聚合 job 重启问题

郑斌斌
hi all :

     请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.

Thanks&Regards


Reply | Threaded
Open this post in threaded view
|

Re:flink 聚合 job 重启问题

hechuan
伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了


在 2020-07-27 11:33:31,"郑斌斌" <[hidden email]> 写道:
>hi all :
>
>     请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
>但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.
>
>Thanks&Regards
>
>
Reply | Threaded
Open this post in threaded view
|

回复:flink 聚合 job 重启问题

郑斌斌
 需要通过checkpoint恢复启动才没有问题,不知道为什么是这样
------------------------------------------------------------------
发件人:RS <[hidden email]>
发送时间:2020年7月27日(星期一) 15:50
收件人:[hidden email] <[hidden email]>; 郑斌斌 <[hidden email]>
主 题:Re:flink 聚合 job 重启问题

伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了

在 2020-07-27 11:33:31,"郑斌斌" <[hidden email]> 写道:
>hi all :
>
>     请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
>但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.
>
>Thanks&Regards
>
>