Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
16 posts
|
flink版本1.10.2,问题重现如下,请问各位大佬是什么原因:
./sql-client.sh embedded Flink SQL> show tables ; [INFO] Result was empty. Flink SQL> CREATE TABLE tx ( > account_id BIGINT, > amount BIGINT, > transaction_time TIMESTAMP(3), > WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'heli01', > 'properties.bootstrap.servers' = '10.100.51.56:9092', > 'format' = 'csv' > ); Flink SQL> select * from tx ; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector=kafka format=csv properties.bootstrap.servers=10.100.51.56:9092 schema.0.data-type=BIGINT schema.0.name=account_id schema.1.data-type=BIGINT schema.1.name=amount schema.2.data-type=TIMESTAMP(3) schema.2.name=transaction_time schema.watermark.0.rowtime=transaction_time schema.watermark.0.strategy.data-type=TIMESTAMP(3) schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND topic=heli01 The following factories have been considered: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory [hidden email] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
57 posts
|
你貌似使用的是flink-1.11的语法。
可以修改成flink-1.10的语法试试,参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector | | xinghalo | | [hidden email] | 签名由网易邮箱大师定制 在2020年09月28日 09:16,[hidden email]<[hidden email]> 写道: flink版本1.10.2,问题重现如下,请问各位大佬是什么原因: ./sql-client.sh embedded Flink SQL> show tables ; [INFO] Result was empty. Flink SQL> CREATE TABLE tx ( account_id BIGINT, amount BIGINT, transaction_time TIMESTAMP(3), WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'heli01', 'properties.bootstrap.servers' = '10.100.51.56:9092', 'format' = 'csv' ); [INFO] Table has been created. Flink SQL> select * from tx ; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector=kafka format=csv properties.bootstrap.servers=10.100.51.56:9092 schema.0.data-type=BIGINT schema.0.name=account_id schema.1.data-type=BIGINT schema.1.name=amount schema.2.data-type=TIMESTAMP(3) schema.2.name=transaction_time schema.watermark.0.rowtime=transaction_time schema.watermark.0.strategy.data-type=TIMESTAMP(3) schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND topic=heli01 The following factories have been considered: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory [hidden email] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
16 posts
|
确实语法不对。我用了1.10的语法后,执行sql又报了另外一个错误:
Flink SQL> select * from tx ; [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer 相关的lib依赖包如下: [root@rcx51101 lib]# pwd /opt/flink-1.10.2/lib [root@rcx51101 lib]# ll | grep kafka -rw-rw-r-- 1 test test 26169 Sep 28 10:21 flink-connector-kafka-0.10_2.11-1.10.2.jar -rw-rw-r-- 1 test test 54969 Sep 28 10:21 flink-connector-kafka-0.11_2.11-1.10.2.jar -rw-rw-r-- 1 test test 37642 Sep 28 10:21 flink-connector-kafka-0.9_2.11-1.10.2.jar -rw-rw-r-- 1 test test 81912 Aug 17 16:41 flink-connector-kafka_2.12-1.10.2.jar -rw-rw-r-- 1 test test 106632 Sep 28 10:22 flink-connector-kafka-base_2.11-1.10.2.jar -rw-rw-r-- 1 test test 106632 Aug 17 16:36 flink-connector-kafka-base_2.12-1.10.2.jar -rw-rw-r-- 1 test test 1893564 Jul 24 2018 kafka-clients-2.0.0.jar [hidden email] 发件人: 111 发送时间: 2020-09-28 09:23 收件人: [hidden email] 主题: 回复:sql-cli执行sql报错 你貌似使用的是flink-1.11的语法。 可以修改成flink-1.10的语法试试,参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector | | xinghalo | | [hidden email] | 签名由网易邮箱大师定制 在2020年09月28日 09:16,[hidden email]<[hidden email]> 写道: flink版本1.10.2,问题重现如下,请问各位大佬是什么原因: ./sql-client.sh embedded Flink SQL> show tables ; [INFO] Result was empty. Flink SQL> CREATE TABLE tx ( account_id BIGINT, amount BIGINT, transaction_time TIMESTAMP(3), WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'heli01', 'properties.bootstrap.servers' = '10.100.51.56:9092', 'format' = 'csv' ); [INFO] Table has been created. Flink SQL> select * from tx ; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector=kafka format=csv properties.bootstrap.servers=10.100.51.56:9092 schema.0.data-type=BIGINT schema.0.name=account_id schema.1.data-type=BIGINT schema.1.name=amount schema.2.data-type=TIMESTAMP(3) schema.2.name=transaction_time schema.watermark.0.rowtime=transaction_time schema.watermark.0.strategy.data-type=TIMESTAMP(3) schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND topic=heli01 The following factories have been considered: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory [hidden email] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
57 posts
|
HI,
大致看了下,建议可以这么排查: 1 flink-connector-kafka* 包太多了,仅保留你需要的版本即可 2 kafka-clients*包版本太高了,你看看你需要的版本是哪个 Best, xingoo |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
183 posts
|
In reply to this post by hl9902@126.com
kafka的依赖应该是依赖shaded之后的版本,也就是flink-*sql*-connector-kafka***.jar
[hidden email] <[hidden email]> 于2020年9月28日周一 上午10:29写道: > 确实语法不对。我用了1.10的语法后,执行sql又报了另外一个错误: > Flink SQL> select * from tx ; > [ERROR] Could not execute SQL statement. Reason: > java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer > > 相关的lib依赖包如下: > [root@rcx51101 lib]# pwd > /opt/flink-1.10.2/lib > [root@rcx51101 lib]# ll | grep kafka > -rw-rw-r-- 1 test test 26169 Sep 28 10:21 > flink-connector-kafka-0.10_2.11-1.10.2.jar > -rw-rw-r-- 1 test test 54969 Sep 28 10:21 > flink-connector-kafka-0.11_2.11-1.10.2.jar > -rw-rw-r-- 1 test test 37642 Sep 28 10:21 > flink-connector-kafka-0.9_2.11-1.10.2.jar > -rw-rw-r-- 1 test test 81912 Aug 17 16:41 > flink-connector-kafka_2.12-1.10.2.jar > -rw-rw-r-- 1 test test 106632 Sep 28 10:22 > flink-connector-kafka-base_2.11-1.10.2.jar > -rw-rw-r-- 1 test test 106632 Aug 17 16:36 > flink-connector-kafka-base_2.12-1.10.2.jar > -rw-rw-r-- 1 test test 1893564 Jul 24 2018 kafka-clients-2.0.0.jar > > > > [hidden email] > > 发件人: 111 > 发送时间: 2020-09-28 09:23 > 收件人: [hidden email] > 主题: 回复:sql-cli执行sql报错 > 你貌似使用的是flink-1.11的语法。 > 可以修改成flink-1.10的语法试试,参考文档: > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > > > | | > xinghalo > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > > 在2020年09月28日 09:16,[hidden email]<[hidden email]> 写道: > flink版本1.10.2,问题重现如下,请问各位大佬是什么原因: > > ./sql-client.sh embedded > Flink SQL> show tables ; > [INFO] Result was empty. > > Flink SQL> CREATE TABLE tx ( > account_id BIGINT, > amount BIGINT, > transaction_time TIMESTAMP(3), > WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'heli01', > 'properties.bootstrap.servers' = '10.100.51.56:9092', > 'format' = 'csv' > ); > [INFO] Table has been created. > > Flink SQL> select * from tx ; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find > a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > Reason: Required context properties mismatch. > > The following properties are requested: > connector=kafka > format=csv > properties.bootstrap.servers=10.100.51.56:9092 > schema.0.data-type=BIGINT > schema.0.name=account_id > schema.1.data-type=BIGINT > schema.1.name=amount > schema.2.data-type=TIMESTAMP(3) > schema.2.name=transaction_time > schema.watermark.0.rowtime=transaction_time > schema.watermark.0.strategy.data-type=TIMESTAMP(3) > schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND > topic=heli01 > > The following factories have been considered: > org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > > > > [hidden email] > ... [show rest of quote] -- Best, Benchao Li |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
16 posts
|
In reply to this post by 111
我按照下面的步骤尝试了下,依然报同样的错误:
错误信息:java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer 相关lib包: flink-connector-kafka_2.12-1.10.2.jar kafka-clients-0.11.0.3.jar (之前是kafka-clients-2.0.0.jar) [hidden email] 发件人: 111 发送时间: 2020-09-28 10:41 收件人: [hidden email] 抄送: user-zh 主题: 回复: sql-cli执行sql报错 HI, 大致看了下,建议可以这么排查: 1 flink-connector-kafka* 包太多了,仅保留你需要的版本即可 2 kafka-clients*包版本太高了,你看看你需要的版本是哪个 Best, xingoo |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
339 posts
|
Hi
benchao的回复是的对的, 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > 相关lib包: > flink-connector-kafka_2.12-1.10.2.jar > kafka-clients-0.11.0.3.jar 祝好 Leonard |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
388 posts
|
这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar
和(2)flink-sql-connector-kafka**.jar是啥区别呢? 使用(1)可以不?因为我的kafka-clients部分是调整了源码的。 Leonard Xu <[hidden email]> 于2020年9月28日周一 下午4:36写道: > Hi > benchao的回复是的对的, > 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 > flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > > > > 相关lib包: > > flink-connector-kafka_2.12-1.10.2.jar > > kafka-clients-0.11.0.3.jar > > 祝好 > Leonard |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
388 posts
|
看了下pom,在flink-sql-connector-kafka中依赖了flink-connector-kafka-**,该包又依赖了flink-connector-kafka-base-**以及kafka-client。
然后flink-sql-connector-kafka做了shade。 所以看下来,我的那个(1)和(2)理论上效果是一样的。 ———————————————————————————————————————————————————————— 顺便讲下,我kafka-clients更换了ssl证书读取方式,用于支持hdfs等分布式协议(直接复用了flink-core中的filesystem实现)。 赵一旦 <[hidden email]> 于2020年9月28日周一 下午5:42写道: > 这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar > 和(2)flink-sql-connector-kafka**.jar是啥区别呢? > > 使用(1)可以不?因为我的kafka-clients部分是调整了源码的。 > > Leonard Xu <[hidden email]> 于2020年9月28日周一 下午4:36写道: > >> Hi >> benchao的回复是的对的, >> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 >> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 >> >> >> > 相关lib包: >> > flink-connector-kafka_2.12-1.10.2.jar >> > kafka-clients-0.11.0.3.jar >> >> 祝好 >> Leonard > > ... [show rest of quote]
|
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
183 posts
|
(1) 的方式相当于一个shade之后的包,会把所有compile的依赖都打进去。
(2) 的方式的话,你需要自己手工添加所有这个connector的依赖,比如你提到的kafka-clients。 而且,kafka-clients本身的依赖如果你没有打到kafka-clients这个包里面的话,那你也需要把 那些compile依赖也都放进来。所以相当于手工做了一遍maven的依赖处理,而且要想全部都 放进来,应该会有很多。 如果你对kafka-clients有修改,建议自己重新依赖自己修改后的kafka-clients打包一个kafka-sql-connector-kafka 赵一旦 <[hidden email]> 于2020年9月28日周一 下午5:51写道: > > 看了下pom,在flink-sql-connector-kafka中依赖了flink-connector-kafka-**,该包又依赖了flink-connector-kafka-base-**以及kafka-client。 > 然后flink-sql-connector-kafka做了shade。 > > 所以看下来,我的那个(1)和(2)理论上效果是一样的。 > ———————————————————————————————————————————————————————— > > 顺便讲下,我kafka-clients更换了ssl证书读取方式,用于支持hdfs等分布式协议(直接复用了flink-core中的filesystem实现)。 > > 赵一旦 <[hidden email]> 于2020年9月28日周一 下午5:42写道: > > > > 这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar > > 和(2)flink-sql-connector-kafka**.jar是啥区别呢? > > > > 使用(1)可以不?因为我的kafka-clients部分是调整了源码的。 > > > > Leonard Xu <[hidden email]> 于2020年9月28日周一 下午4:36写道: > > > >> Hi > >> benchao的回复是的对的, > >> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 > >> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > >> > >> > >> > 相关lib包: > >> > flink-connector-kafka_2.12-1.10.2.jar > >> > kafka-clients-0.11.0.3.jar > >> > >> 祝好 > >> Leonard > > > > > ... [show rest of quote] -- Best, Benchao Li |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
16 posts
|
In reply to this post by Leonard Xu
按照您的方法重试了下,又报了另一个错误:
Flink SQL> CREATE TABLE tx ( > account_id BIGINT, > amount BIGINT, > transaction_time TIMESTAMP(3), > WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'heli01', > 'connector.properties.group.id' = 'heli-test', > 'connector.properties.bootstrap.servers' = '10.100.51.56:9092', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); Flink SQL> show tables ; tx Flink SQL> select * from tx ; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer 附:lib包清单 [test@rcx51101 lib]$ pwd /opt/flink-1.10.2/lib flink-csv-1.10.2.jar flink-dist_2.12-1.10.2.jar flink-jdbc_2.12-1.10.2.jar flink-json-1.10.2.jar flink-shaded-hadoop-2-uber-2.6.5-10.0.jar flink-sql-connector-kafka_2.11-1.10.2.jar flink-table_2.12-1.10.2.jar flink-table-blink_2.12-1.10.2.jar log4j-1.2.17.jar mysql-connector-java-5.1.48.jar slf4j-log4j12-1.7.15.jar [hidden email] 发件人: Leonard Xu 发送时间: 2020-09-28 16:36 收件人: user-zh 主题: Re: sql-cli执行sql报错 Hi benchao的回复是的对的, 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > 相关lib包: > flink-connector-kafka_2.12-1.10.2.jar > kafka-clients-0.11.0.3.jar 祝好 Leonard |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
183 posts
|
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了,
但是却报了一个非shaded的ByteArrayDeserializer。 我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么? [hidden email] <[hidden email]> 于2020年9月28日周一 下午6:06写道: > 按照您的方法重试了下,又报了另一个错误: > Flink SQL> CREATE TABLE tx ( > > account_id BIGINT, > > amount BIGINT, > > transaction_time TIMESTAMP(3), > > WATERMARK FOR transaction_time AS transaction_time - > INTERVAL '5' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'heli01', > > 'connector.properties.group.id' = 'heli-test', > > 'connector.properties.bootstrap.servers' = ' > 10.100.51.56:9092', > > 'connector.startup-mode' = 'earliest-offset', > > 'format.type' = 'csv' > > ); > [INFO] Table has been created. > > Flink SQL> show tables ; > tx > > Flink SQL> select * from tx ; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: > org.apache.kafka.common.serialization.ByteArrayDeserializer is not an > instance of > org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer > > 附:lib包清单 > [test@rcx51101 lib]$ pwd > /opt/flink-1.10.2/lib > > flink-csv-1.10.2.jar > flink-dist_2.12-1.10.2.jar > flink-jdbc_2.12-1.10.2.jar > flink-json-1.10.2.jar > flink-shaded-hadoop-2-uber-2.6.5-10.0.jar > flink-sql-connector-kafka_2.11-1.10.2.jar > flink-table_2.12-1.10.2.jar > flink-table-blink_2.12-1.10.2.jar > log4j-1.2.17.jar > mysql-connector-java-5.1.48.jar > slf4j-log4j12-1.7.15.jar > > > > > [hidden email] > > 发件人: Leonard Xu > 发送时间: 2020-09-28 16:36 > 收件人: user-zh > 主题: Re: sql-cli执行sql报错 > Hi > benchao的回复是的对的, > 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 > flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > > > > 相关lib包: > > flink-connector-kafka_2.12-1.10.2.jar > > kafka-clients-0.11.0.3.jar > > 祝好 > Leonard > ... [show rest of quote] -- Best, Benchao Li |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
16 posts
|
没有修改kafka,就用官方的jar。后来我用1.11.2版本重新尝试了下,成功了,没有任何错误。
这个问题就不纠结了 [hidden email] 发件人: Benchao Li 发送时间: 2020-09-29 18:17 收件人: user-zh 主题: Re: Re: sql-cli执行sql报错 这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了, 但是却报了一个非shaded的ByteArrayDeserializer。 我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么? [hidden email] <[hidden email]> 于2020年9月28日周一 下午6:06写道: > 按照您的方法重试了下,又报了另一个错误: > Flink SQL> CREATE TABLE tx ( > > account_id BIGINT, > > amount BIGINT, > > transaction_time TIMESTAMP(3), > > WATERMARK FOR transaction_time AS transaction_time - > INTERVAL '5' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'heli01', > > 'connector.properties.group.id' = 'heli-test', > > 'connector.properties.bootstrap.servers' = ' > 10.100.51.56:9092', > > 'connector.startup-mode' = 'earliest-offset', > > 'format.type' = 'csv' > > ); > [INFO] Table has been created. > > Flink SQL> show tables ; > tx > > Flink SQL> select * from tx ; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: > org.apache.kafka.common.serialization.ByteArrayDeserializer is not an > instance of > org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer > > 附:lib包清单 > [test@rcx51101 lib]$ pwd > /opt/flink-1.10.2/lib > > flink-csv-1.10.2.jar > flink-dist_2.12-1.10.2.jar > flink-jdbc_2.12-1.10.2.jar > flink-json-1.10.2.jar > flink-shaded-hadoop-2-uber-2.6.5-10.0.jar > flink-sql-connector-kafka_2.11-1.10.2.jar > flink-table_2.12-1.10.2.jar > flink-table-blink_2.12-1.10.2.jar > log4j-1.2.17.jar > mysql-connector-java-5.1.48.jar > slf4j-log4j12-1.7.15.jar > > > > > [hidden email] > > 发件人: Leonard Xu > 发送时间: 2020-09-28 16:36 > 收件人: user-zh > 主题: Re: sql-cli执行sql报错 > Hi > benchao的回复是的对的, > 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 > flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > > > > 相关lib包: > > flink-connector-kafka_2.12-1.10.2.jar > > kafka-clients-0.11.0.3.jar > > 祝好 > Leonard > ... [show rest of quote]
-- Best, Benchao Li |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
138 posts
|
In reply to this post by hl9902@126.com
这个问题同样在最新的 master 分支也有这个问题,我建了一个 Issue 描述了下整个流程
https://issues.apache.org/jira/browse/FLINK-19995 [hidden email] <[hidden email]> 于2020年9月28日周一 下午6:06写道: > 按照您的方法重试了下,又报了另一个错误: > Flink SQL> CREATE TABLE tx ( > > account_id BIGINT, > > amount BIGINT, > > transaction_time TIMESTAMP(3), > > WATERMARK FOR transaction_time AS transaction_time - > INTERVAL '5' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'heli01', > > 'connector.properties.group.id' = 'heli-test', > > 'connector.properties.bootstrap.servers' = ' > 10.100.51.56:9092', > > 'connector.startup-mode' = 'earliest-offset', > > 'format.type' = 'csv' > > ); > [INFO] Table has been created. > > Flink SQL> show tables ; > tx > > Flink SQL> select * from tx ; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: > org.apache.kafka.common.serialization.ByteArrayDeserializer is not an > instance of > org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer > > 附:lib包清单 > [test@rcx51101 lib]$ pwd > /opt/flink-1.10.2/lib > > flink-csv-1.10.2.jar > flink-dist_2.12-1.10.2.jar > flink-jdbc_2.12-1.10.2.jar > flink-json-1.10.2.jar > flink-shaded-hadoop-2-uber-2.6.5-10.0.jar > flink-sql-connector-kafka_2.11-1.10.2.jar > flink-table_2.12-1.10.2.jar > flink-table-blink_2.12-1.10.2.jar > log4j-1.2.17.jar > mysql-connector-java-5.1.48.jar > slf4j-log4j12-1.7.15.jar > > > > > [hidden email] > > 发件人: Leonard Xu > 发送时间: 2020-09-28 16:36 > 收件人: user-zh > 主题: Re: sql-cli执行sql报错 > Hi > benchao的回复是的对的, > 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 > flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > > > > 相关lib包: > > flink-connector-kafka_2.12-1.10.2.jar > > kafka-clients-0.11.0.3.jar > > 祝好 > Leonard > ... [show rest of quote]
|
Free forum by Nabble | Edit this page |