log展示超时,如果确认不是超时问题,再次查看taskmanager的log,是否有内存溢出导致无法连接es问题,或者插入时候有bug,导致了不断新类产生,metaspace溢出。
------------------ 原始邮件 ------------------ 发件人: aven.wu <[hidden email]> 发送时间: 2020年5月13日 16:27 收件人: [hidden email] <[hidden email]> 主题: 回复: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000] Hi 根据你的情况,flink 写入ES 超时,一般是ES吞吐不足造成的,可以看一下官方的建议 https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html 另外,es写入失败可以自定义一个 ActionRequestFailureHandler,你可以加入失败队列或者重试等等。 希望可以帮助到你。 Best Aven 发件人: Yangze Guo 发送时间: 2020年5月13日 16:21 收件人: [hidden email] 主题: Re: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000] 您好,请问您的日志中有没有如语句 - Failed Elasticsearch bulk request: - Failed Elasticsearch item request: 如果有,可以提供一下 从错误上看,应该是和es交互超时了,检查一下网络连通情况,或者将timeout调大,具体方法见文档[1] esSinkBuilder.setRestClientFactory( restClientBuilder -> { restClientBuilder.setDefaultHeaders(...) restClientBuilder.setMaxRetryTimeoutMillis(...) restClientBuilder.setPathPrefix(...) restClientBuilder.setHttpClientConfigCallback(...) } ); [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/elasticsearch.html#elasticsearch-sink Best, Yangze Guo On Wed, May 13, 2020 at 2:53 PM Jim Chen <[hidden email]> wrote: > > 大家好, > > 我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout > [30000],报错信息如下: > java.lang.RuntimeException: An error occurred in ElasticsearchSink. > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: request retries exceeded max retry timeout > [30000] > at > org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411) > at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398) > at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:134) > at > org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419) > at > org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375) > at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) > at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) > at > org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) > at > org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) > at > org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) > at > org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) > at > org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) > at > org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > at > org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) > ... 1 more > > flink写入的数据量不大,这个错误时不时会出现一下。我没法模拟,主要是定位不出问题在哪里。ES集群那边好像也没啥问题,数据也能写进去,但是丢没丢不好说。 > > 有没有人遇到过类似情况,指导一下,或者给个思路。谢谢! |
Free forum by Nabble | Edit this page |