用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

xuzh-2
错误:


Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath


看意思是找到了两个一样的类:DynamicTableSinkFactory


代码如下:
package org.apache.flink.examples;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.factories.DynamicTableSinkFactory;


public class CDC2ss2 {
    public static void main(String[] args) throws Exception {


        // set up execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv;


        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        tEnv = StreamTableEnvironment.create(env, settings);
        String src_sql = "CREATE TABLE userss (\n" +
                "     user_id INT,\n" +
                "     user_nm STRING\n" +
                ") WITH (\n" +
                "      'connector' = 'mysql-cdc',\n" +
                "      'hostname' = '10.12.5.37',\n" +
                "      'port' = '3306',\n" +
                "      'username' = 'dps',\n" +
                "      'password' = 'dps1234',\n" +
                "      'database-name' = 'rpt',\n" +
                "      'table-name' = 'users'\n" +
                "      )";


        tEnv.executeSql(src_sql); // 创建表


        String sink="CREATE TABLE sink (\n" +
                "     user_id INT,\n" +
                "     user_nm STRING,\n" +
                "     primary key(user_id)  NOT ENFORCED \n" +
                ") WITH (\n" +
                "      'connector' = 'jdbc',\n" +
                "      'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
                "      'username' = 'dps',\n" +
                "      'password' = 'dps1234',\n" +
                "      'table-name' = 'sink'\n" +
                "      )";
        String to_print_sql="insert into sink select user_id  ,user_nm   from userss";
         tEnv.executeSql(sink);
        tEnv.executeSql(to_print_sql);
        env.execute();
    }


}





详细错误:


Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.


Table options are:


'connector'='jdbc'
'password'='dps1234'
'table-name'='sink'
'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
'username'='dps'
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
        at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
        at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
        ... 18 more
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.


Ambiguous factory classes are:


java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
        ... 19 more


Process finished with exit code 1
Reply | Threaded
Open this post in threaded view
|

Re:用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

hailongwang
Hi,
 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的 Connector?


Best,
Hailong
在 2020-12-03 14:44:18,"xuzh" <[hidden email]> 写道:

>错误:
>
>
>Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath
>
>
>看意思是找到了两个一样的类:DynamicTableSinkFactory
>
>
>代码如下:
>package org.apache.flink.examples;
>
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.factories.DynamicTableSinkFactory;
>
>
>public class CDC2ss2 {
>&nbsp; &nbsp; public static void main(String[] args) throws Exception {
>
>
>&nbsp; &nbsp; &nbsp; &nbsp; // set up execution environment
>&nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tEnv;
>
>
>&nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings settings = EnvironmentSettings.newInstance()
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .useBlinkPlanner()
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .inStreamingMode()
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();
>&nbsp; &nbsp; &nbsp; &nbsp; tEnv = StreamTableEnvironment.create(env, settings);
>&nbsp; &nbsp; &nbsp; &nbsp; String src_sql = "CREATE TABLE userss (\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_id INT,\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_nm STRING\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'connector' = 'mysql-cdc',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'hostname' = '10.12.5.37',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'port' = '3306',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'username' = 'dps',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'password' = 'dps1234',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'database-name' = 'rpt',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'table-name' = 'users'\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; )";
>
>
>&nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(src_sql); // 创建表
>
>
>&nbsp; &nbsp; &nbsp; &nbsp; String sink="CREATE TABLE sink (\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_id INT,\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_nm STRING,\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;primary key(user_id)&nbsp; NOT ENFORCED \n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'connector' = 'jdbc',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'username' = 'dps',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'password' = 'dps1234',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'table-name' = 'sink'\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; )";
>&nbsp; &nbsp; &nbsp; &nbsp; String to_print_sql="insert into sink select user_id&nbsp; ,user_nm&nbsp; &nbsp;from userss";
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tEnv.executeSql(sink);
>&nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(to_print_sql);
>&nbsp; &nbsp; &nbsp; &nbsp; env.execute();
>&nbsp; &nbsp; }
>
>
>}
>
>
>
>
>
>详细错误:
>
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.
>
>
>Table options are:
>
>
>'connector'='jdbc'
>'password'='dps1234'
>'table-name'='sink'
>'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>'username'='dps'
> at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
> at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.
> at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> ... 18 more
>Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.
>
>
>Ambiguous factory classes are:
>
>
>java.util.LinkedList
>java.util.LinkedList
>java.util.LinkedList
>java.util.LinkedList
> at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
> at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ... 19 more
>
>
>Process finished with exit code 1
Reply | Threaded
Open this post in threaded view
|

Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

Wei Zhong
Hi,

现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:

List<Factory> result = new LinkedList<>();
ServiceLoader
   .load(Factory.class, Thread.currentThread().getContextClassLoader())
   .iterator()
   .forEachRemaining(result::add);
