flink-cdc 简单聚合后再次通过jdbc-connector sink到mysql,SnapshotReader出现报错

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink-cdc 简单聚合后再次通过jdbc-connector sink到mysql,SnapshotReader出现报错

chenjb

老哥们好,麻烦帮忙看看,谢谢


场景是:
source是用cdc从mysql读取数据(大概400多万条),用sql写的一个简单group
by场景,然后通过jdbc-connector写到mysql

配置了checkpoint(5min一次)提交到flink
standalone集群上运行,发现运行不久后好像数据流就停了,大概只处理了几万条数据,但任务依然是运行状态

运行过程中sink很慢(大概算了下每秒17条记录),导致source的反压很高

插入taskmanager日志发现有如下报错

2020-12-30 14:31:38,723 INFO  io.debezium.connector.mysql.SnapshotReader                  
[] - Step 8: committing transaction
2020-12-30 14:31:38,723 ERROR io.debezium.connector.mysql.SnapshotReader                  
[] - Failed due to error: Aborting snapshot due to error when last running
'SELECT * FROM `ydy_hsapdb`.`alfk_ydy_qyw_user_detail`': Streaming result
set com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@2c66154c is still
active. No statements may be issued when any streaming result sets are open
and in use on a given connection. Ensure that you have called .close() on
any active streaming result sets before attempting more queries.
org.apache.kafka.connect.errors.ConnectException: Streaming result set
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@2c66154c is still
active. No statements may be issued when any streaming result sets are open
and in use on a given connection. Ensure that you have called .close() on
any active streaming result sets before attempting more queries. Error code:
0; SQLSTATE: S1000.
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_201]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_201]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: java.sql.SQLException: Streaming result set
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@2c66154c is still
active. No statements may be issued when any streaming result sets are open
and in use on a given connection. Ensure that you have called .close() on
any active streaming result sets before attempting more queries.
        at
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:813)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:747)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        ... 3 more
2020-12-30 14:31:38,729 WARN  io.debezium.connector.mysql.SnapshotReader                  
[] - Failed to close the connection properly
java.sql.SQLException: Streaming result set
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@2c66154c is still
active. No statements may be issued when any streaming result sets are open
and in use on a given connection. Ensure that you have called .close() on
any active streaming result sets before attempting more queries.
        at
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at
com.mysql.cj.jdbc.ConnectionImpl.rollbackNoChecks(ConnectionImpl.java:1961)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at com.mysql.cj.jdbc.ConnectionImpl.rollback(ConnectionImpl.java:1855)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at com.mysql.cj.jdbc.ConnectionImpl.realClose(ConnectionImpl.java:1720)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at com.mysql.cj.jdbc.ConnectionImpl.close(ConnectionImpl.java:720)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at io.debezium.jdbc.JdbcConnection.close(JdbcConnection.java:849)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:850)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_201]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_201]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
2020-12-30 14:35:19,911 INFO  io.debezium.connector.common.BaseSourceTask                
[] - Stopping down connector
2020-12-30 14:35:19,911 INFO  io.debezium.connector.mysql.MySqlConnectorTask              
[] - Stopping MySQL connector task
2020-12-30 14:35:19,911 INFO  io.debezium.connector.mysql.ChainedReader                  
[] - ChainedReader: Stopping the snapshot reader
2020-12-30 14:35:19,912 INFO  io.debezium.connector.mysql.SnapshotReader                  
[] - Discarding 7725 unsent record(s) due to the connector shutting down
2020-12-30 14:35:19,912 INFO  io.debezium.connector.mysql.SnapshotReader                  
[] - Discarding 0 unsent record(s) due to the connector shutting down
2020-12-30 14:35:19,913 INFO  io.debezium.connector.mysql.MySqlConnectorTask              
[] - Connector task finished all work and is now shutdown
2020-12-30 14:35:19,915 ERROR
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction    [] - Reporting
error:
org.apache.kafka.connect.errors.ConnectException: Streaming result set
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@2c66154c is still
active. No statements may be issued when any streaming result sets are open
and in use on a given connection. Ensure that you have called .close() on
any active streaming result sets before attempting more queries. Error code:
0; SQLSTATE: S1000.
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_201]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_201]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: java.sql.SQLException: Streaming result set
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@2c66154c is still
active. No statements may be issued when any streaming result sets are open
and in use on a given connection. Ensure that you have called .close() on
any active streaming result sets before attempting more queries.
        at
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:813)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:1.1.0]
        at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:747)
~[blob_p-84a0db5932f0f8db241fae3c0801c7e31718c243-c15224c86e25f0dfba1c2b8da96760fb:?]
        ... 3 more
2020-12-30 14:35:24,827 INFO  io.debezium.embedded.EmbeddedEngine                        
[] - Stopping the embedded engine



另外有个现象是我在本地idea里面调试了一下,把checkpoint去掉了,然后在cdc的table里面加了这2个参数,发现虽然也很慢,但也一直能运行
" 'debezium.min.row.count.to.stream.results' = '1000', \n" +
" 'debezium.snapshot.fetch.size' = '100', \n" +


最后,我看有个老哥也有和我一样的问题
http://apache-flink.147419.n8.nabble.com/Flink-cdc-connector-snapshot-td8966.html
<http://apache-flink.147419.n8.nabble.com/Flink-cdc-connector-snapshot-td8966.html>
,但没有回复,就再问问







--
Sent from: http://apache-flink.147419.n8.nabble.com/