升级kafka导致fink job失败

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

升级kafka导致fink job失败

freeza1982@outlook.com
Hi :
    生产上在凌晨的时候做了1个升级,线上有13个job,其中有1个名为gift的job失败了,其他的都是正常的,很奇怪,查看日志后看到如下信息,此类问题如何规避,是否flink需要做一些特殊设置?
JM的日志
  [2021-02-08 00:49:23.714][org.apache.flink.runtime.checkpoint.CheckpointCoordinator][]Completed checkpoint 1435986 for job d31fe16e0525e20a19dc79c88ab958a2 (10872 bytes in 41 ms).
[2021-02-08 00:49:23.859][org.apache.flink.runtime.executiongraph.ExecutionGraph][]Source: Custom Source -> Filter -> Flat Map (1/1) (cc7dc71b5a40ece6fd1383ae30c316a4) switched from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Error registering mbean kafka.consumer:type=consumer-node-metrics,client-id=consumer-2,node-id=node-2147483544
at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:163)
at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:81)
at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:504)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:255)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:240)
at org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:817)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:353)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
Caused by: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=consumer-node-metrics,client-id=consumer-2,node-id=node-2147483544
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:161)
... 13 common frames omitted
[2021-02-08 00:49:23.859][org.apache.flink.runtime.executiongraph.ExecutionGraph][]Job ty-bi-flink:gift (36ecb2327a19f39c3a48e5f9467255b9) switched from state RUNNING to FAILING.
org.apache.kafka.common.KafkaException: Error registering mbean kafka.consumer:type=consumer-node-metrics,client-id=consumer-2,node-id=node-2147483544
at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:163)
at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:81)
at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:504)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:255)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:240)
at org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:817)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:353)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
Caused by: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=consumer-node-metrics,client-id=consumer-2,node-id=node-2147483544
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:161)
... 13 common frames omitted
[2021-02-08 00:49:23.862][org.apache.flink.runtime.executiongraph.ExecutionGraph][]TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{name=window-contents, defaultValue=null, serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@503e2d76}, ProcessingTimeTrigger(), AllWindowedStream.addSinkByTopic(Application.java:254)) -> Sink: gift (1/1) (0bf38f64eef92b7f900e1a1510b5ac2a) switched from RUNNING to CANCELING.
[2021-02-08 00:49:23.963][org.apache.flink.runtime.executiongraph.ExecutionGraph][]TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{name=window-contents, defaultValue=null, serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@503e2d76}, ProcessingTimeTrigger(), AllWindowedStream.addSinkByTopic(Application.java:254)) -> Sink: gift (1/1) (0bf38f64eef92b7f900e1a1510b5ac2a) switched from CANCELING to CANCELED.
[2021-02-08 00:49:23.964][org.apache.flink.runtime.executiongraph.ExecutionGraph][]Try to restart or fail the job ty-bi-flink:gift (36ecb2327a19f39c3a48e5f9467255b9) if no longer possible.
[2021-02-08 00:49:23.964][org.apache.flink.runtime.executiongraph.ExecutionGraph][]Job ty-bi-flink:gift (36ecb2327a19f39c3a48e5f9467255b9) switched from state FAILING to FAILED.
org.apache.kafka.common.KafkaException: Error registering mbean kafka.consumer:type=consumer-node-metrics,client-id=consumer-2,node-id=node-2147483544
at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:163)
at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:81)
at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:504)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:255)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:240)
at org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:817)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:353)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
Caused by: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=consumer-node-metrics,client-id=consumer-2,node-id=node-2147483544
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:161)
... 13 common frames omitted
---
[2021-02-08 00:49:23.966][org.apache.flink.runtime.checkpoint.CheckpointCoordinator][]Stopping checkpoint coordinator for job 36ecb2327a19f39c3a48e5f9467255b9.
[2021-02-08 00:49:23.966][org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore][]Shutting down
[2021-02-08 00:49:23.972][org.apache.flink.runtime.dispatcher.StandaloneDispatcher][]Job 36ecb2327a19f39c3a48e5f9467255b9 reached globally terminal state FAILED.
[2021-02-08 00:49:23.989][org.apache.flink.runtime.jobmaster.JobMaster][]Stopping the JobMaster for job ty-bi-flink:gift(36ecb2327a19f39c3a48e5f9467255b9).
[2021-02-08 00:49:23.991][org.apache.flink.runtime.checkpoint.CheckpointCoordinator][]Triggering checkpoint 1435959 @ 1612716563991 for job 4bece886fb7bd17c2e67bbfca7c49ddb.
[2021-02-08 00:49:23.997][org.apache.flink.runtime.jobmaster.JobMaster][]Close ResourceManager connection a5c2e0aad7cfa0a7051cc0f673a82e94: JobManager is shutting down..
[2021-02-08 00:49:23.997][org.apache.flink.runtime.jobmaster.slotpool.SlotPool][]Suspending SlotPool.
[2021-02-08 00:49:23.997][org.apache.flink.runtime.jobmaster.slotpool.SlotPool][]Stopping SlotPool.
[2021-02-08 00:49:23.997][org.apache.flink.runtime.jobmaster.JobManagerRunner][]JobManagerRunner already shutdown.


