flink读取kafka遇到kafka.consumer:type=app-info,id=1

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink读取kafka遇到kafka.consumer:type=app-info,id=1

宁吉浩
把正则读取kafka-topic的代码进行了修改,然后问题消失了,贴一下读取的代码给大家看看
code:
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(
        Pattern.compile(readTopic), // 正则读取
        new SimpleStringSchema(), // 序列化
        properties);
kafkaConsumer.setStartFromLatest();
实在想不清楚这里会有什么问题?



------------------以下为原始问题-------------------------------------------

使用flink1.9.3 , kafka 1.0.0
flink使用正则读取kafka的多个topic , 每个topic均为3个分区,flink这边并行度是3(1也尝试过),均出现了如下问题
有没有人遇到过这个问题呢?如何解决的
报错如下所示:
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=1
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:477)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:167)

还有这条不知道是什么问题的报错:

java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:282)