Flink 版本:
flink:1.11.1-scala_2.12 连接器 mysql-connector-java-8.0.21 flink-sql-connector-kafka_2.12-1.11.1 flink-connector-jdbc_2.12-1.11.1 Flink SQL: CREATE TABLE source_user_name ( loan_no int, name varchar, PRIMARY KEY (loan_no) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'test.username', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_flink_name_group', 'format'='canal-json', 'scan.startup.mode' = 'group-offsets' ); CREATE TABLE test_flink_name_sink ( loan_no int, name varchar, PRIMARY KEY (loan_no) NOT ENFORCED ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://host.docker.internal:3306/test?&rewriteBatchedStatements=true', 'connector.table' = 'username', 'connector.driver' = 'com.mysql.cj.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '', 'connector.write.flush.max-rows' = '5000', 'connector.write.flush.interval' = '1s' ); insert into test_flink_name_sink (loan_no,name) select loan_no,name from source_user_name; 外部 sql: CREATE TABLE username ( loan_no int PRIMARY KEY, name varchar(10) ); insert into username values (1,'a'); 架构是 mysql-canal-kafka-flink-mysql 同时执行(一次输入两行) UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1; UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1; 发现目标数据库中结果丢失,结果稳定复现。 分析原因: ``` 上游一个update下游会落地两个sql 1.insert into after value 2.delete before value 而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch 如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a 这个时候就会触发问题 insert batch结束之后数据变成了id:1,name:a 再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了 第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了 ``` 换成新版 JDBC 配置之后没有这个问题。 请问这是已经发现的问题吗?有没有 issue 号 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
Hi,
这个是我们在实现新版 JDBC sink 的时候发现的问题,所以新版中修复了这个问题。 由于旧版 JDBC 修复起来比较麻烦,可能导致不兼容,且旧版在不久的将来会移除,所以目前没有去修复。 Best, Jark On Mon, 14 Sep 2020 at 11:38, LittleFall <[hidden email]> wrote: > Flink 版本: > flink:1.11.1-scala_2.12 > 连接器 > mysql-connector-java-8.0.21 > flink-sql-connector-kafka_2.12-1.11.1 > flink-connector-jdbc_2.12-1.11.1 > > Flink SQL: > > CREATE TABLE source_user_name ( > loan_no int, > name varchar, > PRIMARY KEY (loan_no) NOT ENFORCED > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test.username', > 'properties.bootstrap.servers' = 'kafka:9092', > 'properties.group.id' = 'test_flink_name_group', > 'format'='canal-json', > 'scan.startup.mode' = 'group-offsets' > ); > > CREATE TABLE test_flink_name_sink ( > loan_no int, > name varchar, > PRIMARY KEY (loan_no) NOT ENFORCED > ) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = > > 'jdbc:mysql://host.docker.internal:3306/test?&rewriteBatchedStatements=true', > 'connector.table' = 'username', > 'connector.driver' = 'com.mysql.cj.jdbc.Driver', > 'connector.username' = 'root', > 'connector.password' = '', > 'connector.write.flush.max-rows' = '5000', > 'connector.write.flush.interval' = '1s' > ); > > insert into test_flink_name_sink (loan_no,name) > select loan_no,name from source_user_name; > > > 外部 sql: > > CREATE TABLE username ( > loan_no int PRIMARY KEY, > name varchar(10) > ); > > insert into username values (1,'a'); > > 架构是 mysql-canal-kafka-flink-mysql > > 同时执行(一次输入两行) > > UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1; > UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1; > > 发现目标数据库中结果丢失,结果稳定复现。 > > 分析原因: > > ``` > 上游一个update下游会落地两个sql > 1.insert into after value > 2.delete before value > 而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch > > 如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a > 这个时候就会触发问题 > insert batch结束之后数据变成了id:1,name:a > 再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了 > 第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了 > ``` > > 换成新版 JDBC 配置之后没有这个问题。 > > 请问这是已经发现的问题吗?有没有 issue 号 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
In reply to this post by LittleFall
update 怎么触发的 delete 哦?
在 2020-09-14 11:37:07,"LittleFall" <[hidden email]> 写道: >Flink 版本: >flink:1.11.1-scala_2.12 >连接器 >mysql-connector-java-8.0.21 >flink-sql-connector-kafka_2.12-1.11.1 >flink-connector-jdbc_2.12-1.11.1 > >Flink SQL: > >CREATE TABLE source_user_name ( > loan_no int, > name varchar, > PRIMARY KEY (loan_no) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'test.username', > 'properties.bootstrap.servers' = 'kafka:9092', > 'properties.group.id' = 'test_flink_name_group', > 'format'='canal-json', > 'scan.startup.mode' = 'group-offsets' >); > >CREATE TABLE test_flink_name_sink ( > loan_no int, > name varchar, > PRIMARY KEY (loan_no) NOT ENFORCED >) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = >'jdbc:mysql://host.docker.internal:3306/test?&rewriteBatchedStatements=true', > 'connector.table' = 'username', > 'connector.driver' = 'com.mysql.cj.jdbc.Driver', > 'connector.username' = 'root', > 'connector.password' = '', > 'connector.write.flush.max-rows' = '5000', > 'connector.write.flush.interval' = '1s' >); > >insert into test_flink_name_sink (loan_no,name) >select loan_no,name from source_user_name; > > >外部 sql: > >CREATE TABLE username ( > loan_no int PRIMARY KEY, > name varchar(10) >); > >insert into username values (1,'a'); > >架构是 mysql-canal-kafka-flink-mysql > >同时执行(一次输入两行) > >UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1; >UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1; > >发现目标数据库中结果丢失,结果稳定复现。 > >分析原因: > >``` >上游一个update下游会落地两个sql >1.insert into after value >2.delete before value >而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch > >如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a >这个时候就会触发问题 >insert batch结束之后数据变成了id:1,name:a >再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了 >第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了 >``` > >换成新版 JDBC 配置之后没有这个问题。 > >请问这是已经发现的问题吗?有没有 issue 号 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by LittleFall
update 怎么触发的 delete 哦?
在 2020-09-14 11:37:07,"LittleFall" <[hidden email]> 写道: >Flink 版本: >flink:1.11.1-scala_2.12 >连接器 >mysql-connector-java-8.0.21 >flink-sql-connector-kafka_2.12-1.11.1 >flink-connector-jdbc_2.12-1.11.1 > >Flink SQL: > >CREATE TABLE source_user_name ( > loan_no int, > name varchar, > PRIMARY KEY (loan_no) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'test.username', > 'properties.bootstrap.servers' = 'kafka:9092', > 'properties.group.id' = 'test_flink_name_group', > 'format'='canal-json', > 'scan.startup.mode' = 'group-offsets' >); > >CREATE TABLE test_flink_name_sink ( > loan_no int, > name varchar, > PRIMARY KEY (loan_no) NOT ENFORCED >) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = >'jdbc:mysql://host.docker.internal:3306/test?&rewriteBatchedStatements=true', > 'connector.table' = 'username', > 'connector.driver' = 'com.mysql.cj.jdbc.Driver', > 'connector.username' = 'root', > 'connector.password' = '', > 'connector.write.flush.max-rows' = '5000', > 'connector.write.flush.interval' = '1s' >); > >insert into test_flink_name_sink (loan_no,name) >select loan_no,name from source_user_name; > > >外部 sql: > >CREATE TABLE username ( > loan_no int PRIMARY KEY, > name varchar(10) >); > >insert into username values (1,'a'); > >架构是 mysql-canal-kafka-flink-mysql > >同时执行(一次输入两行) > >UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1; >UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1; > >发现目标数据库中结果丢失,结果稳定复现。 > >分析原因: > >``` >上游一个update下游会落地两个sql >1.insert into after value >2.delete before value >而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch > >如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a >这个时候就会触发问题 >insert batch结束之后数据变成了id:1,name:a >再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了 >第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了 >``` > >换成新版 JDBC 配置之后没有这个问题。 > >请问这是已经发现的问题吗?有没有 issue 号 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by LittleFall
请问下各位,flink1.8.1,flink sql(非java代码转化)方式下,数据格式'2018-1-2T12:00Z'如何sink到oracle的date字段,to_timestamp函数在1.8中还没有
|
In reply to this post by LittleFall
> 在 2020年9月15日,16:52,LittleFall <[hidden email]> 写道: > > 谢谢,请问有相关的 issue 链接吗 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ To @LItteFall : 没有对应的issue,因为是在修复changlog issue[1]时在这个issue里一起修复的,代码可以看下TableBufferReducedStatementExecutor里reduceBuffer保证是对同key上得不同操作顺序执行的。 To @Michael Ran: update 怎么触发的 delete 哦? LItteFall 是在数据库的表中触发了update操作,然后数据库的binlog通过 CDC工具 canal 以 canal-json 格式写入到kafka的表中,一个update 会对应UPDATE_BEFORE,UPDATE_AFTER两条数据, JDBC connector 对应的处理会生成两条sql, 一条delete和一条insert. 祝好 Leonard [1]https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues <https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues> |
有主键吗? 有的话不会触发delete 才对
在 2020-09-28 15:54:49,"Leonard Xu" <[hidden email]> 写道: > > >> 在 2020年9月15日,16:52,LittleFall <[hidden email]> 写道: >> >> 谢谢,请问有相关的 issue 链接吗 >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ > >To @LItteFall : > >没有对应的issue,因为是在修复changlog issue[1]时在这个issue里一起修复的,代码可以看下TableBufferReducedStatementExecutor里reduceBuffer保证是对同key上得不同操作顺序执行的。 > >To @Michael Ran: > >update 怎么触发的 delete 哦? > >LItteFall 是在数据库的表中触发了update操作,然后数据库的binlog通过 CDC工具 canal 以 canal-json 格式写入到kafka的表中,一个update 会对应UPDATE_BEFORE,UPDATE_AFTER两条数据, JDBC connector 对应的处理会生成两条sql, 一条delete和一条insert. > > >祝好 >Leonard >[1]https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues <https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues> |
Free forum by Nabble | Edit this page |