This post was updated on .
sourcr:mysql-cdc
sink:elasticsearch 问题描述: 从mysql中同步表数据至elasticsearch后,进行新增再删除的某条数据出现问题,导致sink失败(没加primary key)。checkpoint失败,程序自动恢复重启后,checkpoint 成功,但是elasticsearch 中的数据是mysql 表中的两倍,出现重复同步情况。 程序的自动恢复不应该是从当前checkpoint 中记录的binlog 位置再同步么?为什么会再重头同步一次呢? (ddl 中写死了server-id, " 'table-name' = '"+ table +"'," + " 'server-id' = '"+ serverId +"'" + ) 日志: 2020-12-31 17:41:22,189 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 2020-12-31 17:41:22,191 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 1: flush and obtain global read lock to prevent writes to database 2020-12-31 17:41:22,192 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 2: start transaction with consistent snapshot 2020-12-31 17:41:22,193 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 3: read binlog position of MySQL master 2020-12-31 17:41:22,195 INFO io.debezium.connector.mysql.SnapshotReader [] - using binlog 'mysql-bin.000063' at position '6914476' and gtid '' 2020-12-31 17:41:22,195 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 4: read list of available databases 2020-12-31 17:41:22,197 INFO io.debezium.connector.mysql.SnapshotReader [] - list of available databases is: [information_schema, 10bei_portal, badminton, dubbo_monitor, grafana, mysql, new_jianghu, noahark, performance_schema, qy_portal, sequelize, sevendays, shadow, short_url, test, vehicle_backend_api, wxb_jobs, zh_portal, zkdash] 2020-12-31 17:41:22,197 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 5: read list of available tables in each database ···省略中间库表扫描打印的日志 2020-12-31 17:41:22,331 INFO io.debezium.connector.mysql.SnapshotReader [] - snapshot continuing with database(s): [zh_portal] 2020-12-31 17:41:22,331 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 6: generating DROP and CREATE statements to reflect current database schemas: 2020-12-31 17:41:23,093 WARN io.debezium.connector.mysql.MySqlSchema [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Value' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Value' 2020-12-31 17:41:23,093 WARN io.debezium.connector.mysql.MySqlSchema [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Key' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Key' 2020-12-31 17:41:23,100 WARN io.debezium.connector.mysql.MySqlSchema [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Envelope' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Envelope' 2020-12-31 17:41:23,111 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 7: releasing global read lock to enable MySQL writes 2020-12-31 17:41:23,112 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 7: blocked writes to MySQL for a tota l of 00:00:00.919 2020-12-31 17:41:23,113 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: scanning contents of 1 tables while still in transaction 2020-12-31 17:41:23,115 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: - scanning table 'zh_portal.fishpool_login_logging' (1 of 1 tables) 2020-12-31 17:41:23,115 INFO io.debezium.connector.mysql.SnapshotReader [] - For table 'zh_portal.fishpool_login_logging' using select statement: 'SELECT * FROM `zh_portal`.`fishpool_login_logging`' 2020-12-31 17:41:23,647 INFO com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer [] - Database snapshot phase can't perform checkpoint, acquired Checkpoint lock. 2020-12-31 17:41:23,698 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: - 10000 of 14377 rows scanned from table 'zh_portal.fishpool_login_logging' after 00:00:00.583 2020-12-31 17:41:25,128 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: - Completed scanning a total of 14866 rows from table 'zh_portal.fishpool_login_logging' after 00:00:02.012 2020-12-31 17:41:25,130 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: scanned 14866 rows in 1 tables in 00:00:02.016 2020-12-31 17:41:25,130 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 9: committing transaction 2020-12-31 17:41:25,131 INFO io.debezium.connector.mysql.SnapshotReader [] - Completed snapshot in 00:00:02.962 2020-12-31 17:41:25,935 INFO com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer [] - Received record from streaming binlog phase, released checkpoint lock. 2020-12-31 17:41:25,935 INFO io.debezium.connector.mysql.ChainedReader [] - Transitioning from the snapshot reader to the binlog reader 2020-12-31 17:41:25,945 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client 2020-12-31 17:41:25,948 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client 2020-12-31 17:41:26,089 INFO io.debezium.connector.mysql.BinlogReader [] - Connected to MySQL binlog at 192.168.1.189:3306, starting at binlog file 'mysql-bin.000063', pos=6914476, skipping 0 events plus 0 rows 2020-12-31 17:41:26,090 INFO io.debezium.connector.mysql.BinlogReader [] - Waiting for keepalive thread to start 2020-12-31 17:41:26,090 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client 2020-12-31 17:41:26,190 INFO io.debezium.connector.mysql.BinlogReader [] - Keepalive thread is running 2020-12-31 17:42:33,643 INFO io.debezium.connector.mysql.BinlogReader [] - 1 records sent during previous 00:01:07.697, last recorded offset: {ts_sec=1609407753, file=mysql-bin.000063, pos=6914476, row=1, server_id=151, event=2} 2020-12-31 17:43:13,142 INFO io.debezium.connector.mysql.BinlogReader [] - 1 records sent during previous 00:00:39.5, last recorded offset: {ts_sec=1609407793, file=mysql-bin.000063, pos=6914873, row=1, server_id=151, event=2} 2020-12-31 17:43:13,346 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase [] - Failed Elasticsearch bulk request: Validation Failed: 1: id is missing; org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: id is missing; at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:393) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsync(RestHighLevelClient.java:1609) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsyncAndParseEntity(RestHighLevelClient.java:1580) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.bulkAsync(RestHighLevelClient.java:509) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.lambda$createBulkProcessorBuilder$0(Elasticsearch7ApiCallBridge.java:83) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry$RetryHandler.execute(Retry.java:205) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry.withBackoff(Retry.java:59) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:62) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.access$400(BulkProcessor.java:54) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:504) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:223) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_181] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181] 2020-12-31 17:43:17,376 INFO org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Could not complete snapshot 12 for operator Source: TableSourceScan(table=[[default_catalog, default_database, fishpool_login_logging]], fields=[id, action_user_id, user_name, action_day, action_time, sn, app_type, online_action_type, remark, ip, mac, create_time, deleted_id]) -> Sink: Sink(table=[default_catalog.default_database.fishpool_login_logging_es], fields=[id, action_user_id, user_name, action_day, action_time, sn, app_type, online_action_type, remark, ip, mac, create_time, deleted_id]) (1/1). Failure reason: Checkpoint was declined. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 12 for operator Source: TableSourceScan(table=[[default_catalog, default_database, fishpool_login_logging]], fields=[id, action_user_id, user_name, action_day, action_time, sn, app_type, online_action_type, remark, ip, mac, create_time, deleted_id]) -> Sink: Sink(table=[default_catalog.default_database.fishpool_login_logging_es], fields=[id, action_user_id, user_name, action_day, action_time, sn, app_type, online_action_type, remark, ip, mac, create_time, deleted_id]) (1/1). Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:892) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:882) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:813) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$4(StreamTask.java:789) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181] Caused by: org.apache.flink.util.SerializedThrowable: An error occurred in ElasticsearchSink. at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:388) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.snapshotState(ElasticsearchSinkBase.java:320) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186) ~[flink-dist_2.11-1.11.1.jar:1.11.1] ... 21 more Caused by: org.apache.flink.util.SerializedThrowable: Validation Failed: 1: id is missing; at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:393) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsync(RestHighLevelClient.java:1609) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsyncAndParseEntity(RestHighLevelClient.java:1580) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.bulkAsync(RestHighLevelClient.java:509) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.lambda$createBulkProcessorBuilder$0(Elasticsearch7ApiCallBridge.java:83) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry$RetryHandler.execute(Retry.java:205) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry.withBackoff(Retry.java:59) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:62) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.access$400(BulkProcessor.java:54) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:504) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:223) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181] ... 1 more 2020-12-31 17:43:17,384 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during disposal of stream operator. java.lang.RuntimeException: An error occurred in ElasticsearchSink. at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:345) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:703) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:635) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:542) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181] Caused by: org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: id is missing; at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:393) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsync(RestHighLevelClient.java:1609) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.performRequestAsyncAndParseEntity(RestHighLevelClient.java:1580) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.bulkAsync(RestHighLevelClient.java:509) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.lambda$createBulkProcessorBuilder$0(Elasticsearch7ApiCallBridge.java:83) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry$RetryHandler.execute(Retry.java:205) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.Retry.withBackoff(Retry.java:59) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:62) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.access$400(BulkProcessor.java:54) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:504) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:223) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[flink-sql-connector-elasticsearch7_2.11-1.11.1.jar:1.11.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181] ... 1 more 2020-12-31 17:43:17,385 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping the embedded engine 2020-12-31 17:43:17,385 INFO io.debezium.embedded.EmbeddedEngine [] - Waiting for PT5M for connector to stop 2020-12-31 17:43:18,141 INFO io.debezium.connector.common.BaseSourceTask [] - Stopping down connector 2020-12-31 17:43:18,142 INFO io.debezium.connector.mysql.MySqlConnectorTask [] - Stopping MySQL connector task 2020-12-31 17:43:18,142 INFO io.debezium.connector.mysql.ChainedReader [] - ChainedReader: Stopping the binlog reader 2020-12-31 17:43:18,142 INFO io.debezium.connector.mysql.BinlogReader [] - Discarding 0 unsent record(s) due to the connector shutting down 2020-12-31 17:43:18,143 INFO io.debezium.connector.mysql.BinlogReader [] - Discarding 0 unsent record(s) due to the connector shutting down 2020-12-31 17:43:18,144 INFO io.debezium.connector.mysql.BinlogReader [] - Stopped reading binlog after 2 events, last recorded offset: {ts_sec=1609407793, file=mysql-bin.000063, pos=6914873, row=1, server_id=151, event=2} ··· checkpoint 失败的日志重复多次后 2020-12-31 17:50:56,088 INFO com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction [] - Consumer subtask 0 restored offset state: null. 2020-12-31 17:50:56,088 INFO com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction [] - Consumer subtask 0 restored history records state: 8177e25b-7d25-498c-b7e8-95144da02b6e with 3 records. 2020-12-31 17:50:56,089 INFO com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction [] - Debezium Properties: database.server.name = mysql-binlog-source value.converter.schemas.enable = false offset.flush.interval.ms = 9223372036854775807 database.history.instance.name = 8177e25b-7d25-498c-b7e8-95144da02b6e database.history = com.alibaba.ververica.cdc.debezium.internal.FlinkDatabaseHistory table.whitelist = zh_portal.fishpool_login_logging database.hostname = 192.168.1.189 offset.storage = com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore connector.class = io.debezium.connector.mysql.MySqlConnector database.password = 123456 database.server.id = 1234 include.schema.changes = false database.history.store.only.monitored.tables.ddl = true name = engine database.port = 3306 tombstones.on.delete = false key.converter.schemas.enable = false database.user = root database.whitelist = zh_portal 2020-12-31 17:50:56,091 INFO org.apache.kafka.connect.json.JsonConverterConfig [] - JsonConverterConfig values: converter.type = key decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = true 2020-12-31 17:50:56,091 INFO org.apache.kafka.connect.json.JsonConverterConfig [] - JsonConverterConfig values: converter.type = value decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = false 2020-12-31 17:50:56,091 INFO io.debezium.embedded.EmbeddedEngine$EmbeddedConfig [] - EmbeddedConfig values: access.control.allow.methods = access.control.allow.origin = admin.listeners = null bootstrap.servers = [localhost:9092] client.dns.lookup = default config.providers = [] connector.client.config.override.policy = None header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter internal.key.converter = class org.apache.kafka.connect.json.JsonConverter internal.value.converter = class org.apache.kafka.connect.json.JsonConverter key.converter = class org.apache.kafka.connect.json.JsonConverter listeners = null metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 offset.flush.interval.ms = 9223372036854775807 offset.flush.timeout.ms = 5000 offset.storage.file.filename = offset.storage.partitions = null offset.storage.replication.factor = null offset.storage.topic = plugin.path = null rest.advertised.host.name = null rest.advertised.listener = null rest.advertised.port = null rest.extension.classes = [] rest.host.name = null rest.port = 8083 ssl.client.auth = none task.shutdown.graceful.timeout.ms = 5000 topic.tracking.allow.reset = true topic.tracking.enable = true value.converter = class org.apache.kafka.connect.json.JsonConverter 2020-12-31 17:50:56,091 INFO org.apache.kafka.connect.runtime.WorkerConfig [] - Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration. 2020-12-31 17:50:56,092 INFO org.apache.kafka.connect.runtime.WorkerConfig [] - Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration. 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - Starting MySqlConnectorTask with configuration: 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - connector.class = io.debezium.connector.mysql.MySqlConnector 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.user = root 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - offset.storage = com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.server.id = 1234 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.server.name = mysql-binlog-source 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - include.schema.changes = false 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.port = 3306 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - table.whitelist = zh_portal.fishpool_login_logging 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - offset.flush.interval.ms = 9223372036854775807 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - key.converter.schemas.enable = false 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - tombstones.on.delete = false 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.hostname = 192.168.1.189 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.password = ******** 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - value.converter.schemas.enable = false 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - name = engine 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.history.store.only.monitored.tables.ddl = true 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.whitelist = zh_portal 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.history.instance.name = 8177e25b-7d25-498c-b7e8-95144da02b6e 2020-12-31 17:50:56,094 INFO io.debezium.connector.common.BaseSourceTask [] - database.history = com.alibaba.ververica.cdc.debezium.internal.FlinkDatabaseHistory 2020-12-31 17:50:56,106 INFO io.debezium.connector.mysql.MySqlConnectorTask [] - Found no existing offset, so preparing to perform a snapshot 2020-12-31 17:50:56,109 INFO io.debezium.util.Threads [] - Requested thread factory for connector MySqlConnector, id = mysql-binlog-source named = binlog-client 2020-12-31 17:50:56,111 INFO io.debezium.util.Threads [] - Requested thread factory for connector MySqlConnector, id = mysql-binlog-source named = snapshot 2020-12-31 17:50:56,111 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-snapshot 2020-12-31 17:50:56,112 INFO io.debezium.connector.mysql.SnapshotReader [] - Starting snapshot for jdbc:mysql://192.168.1.189:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'root' with locking mode 'minimal' 2020-12-31 17:50:56,114 INFO io.debezium.connector.mysql.SnapshotReader [] - Snapshot is using user 'root' with these MySQL grants: 2020-12-31 17:50:56,114 INFO io.debezium.connector.mysql.SnapshotReader [] - GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY PASSWORD '*6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9' WITH GRANT OPTION 2020-12-31 17:50:56,114 INFO io.debezium.connector.mysql.SnapshotReader [] - MySQL server variables related to change data capture: 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_cache_size = 32768 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_checksum = CRC32 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_direct_non_transactional_updates = OFF 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_error_action = IGNORE_ERROR 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_format = ROW 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_gtid_simple_recovery = OFF 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_max_flush_queue_time = 0 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_order_commits = ON 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_row_image = FULL 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_rows_query_log_events = OFF 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlog_stmt_cache_size = 32768 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - binlogging_impossible_mode = IGNORE_ERROR 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - character_set_client = utf8mb4 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - character_set_connection = utf8mb4 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - character_set_database = utf8mb4 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - character_set_filesystem = binary 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - character_set_results = utf8 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - character_set_server = utf8mb4 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - character_set_system = utf8 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - character_sets_dir = /usr/share/mysql/charsets/ 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - collation_connection = utf8mb4_general_ci 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - collation_database = utf8mb4_unicode_ci 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - collation_server = utf8mb4_unicode_ci 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - enforce_gtid_consistency = OFF 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - gtid_executed = 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - gtid_mode = OFF 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - gtid_next = AUTOMATIC 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - gtid_owned = 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - gtid_purged = 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - innodb_api_enable_binlog = OFF 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - innodb_locks_unsafe_for_binlog = OFF 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - innodb_version = 5.6.40 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - max_binlog_cache_size = 18446744073709547520 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - max_binlog_size = 1073741824 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - max_binlog_stmt_cache_size = 18446744073709547520 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - protocol_version = 10 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - simplified_binlog_gtid_recovery = OFF 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - slave_type_conversions = 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - sync_binlog = 0 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - system_time_zone = CST 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - time_zone = SYSTEM 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - tx_isolation = READ-COMMITTED 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - tx_read_only = OFF 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - version = 5.6.40-log 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - version_comment = MySQL Community Server (GPL) 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - version_compile_machine = x86_64 2020-12-31 17:50:56,117 INFO io.debezium.connector.mysql.SnapshotReader [] - version_compile_os = Linux 2020-12-31 17:50:56,118 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 2020-12-31 17:50:56,119 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 1: flush and obtain global read lock to prevent writes to database 2020-12-31 17:50:56,120 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 2: start transaction with consistent snapshot 2020-12-31 17:50:56,120 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 3: read binlog position of MySQL master 2020-12-31 17:50:56,121 INFO io.debezium.connector.mysql.SnapshotReader [] - using binlog 'mysql-bin.000063' at position '6915270' and gtid '' 2020-12-31 17:50:56,121 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 4: read list of available databases 2020-12-31 17:50:56,122 INFO io.debezium.connector.mysql.SnapshotReader [] - list of available databases is: [information_schema, 10bei_portal, badminton, dubbo_monitor, grafana, mysql, new_jianghu, noahark, performance_schema, qy_portal, sequelize, sevendays, shadow, short_url, test, vehicle_backend_api, wxb_jobs, zh_portal, zkdash] 2020-12-31 17:50:56,122 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 5: read list of available tables in each database 2020-12-31 17:50:56,193 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 6: generating DROP and CREATE statements to reflect current database schemas: 2020-12-31 17:50:56,205 WARN io.debezium.connector.mysql.MySqlSchema [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Value' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Value' 2020-12-31 17:50:56,205 WARN io.debezium.connector.mysql.MySqlSchema [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Key' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Key' 2020-12-31 17:50:56,205 WARN io.debezium.connector.mysql.MySqlSchema [] - The Kafka Connect schema name 'mysql-binlog-source.zh_portal.fishpool_login_logging.Envelope' is not a valid Avro schema name, so replacing with 'mysql_binlog_source.zh_portal.fishpool_login_logging.Envelope' 2020-12-31 17:50:56,205 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 7: releasing global read lock to enable MySQL writes 2020-12-31 17:50:56,206 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 7: blocked writes to MySQL for a total of 00:00:00.086 2020-12-31 17:50:56,206 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: scanning contents of 1 tables while still in transaction 2020-12-31 17:50:56,207 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: - scanning table 'zh_portal.fishpool_login_logging' (1 of 1 tables) 2020-12-31 17:50:56,207 INFO io.debezium.connector.mysql.SnapshotReader [] - For table 'zh_portal.fishpool_login_logging' using select statement: 'SELECT * FROM `zh_portal`.`fishpool_login_logging`' 2020-12-31 17:50:56,610 INFO com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer [] - Database snapshot phase can't perform checkpoint, acquired Checkpoint lock. 2020-12-31 17:50:56,641 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: - 10000 of 14377 rows scanned from table 'zh_portal.fishpool_login_logging' after 00:00:00.433 2020-12-31 17:50:57,274 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: - Completed scanning a total of 14866 rows from table 'zh_portal.fishpool_login_logging' after 00:00:01.067 2020-12-31 17:50:57,275 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 8: scanned 14866 rows in 1 tables in 00:00:01.068 2020-12-31 17:50:57,275 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 9: committing transaction 2020-12-31 17:50:57,275 INFO io.debezium.connector.mysql.SnapshotReader [] - Completed snapshot in 00:00:01.164 2020-12-31 17:50:58,173 INFO com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer [] - Received record from streaming binlog phase, released checkpoint lock. 2020-12-31 17:50:58,174 INFO io.debezium.connector.mysql.ChainedReader [] - Transitioning from the snapshot reader to the binlog reader 2020-12-31 17:50:58,178 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client 2020-12-31 17:50:58,179 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client 2020-12-31 17:50:58,300 INFO io.debezium.connector.mysql.BinlogReader [] - Connected to MySQL binlog at 192.168.1.189:3306, starting at binlog file 'mysql-bin.000063', pos=6915270, skipping 0 events plus 0 rows 2020-12-31 17:50:58,300 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql-binlog-source-binlog-client 2020-12-31 17:50:58,300 INFO io.debezium.connector.mysql.BinlogReader [] - Waiting for keepalive thread to start 2020-12-31 17:50:58,400 INFO io.debezium.connector.mysql.BinlogReader [] - Keepalive thread is running -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
在程序自动重启恢复的时候,binlog可能被MySQL服务器删除了,导致debeziume connector读取了新的快照。 参考连接:https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-purges-binlog-files_debezium -- Name:谢波 Mobile:13764228893 在 2021-01-04 10:38:30,"lingchanhu" <[hidden email]> 写道: >sourcr:mysql-cdc >sink:elasticsearch > >问题描述: >从mysql中同步表数据至elasticsearch后,进行新增再删除的某条数据出现问题,导致sink失败(没加primary >key)。checkpoint失败,程序自动恢复重启后,checkpoint 成功,但是elasticsearch 中的数据是mysql >表中的两倍,出现重复同步情况。 >程序的自动恢复不应该是从当前checkpoint 中记录的binlog 位置再同步么?为什么会再重头同步一次呢? >(ddl 中写死了server-id, > " 'table-name' = '"+ table +"'," + > " 'server-id' = '"+ serverId +"'" + ) > > >日志: > > > > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |