|
大家好,我最近采用了flink cdc 对接mysql binlog ,每次重启或者停止job后,都是从表的第一条数据开始消费。
有做checkpoint和持久化,并且日志提示checkpoint成功,不知道为何重启应用始终是从头开始消费?
按照官方定义
.startupOptions(StartupOptions.initial()) 应该是历史+增量才对
flink 版本:1.12.2
flink cdc 版本:flink-sql-connector-mysql-cdc-1.4-SNAPSHOT.jar
相关核心代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(config.getProperty("stateBackend.path")).getCheckpointBackend());
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.enableCheckpointing(10 * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
DebeziumSourceFunction<String> sourceMilApplysLogStream = MySQLSource.<String>builder()
.hostname(config.getProperty("datasource.db"))
.port(Integer.parseInt(config.getProperty("datasource.port")))
.username(config.getProperty("datasource.username"))
.password(config.getProperty("datasource.password"))
.databaseList(config.getProperty("datasource.databaseList"))
.tableList(config.getProperty("datasource.tableList"))
.deserializer(new DebeziumDeserialization())
.serverId(Integer.parseInt(config.getProperty("datasource.server-id")))
.startupOptions(StartupOptions.initial())
.build();
|