在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

Jim Chen
大家好,

    我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout
[30000],报错信息如下:
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: request retries exceeded max retry timeout
[30000]
at
org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:134)
at
org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
at
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
... 1 more

flink写入的数据量不大,这个错误时不时会出现一下。我没法模拟,主要是定位不出问题在哪里。ES集群那边好像也没啥问题,数据也能写进去,但是丢没丢不好说。

有没有人遇到过类似情况,指导一下,或者给个思路。谢谢!
Reply | Threaded
Open this post in threaded view
|

Re: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

Yangze Guo
您好,请问您的日志中有没有如语句

- Failed Elasticsearch bulk request:
- Failed Elasticsearch item request:
如果有,可以提供一下

从错误上看,应该是和es交互超时了,检查一下网络连通情况,或者将timeout调大,具体方法见文档[1]

esSinkBuilder.setRestClientFactory(
  restClientBuilder -> {
    restClientBuilder.setDefaultHeaders(...)
    restClientBuilder.setMaxRetryTimeoutMillis(...)
    restClientBuilder.setPathPrefix(...)
    restClientBuilder.setHttpClientConfigCallback(...)
  }
);

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/elasticsearch.html#elasticsearch-sink


Best,
Yangze Guo

On Wed, May 13, 2020 at 2:53 PM Jim Chen <[hidden email]> wrote:

>
> 大家好,
>
>     我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout
> [30000],报错信息如下:
> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: request retries exceeded max retry timeout
> [30000]
> at
> org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
> at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
> at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:134)
> at
> org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
> at
> org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
> at
> org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
> at
> org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
> at
> org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
> at
> org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
> at
> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> at
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> ... 1 more
>
> flink写入的数据量不大,这个错误时不时会出现一下。我没法模拟,主要是定位不出问题在哪里。ES集群那边好像也没啥问题,数据也能写进去,但是丢没丢不好说。
>
> 有没有人遇到过类似情况,指导一下,或者给个思路。谢谢!
Reply | Threaded
Open this post in threaded view
|

回复: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

aven.wu
Hi
根据你的情况,flink 写入ES 超时,一般是ES吞吐不足造成的,可以看一下官方的建议
https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
另外,es写入失败可以自定义一个 ActionRequestFailureHandler,你可以加入失败队列或者重试等等。
希望可以帮助到你。

Best
Aven

发件人: Yangze Guo
发送时间: 2020年5月13日 16:21
收件人: [hidden email]
主题: Re: 在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

您好,请问您的日志中有没有如语句

- Failed Elasticsearch bulk request:
- Failed Elasticsearch item request:
如果有,可以提供一下

从错误上看,应该是和es交互超时了,检查一下网络连通情况,或者将timeout调大,具体方法见文档[1]

esSinkBuilder.setRestClientFactory(
  restClientBuilder -> {
    restClientBuilder.setDefaultHeaders(...)
    restClientBuilder.setMaxRetryTimeoutMillis(...)
    restClientBuilder.setPathPrefix(...)
    restClientBuilder.setHttpClientConfigCallback(...)
  }
);

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/elasticsearch.html#elasticsearch-sink


Best,
Yangze Guo

On Wed, May 13, 2020 at 2:53 PM Jim Chen <[hidden email]> wrote:

>
> 大家好,
>
>     我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout
> [30000],报错信息如下:
> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: request retries exceeded max retry timeout
> [30000]
> at
> org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
> at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
> at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:134)
> at
> org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
> at
> org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
> at
> org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
> at
> org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
> at
> org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
> at
> org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
> at
> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> at
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> ... 1 more
>
> flink写入的数据量不大,这个错误时不时会出现一下。我没法模拟,主要是定位不出问题在哪里。ES集群那边好像也没啥问题,数据也能写进去,但是丢没丢不好说。
>
> 有没有人遇到过类似情况,指导一下,或者给个思路。谢谢!