Kafka Consumer反序列化错问题

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka Consumer反序列化错问题

Even
Hi!
请教一个Kafka Consumer反序列问题:
一个kafka consumer job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties))
2020-05-27 17:05:22
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:811)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:659)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:639)
        at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:705)
        ... 15 more
Reply | Threaded
Open this post in threaded view
|

回复:Kafka Consumer反序列化错问题

Shuai Xia
可以排除一下是否是jar包冲突


------------------------------------------------------------------
发件人:Even <[hidden email]>
发送时间:2020年5月29日(星期五) 16:17
收件人:user-zh <[hidden email]>
主 题:Kafka Consumer反序列化错问题

Hi!
请教一个Kafka Consumer反序列问题:
一个kafka&nbsp;consumer&nbsp;job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties))
2020-05-27&nbsp;17:05:22
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:811)
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:659)
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:639)
 at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
 at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
 at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
 at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
 at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:705)
 ... 15 more
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Consumer反序列化错问题

zz zhang
应该是maven-shade那边配置问题,
原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常

夏帅 <[hidden email]> 于2020年5月29日周五 下午4:33写道:

>
> 可以排除一下是否是jar包冲突
>
>
> ------------------------------------------------------------------
> 发件人:Even <[hidden email]>
> 发送时间:2020年5月29日(星期五) 16:17
> 收件人:user-zh <[hidden email]>
> 主 题:Kafka Consumer反序列化错问题
>
> Hi!
> 请教一个Kafka Consumer反序列问题:
> 一个kafka&nbsp;consumer&nbsp;job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
> 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties))
> 2020-05-27&nbsp;17:05:22
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>  at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:811)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:659)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:639)
>  at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>  at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>  at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
>  at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:705)
>  ... 15 more



--
Best,
zz zhang
Reply | Threaded
Open this post in threaded view
|

回复: Kafka Consumer反序列化错问题

Even
谢谢,请问需要怎么处理避免这个问题?




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"zz zhang"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月29日(星期五) 下午5:16
收件人:&nbsp;"user-zh"<[hidden email]&gt;;"夏帅"<[hidden email]&gt;;

主题:&nbsp;Re: Kafka Consumer反序列化错问题



应该是maven-shade那边配置问题,
原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常

夏帅 <[hidden email]&gt; 于2020年5月29日周五 下午4:33写道:
&gt;
&gt; 可以排除一下是否是jar包冲突
&gt;
&gt;
&gt; ------------------------------------------------------------------
&gt; 发件人:Even <[hidden email]&gt;
&gt; 发送时间:2020年5月29日(星期五) 16:17
&gt; 收件人:user-zh <[hidden email]&gt;
&gt; 主 题:Kafka Consumer反序列化错问题
&gt;
&gt; Hi!
&gt; 请教一个Kafka Consumer反序列问题:
&gt; 一个kafka&amp;nbsp;consumer&amp;nbsp;job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
&gt; 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties))
&gt; 2020-05-27&amp;nbsp;17:05:22
&gt; org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
&gt;&nbsp; at org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:811)
&gt;&nbsp; at org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:659)
&gt;&nbsp; at org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:639)
&gt;&nbsp; at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
&gt;&nbsp; at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
&gt;&nbsp; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
&gt;&nbsp; at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
&gt;&nbsp; at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
&gt;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
&gt;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
&gt;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
&gt;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
&gt;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
&gt;&nbsp; at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt;&nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
&gt;&nbsp; at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
&gt;&nbsp; at org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:705)
&gt;&nbsp; ... 15 more



--
Best,
zz zhang
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Consumer反序列化错问题

tison
这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。

参考这个文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath

Best,
tison.


Even <[hidden email]> 于2020年5月29日周五 下午6:48写道:

> 谢谢,请问需要怎么处理避免这个问题?
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"zz zhang"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年5月29日(星期五) 下午5:16
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;"夏帅"<
> [hidden email]&gt;;
>
> 主题:&nbsp;Re: Kafka Consumer反序列化错问题
>
>
>
> 应该是maven-shade那边配置问题,
>
> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
> apache.flink.kafka.shaded.org
> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>
> 夏帅 <[hidden email]&gt; 于2020年5月29日周五 下午4:33写道:
> &gt;
> &gt; 可以排除一下是否是jar包冲突
> &gt;
> &gt;
> &gt; ------------------------------------------------------------------
> &gt; 发件人:Even <[hidden email]&gt;
> &gt; 发送时间:2020年5月29日(星期五) 16:17
> &gt; 收件人:user-zh <[hidden email]&gt;
> &gt; 主 题:Kafka Consumer反序列化错问题
> &gt;
> &gt; Hi!
> &gt; 请教一个Kafka Consumer反序列问题:
> &gt; 一个kafka&amp;nbsp;consumer&amp;nbsp;job 提交到Flink session
> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
> &gt; 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
> env.addSource(new FlinkKafkaConsumer[String](topic, new
> SimpleStringSchema(), properties))
> &gt; 2020-05-27&amp;nbsp;17:05:22
> &gt; org.apache.kafka.common.KafkaException: Failed to construct kafka
> consumer
> &gt;&nbsp; at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:811)
> &gt;&nbsp; at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:659)
> &gt;&nbsp; at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:639)
> &gt;&nbsp; at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> &gt;&nbsp; at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> &gt;&nbsp; at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
> &gt;&nbsp; at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> &gt;&nbsp; at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> &gt;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
> &gt;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> &gt;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> &gt;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> &gt;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> &gt;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> &gt;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> &gt;&nbsp; at java.lang.Thread.run(Thread.java:748)
> &gt; Caused by: org.apache.kafka.common.KafkaException:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> is not an instance of org.apache.kafka.common.serialization.Deserializer
> &gt;&nbsp; at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
> &gt;&nbsp; at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:705)
> &gt;&nbsp; ... 15 more
>
>
>
> --
> Best,
> zz zhang
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Consumer反序列化错问题

tison
另外关于类加载的一般性文档,可以看下这个

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

Best,
tison.


tison <[hidden email]> 于2020年5月29日周五 下午7:46写道:

> 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。
>
> 参考这个文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath
>
> Best,
> tison.
>
>
> Even <[hidden email]> 于2020年5月29日周五 下午6:48写道:
>
>> 谢谢,请问需要怎么处理避免这个问题?
>>
>>
>>
>>
>> ------------------&nbsp;原始邮件&nbsp;------------------
>> 发件人:&nbsp;"zz zhang"<[hidden email]&gt;;
>> 发送时间:&nbsp;2020年5月29日(星期五) 下午5:16
>> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;"夏帅"<
>> [hidden email]&gt;;
>>
>> 主题:&nbsp;Re: Kafka Consumer反序列化错问题
>>
>>
>>
>> 应该是maven-shade那边配置问题,
>>
>> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
>> apache.flink.kafka.shaded.org
>> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>>
>> 夏帅 <[hidden email]&gt; 于2020年5月29日周五 下午4:33写道:
>> &gt;
>> &gt; 可以排除一下是否是jar包冲突
>> &gt;
>> &gt;
>> &gt; ------------------------------------------------------------------
>> &gt; 发件人:Even <[hidden email]&gt;
>> &gt; 发送时间:2020年5月29日(星期五) 16:17
>> &gt; 收件人:user-zh <[hidden email]&gt;
>> &gt; 主 题:Kafka Consumer反序列化错问题
>> &gt;
>> &gt; Hi!
>> &gt; 请教一个Kafka Consumer反序列问题:
>> &gt; 一个kafka&amp;nbsp;consumer&amp;nbsp;job 提交到Flink session
>> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
>> &gt; 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
>> env.addSource(new FlinkKafkaConsumer[String](topic, new
>> SimpleStringSchema(), properties))
>> &gt; 2020-05-27&amp;nbsp;17:05:22
>> &gt; org.apache.kafka.common.KafkaException: Failed to construct kafka
>> consumer
>> &gt;&nbsp; at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:811)
>> &gt;&nbsp; at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:659)
>> &gt;&nbsp; at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:639)
>> &gt;&nbsp; at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>> &gt;&nbsp; at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>> &gt;&nbsp; at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>> &gt;&nbsp; at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> &gt;&nbsp; at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> &gt;&nbsp; at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>> &gt;&nbsp; at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> &gt;&nbsp; at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> &gt;&nbsp; at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> &gt;&nbsp; at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> &gt;&nbsp; at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> &gt;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> &gt;&nbsp; at java.lang.Thread.run(Thread.java:748)
>> &gt; Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>> is not an instance of org.apache.kafka.common.serialization.Deserializer
>> &gt;&nbsp; at
>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>> &gt;&nbsp; at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:705)
>> &gt;&nbsp; ... 15 more
>>
>>
>>
>> --
>> Best,
>> zz zhang
>
>
Reply | Threaded
Open this post in threaded view
|

