每天0点数据写入Elasticsearch异常且kafka数据堆积

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

每天0点数据写入Elasticsearch异常且kafka数据堆积

云长
hi,


我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。


想咨询下这种问题应该怎么样排查和处理?


flink版本:1.10
ES版本:6.x 


使用jar:flink-sql-connector-elasticsearch6_2.12


补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建


ES异常如下:


2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed Elasticsearch item request: ElasticsearchException[Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s]]org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s]
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
    at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
    at java.lang.Thread.run(Thread.java:748)



flinkSQL:
CREATE TABLE source_table (
  `time` VARCHAR
  ,`level` VARCHAR
  ,`thread` VARCHAR
  ,`class` VARCHAR
) WITH (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'xxxx',
   'connector.startup-mode' = 'latest-offset',
   'connector.properties.group.id' = 'xxxx',
   'connector.properties.zookeeper.connect' = 'ip:2181',
   'connector.properties.bootstrap.servers' = 'ip:9092',
   'format.type' = 'json',
   'format.derive-schema' = 'true'
);


CREATE TABLE result_table (
  `time` VARCHAR
  ,`level` VARCHAR
  ,`thread` VARCHAR
  ,`class` VARCHAR
) WITH (
  'connector.type' = 'elasticsearch',
  'connector.version' = '6',
  'connector.hosts' = 'xxxx,
  'connector.index' = 'xxxx-yyyy.MM.dd',
  'connector.document-type' = 'doc',
  'update-mode' = 'append',
  'connector.bulk-flush.interval' = '1000',
  'connector.bulk-flush.backoff.type' = 'exponential',
  'connector.bulk-flush.backoff.max-retries' = '10',
  'connector.bulk-flush.backoff.delay' = '60000',
  'connector.failure-handler' = 'ignore',
  'format.type' = 'json'
);


INSERT INTO result_table
SELECT
    `time`,`level`,thread,class
FROM source_table
WHERE `method`='xxxx';
Reply | Threaded
Open this post in threaded view
|

Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

Leonard Xu
Hi,
 
提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
可以从这方面找思路排查下看看

祝好,
Leonard Xu



> 在 2020年4月22日,16:10,Oliver <[hidden email]> 写道:
>
> hi,
>
>
> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
>
>
> 想咨询下这种问题应该怎么样排查和处理?
>
>
> flink版本:1.10
> ES版本:6.x&nbsp;
>
>
> 使用jar:flink-sql-connector-elasticsearch6_2.12
>
>
> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
>
>
> ES异常如下:
>
>
> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed Elasticsearch item request: ElasticsearchException[Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s]]org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s]
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
>
>
>
> flinkSQL:
> CREATE TABLE source_table (
> &nbsp; `time` VARCHAR
> &nbsp; ,`level` VARCHAR
> &nbsp; ,`thread` VARCHAR
> &nbsp; ,`class` VARCHAR
> ) WITH (
> &nbsp; &nbsp;'connector.type' = 'kafka',
> &nbsp; &nbsp;'connector.version' = 'universal',
> &nbsp; &nbsp;'connector.topic' = 'xxxx',
> &nbsp; &nbsp;'connector.startup-mode' = 'latest-offset',
> &nbsp; &nbsp;'connector.properties.group.id' = 'xxxx',
> &nbsp; &nbsp;'connector.properties.zookeeper.connect' = 'ip:2181',
> &nbsp; &nbsp;'connector.properties.bootstrap.servers' = 'ip:9092',
> &nbsp; &nbsp;'format.type' = 'json',
> &nbsp; &nbsp;'format.derive-schema' = 'true'
> );
>
>
> CREATE TABLE result_table (
> &nbsp; `time` VARCHAR
> &nbsp; ,`level` VARCHAR
> &nbsp; ,`thread` VARCHAR
> &nbsp; ,`class` VARCHAR
> ) WITH (
> &nbsp; 'connector.type' = 'elasticsearch',
> &nbsp; 'connector.version' = '6',
> &nbsp; 'connector.hosts' = 'xxxx,
> &nbsp; 'connector.index' = 'xxxx-yyyy.MM.dd',
> &nbsp; 'connector.document-type' = 'doc',
> &nbsp; 'update-mode' = 'append',
> &nbsp; 'connector.bulk-flush.interval' = '1000',
> &nbsp; 'connector.bulk-flush.backoff.type' = 'exponential',
> &nbsp; 'connector.bulk-flush.backoff.max-retries' = '10',
> &nbsp; 'connector.bulk-flush.backoff.delay' = '60000',
> &nbsp; 'connector.failure-handler' = 'ignore',
> &nbsp; 'format.type' = 'json'
> );
>
>
> INSERT INTO result_table
> SELECT
> &nbsp; &nbsp; `time`,`level`,thread,class
> FROM source_table
> WHERE `method`='xxxx';

