JDBC 并发写入量大时挂掉

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

JDBC 并发写入量大时挂掉

LittleFall
测试发了10个线程,每个线程1000次,一共1万条记录

会在写入几千条的时候挂掉

2020-10-29 12:04:55,573 WARN  org.apache.flink.runtime.taskmanager.Task                  
[] - Join(joinType=[LeftOuterJoin], where=[(ID = ID1)], select=[ID,
PRODUCT_SERVICE, CUSTOMER_NO, CUSTOMER_NAME, CUSTOMER_REQUEST_NO, EXTE    
RNAL_NO, STATUS, ORDER_DATE, CREATE_TIME, COUPON_AMOUNT, ID0,
CHANNEL_RET_CODE, CHANNEL_RET_MSG, STATUS0, CARD_NO, BANK_PAY_WAY,
CREATE_TIME0, UPDATE_TIME0, PAY_AMOUNT, PAYER_FEE, CNET_BIND_CARD_ID,
PAYER_CUSTOMER_REQUEST_NO, OPE      RATOR_NAME, CARD_HOLDER_NAME, ID1,
CUSTOMER_BIZ_REQUEST_NO, GOODS_NAME, GOODS_CAT, GOODS_DESC, GOODS_EXT_INFO,
MEMO, EXTEND_INFO], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ID AS id,      
ID0 AS op_id, ORDER_DATE AS order_date, UPDATE_TIME0 AS complete_date,
PAYER_CUSTOMER_REQUEST_NO AS payer_customer_request_no, CREATE_TIME0 AS
pay_time, CUSTOMER_REQUEST_NO AS customer_request_no, EXTERNAL_NO AS
external_no, STA      TUS0 AS pay_status, STATUS AS order_status, PAY_AMOUNT
AS pay_amount, ABS(PAYER_FEE) AS payer_fee, BANK_PAY_WAY AS bank_pay_way,
GOODS_CAT AS goods_cat, GOODS_NAME AS goods_name, GOODS_DESC AS productdesc,
GOODS_DESC AS goods_des      c, CUSTOMER_BIZ_REQUEST_NO AS
customer_biz_request_no, GOODS_EXT_INFO AS goods_ext_info, MEMO AS memo,
EXTEND_INFO AS extend_info, CHANNEL_RET_CODE AS channel_ret_code,
CHANNEL_RET_MSG AS channel_ret_msg, OPERATOR_NAME AS operato      r,
CUSTOMER_NO AS customer_no, CUSTOMER_NAME AS customer_name, PRODUCT_SERVICE
AS extend, CREATE_TIME0 AS payercreatetime, UPDATE_TIME0 AS payerupdatetime,
CARD_NO AS card_no, CARD_HOLDER_NAME AS card_holder_name, CREATE_TIME AS      
create_time, CNET_BIND_CARD_ID AS cnetbindcarid, COUPON_AMOUNT AS
coupon_amount]) -> Sink:
Sink(table=[default_catalog.default_database.wide_table_1], fields=[id,
op_id, order_date, complete_date, payer_customer_request_no, pay_t      ime,
customer_request_no, external_no, pay_status, order_status, pay_amount,
payer_fee, bank_pay_way, goods_cat, goods_name, productdesc, goods_desc,
customer_biz_request_no, goods_ext_info, memo, extend_info,
channel_ret_code, c      hannel_ret_msg, operator, customer_no,
customer_name, extend, payercreatetime, payerupdatetime, card_no,
card_holder_name, create_time, cnetbindcarid, coupon_amount]) (1/1)
(14a0d11067e4779e13ad3e500f2ab29d) switched from RUNNING       to FAILED.
java.io.IOException: Writing records to JDBC failed.
        at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:157)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at StreamExecCalc$147.processElement(Unknown Source) ~[?:?]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.output(StreamingJoinOperator.java:305)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:278)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-dist_2.12-1.11.2.jar:1.11.2]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
        at
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
~[flink-table_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:310)
~[flink-table_2.12-1.11.2.jar:1.11.2]
        at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.getPrimaryKey(JdbcDynamicOutputFormatBuilder.java:216)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createRowKeyExtractor$7(JdbcDynamicOutputFormatBuilder.java:193)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcDynamicOutputFormatBuilder.java:128)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.connector.jdbc.internal.executor.BufferReduceStatementExecutor.executeBatch(BufferReduceStatementExecutor.java:99)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:200)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:154)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
        ... 32 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: JDBC 并发写入量大时挂掉

Jark
Administrator
看起来是这个bug,已经在1.11.3上修复,你可以自己 build 下 release-1.11 分支。
https://issues.apache.org/jira/browse/FLINK-19423

Best,
Jark

On Thu, 29 Oct 2020 at 16:18, LittleFall <[hidden email]> wrote:

