关于Flink CDC问题第二弹

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

关于Flink CDC问题第二弹

tonychen
hi everyone:

今天试用了下1.11.1下的cdc功能,遇到几个问题,来请教一下大家。

环境为 https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B> 中提供的 docker-compose.yml

问题1:通过sql-cli提交了flink-sql,之后修改本地mysql中的数据时,报错。是否对jdk版本有要求?
Caused by: java.lang.NoSuchMethodError: sun.misc.Unsafe.monitorEnter(Ljava/lang/Object;)V
    at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:101)
    at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:810)
    at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
我的JDK版本如下
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

问题2:通过代码方式提交flink-sql,(我自己实现的一个类似https://github.com/ververica/flink-sql-gateway的项目)总是报错。测试sql为https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B> 中的demo的三流join的那个flink sql

报错信息如下:

2020-08-12 19:18:51,034 WARN  org.apache.flink.runtime.taskmanager.Task                     - Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[order_id, order_date, customer_name, price, product_id, order_status]) (1/1) (f2a72e33dc272266408e47d60724b078) switched from RUNNING to FAILED.
java.lang.AbstractMethodError: Receiver class org.apache.kafka.connect.json.JsonSerializer does not define or inherit an implementation of the resolved method abstract configure(Ljava/util/Map;Z)V of interface org.apache.kafka.common.serialization.Serializer.
        at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
        at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
        at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:582)
        at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:79)
        at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:300)
        at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:216)
        at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
        at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)



Best,
TonyChen

Reply | Threaded
Open this post in threaded view
|

Re: 关于Flink CDC问题第二弹

Jark
Administrator
1. 是的。目前JDK11还不支持。因为用了 Unsafe 的一些方法。 这个会在未来修复。
2. 第二个问题貌似是类冲突的问题。 flink-cdc-connectors 依赖了Kafka Connect 2.5.0,并且没有做
shade。如果你的项目中也依赖了Kafka Connect,要保持版本一致。

Best,
Jark

On Wed, 12 Aug 2020 at 20:07, 陈韬 <[hidden email]> wrote:

> hi everyone:
>
> 今天试用了下1.11.1下的cdc功能,遇到几个问题,来请教一下大家。
>
> 环境为 https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <
> https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B>
> 中提供的 docker-compose.yml
>
> 问题1:通过sql-cli提交了flink-sql,之后修改本地mysql中的数据时,报错。是否对jdk版本有要求?
> Caused by: java.lang.NoSuchMethodError:
> sun.misc.Unsafe.monitorEnter(Ljava/lang/Object;)V
>     at
> com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:101)
>     at
> io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
>     at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:810)
>     at
> io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
>     at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:834)
> 我的JDK版本如下
> openjdk version "11.0.2" 2019-01-15
> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
>
> 问题2:通过代码方式提交flink-sql,(我自己实现的一个类似
> https://github.com/ververica/flink-sql-gateway的项目)总是报错。测试sql为
> https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <
> https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B>
> 中的demo的三流join的那个flink sql
>
> 报错信息如下:
>
> 2020-08-12 19:18:51,034 WARN  org.apache.flink.runtime.taskmanager.Task
>                  - Source: TableSourceScan(table=[[default_catalog,
> default_database, orders]], fields=[order_id, order_date, customer_name,
> price, product_id, order_status]) (1/1) (f2a72e33dc272266408e47d60724b078)
> switched from RUNNING to FAILED.
> java.lang.AbstractMethodError: Receiver class
> org.apache.kafka.connect.json.JsonSerializer does not define or inherit an
> implementation of the resolved method abstract configure(Ljava/util/Map;Z)V
> of interface org.apache.kafka.common.serialization.Serializer.
>         at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
>         at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
>         at
> io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:582)
>         at
> io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:79)
>         at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:300)
>         at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:216)
>         at
> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
>         at
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>
>
> Best,
> TonyChen
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于Flink CDC问题第二弹

tonychen
谢谢,那我等下再试一下


Best,
TonyChen

> 2020年8月12日 下午10:27,Jark Wu <[hidden email]> 写道:
>
> 1. 是的。目前JDK11还不支持。因为用了 Unsafe 的一些方法。 这个会在未来修复。
> 2. 第二个问题貌似是类冲突的问题。 flink-cdc-connectors 依赖了Kafka Connect 2.5.0,并且没有做
> shade。如果你的项目中也依赖了Kafka Connect,要保持版本一致。
>
> Best,
> Jark
>
> On Wed, 12 Aug 2020 at 20:07, 陈韬 <[hidden email]> wrote:
>
>> hi everyone:
>>
>> 今天试用了下1.11.1下的cdc功能,遇到几个问题,来请教一下大家。
>>
>> 环境为 https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <
>> https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B>
>> 中提供的 docker-compose.yml
>>
>> 问题1:通过sql-cli提交了flink-sql,之后修改本地mysql中的数据时,报错。是否对jdk版本有要求?
>> Caused by: java.lang.NoSuchMethodError:
>> sun.misc.Unsafe.monitorEnter(Ljava/lang/Object;)V
>>    at
>> com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:101)
>>    at
>> io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
>>    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:810)
>>    at
>> io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
>>    at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>    at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>    at java.base/java.lang.Thread.run(Thread.java:834)
>> 我的JDK版本如下
>> openjdk version "11.0.2" 2019-01-15
>> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
>> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
>>
>> 问题2:通过代码方式提交flink-sql,(我自己实现的一个类似
>> https://github.com/ververica/flink-sql-gateway的项目)总是报错。测试sql为
>> https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <
>> https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B>
>> 中的demo的三流join的那个flink sql
>>
>> 报错信息如下:
>>
>> 2020-08-12 19:18:51,034 WARN  org.apache.flink.runtime.taskmanager.Task
>>                 - Source: TableSourceScan(table=[[default_catalog,
>> default_database, orders]], fields=[order_id, order_date, customer_name,
>> price, product_id, order_status]) (1/1) (f2a72e33dc272266408e47d60724b078)
>> switched from RUNNING to FAILED.
>> java.lang.AbstractMethodError: Receiver class
>> org.apache.kafka.connect.json.JsonSerializer does not define or inherit an
>> implementation of the resolved method abstract configure(Ljava/util/Map;Z)V
>> of interface org.apache.kafka.common.serialization.Serializer.
>>        at
>> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
>>        at
>> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
>>        at
>> io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:582)
>>        at
>> io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:79)
>>        at
>> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:300)
>>        at
>> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:216)
>>        at
>> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
>>        at
>> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
>>        at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>        at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>        at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>
>>
>>
>> Best,
>> TonyChen
>>
>>