Reply | Threaded
Open this post in threaded view
|

Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

zhisheng
hi,

es index 的 mapping 是否提前设置好了?

我看到异常 :

> failed to process cluster event (put-mapping) within 30s

像是自动建 mapping 超时了

Leonard Xu <[hidden email]> 于2020年4月22日周三 下午4:41写道:

> Hi,
>
> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
> 可以从这方面找思路排查下看看
>
> 祝好,
> Leonard Xu
>
>
>
> > 在 2020年4月22日,16:10,Oliver <[hidden email]> 写道:
> >
> > hi,
> >
> >
> > 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
> >
> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
> >
> >
> > 想咨询下这种问题应该怎么样排查和处理?
> >
> >
> > flink版本:1.10
> > ES版本:6.x&nbsp;
> >
> >
> > 使用jar:flink-sql-connector-elasticsearch6_2.12
> >
> >
> > 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
> > 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
> >
> >
> > ES异常如下:
> >
> >
> > 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
> Elasticsearch item request: ElasticsearchException[Elasticsearch exception
> [type=process_cluster_event_timeout_exception, reason=failed to process
> cluster event (put-mapping) within 30s]]org.apache.flink.
> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
> Elasticsearch exception [type=process_cluster_event_timeout_exception,
> reason=failed to process cluster event (put-mapping) within 30s]
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> > &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
> >
> >
> >
> > flinkSQL:
> > CREATE TABLE source_table (
> > &nbsp; `time` VARCHAR
> > &nbsp; ,`level` VARCHAR
> > &nbsp; ,`thread` VARCHAR
> > &nbsp; ,`class` VARCHAR
> > ) WITH (
> > &nbsp; &nbsp;'connector.type' = 'kafka',
> > &nbsp; &nbsp;'connector.version' = 'universal',
> > &nbsp; &nbsp;'connector.topic' = 'xxxx',
> > &nbsp; &nbsp;'connector.startup-mode' = 'latest-offset',
> > &nbsp; &nbsp;'connector.properties.group.id' = 'xxxx',
> > &nbsp; &nbsp;'connector.properties.zookeeper.connect' = 'ip:2181',
> > &nbsp; &nbsp;'connector.properties.bootstrap.servers' = 'ip:9092',
> > &nbsp; &nbsp;'format.type' = 'json',
> > &nbsp; &nbsp;'format.derive-schema' = 'true'
> > );
> >
> >
> > CREATE TABLE result_table (
> > &nbsp; `time` VARCHAR
> > &nbsp; ,`level` VARCHAR
> > &nbsp; ,`thread` VARCHAR
> > &nbsp; ,`class` VARCHAR
> > ) WITH (
> > &nbsp; 'connector.type' = 'elasticsearch',
> > &nbsp; 'connector.version' = '6',
> > &nbsp; 'connector.hosts' = 'xxxx,
> > &nbsp; 'connector.index' = 'xxxx-yyyy.MM.dd',
> > &nbsp; 'connector.document-type' = 'doc',
> > &nbsp; 'update-mode' = 'append',
> > &nbsp; 'connector.bulk-flush.interval' = '1000',
> > &nbsp; 'connector.bulk-flush.backoff.type' = 'exponential',
> > &nbsp; 'connector.bulk-flush.backoff.max-retries' = '10',
> > &nbsp; 'connector.bulk-flush.backoff.delay' = '60000',
> > &nbsp; 'connector.failure-handler' = 'ignore',
> > &nbsp; 'format.type' = 'json'
> > );
> >
> >
> > INSERT INTO result_table
> > SELECT
> > &nbsp; &nbsp; `time`,`level`,thread,class
> > FROM source_table
> > WHERE `method`='xxxx';
>
>
Reply | Threaded
Open this post in threaded view
|

