hi,
我有一个任务是使用flink将kafka数据写入ES,纯ETL过程, 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。 想咨询下这种问题应该怎么样排查和处理? flink版本:1.10 ES版本:6.x 使用jar:flink-sql-connector-elasticsearch6_2.12 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建 ES异常如下: 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed Elasticsearch item request: ElasticsearchException[Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s]]org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s] at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:375) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:366) at org.apache.flink.elasticsearch6.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:748) flinkSQL: CREATE TABLE source_table ( `time` VARCHAR ,`level` VARCHAR ,`thread` VARCHAR ,`class` VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'xxxx', 'connector.startup-mode' = 'latest-offset', 'connector.properties.group.id' = 'xxxx', 'connector.properties.zookeeper.connect' = 'ip:2181', 'connector.properties.bootstrap.servers' = 'ip:9092', 'format.type' = 'json', 'format.derive-schema' = 'true' ); CREATE TABLE result_table ( `time` VARCHAR ,`level` VARCHAR ,`thread` VARCHAR ,`class` VARCHAR ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'xxxx, 'connector.index' = 'xxxx-yyyy.MM.dd', 'connector.document-type' = 'doc', 'update-mode' = 'append', 'connector.bulk-flush.interval' = '1000', 'connector.bulk-flush.backoff.type' = 'exponential', 'connector.bulk-flush.backoff.max-retries' = '10', 'connector.bulk-flush.backoff.delay' = '60000', 'connector.failure-handler' = 'ignore', 'format.type' = 'json' ); INSERT INTO result_table SELECT `time`,`level`,thread,class FROM source_table WHERE `method`='xxxx'; |
Hi,
提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的? 可以从这方面找思路排查下看看 祝好, Leonard Xu > 在 2020年4月22日,16:10,Oliver <[hidden email]> 写道: > > hi, > > > 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程, > 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。 > > > 想咨询下这种问题应该怎么样排查和处理? > > > flink版本:1.10 > ES版本:6.x > > > 使用jar:flink-sql-connector-elasticsearch6_2.12 > > > 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀 > 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建 > > > ES异常如下: > > > 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed Elasticsearch item request: ElasticsearchException[Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s]]org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s] > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:375) > at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:366) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) > at java.lang.Thread.run(Thread.java:748) > > > > flinkSQL: > CREATE TABLE source_table ( > `time` VARCHAR > ,`level` VARCHAR > ,`thread` VARCHAR > ,`class` VARCHAR > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'xxxx', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.group.id' = 'xxxx', > 'connector.properties.zookeeper.connect' = 'ip:2181', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > > CREATE TABLE result_table ( > `time` VARCHAR > ,`level` VARCHAR > ,`thread` VARCHAR > ,`class` VARCHAR > ) WITH ( > 'connector.type' = 'elasticsearch', > 'connector.version' = '6', > 'connector.hosts' = 'xxxx, > 'connector.index' = 'xxxx-yyyy.MM.dd', > 'connector.document-type' = 'doc', > 'update-mode' = 'append', > 'connector.bulk-flush.interval' = '1000', > 'connector.bulk-flush.backoff.type' = 'exponential', > 'connector.bulk-flush.backoff.max-retries' = '10', > 'connector.bulk-flush.backoff.delay' = '60000', > 'connector.failure-handler' = 'ignore', > 'format.type' = 'json' > ); > > > INSERT INTO result_table > SELECT > `time`,`level`,thread,class > FROM source_table > WHERE `method`='xxxx'; |
hi,
es index 的 mapping 是否提前设置好了? 我看到异常 : > failed to process cluster event (put-mapping) within 30s 像是自动建 mapping 超时了 Leonard Xu <[hidden email]> 于2020年4月22日周三 下午4:41写道: > Hi, > > 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的? > 可以从这方面找思路排查下看看 > > 祝好, > Leonard Xu > > > > > 在 2020年4月22日,16:10,Oliver <[hidden email]> 写道: > > > > hi, > > > > > > 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程, > > > 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。 > > > > > > 想咨询下这种问题应该怎么样排查和处理? > > > > > > flink版本:1.10 > > ES版本:6.x > > > > > > 使用jar:flink-sql-connector-elasticsearch6_2.12 > > > > > > 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀 > > 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建 > > > > > > ES异常如下: > > > > > > 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed > Elasticsearch item request: ElasticsearchException[Elasticsearch exception > [type=process_cluster_event_timeout_exception, reason=failed to process > cluster event (put-mapping) within 30s]]org.apache.flink. > elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: > Elasticsearch exception [type=process_cluster_event_timeout_exception, > reason=failed to process cluster event (put-mapping) within 30s] > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$1.completed(RestClient.java:375) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$1.completed(RestClient.java:366) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > flinkSQL: > > CREATE TABLE source_table ( > > `time` VARCHAR > > ,`level` VARCHAR > > ,`thread` VARCHAR > > ,`class` VARCHAR > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'xxxx', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.properties.group.id' = 'xxxx', > > 'connector.properties.zookeeper.connect' = 'ip:2181', > > 'connector.properties.bootstrap.servers' = 'ip:9092', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ); > > > > > > CREATE TABLE result_table ( > > `time` VARCHAR > > ,`level` VARCHAR > > ,`thread` VARCHAR > > ,`class` VARCHAR > > ) WITH ( > > 'connector.type' = 'elasticsearch', > > 'connector.version' = '6', > > 'connector.hosts' = 'xxxx, > > 'connector.index' = 'xxxx-yyyy.MM.dd', > > 'connector.document-type' = 'doc', > > 'update-mode' = 'append', > > 'connector.bulk-flush.interval' = '1000', > > 'connector.bulk-flush.backoff.type' = 'exponential', > > 'connector.bulk-flush.backoff.max-retries' = '10', > > 'connector.bulk-flush.backoff.delay' = '60000', > > 'connector.failure-handler' = 'ignore', > > 'format.type' = 'json' > > ); > > > > > > INSERT INTO result_table > > SELECT > > `time`,`level`,thread,class > > FROM source_table > > WHERE `method`='xxxx'; > > |
Mapping采用的template,格式如下:
"xxx-2020.04.23": { "mappings": { "doc": { "dynamic_templates": [ { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "keyword" } } } ], "properties": { "cost": { "type": "long" }, "result": { "type": "keyword" } } } } } setting如下: "xxx-2020.04.23": { "settings": { "index": { "routing": { "allocation": { "total_shards_per_node": "1" } }, "refresh_interval": "10s", "number_of_shards": "2", "provided_name": "xxx-2020.04.23", "creation_date": "1587509965602", -- 2020/4/22 6:59:25 "number_of_replicas": "0", "uuid": "f9OqpCmJQnyqlqTeYpt1Sg", "version": { "created": "6020499" } } } } ------------------ 原始邮件 ------------------ 发件人: "zhisheng"<[hidden email]>; 发送时间: 2020年4月22日(星期三) 下午4:47 收件人: "user-zh"<[hidden email]>; 主题: Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积 hi, es index 的 mapping 是否提前设置好了? 我看到异常 : > failed to process cluster event (put-mapping) within 30s 像是自动建 mapping 超时了 Leonard Xu <[hidden email]> 于2020年4月22日周三 下午4:41写道: > Hi, > > 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的? > 可以从这方面找思路排查下看看 > > 祝好, > Leonard Xu > > > > > 在 2020年4月22日,16:10,Oliver <[hidden email]> 写道: > > > > hi, > > > > > > 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程, > > > 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。 > > > > > > 想咨询下这种问题应该怎么样排查和处理? > > > > > > flink版本:1.10 > > ES版本:6.x&nbsp; > > > > > > 使用jar:flink-sql-connector-elasticsearch6_2.12 > > > > > > 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀 > > 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建 > > > > > > ES异常如下: > > > > > > 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed > Elasticsearch item request: ElasticsearchException[Elasticsearch exception > [type=process_cluster_event_timeout_exception, reason=failed to process > cluster event (put-mapping) within 30s]]org.apache.flink. > elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: > Elasticsearch exception [type=process_cluster_event_timeout_exception, > reason=failed to process cluster event (put-mapping) within 30s] > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$1.completed(RestClient.java:375) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$1.completed(RestClient.java:366) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) > > &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748) > > > > > > > > flinkSQL: > > CREATE TABLE source_table ( > > &nbsp; `time` VARCHAR > > &nbsp; ,`level` VARCHAR > > &nbsp; ,`thread` VARCHAR > > &nbsp; ,`class` VARCHAR > > ) WITH ( > > &nbsp; &nbsp;'connector.type' = 'kafka', > > &nbsp; &nbsp;'connector.version' = 'universal', > > &nbsp; &nbsp;'connector.topic' = 'xxxx', > > &nbsp; &nbsp;'connector.startup-mode' = 'latest-offset', > > &nbsp; &nbsp;'connector.properties.group.id' = 'xxxx', > > &nbsp; &nbsp;'connector.properties.zookeeper.connect' = 'ip:2181', > > &nbsp; &nbsp;'connector.properties.bootstrap.servers' = 'ip:9092', > > &nbsp; &nbsp;'format.type' = 'json', > > &nbsp; &nbsp;'format.derive-schema' = 'true' > > ); > > > > > > CREATE TABLE result_table ( > > &nbsp; `time` VARCHAR > > &nbsp; ,`level` VARCHAR > > &nbsp; ,`thread` VARCHAR > > &nbsp; ,`class` VARCHAR > > ) WITH ( > > &nbsp; 'connector.type' = 'elasticsearch', > > &nbsp; 'connector.version' = '6', > > &nbsp; 'connector.hosts' = 'xxxx, > > &nbsp; 'connector.index' = 'xxxx-yyyy.MM.dd', > > &nbsp; 'connector.document-type' = 'doc', > > &nbsp; 'update-mode' = 'append', > > &nbsp; 'connector.bulk-flush.interval' = '1000', > > &nbsp; 'connector.bulk-flush.backoff.type' = 'exponential', > > &nbsp; 'connector.bulk-flush.backoff.max-retries' = '10', > > &nbsp; 'connector.bulk-flush.backoff.delay' = '60000', > > &nbsp; 'connector.failure-handler' = 'ignore', > > &nbsp; 'format.type' = 'json' > > ); > > > > > > INSERT INTO result_table > > SELECT > > &nbsp; &nbsp; `time`,`level`,thread,class > > FROM source_table > > WHERE `method`='xxxx'; > > |
In reply to this post by zhisheng
非常感谢Leonard Xu和zhisheng的回复
> es index 的 mapping 是否提前设置好了? 提前设置好了,提前创建索引的mapping如下: { "xxx-2020.04.23": { "mappings": { "doc": { "dynamic_templates": [ { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "keyword" } } } ], "properties": { "cost": { "type": "long" }, "result": { "type": "keyword" } } } } } } 而待写入数据的字段远不止cost和result 查看ES官方文档对dynamic_templates的介绍:When putting new dynamic templates through the put mapping <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html> API, all existing templates are overwritten.[1] 个人猜测是:已经设置的mapping未覆盖全数据字段、写入ES时依旧会调用put mapping API做修改,导致异常 重新调整了新索引的mapping为全字段,failed to process cluster event (put-mapping) within 30s异常消失了 [1] https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates <https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates> Best, Oliver yunchang > 2020年4月22日 下午4:47,zhisheng <[hidden email]> 写道: > > hi, > > es index 的 mapping 是否提前设置好了? > > 我看到异常 : > >> failed to process cluster event (put-mapping) within 30s > > 像是自动建 mapping 超时了 > > Leonard Xu <[hidden email]> 于2020年4月22日周三 下午4:41写道: > >> Hi, >> >> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的? >> 可以从这方面找思路排查下看看 >> >> 祝好, >> Leonard Xu >> >> >> >>> 在 2020年4月22日,16:10,Oliver <[hidden email]> 写道: >>> >>> hi, >>> >>> >>> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程, >>> >> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。 >>> >>> >>> 想咨询下这种问题应该怎么样排查和处理? >>> >>> >>> flink版本:1.10 >>> ES版本:6.x >>> >>> >>> 使用jar:flink-sql-connector-elasticsearch6_2.12 >>> >>> >>> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀 >>> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建 >>> >>> >>> ES异常如下: >>> >>> >>> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed >> Elasticsearch item request: ElasticsearchException[Elasticsearch exception >> [type=process_cluster_event_timeout_exception, reason=failed to process >> cluster event (put-mapping) within 30s]]org.apache.flink. >> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: >> Elasticsearch exception [type=process_cluster_event_timeout_exception, >> reason=failed to process cluster event (put-mapping) within 30s] >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.client.RestClient$1.completed(RestClient.java:375) >>> at org.apache.flink.elasticsearch6.shaded.org >> .elasticsearch.client.RestClient$1.completed(RestClient.java:366) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) >>> at org.apache.flink.elasticsearch6.shaded.org >> .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> >>> >>> flinkSQL: >>> CREATE TABLE source_table ( >>> `time` VARCHAR >>> ,`level` VARCHAR >>> ,`thread` VARCHAR >>> ,`class` VARCHAR >>> ) WITH ( >>> 'connector.type' = 'kafka', >>> 'connector.version' = 'universal', >>> 'connector.topic' = 'xxxx', >>> 'connector.startup-mode' = 'latest-offset', >>> 'connector.properties.group.id' = 'xxxx', >>> 'connector.properties.zookeeper.connect' = 'ip:2181', >>> 'connector.properties.bootstrap.servers' = 'ip:9092', >>> 'format.type' = 'json', >>> 'format.derive-schema' = 'true' >>> ); >>> >>> >>> CREATE TABLE result_table ( >>> `time` VARCHAR >>> ,`level` VARCHAR >>> ,`thread` VARCHAR >>> ,`class` VARCHAR >>> ) WITH ( >>> 'connector.type' = 'elasticsearch', >>> 'connector.version' = '6', >>> 'connector.hosts' = 'xxxx, >>> 'connector.index' = 'xxxx-yyyy.MM.dd', >>> 'connector.document-type' = 'doc', >>> 'update-mode' = 'append', >>> 'connector.bulk-flush.interval' = '1000', >>> 'connector.bulk-flush.backoff.type' = 'exponential', >>> 'connector.bulk-flush.backoff.max-retries' = '10', >>> 'connector.bulk-flush.backoff.delay' = '60000', >>> 'connector.failure-handler' = 'ignore', >>> 'format.type' = 'json' >>> ); >>> >>> >>> INSERT INTO result_table >>> SELECT >>> `time`,`level`,thread,class >>> FROM source_table >>> WHERE `method`='xxxx'; >> >> |
👍👍👍
oliver yunchang <[hidden email]> 于2020年4月23日周四 上午12:32写道: > 非常感谢Leonard Xu和zhisheng的回复 > > > es index 的 mapping 是否提前设置好了? > 提前设置好了,提前创建索引的mapping如下: > { > "xxx-2020.04.23": { > "mappings": { > "doc": { > "dynamic_templates": [ > { > "string_fields": { > "match": "*", > "match_mapping_type": "string", > "mapping": { > "type": "keyword" > } > } > } > ], > "properties": { > "cost": { > "type": "long" > }, > "result": { > "type": "keyword" > } > } > } > } > } > } > 而待写入数据的字段远不止cost和result > 查看ES官方文档对dynamic_templates的介绍:When putting new dynamic templates through > the put mapping < > https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html> > API, all existing templates are overwritten.[1] > 个人猜测是:已经设置的mapping未覆盖全数据字段、写入ES时依旧会调用put mapping API做修改,导致异常 > > 重新调整了新索引的mapping为全字段,failed to process cluster event (put-mapping) within > 30s异常消失了 > > [1] > https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates > < > https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates > > > Best, > Oliver yunchang > > > 2020年4月22日 下午4:47,zhisheng <[hidden email]> 写道: > > > > hi, > > > > es index 的 mapping 是否提前设置好了? > > > > 我看到异常 : > > > >> failed to process cluster event (put-mapping) within 30s > > > > 像是自动建 mapping 超时了 > > > > Leonard Xu <[hidden email]> 于2020年4月22日周三 下午4:41写道: > > > >> Hi, > >> > >> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的? > >> 可以从这方面找思路排查下看看 > >> > >> 祝好, > >> Leonard Xu > >> > >> > >> > >>> 在 2020年4月22日,16:10,Oliver <[hidden email]> 写道: > >>> > >>> hi, > >>> > >>> > >>> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程, > >>> > >> > 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。 > >>> > >>> > >>> 想咨询下这种问题应该怎么样排查和处理? > >>> > >>> > >>> flink版本:1.10 > >>> ES版本:6.x > >>> > >>> > >>> 使用jar:flink-sql-connector-elasticsearch6_2.12 > >>> > >>> > >>> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀 > >>> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建 > >>> > >>> > >>> ES异常如下: > >>> > >>> > >>> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed > >> Elasticsearch item request: ElasticsearchException[Elasticsearch > exception > >> [type=process_cluster_event_timeout_exception, reason=failed to process > >> cluster event (put-mapping) within 30s]]org.apache.flink. > >> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: > >> Elasticsearch exception [type=process_cluster_event_timeout_exception, > >> reason=failed to process cluster event (put-mapping) within 30s] > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> .elasticsearch.client.RestClient$1.completed(RestClient.java:375) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> .elasticsearch.client.RestClient$1.completed(RestClient.java:366) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) > >>> at java.lang.Thread.run(Thread.java:748) > >>> > >>> > >>> > >>> flinkSQL: > >>> CREATE TABLE source_table ( > >>> `time` VARCHAR > >>> ,`level` VARCHAR > >>> ,`thread` VARCHAR > >>> ,`class` VARCHAR > >>> ) WITH ( > >>> 'connector.type' = 'kafka', > >>> 'connector.version' = 'universal', > >>> 'connector.topic' = 'xxxx', > >>> 'connector.startup-mode' = 'latest-offset', > >>> 'connector.properties.group.id' = 'xxxx', > >>> 'connector.properties.zookeeper.connect' = 'ip:2181', > >>> 'connector.properties.bootstrap.servers' = 'ip:9092', > >>> 'format.type' = 'json', > >>> 'format.derive-schema' = 'true' > >>> ); > >>> > >>> > >>> CREATE TABLE result_table ( > >>> `time` VARCHAR > >>> ,`level` VARCHAR > >>> ,`thread` VARCHAR > >>> ,`class` VARCHAR > >>> ) WITH ( > >>> 'connector.type' = 'elasticsearch', > >>> 'connector.version' = '6', > >>> 'connector.hosts' = 'xxxx, > >>> 'connector.index' = 'xxxx-yyyy.MM.dd', > >>> 'connector.document-type' = 'doc', > >>> 'update-mode' = 'append', > >>> 'connector.bulk-flush.interval' = '1000', > >>> 'connector.bulk-flush.backoff.type' = 'exponential', > >>> 'connector.bulk-flush.backoff.max-retries' = '10', > >>> 'connector.bulk-flush.backoff.delay' = '60000', > >>> 'connector.failure-handler' = 'ignore', > >>> 'format.type' = 'json' > >>> ); > >>> > >>> > >>> INSERT INTO result_table > >>> SELECT > >>> `time`,`level`,thread,class > >>> FROM source_table > >>> WHERE `method`='xxxx'; > >> > >> > > |
Free forum by Nabble | Edit this page |