source.map(......).addSink(new MySQLSink())
MySQLSink 就是接收前面算子生成的要执行的 SQL 并执行。
@Override
public void invoke(JDBCStatement statement, Context context) throws Exception {
log.info(statement.getSql());
log.info(statement.getParasMap().toString());
try {
namedTemplate.update(statement.getSql(), statement.getParasMap());
} catch (Exception e) {
e.printStackTrace();
}
}
Flink 能保证 namedTemplate.update(statement.getSql(), statement.getParasMap()) 一定执行成功吗?
谢谢,
王磊
[hidden email]