Flink Sink function 的 close() 在程序停止时一定会被调用到吗?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink Sink function 的 close() 在程序停止时一定会被调用到吗?

Lei Wang
我自己写了个 Sink 到数据库的 SinkFunction,SinkFunction 中指定只有数据到了一定条数(100)
才执行入库操作。我通过定义了一个 List 缓存需要入库的数据的方式实现。


public class SinkToJDBCWithJDBCStatementBatch extends
RichSinkFunction<JDBCStatement> {

    private List<JDBCStatement> statementList = new
ArrayList<JDBCStatement>();


    @Override
    public void close() throws Exception {
        writeToDatabase();
        this.statementList.clear();
        super.close();
        if (dataSource != null) {
            dataSource.close();
        }
    }

    @Override
    public void invoke(JDBCStatement statement, Context context) throws
Exception {

        if (statementList.size() < 100) {
            statementList.add(statement);
            return;
        }
        writeToDatabase();
        this.statementList.clear();
    }

    public void writeToDatabase(){
        .........
    }
}

我想确认一下 这个 close() 方法在程序停止的时候一定会被调用到吗?是通过怎样的机制实现的呢?

谢谢,
王磊