关于flink sql cdc

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

关于flink sql cdc

Kyle Zhang
Hi,all
  今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 emp_name STRING,
 age INT
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'xxx',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'empoylee1'
);
结果直接用print table
运行一段时间后报错
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
    [] - Error during binlog processing. Last offset stored = null, binlog
reader near position = binlog.001254/132686776
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
    [] - Failed due to error: Error processing binlog event
org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT INTO
execution_flows (project_id, flow_id, version, status, submit_time,
submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
'INSERT INTO execution_flows (project_id, flow_id, version, status,
submit_time, submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at
io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
... 5 more

sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题

Best,
Kyle Zhang
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink sql cdc

Kyle Zhang
代码部分基本没有什么东西

public class CDC {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

        String ddl = "CREATE TABLE mysql_binlog (\n" +
                " id INT NOT NULL,\n" +
                " emp_name STRING,\n" +
                " age INT\n" +
                ") WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'xxx',\n" +
                " 'port' = '3306',\n" +
                " 'username' = 'root',\n" +
                " 'password' = 'root',\n" +
                " 'database-name' = 'eric',\n" +
                " 'table-name' = 'employee1'\n" +
                ")";
        tEnv.executeSql(ddl);

        tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
'print')\n" +
                "LIKE mysql_binlog (EXCLUDING ALL)");

        tEnv.executeSql("insert into print_table select *  from mysql_binlog");
    }
}


08:30:19,945 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 0: disabling autocommit, enabling repeatable read
transactions, and setting lock wait timeout to 10
08:30:19,964 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 1: flush and obtain global read lock to prevent
writes to database
08:30:19,982 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 2: start transaction with consistent snapshot
08:30:19,985 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 3: read binlog position of MySQL master
08:30:19,989 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - using binlog 'binlog.001254' at position '152522471'
and gtid ''
08:30:19,989 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 4: read list of available databases
08:30:19,995 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - list of available databases is: [一堆database]
08:30:19,995 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 5: read list of available tables in each database

。。。一堆table

08:30:20,198 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 6: generating DROP and CREATE statements to reflect
current database schemas:
08:30:20,918 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 7: releasing global read lock to enable MySQL
writes
08:30:20,926 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 7: blocked writes to MySQL for a total of
00:00:00.948
08:30:20,927 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 8: scanning contents of 1 tables while still in
transaction
08:30:20,937 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 8: - scanning table 'eric.employee1' (1 of 1
tables)
08:30:20,937 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - For table 'eric.employee1' using select statement:
'SELECT * FROM `eric`.`employee1`'
08:30:20,949 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 8: - Completed scanning a total of 5 rows from
table 'eric.employee1' after 00:00:00.012
08:30:20,950 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 8: scanned 5 rows in 1 tables in 00:00:00.022
08:30:20,950 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Step 9: committing transaction
08:30:20,954 INFO  io.debezium.connector.mysql.SnapshotReader
         [] - Completed snapshot in 00:00:01.03
08:30:21,391 INFO
com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer []
- Database snapshot phase can't perform checkpoint, acquired
Checkpoint lock.
+I(1,Eric,23)
+I(2,Jack,22)
+I(3,Amy,33)
+I(4,Dell,12)
08:30:21,392 INFO
com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer []
- Received record from streaming binlog phase, released checkpoint
lock.
+I(5,Hello,44)


之后就会报其他表update或者insert的错

09:18:54,326 ERROR io.debezium.connector.mysql.BinlogReader
         [] - Failed due to error: Error processing binlog event
