在Flink 写ES,当ES集群繁忙时,会有如下异常:
2019-09-17 16:01:02 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase afterBulk 430 Failed Elasticsearch bulk request: Connection closed org.apache.http.ConnectionClosedException: Connection closed at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345) at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:745) 2019-09-17 16:01:02 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase afterBulk 430 Failed Elasticsearch bulk request: Connection closed org.apache.http.ConnectionClosedException: Connection closed at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345) at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:745) Flink 用ElasticsearchSink 写ES时遇到这种异常后,不会自动恢复连接。 目前已知: ElasticsearchSink中的异常处理,需要通过ElasticsearchSink的setFailureHandler方法来定义各种失败处理方式(如丢弃数据或将数据重新加入队列),但这里不能实现当ConnectionClosedException时,重新打开ES连接。 希望Flink遇到这种异常后,能自动重新打开ES的连接,该如何实现? 感谢 |
Free forum by Nabble | Edit this page |