flink sql报错 Could not find any factory for identifier 'kafka'

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql报错 Could not find any factory for identifier 'kafka'

wangsong2
各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.
请问是什么原因导致的呢?

代码如下:

-----------------------------------------------------------------------------------------------------------------------------
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env,
settings);

        tenv.executeSql("CREATE TABLE test_table (\n" +
                " id INT,\n" +
                " name STRING,\n" +
                " age INT,\n" +
                " create_at TIMESTAMP(3)\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = 'test_json',\n" +
                " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
                " 'properties.group.id' = 'testGroup',\n" +
                " 'format' = 'json',\n" +
                " 'scan.startup.mode' = 'latest-offset'\n" +
                ")");
        Table table = tenv.sqlQuery("select * from test_table");
        tenv.toRetractStream(table, Row.class).print();
        env.execute("flink 1.11.0 demo");
-----------------------------------------------------------------------------------------------------------------------------

pom 文件如下:
=============================================
<properties>
        <scala.binary.version>2.11</scala.binary.version>
        <flink.version>1.11.0</flink.version>
</properties>
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
=============================================
Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

tison
那就要看下你是什么 Flink 版本,怎么提交到 YARN 上的,以及 YARN 的日志上的 classpath 是啥了

Best,
tison.


王松 <[hidden email]> 于2020年7月13日周一 下午12:54写道:

> 各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> 请问是什么原因导致的呢?
>
> 代码如下:
>
>
> -----------------------------------------------------------------------------------------------------------------------------
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings settings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment tenv = StreamTableEnvironment.create(env,
> settings);
>
>         tenv.executeSql("CREATE TABLE test_table (\n" +
>                 " id INT,\n" +
>                 " name STRING,\n" +
>                 " age INT,\n" +
>                 " create_at TIMESTAMP(3)\n" +
>                 ") WITH (\n" +
>                 " 'connector' = 'kafka',\n" +
>                 " 'topic' = 'test_json',\n" +
>                 " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
>                 " 'properties.group.id' = 'testGroup',\n" +
>                 " 'format' = 'json',\n" +
>                 " 'scan.startup.mode' = 'latest-offset'\n" +
>                 ")");
>         Table table = tenv.sqlQuery("select * from test_table");
>         tenv.toRetractStream(table, Row.class).print();
>         env.execute("flink 1.11.0 demo");
>
> -----------------------------------------------------------------------------------------------------------------------------
>
> pom 文件如下:
> =============================================
> <properties>
>         <scala.binary.version>2.11</scala.binary.version>
>         <flink.version>1.11.0</flink.version>
> </properties>
> <dependencies>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-json</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-core</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-clients_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-common</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>     </dependencies>
> =============================================
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

Leonard Xu
In reply to this post by wangsong2
Hi, 王松

这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka datastream  connector 同时引用是会冲突的,请根据你的需要使用。


祝好,
Leonard Xu
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html>

>        <dependency>
>            <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-core</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
> =============================================

Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

wangsong2
@Leonard Xu,
非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
=============================
          <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

         <!--<dependency>-->
            <!--<groupId>org.apache.flink</groupId>-->