> 测试发了10个线程,每个线程1000次,一共1万条记录
>
> 会在写入几千条的时候挂掉
>
> 2020-10-29 12:04:55,573 WARN  org.apache.flink.runtime.taskmanager.Task
>
> [] - Join(joinType=[LeftOuterJoin], where=[(ID = ID1)], select=[ID,
> PRODUCT_SERVICE, CUSTOMER_NO, CUSTOMER_NAME, CUSTOMER_REQUEST_NO, EXTE
> RNAL_NO, STATUS, ORDER_DATE, CREATE_TIME, COUPON_AMOUNT, ID0,
> CHANNEL_RET_CODE, CHANNEL_RET_MSG, STATUS0, CARD_NO, BANK_PAY_WAY,
> CREATE_TIME0, UPDATE_TIME0, PAY_AMOUNT, PAYER_FEE, CNET_BIND_CARD_ID,
> PAYER_CUSTOMER_REQUEST_NO, OPE      RATOR_NAME, CARD_HOLDER_NAME, ID1,
> CUSTOMER_BIZ_REQUEST_NO, GOODS_NAME, GOODS_CAT, GOODS_DESC, GOODS_EXT_INFO,
> MEMO, EXTEND_INFO], leftInputSpec=[HasUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ID AS id,
> ID0 AS op_id, ORDER_DATE AS order_date, UPDATE_TIME0 AS complete_date,
> PAYER_CUSTOMER_REQUEST_NO AS payer_customer_request_no, CREATE_TIME0 AS
> pay_time, CUSTOMER_REQUEST_NO AS customer_request_no, EXTERNAL_NO AS
> external_no, STA      TUS0 AS pay_status, STATUS AS order_status,
> PAY_AMOUNT
> AS pay_amount, ABS(PAYER_FEE) AS payer_fee, BANK_PAY_WAY AS bank_pay_way,
> GOODS_CAT AS goods_cat, GOODS_NAME AS goods_name, GOODS_DESC AS
> productdesc,
> GOODS_DESC AS goods_des      c, CUSTOMER_BIZ_REQUEST_NO AS
> customer_biz_request_no, GOODS_EXT_INFO AS goods_ext_info, MEMO AS memo,
> EXTEND_INFO AS extend_info, CHANNEL_RET_CODE AS channel_ret_code,
> CHANNEL_RET_MSG AS channel_ret_msg, OPERATOR_NAME AS operato      r,
> CUSTOMER_NO AS customer_no, CUSTOMER_NAME AS customer_name, PRODUCT_SERVICE
> AS extend, CREATE_TIME0 AS payercreatetime, UPDATE_TIME0 AS
> payerupdatetime,
> CARD_NO AS card_no, CARD_HOLDER_NAME AS card_holder_name, CREATE_TIME AS
>
> create_time, CNET_BIND_CARD_ID AS cnetbindcarid, COUPON_AMOUNT AS
> coupon_amount]) -> Sink:
> Sink(table=[default_catalog.default_database.wide_table_1], fields=[id,
> op_id, order_date, complete_date, payer_customer_request_no, pay_t
> ime,
> customer_request_no, external_no, pay_status, order_status, pay_amount,
> payer_fee, bank_pay_way, goods_cat, goods_name, productdesc, goods_desc,
> customer_biz_request_no, goods_ext_info, memo, extend_info,
> channel_ret_code, c      hannel_ret_msg, operator, customer_no,
> customer_name, extend, payercreatetime, payerupdatetime, card_no,
> card_holder_name, create_time, cnetbindcarid, coupon_amount]) (1/1)
> (14a0d11067e4779e13ad3e500f2ab29d) switched from RUNNING       to FAILED.
> java.io.IOException: Writing records to JDBC failed.
>         at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:157)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at StreamExecCalc$147.processElement(Unknown Source) ~[?:?]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.output(StreamingJoinOperator.java:305)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:278)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
>         at
>
> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
> ~[flink-table_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:310)
> ~[flink-table_2.12-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.getPrimaryKey(JdbcDynamicOutputFormatBuilder.java:216)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createRowKeyExtractor$7(JdbcDynamicOutputFormatBuilder.java:193)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcDynamicOutputFormatBuilder.java:128)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.connector.jdbc.internal.executor.BufferReduceStatementExecutor.executeBatch(BufferReduceStatementExecutor.java:99)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:200)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:154)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
>         ... 32 more
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: JDBC 并发写入量大时挂掉

LittleFall
操作中没有 DELETE 语句也会导致这个问题吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: JDBC 并发写入量大时挂掉

Jark
Administrator
LEFT JOIN 是会有 delete (retraction)发生的。

On Thu, 29 Oct 2020 at 16:36, LittleFall <[hidden email]> wrote:

> 操作中没有 DELETE 语句也会导致这个问题吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: JDBC 并发写入量大时挂掉

LittleFall
感谢,使用最新的 release 1.11 之后没有再出现这样的问题。

祝好!



--
Sent from: http://apache-flink.147419.n8.nabble.com/