flink1.11 mysql cdc checkpoint 失败后程序自动恢复,同步数据出现重复

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.11 mysql cdc checkpoint 失败后程序自动恢复,同步数据出现重复

lingchanhu
This post was updated on .
sourcr:mysql-cdc
sink:elasticsearch

问题描述:
从mysql中同步表数据至elasticsearch后,进行新增再删除的某条数据出现问题,导致sink失败(没加primary
key)。checkpoint失败,程序自动恢复重启后,checkpoint 成功,但是elasticsearch 中的数据是mysql
表中的两倍,出现重复同步情况。
程序的自动恢复不应该是从当前checkpoint 中记录的binlog 位置再同步么?为什么会再重头同步一次呢?
(ddl 中写死了server-id,
                "  'table-name' = '"+ table +"'," +
                "  'server-id' = '"+ serverId +"'" + )


日志:

2020-12-31 17:41:22,189 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10
2020-12-31 17:41:22,191 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 1: flush and obtain global read lock to prevent writes to database
2020-12-31 17:41:22,192 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 2: start transaction with consistent snapshot
2020-12-31 17:41:22,193 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 3: read binlog position of MySQL master
2020-12-31 17:41:22,195 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - using binlog 'mysql-bin.000063' at position '6914476' and gtid ''
2020-12-31 17:41:22,195 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 4: read list of available databases
2020-12-31 17:41:22,197 INFO  io.debezium.connector.mysql.SnapshotReader                   [] -   list of available databases is: [information_schema, 10bei_portal, badminton, dubbo_monitor, grafana, mysql, new_jianghu, noahark, performance_schema, qy_portal, sequelize, sevendays, shadow, short_url, test, vehicle_backend_api, wxb_jobs, zh_portal, zkdash]
2020-12-31 17:41:22,197 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 5: read list of available tables in each database
···省略中间库表扫描打印的日志
2020-12-31 17:41:22,331 INFO  io.debezium.connector.mysql.SnapshotReader                   [] -  snapshot continuing with database(s): [zh_portal]
2020-12-31 17:41:22,331 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 6: generating DROP and CREATE statements to reflect current database schemas:
2020-12-31 17:41:23,093 WARN  io.debezium.connector.mysql.MySqlSchema                      [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Value' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Value'
2020-12-31 17:41:23,093 WARN  io.debezium.connector.mysql.MySqlSchema                      [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Key' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Key'
2020-12-31 17:41:23,100 WARN  io.debezium.connector.mysql.MySqlSchema                      [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Envelope' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Envelope'
2020-12-31 17:41:23,111 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 7: releasing global read lock to enable MySQL writes
2020-12-31 17:41:23,112 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 7: blocked writes to MySQL for a tota l of 00:00:00.919
2020-12-31 17:41:23,113 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: scanning contents of 1 tables while still in transaction
2020-12-31 17:41:23,115 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: - scanning table 'zh_portal.fishpool_login_logging' (1 of 1 tables)
2020-12-31 17:41:23,115 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - For table 'zh_portal.fishpool_login_logging' using select statement: 'SELECT * FROM `zh_portal`.`fishpool_login_logging`'
2020-12-31 17:41:23,647 INFO  com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer [] - Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.
2020-12-31 17:41:23,698 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: - 10000 of 14377 rows scanned from table 'zh_portal.fishpool_login_logging' after 00:00:00.583
2020-12-31 17:41:25,128 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: - Completed scanning a total of 14866 rows from table 'zh_portal.fishpool_login_logging' after 00:00:02.012
2020-12-31 17:41:25,130 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: scanned 14866 rows in 1 tables in 00:00:02.016
2020-12-31 17:41:25,130 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 9: committing transaction
2020-12-31 17:41:25,131 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Completed snapshot in 00:00:02.962
2020-12-31 17:41:25,935 INFO  com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer [] - Received record from streaming binlog phase, released checkpoint lock.
2020-12-31 17:41:25,935 INFO  io.debezium.connector.mysql.ChainedReader                    [] - Transitioning from the snapshot reader to the binlog reader
2020-12-31 17:41:25,945 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client
2020-12-31 17:41:25,948 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client
2020-12-31 17:41:26,089 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Connected to MySQL binlog at 192.168.1.189:3306, starting at binlog file 'mysql-bin.000063', pos=6914476, skipping 0 events plus 0 rows
2020-12-31 17:41:26,090 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Waiting for keepalive thread to start
2020-12-31 17:41:26,090 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client
2020-12-31 17:41:26,190 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Keepalive thread is running
2020-12-31 17:42:33,643 INFO  io.debezium.connector.mysql.BinlogReader                     [] - 1 records sent during previous 00:01:07.697, last recorded offset: {ts_sec=1609407753, file=mysql-bin.000063, pos=6914476, row=1, server_id=151, event=2}
2020-12-31 17:43:13,142 INFO  io.debezium.connector.mysql.BinlogReader                     [] - 1 records sent during previous 00:00:39.5, last recorded offset: {ts_sec=1609407793, file=mysql-bin.000063, pos=6914873, row=1, server_id=151, event=2}
2020-12-31 17:43:13,346 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase [] - Failed Elasticsearch bulk request: Validation Failed: 1: id is missing;
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: id is missing;
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:393) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsync(RestHighLevelClient.java:1609) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsyncAndParseEntity(RestHighLevelClient.java:1580) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.bulkAsync(RestHighLevelClient.java:509) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.lambda$createBulkProcessorBuilder$0(Elasticsearch7ApiCallBridge.java:83) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry$RetryHandler.execute(Retry.java:205) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry.withBackoff(Retry.java:59) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:62) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.access$400(BulkProcessor.java:54) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:504) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:223) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
2020-12-31 17:43:17,376 INFO  org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Could not complete snapshot 12 for operator Source: TableSourceScan(table=[[default_catalog, default_database, fishpool_login_logging]], fields=[id, action_user_id, user_name, action_day, action_time, sn, app_type, online_action_type, remark, ip, mac, create_time, deleted_id]) -> Sink: Sink(table=[default_catalog.default_database.fishpool_login_logging_es], fields=[id, action_user_id, user_name, action_day, action_time, sn, app_type, online_action_type, remark, ip, mac, create_time, deleted_id]) (1/1). Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 12 for operator Source: TableSourceScan(table=[[default_catalog, default_database, fishpool_login_logging]], fields=[id, action_user_id, user_name, action_day, action_time, sn, app_type, online_action_type, remark, ip, mac, create_time, deleted_id]) -> Sink: Sink(table=[default_catalog.default_database.fishpool_login_logging_es], fields=[id, action_user_id, user_name, action_day, action_time, sn, app_type, online_action_type, remark, ip, mac, create_time, deleted_id]) (1/1). Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:892) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:882) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:813) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$4(StreamTask.java:789) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: org.apache.flink.util.SerializedThrowable: An error occurred in ElasticsearchSink.
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:388) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.snapshotState(ElasticsearchSinkBase.java:320) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
... 21 more
Caused by: org.apache.flink.util.SerializedThrowable: Validation Failed: 1: id is missing;
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:393) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsync(RestHighLevelClient.java:1609) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsyncAndParseEntity(RestHighLevelClient.java:1580) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.bulkAsync(RestHighLevelClient.java:509) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.lambda$createBulkProcessorBuilder$0(Elasticsearch7ApiCallBridge.java:83) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry$RetryHandler.execute(Retry.java:205) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry.withBackoff(Retry.java:59) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:62) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.access$400(BulkProcessor.java:54) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:504) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:223) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
... 1 more
2020-12-31 17:43:17,384 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Error during disposal of stream operator.
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:345) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:703) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:635) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:542) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: id is missing;
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:393) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsync(RestHighLevelClient.java:1609) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsyncAndParseEntity(RestHighLevelClient.java:1580) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.bulkAsync(RestHighLevelClient.java:509) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.lambda$createBulkProcessorBuilder$0(Elasticsearch7ApiCallBridge.java:83) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry$RetryHandler.execute(Retry.java:205) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry.withBackoff(Retry.java:59) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:62) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.access$400(BulkProcessor.java:54) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:504) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:223) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
... 1 more
2020-12-31 17:43:17,385 INFO  io.debezium.embedded.EmbeddedEngine                          [] - Stopping the embedded engine
2020-12-31 17:43:17,385 INFO  io.debezium.embedded.EmbeddedEngine                          [] - Waiting for PT5M for connector to stop
2020-12-31 17:43:18,141 INFO  io.debezium.connector.common.BaseSourceTask                  [] - Stopping down connector
2020-12-31 17:43:18,142 INFO  io.debezium.connector.mysql.MySqlConnectorTask               [] - Stopping MySQL connector task
2020-12-31 17:43:18,142 INFO  io.debezium.connector.mysql.ChainedReader                    [] - ChainedReader: Stopping the binlog reader
2020-12-31 17:43:18,142 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Discarding 0 unsent record(s) due to the connector shutting down
2020-12-31 17:43:18,143 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Discarding 0 unsent record(s) due to the connector shutting down
2020-12-31 17:43:18,144 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Stopped reading binlog after 2 events, last recorded offset: {ts_sec=1609407793, file=mysql-bin.000063, pos=6914873, row=1, server_id=151, event=2}
··· checkpoint 失败的日志重复多次后
2020-12-31 17:50:56,088 INFO  com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction    [] - Consumer subtask 0 restored offset state: null.
2020-12-31 17:50:56,088 INFO  com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction    [] - Consumer subtask 0 restored history records state: 8177e25b-7d25-498c-b7e8-95144da02b6e with 3 records.
2020-12-31 17:50:56,089 INFO  com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction    [] - Debezium Properties:
        database.server.name = mysql-binlog-source
        value.converter.schemas.enable = false
        offset.flush.interval.ms = 9223372036854775807
        database.history.instance.name = 8177e25b-7d25-498c-b7e8-95144da02b6e
        database.history = com.alibaba.ververica.cdc.debezium.internal.FlinkDatabaseHistory
        table.whitelist = zh_portal.fishpool_login_logging
        database.hostname = 192.168.1.189
        offset.storage = com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore
        connector.class = io.debezium.connector.mysql.MySqlConnector
        database.password = 123456
        database.server.id = 1234
        include.schema.changes = false
        database.history.store.only.monitored.tables.ddl = true
        name = engine
        database.port = 3306
        tombstones.on.delete = false
        key.converter.schemas.enable = false
        database.user = root
        database.whitelist = zh_portal

