Hi all:
请问Flink Sink的close方法为什么会被反复执行,但open并没有被调用,他们不是成对出现吗? 以下是Sink的实现和日志,Sink主要做数据库的异步输出,我在open和close中输出日志, 通过日志发现,open只调用了一次,后面非常多的close,请问什么原因,他们不应该是成对出现吗? 环境:JDK8, Flink: flink-1.8.2 Flink是通过标准集群方式(./start-cluster.sh)启动 感谢大家。 | public class MySqlSink<T> extends CounterRichSinkFunction<T> { private static final Logger log = LoggerFactory.getLogger(MySqlSink.class); private final MySqlUpsertor<T> upsertor; private transient volatile BatchThread<T> batchThread; private transient volatile DataBaseUtil dataBaseUtil; private final String name; public MySqlSink(String name) { this(null, name); } public MySqlSink(MySqlUpsertor<T> upsertor, String name) { this.upsertor = upsertor; this.name = name; } @Override public String getName() { return name; } @Override public void invoke(T value, Context context) throws Exception { try { this.counter.inc(); batchThread.push(value); } catch (Throwable e) { this.counterError.inc(); log.info("异步输出异常: {}@{} {}", name, hashCode(), e.toString(), e); } } @Override public void open(Configuration parameters) throws Exception { log.info("异步输出 {}@{} open", name, hashCode()); super.open(parameters); try { if (dataBaseUtil == null) { dataBaseUtil = DataBaseUtil.getInstance(); } if (batchThread == null) { batchThread = new BatchThread<>(getRuntimeContext(), dataBaseUtil, upsertor, name, () -> batchThread = null); batchThread.start(); } } catch (Throwable e) { log.info("创建异常输出线程异常: {}@{} {}", name, hashCode(), e.toString(), e); } } @Override public void close() throws Exception { log.info("异步输出 {}@{} close", name, hashCode()); super.close(); } } | 日志: 2020-02-21 20:29:49 [ main] INFO [util.FlinkUtil ] windowTime: 60000, paramWaterInterval: 40000 2020-02-21 20:29:50 [ main] WARN [job.alarm.LimitAlarmJob ] 未配置 activityNgKafka 参数 2020-02-21 20:29:50 [ main] WARN [job.alarm.LimitAlarmJob ] 未配置 remoteKafka 参数 2020-02-21 20:29:52 [: ng-host (3/5)] INFO [common.MySqlSink ] 异步输出 ng-host@97861042 open 2020-02-21 20:29:52 [k: ng-url (3/5)] INFO [common.MySqlSink ] 异步输出 ng-url@1582284274 open 2020-02-21 20:29:52 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 open 2020-02-21 20:29:52 [: ng-host (4/5)] INFO [common.MySqlSink ] 异步输出 ng-host@971447764 open 2020-02-21 20:29:52 [k: ng-url (2/5)] INFO [common.MySqlSink ] 异步输出 ng-url@793060686 open 2020-02-21 20:29:52 [k: ng-url (5/5)] INFO [common.MySqlSink ] 异步输出 ng-url@275482810 open 2020-02-21 20:29:52 [k: ng-url (4/5)] INFO [common.MySqlSink ] 异步输出 ng-url@543530928 open 2020-02-21 20:29:52 [: ng-host (2/5)] INFO [common.MySqlSink ] 异步输出 ng-host@989133776 open 2020-02-21 20:29:52 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 open 2020-02-21 20:29:52 [: ng-host (5/5)] INFO [common.MySqlSink ] 异步输出 ng-host@268528771 open 2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-1 start 2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-4 start 2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-2 start 2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-3 start 2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-5 start 2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-6 start 2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-7 start 2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-8 start 2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread ] BatchThread ng-url-9 start 2020-02-21 20:29:52 [ ng-host] INFO [common.BatchThread ] BatchThread ng-host-10 start 2020-02-21 20:30:41 [k: ng-url (2/5)] INFO [common.MySqlSink ] 异步输出 ng-url@793060686 close 2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink ] 异步输出 ng-url@543530928 close 2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink ] 异步输出 ng-url@543530928 close 2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 close 2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 close 2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 close 2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink ] 异步输出 ng-host@1255075694 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [: ng-host (4/5)] INFO [common.MySqlSink ] 异步输出 ng-host@971447764 close 2020-02-21 20:30:41 [k: ng-url (3/5)] INFO [common.MySqlSink ] 异步输出 ng-url@1582284274 close 2020-02-21 20:30:41 [: ng-host (4/5)] INFO [common.MySqlSink ] 异步输出 ng-host@971447764 close 2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink ] 异步输出 ng-url@2092790834 close 2020-02-21 20:30:41 [: ng-host (5/5)] INFO [common.MySqlSink ] 异步输出 ng-host@268528771 close 2020-02-21 20:30:41 [: ng-host (3/5)] INFO [common.MySqlSink ] 异步输出 ng-host@97861042 close 2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink ] 异步输出 ng-url@543530928 close 2020-02-21 20:30:41 [: ng-host (2/5)] INFO [common.MySqlSink ] 异步输出 ng-host@989133776 close 2020-02-21 20:30:41 [k: ng-url (2/5)] INFO [common.MySqlSink ] 异步输出 ng-url@793060686 close -- 天下事有难易乎,为之,则难者亦易矣;不为,则易者亦难矣。 |
Free forum by Nabble | Edit this page |