|
我自己写了个 Sink 到数据库的 SinkFunction,SinkFunction 中指定只有数据到了一定条数(100)
才执行入库操作。我通过定义了一个 List 缓存需要入库的数据的方式实现。
public class SinkToJDBCWithJDBCStatementBatch extends
RichSinkFunction<JDBCStatement> {
private List<JDBCStatement> statementList = new
ArrayList<JDBCStatement>();
@Override
public void close() throws Exception {
writeToDatabase();
this.statementList.clear();
super.close();
if (dataSource != null) {
dataSource.close();
}
}
@Override
public void invoke(JDBCStatement statement, Context context) throws
Exception {
if (statementList.size() < 100) {
statementList.add(statement);
return;
}
writeToDatabase();
this.statementList.clear();
}
public void writeToDatabase(){
.........
}
}
我想确认一下 这个 close() 方法在程序停止的时候一定会被调用到吗?是通过怎样的机制实现的呢?
谢谢,
王磊
|