2020-12-31 17:50:56,091 INFO  org.apache.kafka.connect.json.JsonConverterConfig            [] - JsonConverterConfig values:
        converter.type = key
        decimal.format = BASE64
        schemas.cache.size = 1000
        schemas.enable = true

2020-12-31 17:50:56,091 INFO  org.apache.kafka.connect.json.JsonConverterConfig            [] - JsonConverterConfig values:
        converter.type = value
        decimal.format = BASE64
        schemas.cache.size = 1000
        schemas.enable = false

2020-12-31 17:50:56,091 INFO  io.debezium.embedded.EmbeddedEngine$EmbeddedConfig           [] - EmbeddedConfig values:
        access.control.allow.methods =
        access.control.allow.origin =
        admin.listeners = null
        bootstrap.servers = [localhost:9092]
        client.dns.lookup = default
        config.providers = []
        connector.client.config.override.policy = None
        header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
        internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
        internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
        key.converter = class org.apache.kafka.connect.json.JsonConverter
        listeners = null
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        offset.flush.interval.ms = 9223372036854775807
        offset.flush.timeout.ms = 5000
        offset.storage.file.filename =
        offset.storage.partitions = null
        offset.storage.replication.factor = null
        offset.storage.topic =
        plugin.path = null
        rest.advertised.host.name = null
        rest.advertised.listener = null
        rest.advertised.port = null
        rest.extension.classes = []
        rest.host.name = null
        rest.port = 8083
        ssl.client.auth = none
        task.shutdown.graceful.timeout.ms = 5000
        topic.tracking.allow.reset = true
        topic.tracking.enable = true
        value.converter = class org.apache.kafka.connect.json.JsonConverter

