Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

junbaozhang
Hi,all:
       近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法:


/**
*
 Mysql upsert query use DUPLICATE KEY UPDATE.
*
*
 <p>NOTE: It requires Mysql's primary key to be consistent with pkFields.
*
*
 <p>We don't use REPLACE INTO, if there are other fields, we can keep their previous values.
*/
@Override
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields)
 {
String
 updateClause = Arrays.stream(fieldNames)
.map(f ->
 quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName,
 fieldNames) +
"
 ON DUPLICATE KEY UPDATE " + updateClause
);
}

该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成:

String updateClause = Arrays.stream(uniqueKeyFields)
   .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
   .collect(Collectors.joining(", "));

麻烦各位大佬确认。
junbaozhang
Reply | Threaded
Open this post in threaded view
|

回复: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

史 正超
应该是没有问题的,
首先你在flink sql中指定的primary key 应该要与mysql中的唯一索引或者主键对应。
其次那个方法里组装出来的语句 类似下面的语句:
INSERT INTO `tablename`(`key1`, `key2`, `f1`, `f2`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), `f1`=VALUES(`f1`), `f2`=VALUES(`f2`)

里面已经包含了定义的key, 当发生唯一键冲突时,会执行更新。所以无需指定uniqueKeyFields的
________________________________
发件人: [hidden email] <[hidden email]>
发送时间: 2020年11月5日 10:58
收件人: [hidden email] <[hidden email]>
主题: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

Hi,all:
       近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法:


/**
*
 Mysql upsert query use DUPLICATE KEY UPDATE.
*
*
 <p>NOTE: It requires Mysql's primary key to be consistent with pkFields.
*
*
 <p>We don't use REPLACE INTO, if there are other fields, we can keep their previous values.
*/
@Override
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields)
 {
String
 updateClause = Arrays.stream(fieldNames)
.map(f ->
 quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName,
 fieldNames) +
"
 ON DUPLICATE KEY UPDATE " + updateClause
);
}

该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成:

String updateClause = Arrays.stream(uniqueKeyFields)
   .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
   .collect(Collectors.joining(", "));

麻烦各位大佬确认。
Reply | Threaded
Open this post in threaded view
|

回复: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

junbaozhang
嗯,应该是没问题的,我理解错了,谢谢指正。
________________________________
发件人: 史 正超 <[hidden email]>
发送时间: 2020年11月5日 19:30
收件人: [hidden email] <[hidden email]>
主题: 回复: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

应该是没有问题的,
首先你在flink sql中指定的primary key 应该要与mysql中的唯一索引或者主键对应。
其次那个方法里组装出来的语句 类似下面的语句:
INSERT INTO `tablename`(`key1`, `key2`, `f1`, `f2`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), `f1`=VALUES(`f1`), `f2`=VALUES(`f2`)

里面已经包含了定义的key, 当发生唯一键冲突时,会执行更新。所以无需指定uniqueKeyFields的
________________________________
发件人: [hidden email] <[hidden email]>
发送时间: 2020年11月5日 10:58
收件人: [hidden email] <[hidden email]>
主题: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

Hi,all:
       近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法:


/**
*
 Mysql upsert query use DUPLICATE KEY UPDATE.
*
*
 <p>NOTE: It requires Mysql's primary key to be consistent with pkFields.
*
*
 <p>We don't use REPLACE INTO, if there are other fields, we can keep their previous values.
*/
@Override
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields)
 {
String
 updateClause = Arrays.stream(fieldNames)
.map(f ->
 quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName,
 fieldNames) +
"
 ON DUPLICATE KEY UPDATE " + updateClause
);
}

该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成:

String updateClause = Arrays.stream(uniqueKeyFields)
   .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
   .collect(Collectors.joining(", "));

麻烦各位大佬确认。
junbaozhang