List<Factory> jdbcResult = result.stream().filter(f ->
   DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
   f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
System.out.println(jdbcResult);


> 在 2020年12月3日,19:50,hailongwang <[hidden email]> 写道:
>
> Hi,
> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的 Connector?
>
>
> Best,
> Hailong
> 在 2020-12-03 14:44:18,"xuzh" <[hidden email]> 写道:
>> 错误:
>>
>>
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath
>>
>>
>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>>
>>
>> 代码如下:
>> package org.apache.flink.examples;
>>
>>
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>>
>>
>> public class CDC2ss2 {
>> &nbsp; &nbsp; public static void main(String[] args) throws Exception {
>>
>>
>> &nbsp; &nbsp; &nbsp; &nbsp; // set up execution environment
>> &nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tEnv;
>>
>>
>> &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings settings = EnvironmentSettings.newInstance()
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .useBlinkPlanner()
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .inStreamingMode()
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();
>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv = StreamTableEnvironment.create(env, settings);
>> &nbsp; &nbsp; &nbsp; &nbsp; String src_sql = "CREATE TABLE userss (\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_id INT,\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_nm STRING\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'connector' = 'mysql-cdc',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'hostname' = '10.12.5.37',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'port' = '3306',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'username' = 'dps',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'password' = 'dps1234',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'database-name' = 'rpt',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'table-name' = 'users'\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; )";
>>
>>
>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(src_sql); // 创建表
>>
>>
>> &nbsp; &nbsp; &nbsp; &nbsp; String sink="CREATE TABLE sink (\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_id INT,\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_nm STRING,\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;primary key(user_id)&nbsp; NOT ENFORCED \n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'connector' = 'jdbc',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'username' = 'dps',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'password' = 'dps1234',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'table-name' = 'sink'\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; )";
>> &nbsp; &nbsp; &nbsp; &nbsp; String to_print_sql="insert into sink select user_id&nbsp; ,user_nm&nbsp; &nbsp;from userss";
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tEnv.executeSql(sink);
>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(to_print_sql);
>> &nbsp; &nbsp; &nbsp; &nbsp; env.execute();
>> &nbsp; &nbsp; }
>>
>>
>> }
>>
>>
>>
>>
>>
>> 详细错误:
>>
>>
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.
>>
>>
>> Table options are:
>>
>>
>> 'connector'='jdbc'
>> 'password'='dps1234'
>> 'table-name'='sink'
>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>> 'username'='dps'
>> at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>> at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.
>> at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>> at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> ... 18 more
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.
>>
>>
>> Ambiguous factory classes are:
>>
>>
>> java.util.LinkedList
>> java.util.LinkedList
>> java.util.LinkedList
>> java.util.LinkedList
>> at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
>> at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> ... 19 more
>>
>>
>> Process finished with exit code 1

Reply | Threaded
Open this post in threaded view
|

Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

Wei Zhong
这个错误信息显示问题在后续版本已经修复,新版本发布后升级版本就能直接从错误信息中看到是哪些TableFactory冲突了:

https://issues.apache.org/jira/browse/FLINK-20186 <https://issues.apache.org/jira/browse/FLINK-20186>



> 在 2020年12月3日,20:08,Wei Zhong <[hidden email]> 写道:
>
> Hi,
>
> 现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:
>
> List<Factory> result = new LinkedList<>();
> ServiceLoader
>    .load(Factory.class, Thread.currentThread().getContextClassLoader())
>    .iterator()
>    .forEachRemaining(result::add);
> List<Factory> jdbcResult = result.stream().filter(f ->
>    DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
>    f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
> System.out.println(jdbcResult);
>
>
>> 在 2020年12月3日,19:50,hailongwang <[hidden email] <mailto:[hidden email]>> 写道:
>>
>> Hi,
>> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的 Connector?
>>
>>
>> Best,
>> Hailong
>> 在 2020-12-03 14:44:18,"xuzh" <[hidden email] <mailto:[hidden email]>> 写道:
>>> 错误:
>>>
>>>
>>> Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath
>>>
>>>
>>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>>>
>>>
>>> 代码如下:
>>> package org.apache.flink.examples;
>>>
>>>
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>>>
>>>
>>> public class CDC2ss2 {
>>> &nbsp; &nbsp; public static void main(String[] args) throws Exception {
>>>
>>>
>>> &nbsp; &nbsp; &nbsp; &nbsp; // set up execution environment
>>> &nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>> &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tEnv;
>>>
>>>
>>> &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .useBlinkPlanner()
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .inStreamingMode()
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();
>>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv = StreamTableEnvironment.create(env, settings);
>>> &nbsp; &nbsp; &nbsp; &nbsp; String src_sql = "CREATE TABLE userss (\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_id INT,\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_nm STRING\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'connector' = 'mysql-cdc',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'hostname' = '10.12.5.37',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'port' = '3306',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'username' = 'dps',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'password' = 'dps1234',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'database-name' = 'rpt',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'table-name' = 'users'\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; )";
>>>
>>>
>>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(src_sql); // 创建表
>>>
>>>
>>> &nbsp; &nbsp; &nbsp; &nbsp; String sink="CREATE TABLE sink (\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_id INT,\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;user_nm STRING,\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp;primary key(user_id)&nbsp; NOT ENFORCED \n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'connector' = 'jdbc',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n <mysql://10.0.171.171:3306/dps?useSSL=false',\n>" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'username' = 'dps',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'password' = 'dps1234',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 'table-name' = 'sink'\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; )";
>>> &nbsp; &nbsp; &nbsp; &nbsp; String to_print_sql="insert into sink select user_id&nbsp; ,user_nm&nbsp; &nbsp;from userss";
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tEnv.executeSql(sink);
>>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(to_print_sql);
>>> &nbsp; &nbsp; &nbsp; &nbsp; env.execute();
>>> &nbsp; &nbsp; }
>>>
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>>> 详细错误:
>>>
>>>
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.
>>>
>>>
>>> Table options are:
>>>
>>>
>>> 'connector'='jdbc'
>>> 'password'='dps1234'
>>> 'table-name'='sink'
>>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false' <mysql://10.0.171.171:3306/dps?useSSL=false'>
>>> 'username'='dps'
>>> at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>> at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>> at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>> at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>> at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>>> at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>>> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.
>>> at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>>> at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>>> ... 18 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.
>>>
>>>
>>> Ambiguous factory classes are:
>>>
>>>
>>> java.util.LinkedList
>>> java.util.LinkedList
>>> java.util.LinkedList
>>> java.util.LinkedList
>>> at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
>>> at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>>> ... 19 more
>>>
>>>
>>> Process finished with exit code 1
>