<!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
            <!--<version>${flink.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.apache.flink</groupId>-->
            <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
            <!--<version>${flink.version}</version>-->
            <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
        <!--</dependency>-->

        <!--<dependency>-->
            <!--<groupId>org.apache.flink</groupId>-->
            <!--<artifactId>flink-connector-kafka_2.11</artifactId>-->
            <!--<version>${flink.version}</version>-->
        <!--</dependency>-->
=============================

Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道:

> Hi, 王松
>
> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
>
>
> 祝好,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >
> >        <dependency>
> >            <groupId>org.apache.flink</groupId>
> >
> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> >            <version>${flink.version}</version>
> >        </dependency>
> >        <dependency>
> >            <groupId>org.apache.flink</groupId>
> >
> >
> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
> >            <version>${flink.version}</version>
> >        </dependency>
> >        <dependency>
> >            <groupId>org.apache.flink</groupId>
> >
> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> >            <version>${flink.version}</version>
> >        </dependency>
> >        <dependency>
> >            <groupId>org.apache.flink</groupId>
> >            <artifactId>flink-core</artifactId>
> >            <version>${flink.version}</version>
> >        </dependency>
> > =============================================
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

Leonard Xu
Hi,
flink-connector-kafka_${scala.binary.version 和 flink-sql-connector-kafka_${scala.binary.version 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
后者的话主要是对前者做了shade处理,方便用户在 SQL Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。

[1] 中的话是有SQL Client JAR 的下载链接,就是 flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。

祝好
Leonard Xu

> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道:
>
> @Leonard Xu,
> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> =============================
>          <dependency>
>            <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>
>         <!--<dependency>-->
>            <!--<groupId>org.apache.flink</groupId>-->
>
> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
>            <!--<version>${flink.version}</version>-->
>        <!--</dependency>-->
>        <!--<dependency>-->
>            <!--<groupId>org.apache.flink</groupId>-->
>            <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
>            <!--<version>${flink.version}</version>-->
>            <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
>        <!--</dependency>-->
>
>        <!--<dependency>-->
>            <!--<groupId>org.apache.flink</groupId>-->
>            <!--<artifactId>flink-connector-kafka_2.11</artifactId>-->
>            <!--<version>${flink.version}</version>-->
>        <!--</dependency>-->
> =============================
>
> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道:
>
>> Hi, 王松
>>
>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
>> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
>>
>>
>> 祝好,
>> Leonard Xu
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>> <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>>
>>>       <dependency>
>>>           <groupId>org.apache.flink</groupId>
>>>
>>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>>>           <version>${flink.version}</version>
>>>       </dependency>
>>>       <dependency>
>>>           <groupId>org.apache.flink</groupId>
>>>
>>>
>> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
>>>           <version>${flink.version}</version>
>>>       </dependency>
>>>       <dependency>
>>>           <groupId>org.apache.flink</groupId>
>>>
>>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>>>           <version>${flink.version}</version>
>>>       </dependency>
>>>       <dependency>
>>>           <groupId>org.apache.flink</groupId>
>>>           <artifactId>flink-core</artifactId>
>>>           <version>${flink.version}</version>
>>>       </dependency>
>>> =============================================
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

wangsong2
您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。

我机器上flink/lib下jar包如下:
-rw-rw-r-- 1 hadoop hadoop    117719 6月  30 12:41 flink-avro-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop     90782 7月   8 10:09 flink-csv-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09 flink-dist_2.11-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop     94863 7月   8 10:09 flink-json-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
flink-table_2.11-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
flink-table-blink_2.11-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop     67114 7月   8 10:09 log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop    276771 7月   8 10:09 log4j-api-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop     23518 7月   8 10:09
log4j-slf4j-impl-2.12.1.jar

Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道:

> Hi,
> flink-connector-kafka_${scala.binary.version 和
> flink-sql-connector-kafka_${scala.binary.version
> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
>
> [1] 中的话是有SQL Client JAR 的下载链接,就是
> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
>
> 祝好
> Leonard Xu
>
> > 在 2020年7月13日,14:42,王松 <[hidden email]> 写道:
> >
> > @Leonard Xu,
> > 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > =============================
> >          <dependency>
> >            <groupId>org.apache.flink</groupId>
> >
> >
> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
> >            <version>${flink.version}</version>
> >        </dependency>
> >
> >         <!--<dependency>-->
> >            <!--<groupId>org.apache.flink</groupId>-->
> >
> >
> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
> >            <!--<version>${flink.version}</version>-->
> >        <!--</dependency>-->
> >        <!--<dependency>-->
> >            <!--<groupId>org.apache.flink</groupId>-->
> >
> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
> >            <!--<version>${flink.version}</version>-->
> >            <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
> >        <!--</dependency>-->
> >
> >        <!--<dependency>-->
> >            <!--<groupId>org.apache.flink</groupId>-->
> >            <!--<artifactId>flink-connector-kafka_2.11</artifactId>-->
> >            <!--<version>${flink.version}</version>-->
> >        <!--</dependency>-->
> > =============================
> >
> > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道:
> >
> >> Hi, 王松
> >>
> >> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
> >> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> >> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> >> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> >>
> >>
> >> 祝好,
> >> Leonard Xu
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >> <
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>>
> >>>       <dependency>
> >>>           <groupId>org.apache.flink</groupId>
> >>>
> >>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> >>>           <version>${flink.version}</version>
> >>>       </dependency>
> >>>       <dependency>
> >>>           <groupId>org.apache.flink</groupId>
> >>>
> >>>
> >>
> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
> >>>           <version>${flink.version}</version>
> >>>       </dependency>
> >>>       <dependency>
> >>>           <groupId>org.apache.flink</groupId>
> >>>
> >>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> >>>           <version>${flink.version}</version>
> >>>       </dependency>
> >>>       <dependency>
> >>>           <groupId>org.apache.flink</groupId>
> >>>           <artifactId>flink-core</artifactId>
> >>>           <version>${flink.version}</version>
> >>>       </dependency>
> >>> =============================================
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

Leonard Xu
Hi
你可以试下把 flink-connector-kafka_2.11-1.11.0.jar 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。

祝好

> 在 2020年7月13日,15:28,王松 <[hidden email]> 写道:
>
> 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
>
> 我机器上flink/lib下jar包如下:
> -rw-rw-r-- 1 hadoop hadoop    117719 6月  30 12:41 flink-avro-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop     90782 7月   8 10:09 flink-csv-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09 flink-dist_2.11-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop     94863 7月   8 10:09 flink-json-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> flink-shaded-zookeeper-3.4.14.jar
> -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> flink-table_2.11-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> flink-table-blink_2.11-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop     67114 7月   8 10:09 log4j-1.2-api-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop    276771 7月   8 10:09 log4j-api-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop     23518 7月   8 10:09
> log4j-slf4j-impl-2.12.1.jar
>
> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道:
>
>> Hi,
>> flink-connector-kafka_${scala.binary.version 和
>> flink-sql-connector-kafka_${scala.binary.version
>> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
>> 后者的话主要是对前者做了shade处理,方便用户在 SQL
>> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
>> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
>>
>> [1] 中的话是有SQL Client JAR 的下载链接,就是
>> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
>>
>> 祝好
>> Leonard Xu
>>
>>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道:
>>>
>>> @Leonard Xu,
>>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
>>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
>>>
>>> [1]
>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>> =============================
>>>         <dependency>
>>>           <groupId>org.apache.flink</groupId>
>>>
>>>
>> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
>>>           <version>${flink.version}</version>
>>>       </dependency>
>>>
>>>        <!--<dependency>-->
>>>           <!--<groupId>org.apache.flink</groupId>-->
>>>
>>>
>> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
>>>           <!--<version>${flink.version}</version>-->
>>>       <!--</dependency>-->
>>>       <!--<dependency>-->
>>>           <!--<groupId>org.apache.flink</groupId>-->
>>>
>> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
>>>           <!--<version>${flink.version}</version>-->
>>>           <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
>>>       <!--</dependency>-->
>>>
>>>       <!--<dependency>-->
>>>           <!--<groupId>org.apache.flink</groupId>-->
>>>           <!--<artifactId>flink-connector-kafka_2.11</artifactId>-->
>>>           <!--<version>${flink.version}</version>-->
>>>       <!--</dependency>-->
>>> =============================
>>>
>>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道:
>>>
>>>> Hi, 王松
>>>>
>>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
>>>> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
>>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
>>>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
>>>>
>>>>
>>>> 祝好,
>>>> Leonard Xu
>>>> [1]
>>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>>> <
>>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>>>>
>>>>>      <dependency>
>>>>>          <groupId>org.apache.flink</groupId>
>>>>>
>>>>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>>>>>          <version>${flink.version}</version>
>>>>>      </dependency>
>>>>>      <dependency>
>>>>>          <groupId>org.apache.flink</groupId>
>>>>>
>>>>>
>>>>
>> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
>>>>>          <version>${flink.version}</version>
>>>>>      </dependency>
>>>>>      <dependency>
>>>>>          <groupId>org.apache.flink</groupId>
>>>>>
>>>>> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>>>>>          <version>${flink.version}</version>
>>>>>      </dependency>
>>>>>      <dependency>
>>>>>          <groupId>org.apache.flink</groupId>
>>>>>          <artifactId>flink-core</artifactId>
>>>>>          <version>${flink.version}</version>
>>>>>      </dependency>
>>>>> =============================================
>>>>
>>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

Benchao Li-2
你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
当然,直接粗暴的放到lib下,也是可以的。

Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道:

> Hi
> 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
>
> 祝好
>
> > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道:
> >
> > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> >
> > 我机器上flink/lib下jar包如下:
> > -rw-rw-r-- 1 hadoop hadoop    117719 6月  30 12:41 flink-avro-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop     90782 7月   8 10:09 flink-csv-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> flink-dist_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop     94863 7月   8 10:09 flink-json-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > flink-shaded-zookeeper-3.4.14.jar
> > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > flink-table_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > flink-table-blink_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop     67114 7月   8 10:09
> log4j-1.2-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop    276771 7月   8 10:09 log4j-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop     23518 7月   8 10:09
> > log4j-slf4j-impl-2.12.1.jar
> >
> > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道:
> >
> >> Hi,
> >> flink-connector-kafka_${scala.binary.version 和
> >> flink-sql-connector-kafka_${scala.binary.version
> >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> >>
> >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> >>
> >> 祝好
> >> Leonard Xu
> >>
> >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道:
> >>>
> >>> @Leonard Xu,
> >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> >>>
> >>> [1]
> >>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>> =============================
> >>>         <dependency>
> >>>           <groupId>org.apache.flink</groupId>
> >>>
> >>>
> >>
> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
> >>>           <version>${flink.version}</version>
> >>>       </dependency>
> >>>
> >>>        <!--<dependency>-->
> >>>           <!--<groupId>org.apache.flink</groupId>-->
> >>>
> >>>
> >>
> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
> >>>           <!--<version>${flink.version}</version>-->
> >>>       <!--</dependency>-->
> >>>       <!--<dependency>-->
> >>>           <!--<groupId>org.apache.flink</groupId>-->
> >>>
> >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
> >>>           <!--<version>${flink.version}</version>-->
> >>>           <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
> >>>       <!--</dependency>-->
> >>>
> >>>       <!--<dependency>-->
> >>>           <!--<groupId>org.apache.flink</groupId>-->
> >>>           <!--<artifactId>flink-connector-kafka_2.11</artifactId>-->
> >>>           <!--<version>${flink.version}</version>-->
> >>>       <!--</dependency>-->
> >>> =============================
> >>>
> >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道:
> >>>
> >>>> Hi, 王松
> >>>>
> >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> connector
> >>>>
> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> >>>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> >>>>
> >>>>
> >>>> 祝好,
> >>>> Leonard Xu
> >>>> [1]
> >>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>>> <
> >>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>>>>
> >>>>>      <dependency>
> >>>>>          <groupId>org.apache.flink</groupId>
> >>>>>
> >>>>>
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> >>>>>          <version>${flink.version}</version>
> >>>>>      </dependency>
> >>>>>      <dependency>
> >>>>>          <groupId>org.apache.flink</groupId>
> >>>>>
> >>>>>
> >>>>
> >>
> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
> >>>>>          <version>${flink.version}</version>
> >>>>>      </dependency>
> >>>>>      <dependency>
> >>>>>          <groupId>org.apache.flink</groupId>
> >>>>>
> >>>>>
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> >>>>>          <version>${flink.version}</version>
> >>>>>      </dependency>
> >>>>>      <dependency>
> >>>>>          <groupId>org.apache.flink</groupId>
> >>>>>          <artifactId>flink-core</artifactId>
> >>>>>          <version>${flink.version}</version>
> >>>>>      </dependency>
> >>>>> =============================================
> >>>>
> >>>>
> >>
> >>
>
>

--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

wangsong2
In reply to this post by Leonard Xu
这样还是不行,我尝试flink-connector-kafka-0.11_2.11-1.11.0.jar放到lib下,报了另外一个问题:
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase


另外,我是用 bin/flink run -yid xxx xxx.jar 的方式提交任务的,报错是直接在终端报错,没有提交到flink
jobmanager上。

Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道:

> Hi
> 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
>
> 祝好
>
> > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道:
> >
> > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> >
> > 我机器上flink/lib下jar包如下:
> > -rw-rw-r-- 1 hadoop hadoop    117719 6月  30 12:41 flink-avro-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop     90782 7月   8 10:09 flink-csv-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> flink-dist_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop     94863 7月   8 10:09 flink-json-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > flink-shaded-zookeeper-3.4.14.jar
> > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > flink-table_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > flink-table-blink_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop     67114 7月   8 10:09
> log4j-1.2-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop    276771 7月   8 10:09 log4j-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop     23518 7月   8 10:09
> > log4j-slf4j-impl-2.12.1.jar
> >
> > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道:
> >
> >> Hi,
> >> flink-connector-kafka_${scala.binary.version 和
> >> flink-sql-connector-kafka_${scala.binary.version
> >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> >>
> >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> >>
> >> 祝好
> >> Leonard Xu
> >>
> >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道:
> >>>
> >>> @Leonard Xu,
> >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> >>>
> >>> [1]
> >>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>> =============================
> >>>         <dependency>
> >>>           <groupId>org.apache.flink</groupId>
> >>>
> >>>
> >>
> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
> >>>           <version>${flink.version}</version>
> >>>       </dependency>
> >>>
> >>>        <!--<dependency>-->
> >>>           <!--<groupId>org.apache.flink</groupId>-->
> >>>
> >>>
> >>
> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
> >>>           <!--<version>${flink.version}</version>-->
> >>>       <!--</dependency>-->
> >>>       <!--<dependency>-->
> >>>           <!--<groupId>org.apache.flink</groupId>-->
> >>>
> >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
> >>>           <!--<version>${flink.version}</version>-->
> >>>           <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
> >>>       <!--</dependency>-->
> >>>
> >>>       <!--<dependency>-->
> >>>           <!--<groupId>org.apache.flink</groupId>-->
> >>>           <!--<artifactId>flink-connector-kafka_2.11</artifactId>-->
> >>>           <!--<version>${flink.version}</version>-->
> >>>       <!--</dependency>-->
> >>> =============================
> >>>
> >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道:
> >>>
> >>>> Hi, 王松
> >>>>
> >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> connector
> >>>>
> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> >>>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> >>>>
> >>>>
> >>>> 祝好,
> >>>> Leonard Xu
> >>>> [1]
> >>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>>> <
> >>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>>>>
> >>>>>      <dependency>
> >>>>>          <groupId>org.apache.flink</groupId>
> >>>>>
> >>>>>
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> >>>>>          <version>${flink.version}</version>
> >>>>>      </dependency>
> >>>>>      <dependency>
> >>>>>          <groupId>org.apache.flink</groupId>
> >>>>>
> >>>>>
> >>>>
> >>
> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
> >>>>>          <version>${flink.version}</version>
> >>>>>      </dependency>
> >>>>>      <dependency>
> >>>>>          <groupId>org.apache.flink</groupId>
> >>>>>
> >>>>>
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> >>>>>          <version>${flink.version}</version>
> >>>>>      </dependency>
> >>>>>      <dependency>
> >>>>>          <groupId>org.apache.flink</groupId>
> >>>>>          <artifactId>flink-core</artifactId>
> >>>>>          <version>${flink.version}</version>
> >>>>>      </dependency>
> >>>>> =============================================
> >>>>
> >>>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

wangsong2
In reply to this post by Benchao Li-2
你好本超,
是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的

Benchao Li <[hidden email]> 于2020年7月13日周一 下午3:42写道:

> 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> 当然,直接粗暴的放到lib下,也是可以的。
>
> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道:
>
> > Hi
> > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> >
> > 祝好
> >
> > > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道:
> > >
> > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > >
> > > 我机器上flink/lib下jar包如下:
> > > -rw-rw-r-- 1 hadoop hadoop    117719 6月  30 12:41 flink-avro-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop     90782 7月   8 10:09 flink-csv-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > flink-dist_2.11-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop     94863 7月   8 10:09 flink-json-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > flink-shaded-zookeeper-3.4.14.jar
> > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > flink-table_2.11-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > flink-table-blink_2.11-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop     67114 7月   8 10:09
> > log4j-1.2-api-2.12.1.jar
> > > -rw-r--r-- 1 hadoop hadoop    276771 7月   8 10:09 log4j-api-2.12.1.jar
> > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> > > -rw-r--r-- 1 hadoop hadoop     23518 7月   8 10:09
> > > log4j-slf4j-impl-2.12.1.jar
> > >
> > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道:
> > >
> > >> Hi,
> > >> flink-connector-kafka_${scala.binary.version 和
> > >> flink-sql-connector-kafka_${scala.binary.version
> > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > >>
> > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > >>
> > >> 祝好
> > >> Leonard Xu
> > >>
> > >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道:
> > >>>
> > >>> @Leonard Xu,
> > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > >>>
> > >>> [1]
> > >>>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > >>> =============================
> > >>>         <dependency>
> > >>>           <groupId>org.apache.flink</groupId>
> > >>>
> > >>>
> > >>
> >
> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
> > >>>           <version>${flink.version}</version>
> > >>>       </dependency>
> > >>>
> > >>>        <!--<dependency>-->
> > >>>           <!--<groupId>org.apache.flink</groupId>-->
> > >>>
> > >>>
> > >>
> >
> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
> > >>>           <!--<version>${flink.version}</version>-->
> > >>>       <!--</dependency>-->
> > >>>       <!--<dependency>-->
> > >>>           <!--<groupId>org.apache.flink</groupId>-->
> > >>>
> > >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
> > >>>           <!--<version>${flink.version}</version>-->
> > >>>           <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
> > >>>       <!--</dependency>-->
> > >>>
> > >>>       <!--<dependency>-->
> > >>>           <!--<groupId>org.apache.flink</groupId>-->
> > >>>           <!--<artifactId>flink-connector-kafka_2.11</artifactId>-->
> > >>>           <!--<version>${flink.version}</version>-->
> > >>>       <!--</dependency>-->
> > >>> =============================
> > >>>
> > >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道:
> > >>>
> > >>>> Hi, 王松
> > >>>>
> > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> > connector
> > >>>>
> > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> > >>>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> > >>>>
> > >>>>
> > >>>> 祝好,
> > >>>> Leonard Xu
> > >>>> [1]
> > >>>>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > >>>> <
> > >>>>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > >>>>>
> > >>>>>      <dependency>
> > >>>>>          <groupId>org.apache.flink</groupId>
> > >>>>>
> > >>>>>
> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> > >>>>>          <version>${flink.version}</version>
> > >>>>>      </dependency>
> > >>>>>      <dependency>
> > >>>>>          <groupId>org.apache.flink</groupId>
> > >>>>>
> > >>>>>
> > >>>>
> > >>
> >
> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
> > >>>>>          <version>${flink.version}</version>
> > >>>>>      </dependency>
> > >>>>>      <dependency>
> > >>>>>          <groupId>org.apache.flink</groupId>
> > >>>>>
> > >>>>>
> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> > >>>>>          <version>${flink.version}</version>
> > >>>>>      </dependency>
> > >>>>>      <dependency>
> > >>>>>          <groupId>org.apache.flink</groupId>
> > >>>>>          <artifactId>flink-core</artifactId>
> > >>>>>          <version>${flink.version}</version>
> > >>>>>      </dependency>
> > >>>>> =============================================
> > >>>>
> > >>>>
> > >>
> > >>
> >
> >
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

Jingsong Li
Hi,

1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1]
2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java
spi)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
Jingsong

On Mon, Jul 13, 2020 at 4:04 PM 王松 <[hidden email]> wrote:

> 你好本超,
> 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的
>
> Benchao Li <[hidden email]> 于2020年7月13日周一 下午3:42写道:
>
> > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> > 当然,直接粗暴的放到lib下,也是可以的。
> >
> > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道:
> >
> > > Hi
> > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> > >
> > > 祝好
> > >
> > > > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道:
> > > >
> > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > > >
> > > > 我机器上flink/lib下jar包如下:
> > > > -rw-rw-r-- 1 hadoop hadoop    117719 6月  30 12:41
> flink-avro-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop     90782 7月   8 10:09
> flink-csv-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > > flink-dist_2.11-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop     94863 7月   8 10:09
> flink-json-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > > flink-shaded-zookeeper-3.4.14.jar
> > > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > > flink-table_2.11-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > > flink-table-blink_2.11-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop     67114 7月   8 10:09
> > > log4j-1.2-api-2.12.1.jar
> > > > -rw-r--r-- 1 hadoop hadoop    276771 7月   8 10:09
> log4j-api-2.12.1.jar
> > > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09
> log4j-core-2.12.1.jar
> > > > -rw-r--r-- 1 hadoop hadoop     23518 7月   8 10:09
> > > > log4j-slf4j-impl-2.12.1.jar
> > > >
> > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道:
> > > >
> > > >> Hi,
> > > >> flink-connector-kafka_${scala.binary.version 和
> > > >> flink-sql-connector-kafka_${scala.binary.version
> > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > > >>
> > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > > >>
> > > >> 祝好
> > > >> Leonard Xu
> > > >>
> > > >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道:
> > > >>>
> > > >>> @Leonard Xu,
> > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > > >>>
> > > >>> [1]
> > > >>>
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > >>> =============================
> > > >>>         <dependency>
> > > >>>           <groupId>org.apache.flink</groupId>
> > > >>>
> > > >>>
> > > >>
> > >
> >
> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
> > > >>>           <version>${flink.version}</version>
> > > >>>       </dependency>
> > > >>>
> > > >>>        <!--<dependency>-->
> > > >>>           <!--<groupId>org.apache.flink</groupId>-->
> > > >>>
> > > >>>
> > > >>
> > >
> >
> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
> > > >>>           <!--<version>${flink.version}</version>-->
> > > >>>       <!--</dependency>-->
> > > >>>       <!--<dependency>-->
> > > >>>           <!--<groupId>org.apache.flink</groupId>-->
> > > >>>
> > > >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
> > > >>>           <!--<version>${flink.version}</version>-->
> > > >>>           <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
> > > >>>       <!--</dependency>-->
> > > >>>
> > > >>>       <!--<dependency>-->
> > > >>>           <!--<groupId>org.apache.flink</groupId>-->
> > > >>>
>  <!--<artifactId>flink-connector-kafka_2.11</artifactId>-->
> > > >>>           <!--<version>${flink.version}</version>-->
> > > >>>       <!--</dependency>-->
> > > >>> =============================
> > > >>>
> > > >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道:
> > > >>>
> > > >>>> Hi, 王松
> > > >>>>
> > > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> > > connector
> > > >>>>
> > > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> > > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> > > >>>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> > > >>>>
> > > >>>>
> > > >>>> 祝好,
> > > >>>> Leonard Xu
> > > >>>> [1]
> > > >>>>
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > >>>> <
> > > >>>>
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > >>>>>
> > > >>>>>      <dependency>
> > > >>>>>          <groupId>org.apache.flink</groupId>
> > > >>>>>
> > > >>>>>
> > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> > > >>>>>          <version>${flink.version}</version>
> > > >>>>>      </dependency>
> > > >>>>>      <dependency>
> > > >>>>>          <groupId>org.apache.flink</groupId>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
> > > >>>>>          <version>${flink.version}</version>
> > > >>>>>      </dependency>
> > > >>>>>      <dependency>
> > > >>>>>          <groupId>org.apache.flink</groupId>
> > > >>>>>
> > > >>>>>
> > > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> > > >>>>>          <version>${flink.version}</version>
> > > >>>>>      </dependency>
> > > >>>>>      <dependency>
> > > >>>>>          <groupId>org.apache.flink</groupId>
> > > >>>>>          <artifactId>flink-core</artifactId>
> > > >>>>>          <version>${flink.version}</version>
> > > >>>>>      </dependency>
> > > >>>>> =============================================
> > > >>>>
> > > >>>>
> > > >>
> > > >>
> > >
> > >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink sql报错 Could not find any factory for identifier 'kafka'

wangsong2
感谢大家的热情解答,最后问题解决了。原因正是 Leonard Xu所说的,我应该引入的是
flink-sql-connector-kafka-${version}_${scala.binary.version},然后当时改成
flink-sql-connector-kafka
后继续报错的原因是:我还在pom文件中引入了flink-table-planner-blink,如下:
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
添加<scope>provided</scope>后就没有问题了。

最后附上正确的pom文件 (如 Jingsong
所说,也可以把flink-sql-connector-kafka、flink-json这些都在pom文件中去掉,直接将jar报放入lib中):

============================================================
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
============================================================

Jingsong Li <[hidden email]> 于2020年7月13日周一 下午4:35写道:

> Hi,
>
> 1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1]
>
> 2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java
> spi)
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> Best,
> Jingsong
>
> On Mon, Jul 13, 2020 at 4:04 PM 王松 <[hidden email]> wrote:
>
> > 你好本超,
> > 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的
> >
> > Benchao Li <[hidden email]> 于2020年7月13日周一 下午3:42写道:
> >
> > > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> > > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> > > 当然,直接粗暴的放到lib下,也是可以的。
> > >
> > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:38写道:
> > >
> > > > Hi
> > > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> > > >
> > > > 祝好
> > > >
> > > > > 在 2020年7月13日,15:28,王松 <[hidden email]> 写道:
> > > > >
> > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > > > >
> > > > > 我机器上flink/lib下jar包如下:
> > > > > -rw-rw-r-- 1 hadoop hadoop    117719 6月  30 12:41
> > flink-avro-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop     90782 7月   8 10:09
> > flink-csv-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > > > flink-dist_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop     94863 7月   8 10:09
> > flink-json-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > > > flink-shaded-zookeeper-3.4.14.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > > > flink-table_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > > > flink-table-blink_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop     67114 7月   8 10:09
> > > > log4j-1.2-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop    276771 7月   8 10:09
> > log4j-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09
> > log4j-core-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop     23518 7月   8 10:09
> > > > > log4j-slf4j-impl-2.12.1.jar
> > > > >
> > > > > Leonard Xu <[hidden email]> 于2020年7月13日周一 下午3:05写道:
> > > > >
> > > > >> Hi,
> > > > >> flink-connector-kafka_${scala.binary.version 和
> > > > >> flink-sql-connector-kafka_${scala.binary.version
> > > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > > > >>
> > > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > > > >>
> > > > >> 祝好
> > > > >> Leonard Xu
> > > > >>
> > > > >>> 在 2020年7月13日,14:42,王松 <[hidden email]> 写道:
> > > > >>>
> > > > >>> @Leonard Xu,
> > > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > > >>> =============================
> > > > >>>         <dependency>
> > > > >>>           <groupId>org.apache.flink</groupId>
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
> > > > >>>           <version>${flink.version}</version>
> > > > >>>       </dependency>
> > > > >>>
> > > > >>>        <!--<dependency>-->
> > > > >>>           <!--<groupId>org.apache.flink</groupId>-->
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
> > > > >>>           <!--<version>${flink.version}</version>-->
> > > > >>>       <!--</dependency>-->
> > > > >>>       <!--<dependency>-->
> > > > >>>           <!--<groupId>org.apache.flink</groupId>-->
> > > > >>>
> > > > >> <!--<artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
> > > > >>>           <!--<version>${flink.version}</version>-->
> > > > >>>           <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
> > > > >>>       <!--</dependency>-->
> > > > >>>
> > > > >>>       <!--<dependency>-->
> > > > >>>           <!--<groupId>org.apache.flink</groupId>-->
> > > > >>>
> >  <!--<artifactId>flink-connector-kafka_2.11</artifactId>-->
> > > > >>>           <!--<version>${flink.version}</version>-->
> > > > >>>       <!--</dependency>-->
> > > > >>> =============================
> > > > >>>
> > > > >>> Leonard Xu <[hidden email]> 于2020年7月13日周一 下午1:39写道:
> > > > >>>
> > > > >>>> Hi, 王松
> > > > >>>>
> > > > >>>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> > > > connector
> > > > >>>>
> > > >
> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> > > > >>>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和
> Kafka
> > > > >>>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> > > > >>>>
> > > > >>>>
> > > > >>>> 祝好,
> > > > >>>> Leonard Xu
> > > > >>>> [1]
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > > >>>> <
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > > >>>>>
> > > > >>>>>      <dependency>
> > > > >>>>>          <groupId>org.apache.flink</groupId>
> > > > >>>>>
> > > > >>>>>
> > > >
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> > > > >>>>>          <version>${flink.version}</version>
> > > > >>>>>      </dependency>
> > > > >>>>>      <dependency>
> > > > >>>>>          <groupId>org.apache.flink</groupId>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
> > > > >>>>>          <version>${flink.version}</version>
> > > > >>>>>      </dependency>
> > > > >>>>>      <dependency>
> > > > >>>>>          <groupId>org.apache.flink</groupId>
> > > > >>>>>
> > > > >>>>>
> > > >
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> > > > >>>>>          <version>${flink.version}</version>
> > > > >>>>>      </dependency>
> > > > >>>>>      <dependency>
> > > > >>>>>          <groupId>org.apache.flink</groupId>
> > > > >>>>>          <artifactId>flink-core</artifactId>
> > > > >>>>>          <version>${flink.version}</version>
> > > > >>>>>      </dependency>
> > > > >>>>> =============================================
> > > > >>>>
> > > > >>>>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
> Best, Jingsong Lee
>