org.apache.kafka.connect.errors.ConnectException: Received DML 'UPDATE
triggers SET trigger_source='SimpleTimeTrigger',
modify_time=1601428734343, enc_type=2,
data=x'1F8B0800000000000000E553C98E1A3110FD179F120925CD9209C38D214D42C4C088E5308A46C8B80B7070DB1DDB4D60D0FC7BAA7A61114CA228C7D0175CAFB657AF6ACF5C3A8FA59F3AB0ACC5F8F39ACFB96615E63CF7A943D3286C7F7A448305077EA8C36D222DA0DDDB14C82F0B9FC8986CD50F41F33668DE3403FC2128F5C2B0D6FE05DD4C6A05B98C659C2820FF8995CB2556AD30C872B685974663C96F4F15CC9E811DA3234966CC426ED884CB5EEC8E3B29284D6705620D76567D071BAEDEBCC57C223751AA7DF9F8EAF22432BA1A8C511EDFCF4653936D27F9FBF18AEBE58A4BE2E14620526BA55E9E305FCBE4813B9FE57047FB42DAC2F8CA54346CCFF19BA0DAA8078D124FC04A436DD68398FADA2570A567F6F21BE8E94F55305818EDD127D7A798778FCA366A47B94B910EEC72ADFE4297DABFE852FB0F75E1873BD817FFCB19BD72AAB0C52190DF302922C95508D4248CB954C5492532012535F461038AB574AA548511DEB1C0BDA19C112C78AA3CCB9683060BDAE74911944B6D6C36488C49CB7B45A0DB1BF4C65F669DE968140E26FDC7D9683A18F4069F8FAE791BC30DA05411925E70E5004761BC5CEC86BA4BCA7473D70378C6E022B4241322775ADA9CCD8F14D2925F80D595F9F9C02D8FC1675B478B5ED6ECF3CB9291747CAE20CAE775D63B99303AB1E63B089F9D499D955A65CF5C05E862CD6232873D08AF40D4DB80671B30575CACFBD2F9F14E0B762852A064BC238F7B2E4E76EB32272D974256F7264286A7DB556B7EAC37F0632FBF0077C290D8EF050000'
WHERE trigger_id=42' for processing, binlog probably contains events
generated with statement or mixed based replication format
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]


On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang <[hidden email]> wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>       [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>       [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink sql cdc

Kyle Zhang
In reply to this post by Kyle Zhang
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang <[hidden email]> wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>       [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>       [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>
Reply | Threaded
Open this post in threaded view
|

回复:关于flink sql cdc

谢治平
能不能退掉邮箱信息,退出




| |
谢治平
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年09月30日 09:24,Kyle Zhang 写道:
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang <[hidden email]> wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>       [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>       [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>
Reply | Threaded
Open this post in threaded view
|

回复: 回复:关于flink sql cdc

史 正超
HI, Kyle Zhang, 我刚才重现了你的问题,虽然你的mysql binlog设置是ROW格式,但是不排除其它session更改了binlog_format格式。重现步骤:

  1.  登录mysql客户端(注意用cmd登录) 执行语句, SET SESSION binlog_format='MIXED'; SET SESSION tx_isolation='REPEATABLE-READ'; COMMIT;
  2.  随便update或者insert一条语句。

然后就得到了和你一样的错误:
2020-09-30 10:46:37.607 [debezium-engine] ERROR com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction  - Reporting error:
org.apache.kafka.connect.errors.ConnectException: Received DML 'update orders set product_id = 1122 where order_number = 10001' for processing, binlog probably contains events generated with statement or mixed based replication format
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML 'update orders set product_id = 1122 where order_number = 10001' for processing, binlog probably contains events generated with statement or mixed based replication format
at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
... 5 common frames omitted

所以应该是其它session更忙了binlog_format格式,并且事务隔离级别为 REPEATABLE-READ
希望对你有帮助,
best,
shizhengchao
________________________________
发件人: 谢治平 <[hidden email]>
发送时间: 2020年9月30日 1:25
收件人: user-zh <[hidden email]>
抄送: user-zh <[hidden email]>
主题: 回复:关于flink sql cdc

能不能退掉邮箱信息,退出




| |
谢治平
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年09月30日 09:24,Kyle Zhang 写道:
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang <[hidden email]> wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>       [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>       [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>