TM的日志
[2021-02-08 00:49:23.675][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][]Marking the coordinator 172.20.23.243:9092 (id: 2147483546 rack: null) dead for group ty_bi_flink
[2021-02-08 00:49:23.721][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][]Discovered coordinator 172.20.23.245:9092 (id: 2147483544 rack: null) for group ty_bi_flink.
[2021-02-08 00:49:23.721][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][]Marking the coordinator 172.20.23.243:9092 (id: 2147483546 rack: null) dead for group ty_bi_flink
[2021-02-08 00:49:23.721][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][]Discovered coordinator 172.20.23.245:9092 (id: 2147483544 rack: null) for group ty_bi_flink.
[2021-02-08 00:49:23.751][org.apache.flink.runtime.taskmanager.Task][]Source: Custom Source -> Filter -> Flat Map (1/1) (cc7dc71b5a40ece6fd1383ae30c316a4) switched from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Error registering mbean kafka.consumer:type=consumer-node-metrics,client-id=consumer-2,node-id=node-2147483544
at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:163)
at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:81)
at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:504)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:255)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:240)
at org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:817)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:353)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
Caused by: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=consumer-node-metrics,client-id=consumer-2,node-id=node-2147483544
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:161)
... 13 common frames omitted
[2021-02-08 00:49:23.751][org.apache.flink.runtime.taskmanager.Task][]Freeing task resources for Source: Custom Source -> Filter -> Flat Map (1/1) (cc7dc71b5a40ece6fd1383ae30c316a4).
[2021-02-08 00:49:23.751][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][]Marking the coordinator 172.20.23.245:9092 (id: 2147483544 rack: null) dead for group ty_bi_flink
[2021-02-08 00:49:23.752][org.apache.flink.runtime.taskmanager.Task][]Ensuring all FileSystem streams are closed for task Source: Custom Source -> Filter -> Flat Map (1/1) (cc7dc71b5a40ece6fd1383ae30c316a4) [FAILED]
[2021-02-08 00:49:23.752][org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher][]Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: This is not the correct coordinator.
[2021-02-08 00:49:23.752][org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase][]Async Kafka commit failed.
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: This is not the correct coordinator.
[2021-02-08 00:49:23.753][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][]Marking the coordinator 172.20.23.245:9092 (id: 2147483544 rack: null) dead for group ty_bi_flink
[2021-02-08 00:49:23.776][org.apache.flink.runtime.taskexecutor.TaskExecutor][]Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Filter -> Flat Map cc7dc71b5a40ece6fd1383ae30c316a4.


[hidden email]