ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

kingdomad
flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
在idea调试没报错,提交到yarn集群就报错了。求助。


使用的consumer如下:
val logConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties)


pom文件导入的依赖如下:
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
            <version>1.11.1</version>
        </dependency>



报错如下:

2020-11-17 16:39:37

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)

at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)

at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)

at org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58)

at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)

at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)

at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)

at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637)

... 15 more







--

kingdomad

Reply | Threaded
Open this post in threaded view
|

Re:ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

hailongwang
从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。
Shade 后的 kakfa 0.10的版本 的 artifactId 为:
<artifactId>flink-sql-connector-kafka-0.10_${scala.binary.version}</artifactId>

在 2020-11-17 15:47:08,"kingdomad" <[hidden email]> 写道:

>flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
>在idea调试没报错,提交到yarn集群就报错了。求助。
>
>
>使用的consumer如下:
>val logConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties)
>
>
>pom文件导入的依赖如下:
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
>            <version>1.11.1</version>
>        </dependency>
>
>
>
>报错如下:
>
>2020-11-17 16:39:37
>
>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>
>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
>
>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
>
>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
>
>at org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58)
>
>at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>
>at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>
>at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
>at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>
>at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>
>at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>
>at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>
>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
>at java.lang.Thread.run(Thread.java:745)
>
>Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
>
>at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
>
>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637)
>
>... 15 more
>
>
>
>
>
>
>
>--
>
>kingdomad
>
Reply | Threaded
Open this post in threaded view
|

Re:Re:ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

kingdomad
大佬牛逼!
我看了一下,集群上确实是存在了一个flink-sql-connector-kafka-0.10_2.12-1.11.1.jar的包。
请问下这个问题要如何解决







--

kingdomad







在 2020-11-17 17:08:10,"hailongwang" <[hidden email]> 写道:

>从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。
>Shade 后的 kakfa 0.10的版本 的 artifactId 为:
><artifactId>flink-sql-connector-kafka-0.10_${scala.binary.version}</artifactId>
>
>在 2020-11-17 15:47:08,"kingdomad" <[hidden email]> 写道:
>>flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
>>在idea调试没报错,提交到yarn集群就报错了。求助。
>>
>>
>>使用的consumer如下:
>>val logConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties)
>>
>>
>>pom文件导入的依赖如下:
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
>>            <version>1.11.1</version>
>>        </dependency>
>>
>>
>>
>>报错如下:
>>
>>2020-11-17 16:39:37
>>
>>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>>
>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
>>
>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
>>
>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
>>
>>at org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58)
>>
>>at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>
>>at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>>
>>at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>
>>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>
>>at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>>
>>at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>>
>>at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>>
>>at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>>
>>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>>
>>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>
>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>
>>at java.lang.Thread.run(Thread.java:745)
>>
>>Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
>>
>>at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
>>
>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637)
>>
>>... 15 more
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>kingdomad
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

hailongwang
1. 如果只用一个 kafka 版本的话,可以不要 flink-sql-connector-kafka-0.10_2.12-1.11.1.jar 这个 jar包把,就是没有必要 将 kafka client shade 。
目前 1.12 版本也只有一个 kafka connector 版本了。
2. 如果确实需要 shade 的包的话,可以指定 `org.apache.kafka` 开头的包优先从用户的 classload 加载。

在 2020-11-17 16:14:24,"kingdomad" <[hidden email]> 写道:

>大佬牛逼!
>我看了一下,集群上确实是存在了一个flink-sql-connector-kafka-0.10_2.12-1.11.1.jar的包。
>请问下这个问题要如何解决
>
>
>
>
>
>
>
>--
>
>kingdomad
>
>
>
>
>
>
>
>在 2020-11-17 17:08:10,"hailongwang" <[hidden email]> 写道:
>>从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。
>>Shade 后的 kakfa 0.10的版本 的 artifactId 为:
>><artifactId>flink-sql-connector-kafka-0.10_${scala.binary.version}</artifactId>
>>
>>在 2020-11-17 15:47:08,"kingdomad" <[hidden email]> 写道:
>>>flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
>>>在idea调试没报错,提交到yarn集群就报错了。求助。
>>>
>>>
>>>使用的consumer如下:
>>>val logConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties)
>>>
>>>
>>>pom文件导入的依赖如下:
>>>        <dependency>
>>>            <groupId>org.apache.flink</groupId>
>>>            <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
>>>            <version>1.11.1</version>
>>>        </dependency>
>>>
>>>
>>>
>>>报错如下:
>>>
>>>2020-11-17 16:39:37
>>>
>>>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>>>
>>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
>>>
>>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
>>>
>>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
>>>
>>>at org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58)
>>>
>>>at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>
>>>at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>>>
>>>at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>
>>>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>
>>>at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>>>
>>>at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>>>
>>>at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>>>
>>>at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>>>
>>>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>>>
>>>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>
>>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>
>>>at java.lang.Thread.run(Thread.java:745)
>>>
>>>Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
>>>
>>>at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
>>>
>>>at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637)
>>>
>>>... 15 more
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>--
>>>
>>>kingdomad
>>>