Hi,all:
近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法: /** * Mysql upsert query use DUPLICATE KEY UPDATE. * * <p>NOTE: It requires Mysql's primary key to be consistent with pkFields. * * <p>We don't use REPLACE INTO, if there are other fields, we can keep their previous values. */ @Override public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { String updateClause = Arrays.stream(fieldNames) .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")") .collect(Collectors.joining(", ")); return Optional.of(getInsertIntoStatement(tableName, fieldNames) + " ON DUPLICATE KEY UPDATE " + updateClause ); } 该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成: String updateClause = Arrays.stream(uniqueKeyFields) .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")") .collect(Collectors.joining(", ")); 麻烦各位大佬确认。
junbaozhang
|
应该是没有问题的,
首先你在flink sql中指定的primary key 应该要与mysql中的唯一索引或者主键对应。 其次那个方法里组装出来的语句 类似下面的语句: INSERT INTO `tablename`(`key1`, `key2`, `f1`, `f2`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), `f1`=VALUES(`f1`), `f2`=VALUES(`f2`) 里面已经包含了定义的key, 当发生唯一键冲突时,会执行更新。所以无需指定uniqueKeyFields的 ________________________________ 发件人: [hidden email] <[hidden email]> 发送时间: 2020年11月5日 10:58 收件人: [hidden email] <[hidden email]> 主题: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug Hi,all: 近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法: /** * Mysql upsert query use DUPLICATE KEY UPDATE. * * <p>NOTE: It requires Mysql's primary key to be consistent with pkFields. * * <p>We don't use REPLACE INTO, if there are other fields, we can keep their previous values. */ @Override public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { String updateClause = Arrays.stream(fieldNames) .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")") .collect(Collectors.joining(", ")); return Optional.of(getInsertIntoStatement(tableName, fieldNames) + " ON DUPLICATE KEY UPDATE " + updateClause ); } 该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成: String updateClause = Arrays.stream(uniqueKeyFields) .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")") .collect(Collectors.joining(", ")); 麻烦各位大佬确认。 |
嗯,应该是没问题的,我理解错了,谢谢指正。
________________________________ 发件人: 史 正超 <[hidden email]> 发送时间: 2020年11月5日 19:30 收件人: [hidden email] <[hidden email]> 主题: 回复: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug 应该是没有问题的, 首先你在flink sql中指定的primary key 应该要与mysql中的唯一索引或者主键对应。 其次那个方法里组装出来的语句 类似下面的语句: INSERT INTO `tablename`(`key1`, `key2`, `f1`, `f2`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), `f1`=VALUES(`f1`), `f2`=VALUES(`f2`) 里面已经包含了定义的key, 当发生唯一键冲突时,会执行更新。所以无需指定uniqueKeyFields的 ________________________________ 发件人: [hidden email] <[hidden email]> 发送时间: 2020年11月5日 10:58 收件人: [hidden email] <[hidden email]> 主题: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug Hi,all: 近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法: /** * Mysql upsert query use DUPLICATE KEY UPDATE. * * <p>NOTE: It requires Mysql's primary key to be consistent with pkFields. * * <p>We don't use REPLACE INTO, if there are other fields, we can keep their previous values. */ @Override public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { String updateClause = Arrays.stream(fieldNames) .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")") .collect(Collectors.joining(", ")); return Optional.of(getInsertIntoStatement(tableName, fieldNames) + " ON DUPLICATE KEY UPDATE " + updateClause ); } 该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成: String updateClause = Arrays.stream(uniqueKeyFields) .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")") .collect(Collectors.joining(", ")); 麻烦各位大佬确认。
junbaozhang
|
Free forum by Nabble | Edit this page |