线上环境出现:org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer,本地没有

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

线上环境出现:org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer,本地没有

Jeff
在本地IDEA里测试处理相同TOPIC正常,但在线上环境出现了这样的异常:
org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer。将StringSerializer 换成 ByteArraySerializer也是类似错误,不知道该如何解决该问题了。请问还有其它思路来解决这个问题吗?
业务逻辑非常简单:从SOURCE表内过滤数据到sink表。
flink版本:1.11.1 kafka版本:2.1.0


SQL内KAFKA配置如下:
source:
create table *******
with (
'connector' = 'kafka',
'topic'='**********',
'scan.startup.mode'='latest-offset',
'format'='json',
'properties.group.id' = '***********',
'properties.bootstrap.servers'='***********:9092',
'properties.enable.auto.commit'='true',
'properties.auto.commit.interval.ms'='1000',
'properties.key.deserializer'='org.apache.kafka.common.serialization.StringDeserializer',
'properties.value.deserializer'='org.apache.kafka.common.serialization.StringDeserializer',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username=\"****\" password=\"****\";');


sink:
create table *******
with (
'connector' = 'kafka',
'topic'='*******',
'scan.startup.mode'='latest-offset',
'format'='json',
'properties.bootstrap.servers'='*****:9092',
'properties.max.poll.records'='50',
'properties.enable.auto.commit'='true',
'properties.auto.commit.interval.ms'='1000',
'properties.key.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'properties.value.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username=\"*****\" password=\"******\";');