各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. 请问是什么原因导致的呢? 代码如下: ----------------------------------------------------------------------------------------------------------------------------- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings); tenv.executeSql("CREATE TABLE test_table (\n" + " id INT,\n" + " name STRING,\n" + " age INT,\n" + " create_at TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test_json',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'latest-offset'\n" + ")"); Table table = tenv.sqlQuery("select * from test_table"); tenv.toRetractStream(table, Row.class).print(); env.execute("flink 1.11.0 demo"); ----------------------------------------------------------------------------------------------------------------------------- pom 文件如下: ============================================= <properties> <scala.binary.version>2.11</scala.binary.version> <flink.version>1.11.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> </dependencies> ============================================= |
那就要看下你是什么 Flink 版本,怎么提交到 YARN 上的,以及 YARN 的日志上的 classpath 是啥了
Best, tison. 王松 <[hidden email]> 于2020年7月13日周一 下午12:54写道: > 各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错: > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the > classpath. > 请问是什么原因导致的呢? > > 代码如下: > > > ----------------------------------------------------------------------------------------------------------------------------- > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env, > settings); > > tenv.executeSql("CREATE TABLE test_table (\n" + > " id INT,\n" + > " name STRING,\n" + > " age INT,\n" + > " create_at TIMESTAMP(3)\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'test_json',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'testGroup',\n" + > " 'format' = 'json',\n" + > " 'scan.startup.mode' = 'latest-offset'\n" + > ")"); > Table table = tenv.sqlQuery("select * from test_table"); > tenv.toRetractStream(table, Row.class).print(); > env.execute("flink 1.11.0 demo"); > > ----------------------------------------------------------------------------------------------------------------------------- > > pom 文件如下: > ============================================= > <properties> > <scala.binary.version>2.11</scala.binary.version> > <flink.version>1.11.0</flink.version> > </properties> > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > </dependency> > </dependencies> > ============================================= > |
In reply to this post by wangsong2
Hi, 王松
这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka datastream connector 同时引用是会冲突的,请根据你的需要使用。 祝好, Leonard Xu [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>${flink.version}</version> > </dependency> > ============================================= |
@Leonard Xu,
非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html ============================= <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--<dependency>--> <!--<groupId>org.apache.flink</groupId>--> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--> <!--<version>${flink.version}</version>--> <!--</dependency>--> <!--<dependency>--> <!--<groupId>org.apache.flink</groupId>--> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>--> <!--<version>${flink.version}</version>--> <!--<!–<scope>compile</scope>–>--> <!--</dependency>--> <!--<dependency>--> <!--<groupId>org.apache.flink</groupId>--> <!--<artifactId>flink-connector-kafka_2.11</artifactId>--> <!--<version>${flink.version}</version>--> <!--</dependency>--> ============================= Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道: > Hi, 王松 > > 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > datastream connector 同时引用是会冲突的,请根据你的需要使用。 > > > 祝好, > Leonard Xu > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > < > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > > > <dependency> > > <groupId>org.apache.flink</groupId> > > > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > > <version>${flink.version}</version> > > </dependency> > > <dependency> > > <groupId>org.apache.flink</groupId> > > > > > <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> > > <version>${flink.version}</version> > > </dependency> > > <dependency> > > <groupId>org.apache.flink</groupId> > > > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > > <version>${flink.version}</version> > > </dependency> > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-core</artifactId> > > <version>${flink.version}</version> > > </dependency> > > ============================================= > > |
Hi,
flink-connector-kafka_${scala.binary.version 和 flink-sql-connector-kafka_${scala.binary.version 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, 后者的话主要是对前者做了shade处理,方便用户在 SQL Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 [1] 中的话是有SQL Client JAR 的下载链接,就是 flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 祝好 Leonard Xu > 在 2020年7月13日,14:42,王松 <[hidden email]> 写道: > > @Leonard Xu, > 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > ============================= > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!--<dependency>--> > <!--<groupId>org.apache.flink</groupId>--> > > <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--> > <!--<version>${flink.version}</version>--> > <!--</dependency>--> > <!--<dependency>--> > <!--<groupId>org.apache.flink</groupId>--> > <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>--> > <!--<version>${flink.version}</version>--> > <!--<!–<scope>compile</scope>–>--> > <!--</dependency>--> > > <!--<dependency>--> > <!--<groupId>org.apache.flink</groupId>--> > <!--<artifactId>flink-connector-kafka_2.11</artifactId>--> > <!--<version>${flink.version}</version>--> > <!--</dependency>--> > ============================= > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道: > >> Hi, 王松 >> >> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector >> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} >> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka >> datastream connector 同时引用是会冲突的,请根据你的需要使用。 >> >> >> 祝好, >> Leonard Xu >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html >> < >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> >>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> >>> >> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> >>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-core</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> ============================================= >> >> |
您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
我机器上flink/lib下jar包如下: -rw-rw-r-- 1 hadoop hadoop 117719 6月 30 12:41 flink-avro-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 flink-csv-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 flink-dist_2.11-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 flink-json-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 flink-shaded-zookeeper-3.4.14.jar -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 flink-table_2.11-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 flink-table-blink_2.11-1.11.0.jar -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 log4j-1.2-api-2.12.1.jar -rw-r--r-- 1 hadoop hadoop 276771 7月 8 10:09 log4j-api-2.12.1.jar -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 log4j-core-2.12.1.jar -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 log4j-slf4j-impl-2.12.1.jar Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道: > Hi, > flink-connector-kafka_${scala.binary.version 和 > flink-sql-connector-kafka_${scala.binary.version > 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > 后者的话主要是对前者做了shade处理,方便用户在 SQL > Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > > [1] 中的话是有SQL Client JAR 的下载链接,就是 > flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > > 祝好 > Leonard Xu > > > 在 2020年7月13日,14:42,王松 <[hidden email]> 写道: > > > > @Leonard Xu, > > 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > > 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > ============================= > > <dependency> > > <groupId>org.apache.flink</groupId> > > > > > <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> > > <version>${flink.version}</version> > > </dependency> > > > > <!--<dependency>--> > > <!--<groupId>org.apache.flink</groupId>--> > > > > > <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--> > > <!--<version>${flink.version}</version>--> > > <!--</dependency>--> > > <!--<dependency>--> > > <!--<groupId>org.apache.flink</groupId>--> > > > <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>--> > > <!--<version>${flink.version}</version>--> > > <!--<!–<scope>compile</scope>–>--> > > <!--</dependency>--> > > > > <!--<dependency>--> > > <!--<groupId>org.apache.flink</groupId>--> > > <!--<artifactId>flink-connector-kafka_2.11</artifactId>--> > > <!--<version>${flink.version}</version>--> > > <!--</dependency>--> > > ============================= > > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道: > > > >> Hi, 王松 > >> > >> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector > >> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > >> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > >> datastream connector 同时引用是会冲突的,请根据你的需要使用。 > >> > >> > >> 祝好, > >> Leonard Xu > >> [1] > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >> < > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>> > >>> <dependency> > >>> <groupId>org.apache.flink</groupId> > >>> > >>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > >>> <version>${flink.version}</version> > >>> </dependency> > >>> <dependency> > >>> <groupId>org.apache.flink</groupId> > >>> > >>> > >> > <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> > >>> <version>${flink.version}</version> > >>> </dependency> > >>> <dependency> > >>> <groupId>org.apache.flink</groupId> > >>> > >>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > >>> <version>${flink.version}</version> > >>> </dependency> > >>> <dependency> > >>> <groupId>org.apache.flink</groupId> > >>> <artifactId>flink-core</artifactId> > >>> <version>${flink.version}</version> > >>> </dependency> > >>> ============================================= > >> > >> > > |
Hi
你可以试下把 flink-connector-kafka_2.11-1.11.0.jar 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。 祝好 > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道: > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 > > 我机器上flink/lib下jar包如下: > -rw-rw-r-- 1 hadoop hadoop 117719 6月 30 12:41 flink-avro-1.11.0.jar > -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 flink-csv-1.11.0.jar > -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 flink-dist_2.11-1.11.0.jar > -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 flink-json-1.11.0.jar > -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 > flink-shaded-zookeeper-3.4.14.jar > -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 > flink-table_2.11-1.11.0.jar > -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 > flink-table-blink_2.11-1.11.0.jar > -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 log4j-1.2-api-2.12.1.jar > -rw-r--r-- 1 hadoop hadoop 276771 7月 8 10:09 log4j-api-2.12.1.jar > -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 log4j-core-2.12.1.jar > -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 > log4j-slf4j-impl-2.12.1.jar > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道: > >> Hi, >> flink-connector-kafka_${scala.binary.version 和 >> flink-sql-connector-kafka_${scala.binary.version >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, >> 后者的话主要是对前者做了shade处理,方便用户在 SQL >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 >> >> [1] 中的话是有SQL Client JAR 的下载链接,就是 >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 >> >> 祝好 >> Leonard Xu >> >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道: >>> >>> @Leonard Xu, >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: >>> >>> [1] >>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html >>> ============================= >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> >>> >> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> >>> <!--<dependency>--> >>> <!--<groupId>org.apache.flink</groupId>--> >>> >>> >> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--> >>> <!--<version>${flink.version}</version>--> >>> <!--</dependency>--> >>> <!--<dependency>--> >>> <!--<groupId>org.apache.flink</groupId>--> >>> >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>--> >>> <!--<version>${flink.version}</version>--> >>> <!--<!–<scope>compile</scope>–>--> >>> <!--</dependency>--> >>> >>> <!--<dependency>--> >>> <!--<groupId>org.apache.flink</groupId>--> >>> <!--<artifactId>flink-connector-kafka_2.11</artifactId>--> >>> <!--<version>${flink.version}</version>--> >>> <!--</dependency>--> >>> ============================= >>> >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道: >>> >>>> Hi, 王松 >>>> >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector >>>> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka >>>> datastream connector 同时引用是会冲突的,请根据你的需要使用。 >>>> >>>> >>>> 祝好, >>>> Leonard Xu >>>> [1] >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html >>>> < >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> >>>>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> >>>>> >>>> >> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> >>>>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-core</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> ============================================= >>>> >>>> >> >> |
你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。 当然,直接粗暴的放到lib下,也是可以的。 Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道: > Hi > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。 > > 祝好 > > > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道: > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 > > > > 我机器上flink/lib下jar包如下: > > -rw-rw-r-- 1 hadoop hadoop 117719 6月 30 12:41 flink-avro-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 flink-csv-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 > flink-dist_2.11-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 flink-json-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 > > flink-shaded-zookeeper-3.4.14.jar > > -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 > > flink-table_2.11-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 > > flink-table-blink_2.11-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 > log4j-1.2-api-2.12.1.jar > > -rw-r--r-- 1 hadoop hadoop 276771 7月 8 10:09 log4j-api-2.12.1.jar > > -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 log4j-core-2.12.1.jar > > -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 > > log4j-slf4j-impl-2.12.1.jar > > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道: > > > >> Hi, > >> flink-connector-kafka_${scala.binary.version 和 > >> flink-sql-connector-kafka_${scala.binary.version > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > >> > >> [1] 中的话是有SQL Client JAR 的下载链接,就是 > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > >> > >> 祝好 > >> Leonard Xu > >> > >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道: > >>> > >>> @Leonard Xu, > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > >>> > >>> [1] > >>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>> ============================= > >>> <dependency> > >>> <groupId>org.apache.flink</groupId> > >>> > >>> > >> > <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> > >>> <version>${flink.version}</version> > >>> </dependency> > >>> > >>> <!--<dependency>--> > >>> <!--<groupId>org.apache.flink</groupId>--> > >>> > >>> > >> > <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--> > >>> <!--<version>${flink.version}</version>--> > >>> <!--</dependency>--> > >>> <!--<dependency>--> > >>> <!--<groupId>org.apache.flink</groupId>--> > >>> > >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>--> > >>> <!--<version>${flink.version}</version>--> > >>> <!--<!–<scope>compile</scope>–>--> > >>> <!--</dependency>--> > >>> > >>> <!--<dependency>--> > >>> <!--<groupId>org.apache.flink</groupId>--> > >>> <!--<artifactId>flink-connector-kafka_2.11</artifactId>--> > >>> <!--<version>${flink.version}</version>--> > >>> <!--</dependency>--> > >>> ============================= > >>> > >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道: > >>> > >>>> Hi, 王松 > >>>> > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream > connector > >>>> > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > >>>> datastream connector 同时引用是会冲突的,请根据你的需要使用。 > >>>> > >>>> > >>>> 祝好, > >>>> Leonard Xu > >>>> [1] > >>>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>>> < > >>>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>>>> > >>>>> <dependency> > >>>>> <groupId>org.apache.flink</groupId> > >>>>> > >>>>> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > >>>>> <version>${flink.version}</version> > >>>>> </dependency> > >>>>> <dependency> > >>>>> <groupId>org.apache.flink</groupId> > >>>>> > >>>>> > >>>> > >> > <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> > >>>>> <version>${flink.version}</version> > >>>>> </dependency> > >>>>> <dependency> > >>>>> <groupId>org.apache.flink</groupId> > >>>>> > >>>>> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > >>>>> <version>${flink.version}</version> > >>>>> </dependency> > >>>>> <dependency> > >>>>> <groupId>org.apache.flink</groupId> > >>>>> <artifactId>flink-core</artifactId> > >>>>> <version>${flink.version}</version> > >>>>> </dependency> > >>>>> ============================================= > >>>> > >>>> > >> > >> > > -- Best, Benchao Li |
In reply to this post by Leonard Xu
这样还是不行,我尝试flink-connector-kafka-0.11_2.11-1.11.0.jar放到lib下,报了另外一个问题:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase 另外,我是用 bin/flink run -yid xxx xxx.jar 的方式提交任务的,报错是直接在终端报错,没有提交到flink jobmanager上。 Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道: > Hi > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。 > > 祝好 > > > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道: > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 > > > > 我机器上flink/lib下jar包如下: > > -rw-rw-r-- 1 hadoop hadoop 117719 6月 30 12:41 flink-avro-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 flink-csv-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 > flink-dist_2.11-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 flink-json-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 > > flink-shaded-zookeeper-3.4.14.jar > > -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 > > flink-table_2.11-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 > > flink-table-blink_2.11-1.11.0.jar > > -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 > log4j-1.2-api-2.12.1.jar > > -rw-r--r-- 1 hadoop hadoop 276771 7月 8 10:09 log4j-api-2.12.1.jar > > -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 log4j-core-2.12.1.jar > > -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 > > log4j-slf4j-impl-2.12.1.jar > > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道: > > > >> Hi, > >> flink-connector-kafka_${scala.binary.version 和 > >> flink-sql-connector-kafka_${scala.binary.version > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > >> > >> [1] 中的话是有SQL Client JAR 的下载链接,就是 > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > >> > >> 祝好 > >> Leonard Xu > >> > >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道: > >>> > >>> @Leonard Xu, > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > >>> > >>> [1] > >>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>> ============================= > >>> <dependency> > >>> <groupId>org.apache.flink</groupId> > >>> > >>> > >> > <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> > >>> <version>${flink.version}</version> > >>> </dependency> > >>> > >>> <!--<dependency>--> > >>> <!--<groupId>org.apache.flink</groupId>--> > >>> > >>> > >> > <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--> > >>> <!--<version>${flink.version}</version>--> > >>> <!--</dependency>--> > >>> <!--<dependency>--> > >>> <!--<groupId>org.apache.flink</groupId>--> > >>> > >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>--> > >>> <!--<version>${flink.version}</version>--> > >>> <!--<!–<scope>compile</scope>–>--> > >>> <!--</dependency>--> > >>> > >>> <!--<dependency>--> > >>> <!--<groupId>org.apache.flink</groupId>--> > >>> <!--<artifactId>flink-connector-kafka_2.11</artifactId>--> > >>> <!--<version>${flink.version}</version>--> > >>> <!--</dependency>--> > >>> ============================= > >>> > >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道: > >>> > >>>> Hi, 王松 > >>>> > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream > connector > >>>> > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > >>>> datastream connector 同时引用是会冲突的,请根据你的需要使用。 > >>>> > >>>> > >>>> 祝好, > >>>> Leonard Xu > >>>> [1] > >>>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>>> < > >>>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > >>>>> > >>>>> <dependency> > >>>>> <groupId>org.apache.flink</groupId> > >>>>> > >>>>> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > >>>>> <version>${flink.version}</version> > >>>>> </dependency> > >>>>> <dependency> > >>>>> <groupId>org.apache.flink</groupId> > >>>>> > >>>>> > >>>> > >> > <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> > >>>>> <version>${flink.version}</version> > >>>>> </dependency> > >>>>> <dependency> > >>>>> <groupId>org.apache.flink</groupId> > >>>>> > >>>>> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > >>>>> <version>${flink.version}</version> > >>>>> </dependency> > >>>>> <dependency> > >>>>> <groupId>org.apache.flink</groupId> > >>>>> <artifactId>flink-core</artifactId> > >>>>> <version>${flink.version}</version> > >>>>> </dependency> > >>>>> ============================================= > >>>> > >>>> > >> > >> > > |
In reply to this post by Benchao Li-2
你好本超,
是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的 Benchao Li <[hidden email]> 于2020年7月13日周一 下午3:42写道: > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去; > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。 > 当然,直接粗暴的放到lib下,也是可以的。 > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道: > > > Hi > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。 > > > > 祝好 > > > > > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道: > > > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 > > > > > > 我机器上flink/lib下jar包如下: > > > -rw-rw-r-- 1 hadoop hadoop 117719 6月 30 12:41 flink-avro-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 flink-csv-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 > > flink-dist_2.11-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 flink-json-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 > > > flink-shaded-zookeeper-3.4.14.jar > > > -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 > > > flink-table_2.11-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 > > > flink-table-blink_2.11-1.11.0.jar > > > -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 > > log4j-1.2-api-2.12.1.jar > > > -rw-r--r-- 1 hadoop hadoop 276771 7月 8 10:09 log4j-api-2.12.1.jar > > > -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 log4j-core-2.12.1.jar > > > -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 > > > log4j-slf4j-impl-2.12.1.jar > > > > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道: > > > > > >> Hi, > > >> flink-connector-kafka_${scala.binary.version 和 > > >> flink-sql-connector-kafka_${scala.binary.version > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > > >> > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是 > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > > >> > > >> 祝好 > > >> Leonard Xu > > >> > > >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道: > > >>> > > >>> @Leonard Xu, > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > > >>> > > >>> [1] > > >>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > >>> ============================= > > >>> <dependency> > > >>> <groupId>org.apache.flink</groupId> > > >>> > > >>> > > >> > > > <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> > > >>> <version>${flink.version}</version> > > >>> </dependency> > > >>> > > >>> <!--<dependency>--> > > >>> <!--<groupId>org.apache.flink</groupId>--> > > >>> > > >>> > > >> > > > <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--> > > >>> <!--<version>${flink.version}</version>--> > > >>> <!--</dependency>--> > > >>> <!--<dependency>--> > > >>> <!--<groupId>org.apache.flink</groupId>--> > > >>> > > >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>--> > > >>> <!--<version>${flink.version}</version>--> > > >>> <!--<!–<scope>compile</scope>–>--> > > >>> <!--</dependency>--> > > >>> > > >>> <!--<dependency>--> > > >>> <!--<groupId>org.apache.flink</groupId>--> > > >>> <!--<artifactId>flink-connector-kafka_2.11</artifactId>--> > > >>> <!--<version>${flink.version}</version>--> > > >>> <!--</dependency>--> > > >>> ============================= > > >>> > > >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道: > > >>> > > >>>> Hi, 王松 > > >>>> > > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream > > connector > > >>>> > > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > > >>>> datastream connector 同时引用是会冲突的,请根据你的需要使用。 > > >>>> > > >>>> > > >>>> 祝好, > > >>>> Leonard Xu > > >>>> [1] > > >>>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > >>>> < > > >>>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > >>>>> > > >>>>> <dependency> > > >>>>> <groupId>org.apache.flink</groupId> > > >>>>> > > >>>>> > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > > >>>>> <version>${flink.version}</version> > > >>>>> </dependency> > > >>>>> <dependency> > > >>>>> <groupId>org.apache.flink</groupId> > > >>>>> > > >>>>> > > >>>> > > >> > > > <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> > > >>>>> <version>${flink.version}</version> > > >>>>> </dependency> > > >>>>> <dependency> > > >>>>> <groupId>org.apache.flink</groupId> > > >>>>> > > >>>>> > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > > >>>>> <version>${flink.version}</version> > > >>>>> </dependency> > > >>>>> <dependency> > > >>>>> <groupId>org.apache.flink</groupId> > > >>>>> <artifactId>flink-core</artifactId> > > >>>>> <version>${flink.version}</version> > > >>>>> </dependency> > > >>>>> ============================================= > > >>>> > > >>>> > > >> > > >> > > > > > > -- > > Best, > Benchao Li > |
Hi,
1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1] 2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java spi) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html Best, Jingsong On Mon, Jul 13, 2020 at 4:04 PM 王松 <[hidden email]> wrote: > 你好本超, > 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的 > > Benchao Li <[hidden email]> 于2020年7月13日周一 下午3:42写道: > > > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去; > > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。 > > 当然,直接粗暴的放到lib下,也是可以的。 > > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道: > > > > > Hi > > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar > > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。 > > > > > > 祝好 > > > > > > > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道: > > > > > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 > > > > > > > > 我机器上flink/lib下jar包如下: > > > > -rw-rw-r-- 1 hadoop hadoop 117719 6月 30 12:41 > flink-avro-1.11.0.jar > > > > -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 > flink-csv-1.11.0.jar > > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 > > > flink-dist_2.11-1.11.0.jar > > > > -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 > flink-json-1.11.0.jar > > > > -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 > > > > flink-shaded-zookeeper-3.4.14.jar > > > > -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 > > > > flink-table_2.11-1.11.0.jar > > > > -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 > > > > flink-table-blink_2.11-1.11.0.jar > > > > -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 > > > log4j-1.2-api-2.12.1.jar > > > > -rw-r--r-- 1 hadoop hadoop 276771 7月 8 10:09 > log4j-api-2.12.1.jar > > > > -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 > log4j-core-2.12.1.jar > > > > -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 > > > > log4j-slf4j-impl-2.12.1.jar > > > > > > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道: > > > > > > > >> Hi, > > > >> flink-connector-kafka_${scala.binary.version 和 > > > >> flink-sql-connector-kafka_${scala.binary.version > > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL > > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > > > >> > > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是 > > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > > > >> > > > >> 祝好 > > > >> Leonard Xu > > > >> > > > >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道: > > > >>> > > > >>> @Leonard Xu, > > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > > > >>> > > > >>> [1] > > > >>> > > > >> > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > > >>> ============================= > > > >>> <dependency> > > > >>> <groupId>org.apache.flink</groupId> > > > >>> > > > >>> > > > >> > > > > > > <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> > > > >>> <version>${flink.version}</version> > > > >>> </dependency> > > > >>> > > > >>> <!--<dependency>--> > > > >>> <!--<groupId>org.apache.flink</groupId>--> > > > >>> > > > >>> > > > >> > > > > > > <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--> > > > >>> <!--<version>${flink.version}</version>--> > > > >>> <!--</dependency>--> > > > >>> <!--<dependency>--> > > > >>> <!--<groupId>org.apache.flink</groupId>--> > > > >>> > > > >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>--> > > > >>> <!--<version>${flink.version}</version>--> > > > >>> <!--<!–<scope>compile</scope>–>--> > > > >>> <!--</dependency>--> > > > >>> > > > >>> <!--<dependency>--> > > > >>> <!--<groupId>org.apache.flink</groupId>--> > > > >>> > <!--<artifactId>flink-connector-kafka_2.11</artifactId>--> > > > >>> <!--<version>${flink.version}</version>--> > > > >>> <!--</dependency>--> > > > >>> ============================= > > > >>> > > > >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道: > > > >>> > > > >>>> Hi, 王松 > > > >>>> > > > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream > > > connector > > > >>>> > > > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > > > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka > > > >>>> datastream connector 同时引用是会冲突的,请根据你的需要使用。 > > > >>>> > > > >>>> > > > >>>> 祝好, > > > >>>> Leonard Xu > > > >>>> [1] > > > >>>> > > > >> > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > > >>>> < > > > >>>> > > > >> > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > > >>>>> > > > >>>>> <dependency> > > > >>>>> <groupId>org.apache.flink</groupId> > > > >>>>> > > > >>>>> > > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > > > >>>>> <version>${flink.version}</version> > > > >>>>> </dependency> > > > >>>>> <dependency> > > > >>>>> <groupId>org.apache.flink</groupId> > > > >>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> > > > >>>>> <version>${flink.version}</version> > > > >>>>> </dependency> > > > >>>>> <dependency> > > > >>>>> <groupId>org.apache.flink</groupId> > > > >>>>> > > > >>>>> > > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > > > >>>>> <version>${flink.version}</version> > > > >>>>> </dependency> > > > >>>>> <dependency> > > > >>>>> <groupId>org.apache.flink</groupId> > > > >>>>> <artifactId>flink-core</artifactId> > > > >>>>> <version>${flink.version}</version> > > > >>>>> </dependency> > > > >>>>> ============================================= > > > >>>> > > > >>>> > > > >> > > > >> > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Jingsong Lee |
感谢大家的热情解答,最后问题解决了。原因正是 Leonard Xu所说的,我应该引入的是
flink-sql-connector-kafka-${version}_${scala.binary.version},然后当时改成 flink-sql-connector-kafka 后继续报错的原因是:我还在pom文件中引入了flink-table-planner-blink,如下: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> 添加<scope>provided</scope>后就没有问题了。 最后附上正确的pom文件 (如 Jingsong 所说,也可以把flink-sql-connector-kafka、flink-json这些都在pom文件中去掉,直接将jar报放入lib中): ============================================================ <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies> ============================================================ Jingsong Li <[hidden email]> 于2020年7月13日周一 下午4:35写道: > Hi, > > 1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1] > > 2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java > spi) > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > Best, > Jingsong > > On Mon, Jul 13, 2020 at 4:04 PM 王松 <[hidden email]> wrote: > > > 你好本超, > > 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的 > > > > Benchao Li <[hidden email]> 于2020年7月13日周一 下午3:42写道: > > > > > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去; > > > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。 > > > 当然,直接粗暴的放到lib下,也是可以的。 > > > > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道: > > > > > > > Hi > > > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar > > > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。 > > > > > > > > 祝好 > > > > > > > > > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道: > > > > > > > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。 > > > > > > > > > > 我机器上flink/lib下jar包如下: > > > > > -rw-rw-r-- 1 hadoop hadoop 117719 6月 30 12:41 > > flink-avro-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 > > flink-csv-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09 > > > > flink-dist_2.11-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 94863 7月 8 10:09 > > flink-json-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 7712156 7月 8 10:09 > > > > > flink-shaded-zookeeper-3.4.14.jar > > > > > -rw-r--r-- 1 hadoop hadoop 33325754 7月 8 10:09 > > > > > flink-table_2.11-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 37330521 7月 8 10:09 > > > > > flink-table-blink_2.11-1.11.0.jar > > > > > -rw-r--r-- 1 hadoop hadoop 67114 7月 8 10:09 > > > > log4j-1.2-api-2.12.1.jar > > > > > -rw-r--r-- 1 hadoop hadoop 276771 7月 8 10:09 > > log4j-api-2.12.1.jar > > > > > -rw-r--r-- 1 hadoop hadoop 1674433 7月 8 10:09 > > log4j-core-2.12.1.jar > > > > > -rw-r--r-- 1 hadoop hadoop 23518 7月 8 10:09 > > > > > log4j-slf4j-impl-2.12.1.jar > > > > > > > > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道: > > > > > > > > > >> Hi, > > > > >> flink-connector-kafka_${scala.binary.version 和 > > > > >> flink-sql-connector-kafka_${scala.binary.version > > > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用, > > > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL > > > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的, > > > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。 > > > > >> > > > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是 > > > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 > > > > >> > > > > >> 祝好 > > > > >> Leonard Xu > > > > >> > > > > >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道: > > > > >>> > > > > >>> @Leonard Xu, > > > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] > > > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: > > > > >>> > > > > >>> [1] > > > > >>> > > > > >> > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > > > >>> ============================= > > > > >>> <dependency> > > > > >>> <groupId>org.apache.flink</groupId> > > > > >>> > > > > >>> > > > > >> > > > > > > > > > > <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> > > > > >>> <version>${flink.version}</version> > > > > >>> </dependency> > > > > >>> > > > > >>> <!--<dependency>--> > > > > >>> <!--<groupId>org.apache.flink</groupId>--> > > > > >>> > > > > >>> > > > > >> > > > > > > > > > > <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--> > > > > >>> <!--<version>${flink.version}</version>--> > > > > >>> <!--</dependency>--> > > > > >>> <!--<dependency>--> > > > > >>> <!--<groupId>org.apache.flink</groupId>--> > > > > >>> > > > > >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>--> > > > > >>> <!--<version>${flink.version}</version>--> > > > > >>> <!--<!–<scope>compile</scope>–>--> > > > > >>> <!--</dependency>--> > > > > >>> > > > > >>> <!--<dependency>--> > > > > >>> <!--<groupId>org.apache.flink</groupId>--> > > > > >>> > > <!--<artifactId>flink-connector-kafka_2.11</artifactId>--> > > > > >>> <!--<version>${flink.version}</version>--> > > > > >>> <!--</dependency>--> > > > > >>> ============================= > > > > >>> > > > > >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道: > > > > >>> > > > > >>>> Hi, 王松 > > > > >>>> > > > > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream > > > > connector > > > > >>>> > > > > > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} > > > > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 > Kafka > > > > >>>> datastream connector 同时引用是会冲突的,请根据你的需要使用。 > > > > >>>> > > > > >>>> > > > > >>>> 祝好, > > > > >>>> Leonard Xu > > > > >>>> [1] > > > > >>>> > > > > >> > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > > > >>>> < > > > > >>>> > > > > >> > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > > > >>>>> > > > > >>>>> <dependency> > > > > >>>>> <groupId>org.apache.flink</groupId> > > > > >>>>> > > > > >>>>> > > > > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > > > > >>>>> <version>${flink.version}</version> > > > > >>>>> </dependency> > > > > >>>>> <dependency> > > > > >>>>> <groupId>org.apache.flink</groupId> > > > > >>>>> > > > > >>>>> > > > > >>>> > > > > >> > > > > > > > > > > <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> > > > > >>>>> <version>${flink.version}</version> > > > > >>>>> </dependency> > > > > >>>>> <dependency> > > > > >>>>> <groupId>org.apache.flink</groupId> > > > > >>>>> > > > > >>>>> > > > > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > > > > >>>>> <version>${flink.version}</version> > > > > >>>>> </dependency> > > > > >>>>> <dependency> > > > > >>>>> <groupId>org.apache.flink</groupId> > > > > >>>>> <artifactId>flink-core</artifactId> > > > > >>>>> <version>${flink.version}</version> > > > > >>>>> </dependency> > > > > >>>>> ============================================= > > > > >>>> > > > > >>>> > > > > >> > > > > >> > > > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > -- > Best, Jingsong Lee > |
Free forum by Nabble | Edit this page |