2020-12-31 17:50:56,091 INFO  org.apache.kafka.connect.runtime.WorkerConfig                [] - Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-12-31 17:50:56,092 INFO  org.apache.kafka.connect.runtime.WorkerConfig                [] - Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] - Starting MySqlConnectorTask with configuration:
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    connector.class = io.debezium.connector.mysql.MySqlConnector
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.user = root
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    offset.storage = com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.server.id = 1234
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.server.name = mysql-binlog-source
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    include.schema.changes = false
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.port = 3306
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    table.whitelist = zh_portal.fishpool_login_logging
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    offset.flush.interval.ms = 9223372036854775807
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    key.converter.schemas.enable = false
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    tombstones.on.delete = false
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.hostname = 192.168.1.189
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.password = ********
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    value.converter.schemas.enable = false
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    name = engine
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.history.store.only.monitored.tables.ddl = true
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.whitelist = zh_portal
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.history.instance.name = 8177e25b-7d25-498c-b7e8-95144da02b6e
2020-12-31 17:50:56,094 INFO  io.debezium.connector.common.BaseSourceTask                  [] -    database.history = com.alibaba.ververica.cdc.debezium.internal.FlinkDatabaseHistory
2020-12-31 17:50:56,106 INFO  io.debezium.connector.mysql.MySqlConnectorTask               [] - Found no existing offset, so preparing to perform a snapshot
2020-12-31 17:50:56,109 INFO  io.debezium.util.Threads                                     [] - Requested thread factory for connector MySqlConnector, id = mysql-binlog-source named = binlog-client
2020-12-31 17:50:56,111 INFO  io.debezium.util.Threads                                     [] - Requested thread factory for connector MySqlConnector, id = mysql-binlog-source named = snapshot
2020-12-31 17:50:56,111 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-snapshot
2020-12-31 17:50:56,112 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Starting snapshot for jdbc:mysql://192.168.1.189:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'root' with locking mode 'minimal'
2020-12-31 17:50:56,114 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Snapshot is using user 'root' with these MySQL grants:
2020-12-31 17:50:56,114 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY PASSWORD '*6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9' WITH GRANT OPTION
2020-12-31 17:50:56,114 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - MySQL server variables related to change data capture:
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_cache_size                             = 32768                                        
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_checksum                               = CRC32                                        
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_direct_non_transactional_updates       = OFF                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_error_action                           = IGNORE_ERROR                                
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_format                                 = ROW                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_gtid_simple_recovery                   = OFF                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_max_flush_queue_time                   = 0                                            
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_order_commits                          = ON                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_row_image                              = FULL                                        
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_rows_query_log_events                  = OFF                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlog_stmt_cache_size                        = 32768                                        
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - binlogging_impossible_mode                    = IGNORE_ERROR                                
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - character_set_client                          = utf8mb4                                      
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - character_set_connection                      = utf8mb4                                      
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - character_set_database                        = utf8mb4                                      
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - character_set_filesystem                      = binary                                      
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - character_set_results                         = utf8                                        
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - character_set_server                          = utf8mb4                                      
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - character_set_system                          = utf8                                        
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - character_sets_dir                            = /usr/share/mysql/charsets/                  
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - collation_connection                          = utf8mb4_general_ci                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - collation_database                            = utf8mb4_unicode_ci                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - collation_server                              = utf8mb4_unicode_ci                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - enforce_gtid_consistency                      = OFF                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - gtid_executed                                 =                                              
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - gtid_mode                                     = OFF                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - gtid_next                                     = AUTOMATIC                                    
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - gtid_owned                                    =                                              
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - gtid_purged                                   =                                              
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - innodb_api_enable_binlog                      = OFF                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - innodb_locks_unsafe_for_binlog                = OFF                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - innodb_version                                = 5.6.40                                      
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - max_binlog_cache_size                         = 18446744073709547520                        
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - max_binlog_size                               = 1073741824                                  
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - max_binlog_stmt_cache_size                    = 18446744073709547520                        
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - protocol_version                              = 10                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - simplified_binlog_gtid_recovery               = OFF                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - slave_type_conversions                        =                                              
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - sync_binlog                                   = 0                                            
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - system_time_zone                              = CST                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - time_zone                                     = SYSTEM                                      
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - tx_isolation                                  = READ-COMMITTED                              
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - tx_read_only                                  = OFF                                          
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - version                                       = 5.6.40-log                                  
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - version_comment                               = MySQL Community Server (GPL)                
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - version_compile_machine                       = x86_64                                      
2020-12-31 17:50:56,117 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - version_compile_os                            = Linux                                        
2020-12-31 17:50:56,118 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10
2020-12-31 17:50:56,119 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 1: flush and obtain global read lock to prevent writes to database
2020-12-31 17:50:56,120 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 2: start transaction with consistent snapshot
2020-12-31 17:50:56,120 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 3: read binlog position of MySQL master
2020-12-31 17:50:56,121 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - using binlog 'mysql-bin.000063' at position '6915270' and gtid ''
2020-12-31 17:50:56,121 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 4: read list of available databases
2020-12-31 17:50:56,122 INFO  io.debezium.connector.mysql.SnapshotReader                   [] -   list of available databases is: [information_schema, 10bei_portal, badminton, dubbo_monitor, grafana, mysql, new_jianghu, noahark, performance_schema, qy_portal, sequelize, sevendays, shadow, short_url, test, vehicle_backend_api, wxb_jobs, zh_portal, zkdash]
2020-12-31 17:50:56,122 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 5: read list of available tables in each database
2020-12-31 17:50:56,193 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 6: generating DROP and CREATE statements to reflect current database schemas:
2020-12-31 17:50:56,205 WARN  io.debezium.connector.mysql.MySqlSchema                      [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Value' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Value'
2020-12-31 17:50:56,205 WARN  io.debezium.connector.mysql.MySqlSchema                      [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Key' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Key'
2020-12-31 17:50:56,205 WARN  io.debezium.connector.mysql.MySqlSchema                      [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Envelope' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Envelope'
2020-12-31 17:50:56,205 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 7: releasing global read lock to enable MySQL writes
2020-12-31 17:50:56,206 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 7: blocked writes to MySQL for a total of 00:00:00.086
2020-12-31 17:50:56,206 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: scanning contents of 1 tables while still in transaction
2020-12-31 17:50:56,207 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: - scanning table 'zh_portal.fishpool_login_logging' (1 of 1 tables)
2020-12-31 17:50:56,207 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - For table 'zh_portal.fishpool_login_logging' using select statement: 'SELECT * FROM `zh_portal`.`fishpool_login_logging`'
2020-12-31 17:50:56,610 INFO  com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer [] - Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.
2020-12-31 17:50:56,641 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: - 10000 of 14377 rows scanned from table 'zh_portal.fishpool_login_logging' after 00:00:00.433
2020-12-31 17:50:57,274 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: - Completed scanning a total of 14866 rows from table 'zh_portal.fishpool_login_logging' after 00:00:01.067
2020-12-31 17:50:57,275 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 8: scanned 14866 rows in 1 tables in 00:00:01.068
2020-12-31 17:50:57,275 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Step 9: committing transaction
2020-12-31 17:50:57,275 INFO  io.debezium.connector.mysql.SnapshotReader                   [] - Completed snapshot in 00:00:01.164
2020-12-31 17:50:58,173 INFO  com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer [] - Received record from streaming binlog phase, released checkpoint lock.
2020-12-31 17:50:58,174 INFO  io.debezium.connector.mysql.ChainedReader                    [] - Transitioning from the snapshot reader to the binlog reader
2020-12-31 17:50:58,178 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client
2020-12-31 17:50:58,179 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client
2020-12-31 17:50:58,300 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Connected to MySQL binlog at 192.168.1.189:3306, starting at binlog file 'mysql-bin.000063', pos=6915270, skipping 0 events plus 0 rows
2020-12-31 17:50:58,300 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client
2020-12-31 17:50:58,300 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Waiting for keepalive thread to start
2020-12-31 17:50:58,400 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Keepalive thread is running






--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:flink1.11 mysql cdc checkpoint 失败后程序自动恢复,同步数据出现重复

smailxie






在程序自动重启恢复的时候,binlog可能被MySQL服务器删除了,导致debeziume connector读取了新的快照。
参考连接:https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-purges-binlog-files_debezium









--

Name:谢波
Mobile:13764228893






在 2021-01-04 10:38:30,"lingchanhu" <[hidden email]> 写道:

>sourcr:mysql-cdc
>sink:elasticsearch
>
>问题描述:
>从mysql中同步表数据至elasticsearch后,进行新增再删除的某条数据出现问题,导致sink失败(没加primary
>key)。checkpoint失败,程序自动恢复重启后,checkpoint 成功,但是elasticsearch 中的数据是mysql
>表中的两倍,出现重复同步情况。
>程序的自动恢复不应该是从当前checkpoint 中记录的binlog 位置再同步么?为什么会再重头同步一次呢?
>(ddl 中写死了server-id,
>                "  'table-name' = '"+ table +"'," +
>                "  'server-id' = '"+ serverId +"'" + )
>
>
>日志:
>
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/