cdc代码报错

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

cdc代码报错

hl9902@126.com
Hi,all:
我运行ververica/flink-cdc-connectors git上的demo代码,报错:
2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from RUNNING to FAILED.
java.lang.AbstractMethodError: org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

源码:
public class CdcTest {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("sohay") // monitor all tables under inventory database
                .username("root")
                .password("123456")
                .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute();
    }
}



[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: cdc代码报错

hl9902@126.com
flink版本1.11.2,有没有大佬遇到这个问题?



[hidden email]
 
发件人: [hidden email]
发送时间: 2020-11-04 16:43
收件人: user-zh
主题: cdc代码报错
Hi,all:
我运行ververica/flink-cdc-connectors git上的demo代码,报错:
2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from RUNNING to FAILED.
java.lang.AbstractMethodError: org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 
源码:
public class CdcTest {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("sohay") // monitor all tables under inventory database
                .username("root")
                .password("123456")
                .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
                .build();
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
 
        env.execute();
    }
}
 
 
 
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: cdc代码报错

Jark
Administrator
环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。


On Thu, 5 Nov 2020 at 11:35, [hidden email] <[hidden email]> wrote:

> flink版本1.11.2,有没有大佬遇到这个问题?
>
>
>
> [hidden email]
>
> 发件人: [hidden email]
> 发送时间: 2020-11-04 16:43
> 收件人: user-zh
> 主题: cdc代码报错
> Hi,all:
> 我运行ververica/flink-cdc-connectors git上的demo代码,报错:
> 2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out
> (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom
> Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d)
> switched from RUNNING to FAILED.
> java.lang.AbstractMethodError:
> org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
> at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
> at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
> at
> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
> at
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> 源码:
> public class CdcTest {
>     public static void main(String[] args) throws Exception {
>         SourceFunction<String> sourceFunction =
> MySQLSource.<String>builder()
>                 .hostname("localhost")
>                 .port(3306)
>                 .databaseList("sohay") // monitor all tables under
> inventory database
>                 .username("root")
>                 .password("123456")
>                 .deserializer(new StringDebeziumDeserializationSchema())
> // converts SourceRecord to String
>                 .build();
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         env.addSource(sourceFunction).print().setParallelism(1); // use
> parallelism 1 for sink to keep message ordering
>
>         env.execute();
>     }
> }
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: cdc代码报错

hl9902@126.com
确实是pom中存在一个kafka的依赖包,冲突了,去掉后问题解决。感谢大佬。
另外,如果我想使用kafka,就必须引入kafka的包,还是会冲突,有什么解决办法吗?



[hidden email]
 
发件人: Jark Wu
发送时间: 2020-11-05 11:55
收件人: user-zh
主题: Re: cdc代码报错
环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。
 
 
On Thu, 5 Nov 2020 at 11:35, [hidden email] <[hidden email]> wrote:
 

> flink版本1.11.2,有没有大佬遇到这个问题?
>
>
>
> [hidden email]
>
> 发件人: [hidden email]
> 发送时间: 2020-11-04 16:43
> 收件人: user-zh
> 主题: cdc代码报错
> Hi,all:
> 我运行ververica/flink-cdc-connectors git上的demo代码,报错:
> 2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out
> (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom
> Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d)
> switched from RUNNING to FAILED.
> java.lang.AbstractMethodError:
> org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
> at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
> at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
> at
> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
> at
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> 源码:
> public class CdcTest {
>     public static void main(String[] args) throws Exception {
>         SourceFunction<String> sourceFunction =
> MySQLSource.<String>builder()
>                 .hostname("localhost")
>                 .port(3306)
>                 .databaseList("sohay") // monitor all tables under
> inventory database
>                 .username("root")
>                 .password("123456")
>                 .deserializer(new StringDebeziumDeserializationSchema())
> // converts SourceRecord to String
>                 .build();
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         env.addSource(sourceFunction).print().setParallelism(1); // use
> parallelism 1 for sink to keep message ordering
>
>         env.execute();
>     }
> }
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: cdc代码报错

Jark
Administrator
可以看下 flink-cdc-connectors 中用的是什么版本,使用一样的kafka-connect版本。

On Thu, 5 Nov 2020 at 14:07, [hidden email] <[hidden email]> wrote:

> 确实是pom中存在一个kafka的依赖包,冲突了,去掉后问题解决。感谢大佬。
> 另外,如果我想使用kafka,就必须引入kafka的包,还是会冲突,有什么解决办法吗?
>
>
>
> [hidden email]
>
> 发件人: Jark Wu
> 发送时间: 2020-11-05 11:55
> 收件人: user-zh
> 主题: Re: cdc代码报错
> 环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。
>
>
> On Thu, 5 Nov 2020 at 11:35, [hidden email] <[hidden email]> wrote:
>
> > flink版本1.11.2,有没有大佬遇到这个问题?
> >
> >
> >
> > [hidden email]
> >
> > 发件人: [hidden email]
> > 发送时间: 2020-11-04 16:43
> > 收件人: user-zh
> > 主题: cdc代码报错
> > Hi,all:
> > 我运行ververica/flink-cdc-connectors git上的demo代码,报错:
> > 2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out
> > (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom
> > Source -> Sink: Print to Std. Out (1/1)
> (7c3ccf7686ccfb33254e8cb785cd339d)
> > switched from RUNNING to FAILED.
> > java.lang.AbstractMethodError:
> > org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
> > at
> >
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
> > at
> >
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
> > at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
> > at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
> > at
> >
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
> > at
> >
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
> > at
> >
> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
> > at
> >
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> >
> > 源码:
> > public class CdcTest {
> >     public static void main(String[] args) throws Exception {
> >         SourceFunction<String> sourceFunction =
> > MySQLSource.<String>builder()
> >                 .hostname("localhost")
> >                 .port(3306)
> >                 .databaseList("sohay") // monitor all tables under
> > inventory database
> >                 .username("root")
> >                 .password("123456")
> >                 .deserializer(new StringDebeziumDeserializationSchema())
> > // converts SourceRecord to String
> >                 .build();
> >
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >         env.addSource(sourceFunction).print().setParallelism(1); // use
> > parallelism 1 for sink to keep message ordering
> >
> >         env.execute();
> >     }
> > }
> >
> >
> >
> > [hidden email]
> >
>