回复: Kafka Consumer反序列化错问题

Even
谢谢大家!pom文件相关依赖设置为provided后重新打包解决了该问题,初步来看是jar包冲突。非常感谢大家的详细解答,同时正在学习类加载的顺序问题。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"tison"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月29日(星期五) 晚上7:47
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Kafka Consumer反序列化错问题



另外关于类加载的一般性文档,可以看下这个

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

Best,
tison.


tison <[hidden email]&gt; 于2020年5月29日周五 下午7:46写道:

&gt; 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。
&gt;
&gt; 参考这个文档
&gt; https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath
&gt;
&gt; Best,
&gt; tison.
&gt;
&gt;
&gt; Even <[hidden email]&gt; 于2020年5月29日周五 下午6:48写道:
&gt;
&gt;&gt; 谢谢,请问需要怎么处理避免这个问题?
&gt;&gt;
&gt;&gt;
&gt;&gt;
&gt;&gt;
&gt;&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt;&gt; 发件人:&amp;nbsp;"zz zhang"<[hidden email]&amp;gt;;
&gt;&gt; 发送时间:&amp;nbsp;2020年5月29日(星期五) 下午5:16
&gt;&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;"夏帅"<
&gt;&gt; [hidden email]&amp;gt;;
&gt;&gt;
&gt;&gt; 主题:&amp;nbsp;Re: Kafka Consumer反序列化错问题
&gt;&gt;
&gt;&gt;
&gt;&gt;
&gt;&gt; 应该是maven-shade那边配置问题,
&gt;&gt;
&gt;&gt; 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
&gt;&gt; apache.flink.kafka.shaded.org
&gt;&gt; .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
&gt;&gt;
&gt;&gt; 夏帅 <[hidden email]&amp;gt; 于2020年5月29日周五 下午4:33写道:
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt; 可以排除一下是否是jar包冲突
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt; ------------------------------------------------------------------
&gt;&gt; &amp;gt; 发件人:Even <[hidden email]&amp;gt;
&gt;&gt; &amp;gt; 发送时间:2020年5月29日(星期五) 16:17
&gt;&gt; &amp;gt; 收件人:user-zh <[hidden email]&amp;gt;
&gt;&gt; &amp;gt; 主 题:Kafka Consumer反序列化错问题
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt; Hi!
&gt;&gt; &amp;gt; 请教一个Kafka Consumer反序列问题:
&gt;&gt; &amp;gt; 一个kafka&amp;amp;nbsp;consumer&amp;amp;nbsp;job 提交到Flink session
&gt;&gt; cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
&gt;&gt; &amp;gt; 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
&gt;&gt; env.addSource(new FlinkKafkaConsumer[String](topic, new
&gt;&gt; SimpleStringSchema(), properties))
&gt;&gt; &amp;gt; 2020-05-27&amp;amp;nbsp;17:05:22
&gt;&gt; &amp;gt; org.apache.kafka.common.KafkaException: Failed to construct kafka
&gt;&gt; consumer
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:811)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:659)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:639)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt;&gt; &amp;gt;&amp;nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt;&gt; &amp;gt;&amp;nbsp; at java.lang.Thread.run(Thread.java:748)
&gt;&gt; &amp;gt; Caused by: org.apache.kafka.common.KafkaException:
&gt;&gt; org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
&gt;&gt; is not an instance of org.apache.kafka.common.serialization.Deserializer
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
&gt;&gt; &amp;gt;&amp;nbsp; at
&gt;&gt; org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:705)
&gt;&gt; &amp;gt;&amp;nbsp; ... 15 more
&gt;&gt;
&gt;&gt;
&gt;&gt;
&gt;&gt; --
&gt;&gt; Best,
&gt;&gt; zz zhang
&gt;
&gt;