Hi!
请教一个Kafka Consumer反序列问题: 一个kafka consumer job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下: 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)) 2020-05-27 17:05:22 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639) at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:705) ... 15 more |
可以排除一下是否是jar包冲突
------------------------------------------------------------------ 发件人:Even <[hidden email]> 发送时间:2020年5月29日(星期五) 16:17 收件人:user-zh <[hidden email]> 主 题:Kafka Consumer反序列化错问题 Hi! 请教一个Kafka Consumer反序列问题: 一个kafka consumer job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下: 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)) 2020-05-27 17:05:22 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639) at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:705) ... 15 more |
应该是maven-shade那边配置问题,
原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常 夏帅 <[hidden email]> 于2020年5月29日周五 下午4:33写道: > > 可以排除一下是否是jar包冲突 > > > ------------------------------------------------------------------ > 发件人:Even <[hidden email]> > 发送时间:2020年5月29日(星期五) 16:17 > 收件人:user-zh <[hidden email]> > 主 题:Kafka Consumer反序列化错问题 > > Hi! > 请教一个Kafka Consumer反序列问题: > 一个kafka consumer job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下: > 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)) > 2020-05-27 17:05:22 > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811) > at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659) > at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639) > at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) > at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) > at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) > at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer > at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304) > at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:705) > ... 15 more -- Best, zz zhang |
谢谢,请问需要怎么处理避免这个问题?
------------------ 原始邮件 ------------------ 发件人: "zz zhang"<[hidden email]>; 发送时间: 2020年5月29日(星期五) 下午5:16 收件人: "user-zh"<[hidden email]>;"夏帅"<[hidden email]>; 主题: Re: Kafka Consumer反序列化错问题 应该是maven-shade那边配置问题, 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常 夏帅 <[hidden email]> 于2020年5月29日周五 下午4:33写道: > > 可以排除一下是否是jar包冲突 > > > ------------------------------------------------------------------ > 发件人:Even <[hidden email]> > 发送时间:2020年5月29日(星期五) 16:17 > 收件人:user-zh <[hidden email]> > 主 题:Kafka Consumer反序列化错问题 > > Hi! > 请教一个Kafka Consumer反序列问题: > 一个kafka&nbsp;consumer&nbsp;job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下: > 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)) > 2020-05-27&nbsp;17:05:22 > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:811) > at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:659) > at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:639) > at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) > at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) > at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) > at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer > at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304) > at org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:705) > ... 15 more -- Best, zz zhang |
这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。
参考这个文档 https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath Best, tison. Even <[hidden email]> 于2020年5月29日周五 下午6:48写道: > 谢谢,请问需要怎么处理避免这个问题? > > > > > ------------------ 原始邮件 ------------------ > 发件人: "zz zhang"<[hidden email]>; > 发送时间: 2020年5月29日(星期五) 下午5:16 > 收件人: "user-zh"<[hidden email]>;"夏帅"< > [hidden email]>; > > 主题: Re: Kafka Consumer反序列化错问题 > > > > 应该是maven-shade那边配置问题, > > 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org. > apache.flink.kafka.shaded.org > .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常 > > 夏帅 <[hidden email]> 于2020年5月29日周五 下午4:33写道: > > > > 可以排除一下是否是jar包冲突 > > > > > > ------------------------------------------------------------------ > > 发件人:Even <[hidden email]> > > 发送时间:2020年5月29日(星期五) 16:17 > > 收件人:user-zh <[hidden email]> > > 主 题:Kafka Consumer反序列化错问题 > > > > Hi! > > 请教一个Kafka Consumer反序列问题: > > 一个kafka&nbsp;consumer&nbsp;job 提交到Flink session > cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下: > > 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = > env.addSource(new FlinkKafkaConsumer[String](topic, new > SimpleStringSchema(), properties)) > > 2020-05-27&nbsp;17:05:22 > > org.apache.kafka.common.KafkaException: Failed to construct kafka > consumer > > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:811) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:659) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:639) > > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) > > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.kafka.common.KafkaException: > org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer > is not an instance of org.apache.kafka.common.serialization.Deserializer > > at > org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:705) > > ... 15 more > > > > -- > Best, > zz zhang |
另外关于类加载的一般性文档,可以看下这个
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html Best, tison. tison <[hidden email]> 于2020年5月29日周五 下午7:46写道: > 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。 > > 参考这个文档 > https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath > > Best, > tison. > > > Even <[hidden email]> 于2020年5月29日周五 下午6:48写道: > >> 谢谢,请问需要怎么处理避免这个问题? >> >> >> >> >> ------------------ 原始邮件 ------------------ >> 发件人: "zz zhang"<[hidden email]>; >> 发送时间: 2020年5月29日(星期五) 下午5:16 >> 收件人: "user-zh"<[hidden email]>;"夏帅"< >> [hidden email]>; >> >> 主题: Re: Kafka Consumer反序列化错问题 >> >> >> >> 应该是maven-shade那边配置问题, >> >> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org. >> apache.flink.kafka.shaded.org >> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常 >> >> 夏帅 <[hidden email]> 于2020年5月29日周五 下午4:33写道: >> > >> > 可以排除一下是否是jar包冲突 >> > >> > >> > ------------------------------------------------------------------ >> > 发件人:Even <[hidden email]> >> > 发送时间:2020年5月29日(星期五) 16:17 >> > 收件人:user-zh <[hidden email]> >> > 主 题:Kafka Consumer反序列化错问题 >> > >> > Hi! >> > 请教一个Kafka Consumer反序列问题: >> > 一个kafka&nbsp;consumer&nbsp;job 提交到Flink session >> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下: >> > 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = >> env.addSource(new FlinkKafkaConsumer[String](topic, new >> SimpleStringSchema(), properties)) >> > 2020-05-27&nbsp;17:05:22 >> > org.apache.kafka.common.KafkaException: Failed to construct kafka >> consumer >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:811) >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:659) >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:639) >> > at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) >> > at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) >> > at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) >> > at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >> > at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> > at >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> > at java.lang.Thread.run(Thread.java:748) >> > Caused by: org.apache.kafka.common.KafkaException: >> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer >> is not an instance of org.apache.kafka.common.serialization.Deserializer >> > at >> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304) >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:705) >> > ... 15 more >> >> >> >> -- >> Best, >> zz zhang > > |
谢谢大家!pom文件相关依赖设置为provided后重新打包解决了该问题,初步来看是jar包冲突。非常感谢大家的详细解答,同时正在学习类加载的顺序问题。
------------------ 原始邮件 ------------------ 发件人: "tison"<[hidden email]>; 发送时间: 2020年5月29日(星期五) 晚上7:47 收件人: "user-zh"<[hidden email]>; 主题: Re: Kafka Consumer反序列化错问题 另外关于类加载的一般性文档,可以看下这个 https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html Best, tison. tison <[hidden email]> 于2020年5月29日周五 下午7:46写道: > 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。 > > 参考这个文档 > https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath > > Best, > tison. > > > Even <[hidden email]> 于2020年5月29日周五 下午6:48写道: > >> 谢谢,请问需要怎么处理避免这个问题? >> >> >> >> >> ------------------&nbsp;原始邮件&nbsp;------------------ >> 发件人:&nbsp;"zz zhang"<[hidden email]&gt;; >> 发送时间:&nbsp;2020年5月29日(星期五) 下午5:16 >> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;"夏帅"< >> [hidden email]&gt;; >> >> 主题:&nbsp;Re: Kafka Consumer反序列化错问题 >> >> >> >> 应该是maven-shade那边配置问题, >> >> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org. >> apache.flink.kafka.shaded.org >> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常 >> >> 夏帅 <[hidden email]&gt; 于2020年5月29日周五 下午4:33写道: >> &gt; >> &gt; 可以排除一下是否是jar包冲突 >> &gt; >> &gt; >> &gt; ------------------------------------------------------------------ >> &gt; 发件人:Even <[hidden email]&gt; >> &gt; 发送时间:2020年5月29日(星期五) 16:17 >> &gt; 收件人:user-zh <[hidden email]&gt; >> &gt; 主 题:Kafka Consumer反序列化错问题 >> &gt; >> &gt; Hi! >> &gt; 请教一个Kafka Consumer反序列问题: >> &gt; 一个kafka&amp;nbsp;consumer&amp;nbsp;job 提交到Flink session >> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下: >> &gt; 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = >> env.addSource(new FlinkKafkaConsumer[String](topic, new >> SimpleStringSchema(), properties)) >> &gt; 2020-05-27&amp;nbsp;17:05:22 >> &gt; org.apache.kafka.common.KafkaException: Failed to construct kafka >> consumer >> &gt;&nbsp; at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:811) >> &gt;&nbsp; at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:659) >> &gt;&nbsp; at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:639) >> &gt;&nbsp; at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) >> &gt;&nbsp; at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) >> &gt;&nbsp; at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) >> &gt;&nbsp; at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >> &gt;&nbsp; at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >> &gt;&nbsp; at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) >> &gt;&nbsp; at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> &gt;&nbsp; at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> &gt;&nbsp; at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) >> &gt;&nbsp; at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> &gt;&nbsp; at >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> &gt;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> &gt;&nbsp; at java.lang.Thread.run(Thread.java:748) >> &gt; Caused by: org.apache.kafka.common.KafkaException: >> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer >> is not an instance of org.apache.kafka.common.serialization.Deserializer >> &gt;&nbsp; at >> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304) >> &gt;&nbsp; at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;gt;(KafkaConsumer.java:705) >> &gt;&nbsp; ... 15 more >> >> >> >> -- >> Best, >> zz zhang > > |
Free forum by Nabble | Edit this page |