回复: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

classic Classic list List threaded Threaded
1 message Options
kcz
Reply | Threaded
Open this post in threaded view
|

回复: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

kcz
log展示超时,如果确认不是超时问题,再次查看taskmanager的log,是否有内存溢出导致无法连接es问题,或者插入时候有bug,导致了不断新类产生,metaspace溢出。





------------------ 原始邮件 ------------------
发件人: aven.wu <[hidden email]&gt;
发送时间: 2020年5月13日 16:27
收件人: [hidden email] <[hidden email]&gt;
主题: 回复: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]



Hi
根据你的情况,flink 写入ES 超时,一般是ES吞吐不足造成的,可以看一下官方的建议
https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
另外,es写入失败可以自定义一个 ActionRequestFailureHandler,你可以加入失败队列或者重试等等。
希望可以帮助到你。

Best
Aven

发件人: Yangze Guo
发送时间: 2020年5月13日 16:21
收件人: [hidden email]
主题: Re: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

您好,请问您的日志中有没有如语句

- Failed Elasticsearch bulk request:
- Failed Elasticsearch item request:
如果有,可以提供一下

从错误上看,应该是和es交互超时了,检查一下网络连通情况,或者将timeout调大,具体方法见文档[1]

esSinkBuilder.setRestClientFactory(
&nbsp; restClientBuilder -&gt; {
&nbsp;&nbsp;&nbsp; restClientBuilder.setDefaultHeaders(...)
&nbsp;&nbsp;&nbsp; restClientBuilder.setMaxRetryTimeoutMillis(...)
&nbsp;&nbsp;&nbsp; restClientBuilder.setPathPrefix(...)
&nbsp;&nbsp;&nbsp; restClientBuilder.setHttpClientConfigCallback(...)
&nbsp; }
);

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/elasticsearch.html#elasticsearch-sink


Best,
Yangze Guo

On Wed, May 13, 2020 at 2:53 PM Jim Chen <[hidden email]&gt; wrote:
&gt;
&gt; 大家好,
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; 我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout
&gt; [30000],报错信息如下:
&gt; java.lang.RuntimeException: An error occurred in ElasticsearchSink.
&gt; at
&gt; org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
&gt; at
&gt; org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
&gt; at
&gt; org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
&gt; at
&gt; org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
&gt; at
&gt; org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
&gt; at
&gt; org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
&gt; at
&gt; org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
&gt; at
&gt; org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
&gt; at
&gt; org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
&gt; at
&gt; org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
&gt; at
&gt; org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
&gt; at
&gt; org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
&gt; at
&gt; org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
&gt; at
&gt; org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
&gt; at
&gt; org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
&gt; at
&gt; org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
&gt; at
&gt; org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
&gt; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
&gt; at java.lang.Thread.run(Thread.java:748)
&gt; Caused by: java.io.IOException: request retries exceeded max retry timeout
&gt; [30000]
&gt; at
&gt; org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
&gt; at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
&gt; at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:134)
&gt; at
&gt; org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
&gt; at
&gt; org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
&gt; at
&gt; org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
&gt; at
&gt; org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
&gt; at
&gt; org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
&gt; at
&gt; org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
&gt; at
&gt; org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
&gt; at
&gt; org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
&gt; at
&gt; org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
&gt; at
&gt; org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
&gt; at
&gt; org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
&gt; ... 1 more
&gt;
&gt; flink写入的数据量不大,这个错误时不时会出现一下。我没法模拟,主要是定位不出问题在哪里。ES集群那边好像也没啥问题,数据也能写进去,但是丢没丢不好说。
&gt;
&gt; 有没有人遇到过类似情况,指导一下,或者给个思路。谢谢!