JDBC Sink参数connector.write.max-retries 在Oracle中的bug

classic Classic list List threaded Threaded
2 messages Options
111
Reply | Threaded
Open this post in threaded view
|

JDBC Sink参数connector.write.max-retries 在Oracle中的bug

111
Hi,
在使用jdbc sink时,底层使用oracle驱动会出现bug。
出现的现象:当max-retries参数设置为1时,任务能正常报错;当max-retries参数大于1时,虽然程序内部报错,但是任务总是正常结束。


在JDBCUpsertOutputFormat.java中的flush()方法中,设计了重试机制:
public synchronized void flush() throws Exception {
   checkFlushException();

   for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
         break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
         if (i >= maxRetryTimes) {
//throw e;
System.exit(-1);
}
         Thread.sleep(1000 * i);
}
   }
}
但是当executeBatch出现异常时,会进入异常捕获,并清空rank信息(ojdbc14的5339行):
} catch (SQLException var17) {
this.clearBatch();
    this.needToParse = true;
    if (this.sqlKind != 1 && this.sqlKind != 4) {
for(var3 = 0; var3 < var4.length; ++var3) {
            var4[var3] = -3;
}
    }

    DatabaseError.throwBatchUpdateException(var17, this.sqlKind != 1 && this.sqlKind != 4 ? var4.length : var3, var4);
} finally {
下一次执行executeBatch时,由于rank为0,会直接跳过插入操作,返回成功。
public int[] executeBatch() throws SQLException {
synchronized(this.connection) {
int[] var10000;
        synchronized(this) {
int var3 = 0;
            this.setJdbcBatchStyle();
            int[] var4 = new int[this.currentRank];
            if (this.currentRank > 0) {
this.ensureOpen();
从而导致第二次重试的时候直接跳过插入操作,成功返回。





Reply | Threaded
Open this post in threaded view
|

Re: JDBC Sink参数connector.write.max-retries 在Oracle中的bug

Leonard Xu
Hi, xinghalo
这是jdbc sink 的AppenOnlyWriter的一个已知bug,在1.10.1里已经修复[1],社区近期在准备1.10.1的发布,
建议等1.10.1发布后升级即可。

Best,
Leonard

[1]https://issues.apache.org/jira/browse/FLINK-16281 <https://issues.apache.org/jira/browse/FLINK-16281>

> 在 2020年3月24日,18:32,111 <[hidden email]> 写道:
>
> Hi,
> 在使用jdbc sink时,底层使用oracle驱动会出现bug。
> 出现的现象:当max-retries参数设置为1时,任务能正常报错;当max-retries参数大于1时,虽然程序内部报错,但是任务总是正常结束。
>
>
> 在JDBCUpsertOutputFormat.java中的flush()方法中,设计了重试机制:
> public synchronized void flush() throws Exception {
>   checkFlushException();
>
>   for (int i = 1; i <= maxRetryTimes; i++) {
> try {
> jdbcWriter.executeBatch();
> batchCount = 0;
>         break;
> } catch (SQLException e) {
> LOG.error("JDBC executeBatch error, retry times = {}", i, e);
>         if (i >= maxRetryTimes) {
> //throw e;
> System.exit(-1);
> }
>         Thread.sleep(1000 * i);
> }
>   }
> }
> 但是当executeBatch出现异常时,会进入异常捕获,并清空rank信息(ojdbc14的5339行):
> } catch (SQLException var17) {
> this.clearBatch();
>    this.needToParse = true;
>    if (this.sqlKind != 1 && this.sqlKind != 4) {
> for(var3 = 0; var3 < var4.length; ++var3) {
>            var4[var3] = -3;
> }
>    }
>
>    DatabaseError.throwBatchUpdateException(var17, this.sqlKind != 1 && this.sqlKind != 4 ? var4.length : var3, var4);
> } finally {
> 下一次执行executeBatch时,由于rank为0,会直接跳过插入操作,返回成功。
> public int[] executeBatch() throws SQLException {
> synchronized(this.connection) {
> int[] var10000;
>        synchronized(this) {
> int var3 = 0;
>            this.setJdbcBatchStyle();
>            int[] var4 = new int[this.currentRank];
>            if (this.currentRank > 0) {
> this.ensureOpen();
> 从而导致第二次重试的时候直接跳过插入操作,成功返回。
>
>
>
>
>