flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
在idea调试没报错,提交到yarn集群就报错了。求助。 使用的consumer如下: val logConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties) pom文件导入的依赖如下: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.12</artifactId> <version>1.11.1</version> </dependency> 报错如下: 2020-11-17 16:39:37 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637) ... 15 more -- kingdomad |
从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。
Shade 后的 kakfa 0.10的版本 的 artifactId 为: <artifactId>flink-sql-connector-kafka-0.10_${scala.binary.version}</artifactId> 在 2020-11-17 15:47:08,"kingdomad" <[hidden email]> 写道: >flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。 >在idea调试没报错,提交到yarn集群就报错了。求助。 > > >使用的consumer如下: >val logConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties) > > >pom文件导入的依赖如下: > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.10_2.12</artifactId> > <version>1.11.1</version> > </dependency> > > > >报错如下: > >2020-11-17 16:39:37 > >org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > >at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718) > >at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597) > >at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579) > >at org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58) > >at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > >at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) > >at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > >at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > >at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > >at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) > >at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) > >at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > >at java.lang.Thread.run(Thread.java:745) > >Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer > >at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205) > >at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637) > >... 15 more > > > > > > > >-- > >kingdomad > |
大佬牛逼!
我看了一下,集群上确实是存在了一个flink-sql-connector-kafka-0.10_2.12-1.11.1.jar的包。 请问下这个问题要如何解决 -- kingdomad 在 2020-11-17 17:08:10,"hailongwang" <[hidden email]> 写道: >从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。 >Shade 后的 kakfa 0.10的版本 的 artifactId 为: ><artifactId>flink-sql-connector-kafka-0.10_${scala.binary.version}</artifactId> > >在 2020-11-17 15:47:08,"kingdomad" <[hidden email]> 写道: >>flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。 >>在idea调试没报错,提交到yarn集群就报错了。求助。 >> >> >>使用的consumer如下: >>val logConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties) >> >> >>pom文件导入的依赖如下: >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka-0.10_2.12</artifactId> >> <version>1.11.1</version> >> </dependency> >> >> >> >>报错如下: >> >>2020-11-17 16:39:37 >> >>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer >> >>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718) >> >>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597) >> >>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579) >> >>at org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58) >> >>at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) >> >>at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) >> >>at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >> >>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >> >>at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) >> >>at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) >> >>at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) >> >>at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) >> >>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) >> >>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >> >>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >> >>at java.lang.Thread.run(Thread.java:745) >> >>Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer >> >>at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205) >> >>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637) >> >>... 15 more >> >> >> >> >> >> >> >>-- >> >>kingdomad >> |
1. 如果只用一个 kafka 版本的话,可以不要 flink-sql-connector-kafka-0.10_2.12-1.11.1.jar 这个 jar包把,就是没有必要 将 kafka client shade 。
目前 1.12 版本也只有一个 kafka connector 版本了。 2. 如果确实需要 shade 的包的话,可以指定 `org.apache.kafka` 开头的包优先从用户的 classload 加载。 在 2020-11-17 16:14:24,"kingdomad" <[hidden email]> 写道: >大佬牛逼! >我看了一下,集群上确实是存在了一个flink-sql-connector-kafka-0.10_2.12-1.11.1.jar的包。 >请问下这个问题要如何解决 > > > > > > > >-- > >kingdomad > > > > > > > >在 2020-11-17 17:08:10,"hailongwang" <[hidden email]> 写道: >>从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。 >>Shade 后的 kakfa 0.10的版本 的 artifactId 为: >><artifactId>flink-sql-connector-kafka-0.10_${scala.binary.version}</artifactId> >> >>在 2020-11-17 15:47:08,"kingdomad" <[hidden email]> 写道: >>>flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。 >>>在idea调试没报错,提交到yarn集群就报错了。求助。 >>> >>> >>>使用的consumer如下: >>>val logConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties) >>> >>> >>>pom文件导入的依赖如下: >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-connector-kafka-0.10_2.12</artifactId> >>> <version>1.11.1</version> >>> </dependency> >>> >>> >>> >>>报错如下: >>> >>>2020-11-17 16:39:37 >>> >>>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer >>> >>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718) >>> >>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597) >>> >>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579) >>> >>>at org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58) >>> >>>at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) >>> >>>at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) >>> >>>at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>> >>>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>> >>>at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) >>> >>>at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) >>> >>>at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) >>> >>>at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) >>> >>>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) >>> >>>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> >>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> >>>at java.lang.Thread.run(Thread.java:745) >>> >>>Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer >>> >>>at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205) >>> >>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637) >>> >>>... 15 more >>> >>> >>> >>> >>> >>> >>> >>>-- >>> >>>kingdomad >>> |
Free forum by Nabble | Edit this page |