回复: 每天0点数据写入Elasticsearch异常且kafka数据堆积

云长
Mapping采用的template,格式如下:
"xxx-2020.04.23": {
&nbsp; &nbsp; "mappings": {
&nbsp; &nbsp; &nbsp; "doc": {
&nbsp; &nbsp; &nbsp; &nbsp; "dynamic_templates": [
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "string_fields": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "match": "*",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "match_mapping_type": "string",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "mapping": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "type": "keyword"
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; ],
&nbsp; &nbsp; &nbsp; &nbsp; "properties": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "cost": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "type": "long"
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "result": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "type": "keyword"
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }
&nbsp; }



setting如下:
"xxx-2020.04.23": {
&nbsp; &nbsp; "settings": {
&nbsp; &nbsp; &nbsp; "index": {
&nbsp; &nbsp; &nbsp; &nbsp; "routing": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "allocation": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "total_shards_per_node": "1"
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; "refresh_interval": "10s",
&nbsp; &nbsp; &nbsp; &nbsp; "number_of_shards": "2",
&nbsp; &nbsp; &nbsp; &nbsp; "provided_name": "xxx-2020.04.23",
&nbsp; &nbsp; &nbsp; &nbsp; "creation_date": "1587509965602", -- 2020/4/22 6:59:25
&nbsp; &nbsp; &nbsp; &nbsp; "number_of_replicas": "0",
&nbsp; &nbsp; &nbsp; &nbsp; "uuid": "f9OqpCmJQnyqlqTeYpt1Sg",
&nbsp; &nbsp; &nbsp; &nbsp; "version": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "created": "6020499"
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }
&nbsp; }





------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"zhisheng"<[hidden email]&gt;;
发送时间:&nbsp;2020年4月22日(星期三) 下午4:47
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积



hi,

es index 的 mapping 是否提前设置好了?

我看到异常 :

&gt; failed to process cluster event (put-mapping) within 30s

像是自动建 mapping 超时了

Leonard Xu <[hidden email]&gt; 于2020年4月22日周三 下午4:41写道:

&gt; Hi,
&gt;
&gt; 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
&gt; 可以从这方面找思路排查下看看
&gt;
&gt; 祝好,
&gt; Leonard Xu
&gt;
&gt;
&gt;
&gt; &gt; 在 2020年4月22日,16:10,Oliver <[hidden email]&gt; 写道:
&gt; &gt;
&gt; &gt; hi,
&gt; &gt;
&gt; &gt;
&gt; &gt; 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
&gt; &gt;
&gt; 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
&gt; &gt;
&gt; &gt;
&gt; &gt; 想咨询下这种问题应该怎么样排查和处理?
&gt; &gt;
&gt; &gt;
&gt; &gt; flink版本:1.10
&gt; &gt; ES版本:6.x&amp;nbsp;
&gt; &gt;
&gt; &gt;
&gt; &gt; 使用jar:flink-sql-connector-elasticsearch6_2.12
&gt; &gt;
&gt; &gt;
&gt; &gt; 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
&gt; &gt; 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
&gt; &gt;
&gt; &gt;
&gt; &gt; ES异常如下:
&gt; &gt;
&gt; &gt;
&gt; &gt; 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
&gt; Elasticsearch item request: ElasticsearchException[Elasticsearch exception
&gt; [type=process_cluster_event_timeout_exception, reason=failed to process
&gt; cluster event (put-mapping) within 30s]]org.apache.flink.
&gt; elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
&gt; Elasticsearch exception [type=process_cluster_event_timeout_exception,
&gt; reason=failed to process cluster event (put-mapping) within 30s]
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
&gt; &gt; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; flinkSQL:
&gt; &gt; CREATE TABLE source_table (
&gt; &gt; &amp;nbsp; `time` VARCHAR
&gt; &gt; &amp;nbsp; ,`level` VARCHAR
&gt; &gt; &amp;nbsp; ,`thread` VARCHAR
&gt; &gt; &amp;nbsp; ,`class` VARCHAR
&gt; &gt; ) WITH (
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.type' = 'kafka',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.version' = 'universal',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.topic' = 'xxxx',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.startup-mode' = 'latest-offset',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.properties.group.id' = 'xxxx',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.properties.zookeeper.connect' = 'ip:2181',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.properties.bootstrap.servers' = 'ip:9092',
&gt; &gt; &amp;nbsp; &amp;nbsp;'format.type' = 'json',
&gt; &gt; &amp;nbsp; &amp;nbsp;'format.derive-schema' = 'true'
&gt; &gt; );
&gt; &gt;
&gt; &gt;
&gt; &gt; CREATE TABLE result_table (
&gt; &gt; &amp;nbsp; `time` VARCHAR
&gt; &gt; &amp;nbsp; ,`level` VARCHAR
&gt; &gt; &amp;nbsp; ,`thread` VARCHAR
&gt; &gt; &amp;nbsp; ,`class` VARCHAR
&gt; &gt; ) WITH (
&gt; &gt; &amp;nbsp; 'connector.type' = 'elasticsearch',
&gt; &gt; &amp;nbsp; 'connector.version' = '6',
&gt; &gt; &amp;nbsp; 'connector.hosts' = 'xxxx,
&gt; &gt; &amp;nbsp; 'connector.index' = 'xxxx-yyyy.MM.dd',
&gt; &gt; &amp;nbsp; 'connector.document-type' = 'doc',
&gt; &gt; &amp;nbsp; 'update-mode' = 'append',
&gt; &gt; &amp;nbsp; 'connector.bulk-flush.interval' = '1000',
&gt; &gt; &amp;nbsp; 'connector.bulk-flush.backoff.type' = 'exponential',
&gt; &gt; &amp;nbsp; 'connector.bulk-flush.backoff.max-retries' = '10',
&gt; &gt; &amp;nbsp; 'connector.bulk-flush.backoff.delay' = '60000',
&gt; &gt; &amp;nbsp; 'connector.failure-handler' = 'ignore',
&gt; &gt; &amp;nbsp; 'format.type' = 'json'
&gt; &gt; );
&gt; &gt;
&gt; &gt;
&gt; &gt; INSERT INTO result_table
&gt; &gt; SELECT
&gt; &gt; &amp;nbsp; &amp;nbsp; `time`,`level`,thread,class
&gt; &gt; FROM source_table
&gt; &gt; WHERE `method`='xxxx';
&gt;
&gt;
Reply | Threaded
Open this post in threaded view
|

Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

云长
In reply to this post by zhisheng
非常感谢Leonard Xu和zhisheng的回复

> es index 的 mapping 是否提前设置好了?
提前设置好了,提前创建索引的mapping如下:
  {
  "xxx-2020.04.23": {
    "mappings": {
      "doc": {
        "dynamic_templates": [
          {
            "string_fields": {
              "match": "*",
              "match_mapping_type": "string",
              "mapping": {
                "type": "keyword"
              }
            }
          }
        ],
        "properties": {
          "cost": {
            "type": "long"
          },
          "result": {
            "type": "keyword"
          }
        }
      }
    }
  }
}
而待写入数据的字段远不止cost和result
查看ES官方文档对dynamic_templates的介绍:When putting new dynamic templates through the put mapping <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html> API, all existing templates are overwritten.[1]
个人猜测是:已经设置的mapping未覆盖全数据字段、写入ES时依旧会调用put mapping API做修改,导致异常

重新调整了新索引的mapping为全字段,failed to process cluster event (put-mapping) within 30s异常消失了

[1] https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates <https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates>
Best,
Oliver yunchang

> 2020年4月22日 下午4:47,zhisheng <[hidden email]> 写道:
>
> hi,
>
> es index 的 mapping 是否提前设置好了?
>
> 我看到异常 :
>
>> failed to process cluster event (put-mapping) within 30s
>
> 像是自动建 mapping 超时了
>
> Leonard Xu <[hidden email]> 于2020年4月22日周三 下午4:41写道:
>
>> Hi,
>>
>> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
>> 可以从这方面找思路排查下看看
>>
>> 祝好,
>> Leonard Xu
>>
>>
>>
>>> 在 2020年4月22日,16:10,Oliver <[hidden email]> 写道:
>>>
>>> hi,
>>>
>>>
>>> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
>>>
>> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
>>>
>>>
>>> 想咨询下这种问题应该怎么样排查和处理?
>>>
>>>
>>> flink版本:1.10
>>> ES版本:6.x&nbsp;
>>>
>>>
>>> 使用jar:flink-sql-connector-elasticsearch6_2.12
>>>
>>>
>>> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
>>> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
>>>
>>>
>>> ES异常如下:
>>>
>>>
>>> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
>> Elasticsearch item request: ElasticsearchException[Elasticsearch exception
>> [type=process_cluster_event_timeout_exception, reason=failed to process
>> cluster event (put-mapping) within 30s]]org.apache.flink.
>> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
>> Elasticsearch exception [type=process_cluster_event_timeout_exception,
>> reason=failed to process cluster event (put-mapping) within 30s]
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
>>> &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>
>>> flinkSQL:
>>> CREATE TABLE source_table (
>>> &nbsp; `time` VARCHAR
>>> &nbsp; ,`level` VARCHAR
>>> &nbsp; ,`thread` VARCHAR
>>> &nbsp; ,`class` VARCHAR
>>> ) WITH (
>>> &nbsp; &nbsp;'connector.type' = 'kafka',
>>> &nbsp; &nbsp;'connector.version' = 'universal',
>>> &nbsp; &nbsp;'connector.topic' = 'xxxx',
>>> &nbsp; &nbsp;'connector.startup-mode' = 'latest-offset',
>>> &nbsp; &nbsp;'connector.properties.group.id' = 'xxxx',
>>> &nbsp; &nbsp;'connector.properties.zookeeper.connect' = 'ip:2181',
>>> &nbsp; &nbsp;'connector.properties.bootstrap.servers' = 'ip:9092',
>>> &nbsp; &nbsp;'format.type' = 'json',
>>> &nbsp; &nbsp;'format.derive-schema' = 'true'
>>> );
>>>
>>>
>>> CREATE TABLE result_table (
>>> &nbsp; `time` VARCHAR
>>> &nbsp; ,`level` VARCHAR
>>> &nbsp; ,`thread` VARCHAR
>>> &nbsp; ,`class` VARCHAR
>>> ) WITH (
>>> &nbsp; 'connector.type' = 'elasticsearch',
>>> &nbsp; 'connector.version' = '6',
>>> &nbsp; 'connector.hosts' = 'xxxx,
>>> &nbsp; 'connector.index' = 'xxxx-yyyy.MM.dd',
>>> &nbsp; 'connector.document-type' = 'doc',
>>> &nbsp; 'update-mode' = 'append',
>>> &nbsp; 'connector.bulk-flush.interval' = '1000',
>>> &nbsp; 'connector.bulk-flush.backoff.type' = 'exponential',
>>> &nbsp; 'connector.bulk-flush.backoff.max-retries' = '10',
>>> &nbsp; 'connector.bulk-flush.backoff.delay' = '60000',
>>> &nbsp; 'connector.failure-handler' = 'ignore',
>>> &nbsp; 'format.type' = 'json'
>>> );
>>>
>>>
>>> INSERT INTO result_table
>>> SELECT
>>> &nbsp; &nbsp; `time`,`level`,thread,class
>>> FROM source_table
>>> WHERE `method`='xxxx';
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

zhisheng
👍👍👍

oliver yunchang <[hidden email]> 于2020年4月23日周四 上午12:32写道:

> 非常感谢Leonard Xu和zhisheng的回复
>
> > es index 的 mapping 是否提前设置好了?
> 提前设置好了,提前创建索引的mapping如下:
>   {
>   "xxx-2020.04.23": {
>     "mappings": {
>       "doc": {
>         "dynamic_templates": [
>           {
>             "string_fields": {
>               "match": "*",
>               "match_mapping_type": "string",
>               "mapping": {
>                 "type": "keyword"
>               }
>             }
>           }
>         ],
>         "properties": {
>           "cost": {
>             "type": "long"
>           },
>           "result": {
>             "type": "keyword"
>           }
>         }
>       }
>     }
>   }
> }
> 而待写入数据的字段远不止cost和result
> 查看ES官方文档对dynamic_templates的介绍:When putting new dynamic templates through
> the put mapping <
> https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html>
> API, all existing templates are overwritten.[1]
> 个人猜测是:已经设置的mapping未覆盖全数据字段、写入ES时依旧会调用put mapping API做修改,导致异常
>
> 重新调整了新索引的mapping为全字段,failed to process cluster event (put-mapping) within
> 30s异常消失了
>
> [1]
> https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates
> <
> https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates
> >
> Best,
> Oliver yunchang
>
> > 2020年4月22日 下午4:47,zhisheng <[hidden email]> 写道:
> >
> > hi,
> >
> > es index 的 mapping 是否提前设置好了?
> >
> > 我看到异常 :
> >
> >> failed to process cluster event (put-mapping) within 30s
> >
> > 像是自动建 mapping 超时了
> >
> > Leonard Xu <[hidden email]> 于2020年4月22日周三 下午4:41写道:
> >
> >> Hi,
> >>
> >> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
> >> 可以从这方面找思路排查下看看
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >>
> >>
> >>> 在 2020年4月22日,16:10,Oliver <[hidden email]> 写道:
> >>>
> >>> hi,
> >>>
> >>>
> >>> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
> >>>
> >>
> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
> >>>
> >>>
> >>> 想咨询下这种问题应该怎么样排查和处理?
> >>>
> >>>
> >>> flink版本:1.10
> >>> ES版本:6.x&nbsp;
> >>>
> >>>
> >>> 使用jar:flink-sql-connector-elasticsearch6_2.12
> >>>
> >>>
> >>> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
> >>> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
> >>>
> >>>
> >>> ES异常如下:
> >>>
> >>>
> >>> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
> >> Elasticsearch item request: ElasticsearchException[Elasticsearch
> exception
> >> [type=process_cluster_event_timeout_exception, reason=failed to process
> >> cluster event (put-mapping) within 30s]]org.apache.flink.
> >> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
> >> Elasticsearch exception [type=process_cluster_event_timeout_exception,
> >> reason=failed to process cluster event (put-mapping) within 30s]
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >> .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >> .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> >>> &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> >>> &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
> >>>
> >>>
> >>>
> >>> flinkSQL:
> >>> CREATE TABLE source_table (
> >>> &nbsp; `time` VARCHAR
> >>> &nbsp; ,`level` VARCHAR
> >>> &nbsp; ,`thread` VARCHAR
> >>> &nbsp; ,`class` VARCHAR
> >>> ) WITH (
> >>> &nbsp; &nbsp;'connector.type' = 'kafka',
> >>> &nbsp; &nbsp;'connector.version' = 'universal',
> >>> &nbsp; &nbsp;'connector.topic' = 'xxxx',
> >>> &nbsp; &nbsp;'connector.startup-mode' = 'latest-offset',
> >>> &nbsp; &nbsp;'connector.properties.group.id' = 'xxxx',
> >>> &nbsp; &nbsp;'connector.properties.zookeeper.connect' = 'ip:2181',
> >>> &nbsp; &nbsp;'connector.properties.bootstrap.servers' = 'ip:9092',
> >>> &nbsp; &nbsp;'format.type' = 'json',
> >>> &nbsp; &nbsp;'format.derive-schema' = 'true'
> >>> );
> >>>
> >>>
> >>> CREATE TABLE result_table (
> >>> &nbsp; `time` VARCHAR
> >>> &nbsp; ,`level` VARCHAR
> >>> &nbsp; ,`thread` VARCHAR
> >>> &nbsp; ,`class` VARCHAR
> >>> ) WITH (
> >>> &nbsp; 'connector.type' = 'elasticsearch',
> >>> &nbsp; 'connector.version' = '6',
> >>> &nbsp; 'connector.hosts' = 'xxxx,
> >>> &nbsp; 'connector.index' = 'xxxx-yyyy.MM.dd',
> >>> &nbsp; 'connector.document-type' = 'doc',
> >>> &nbsp; 'update-mode' = 'append',
> >>> &nbsp; 'connector.bulk-flush.interval' = '1000',
> >>> &nbsp; 'connector.bulk-flush.backoff.type' = 'exponential',
> >>> &nbsp; 'connector.bulk-flush.backoff.max-retries' = '10',
> >>> &nbsp; 'connector.bulk-flush.backoff.delay' = '60000',
> >>> &nbsp; 'connector.failure-handler' = 'ignore',
> >>> &nbsp; 'format.type' = 'json'
> >>> );
> >>>
> >>>
> >>> INSERT INTO result_table
> >>> SELECT
> >>> &nbsp; &nbsp; `time`,`level`,thread,class
> >>> FROM source_table
> >>> WHERE `method`='xxxx';
> >>
> >>
>
>