hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 编译的jar包是jar-with-dependencies的 代码片段: public String ddlSql = String.format("CREATE TABLE %s (\n" + " number BIGINT,\n" + " msg STRING,\n" + " username STRING,\n" + " update_time TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")\n", tableName, topic, servers, group); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(ddlSql); 报错信息: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ... 33 more 参考了这个 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错 附上pom依赖: <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> </dependencies> 感谢各位~ |
可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?
如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。 如果你用的是shade plugin,需要看下这个transformer[1] [1] https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer RS <[hidden email]> 于2020年7月24日周五 下午5:02写道: > hi, > Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 > 编译的jar包是jar-with-dependencies的 > > > 代码片段: > public String ddlSql = String.format("CREATE TABLE %s (\n" + > " number BIGINT,\n" + > " msg STRING,\n" + > " username STRING,\n" + > " update_time TIMESTAMP(3)\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = '%s',\n" + > " 'properties.bootstrap.servers' = '%s',\n" + > " 'properties.group.id' = '%s',\n" + > " 'format' = 'json',\n" + > " 'json.fail-on-missing-field' = 'false',\n" + > " 'json.ignore-parse-errors' = 'true'\n" + > ")\n", tableName, topic, servers, group); > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.executeSql(ddlSql); > > > 报错信息: > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the > classpath. > Available factory identifiers are: > datagen > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > ... 33 more > > > 参考了这个 > http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 > 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错 > > > 附上pom依赖: > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-sql-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > </dependencies> > > > 感谢各位~ -- Best, Benchao Li |
我这边是直接打成jar包扔到服务器上运行的(bin/flink run xxx),没有在IDEA运行过。<br/>maven编译没配置shade-plugin,maven build参数如下:<br/> <properties><br/> <jdk.version>1.8</jdk.version><br/> <flink.version>1.11.1</flink.version><br/> </properties><br/> <build><br/> <plugins><br/> <plugin><br/> <artifactId>maven-compiler-plugin</artifactId><br/> <configuration><br/> <source>${jdk.version}</source><br/> <target>${jdk.version}</target><br/> </configuration><br/> </plugin><br/> <plugin><br/> <groupId>org.apache.maven.plugins</groupId><br/> <artifactId>maven-assembly-plugin</artifactId><br/> <executions><br/> <execution><br/> <phase>package</phase><br/> <goals><br/> <goal>single</goal><br/> </goals><br/> </execution><br/> </executions><br/> <configuration><br/> <descriptorRefs><br/> <descriptorRef>jar-with-dependencies</descriptorRef><br/> </descriptorRefs><br/> </configuration><br/> </plugin><br/> </plugins><br/> </build><br/><br/>thx
在 2020-07-24 17:36:46,"Benchao Li" <[hidden email]> 写道: >可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么? > >如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。 >如果你用的是shade plugin,需要看下这个transformer[1] > >[1] >https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer > >RS <[hidden email]> 于2020年7月24日周五 下午5:02写道: > >> hi, >> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 >> 编译的jar包是jar-with-dependencies的 >> >> >> 代码片段: >> public String ddlSql = String.format("CREATE TABLE %s (\n" + >> " number BIGINT,\n" + >> " msg STRING,\n" + >> " username STRING,\n" + >> " update_time TIMESTAMP(3)\n" + >> ") WITH (\n" + >> " 'connector' = 'kafka',\n" + >> " 'topic' = '%s',\n" + >> " 'properties.bootstrap.servers' = '%s',\n" + >> " 'properties.group.id' = '%s',\n" + >> " 'format' = 'json',\n" + >> " 'json.fail-on-missing-field' = 'false',\n" + >> " 'json.ignore-parse-errors' = 'true'\n" + >> ")\n", tableName, topic, servers, group); >> >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(env); >> tableEnv.executeSql(ddlSql); >> >> >> 报错信息: >> Caused by: org.apache.flink.table.api.ValidationException: Could not find >> any factory for identifier 'kafka' that implements >> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the >> classpath. >> Available factory identifiers are: >> datagen >> at >> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) >> at >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) >> ... 33 more >> >> >> 参考了这个 >> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 >> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错 >> >> >> 附上pom依赖: >> <dependencies> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-java-bridge_2.12</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-java</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka_2.12</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-sql-connector-kafka_2.12</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-json</artifactId> >> <version>${flink.version}</version> >> </dependency> >> </dependencies> >> >> >> 感谢各位~ > > > >-- > >Best, >Benchao Li |
邮件格式不对,我重新回复下
我这边是直接打成jar包扔到服务器上运行的,没有在IDEA运行过。 > flink run xxx 没有使用shade-plugin maven build参数: <properties> <jdk.version>1.8</jdk.version> <flink.version>1.11.1</flink.version> </properties> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> |
In reply to this post by hechuan
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> 这两个会有冲突,去掉上面那个 > 2020年7月24日 下午5:02,RS <[hidden email]> 写道: > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-sql-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> |
In reply to this post by hechuan
hi,
感谢回复,尝试了多次之后,发现应该不是依赖包的问题 我项目中新增目录:resources/META-INF/services 然后从Flink源码中复制了2个文件 org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。 在 2020-07-24 20:16:18,"JasonLee" <[hidden email]> 写道: >hi >只需要-sql和-json两个包就可以了 > > >| | >JasonLee >| >| >邮箱:[hidden email] >| > >Signature is customized by Netease Mail Master > >On 07/24/2020 17:02, RS wrote: >hi, >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 >编译的jar包是jar-with-dependencies的 > > >代码片段: > public String ddlSql = String.format("CREATE TABLE %s (\n" + > " number BIGINT,\n" + > " msg STRING,\n" + > " username STRING,\n" + > " update_time TIMESTAMP(3)\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = '%s',\n" + > " 'properties.bootstrap.servers' = '%s',\n" + > " 'properties.group.id' = '%s',\n" + > " 'format' = 'json',\n" + > " 'json.fail-on-missing-field' = 'false',\n" + > " 'json.ignore-parse-errors' = 'true'\n" + > ")\n", tableName, topic, servers, group); > > > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > tableEnv.executeSql(ddlSql); > > >报错信息: >Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. >Available factory identifiers are: >datagen >at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) >at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) >... 33 more > > >参考了这个 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错 > > >附上pom依赖: ><dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-sql-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > </dependencies> > > >感谢各位~ |
Hi,
Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies 是否能把这些资源文件打进去。 另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink 的用户 jar 的话,并不需要把 Flink 的依赖也放进去。 RS <[hidden email]> 于2020年7月24日周五 下午8:30写道: > hi, > 感谢回复,尝试了多次之后,发现应该不是依赖包的问题 > > > 我项目中新增目录:resources/META-INF/services > 然后从Flink源码中复制了2个文件 > org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory > 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。 > > > 在 2020-07-24 20:16:18,"JasonLee" <[hidden email]> 写道: > >hi > >只需要-sql和-json两个包就可以了 > > > > > >| | > >JasonLee > >| > >| > >邮箱:[hidden email] > >| > > > >Signature is customized by Netease Mail Master > > > >On 07/24/2020 17:02, RS wrote: > >hi, > >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 > >编译的jar包是jar-with-dependencies的 > > > > > >代码片段: > > public String ddlSql = String.format("CREATE TABLE %s (\n" + > > " number BIGINT,\n" + > > " msg STRING,\n" + > > " username STRING,\n" + > > " update_time TIMESTAMP(3)\n" + > > ") WITH (\n" + > > " 'connector' = 'kafka',\n" + > > " 'topic' = '%s',\n" + > > " 'properties.bootstrap.servers' = '%s',\n" + > > " 'properties.group.id' = '%s',\n" + > > " 'format' = 'json',\n" + > > " 'json.fail-on-missing-field' = 'false',\n" + > > " 'json.ignore-parse-errors' = 'true'\n" + > > ")\n", tableName, topic, servers, group); > > > > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > > tableEnv.executeSql(ddlSql); > > > > > >报错信息: > >Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the > classpath. > >Available factory identifiers are: > >datagen > >at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > >at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > >... 33 more > > > > > >参考了这个 > http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 > >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错 > > > > > >附上pom依赖: > ><dependencies> > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-java</artifactId> > > <version>${flink.version}</version> > > </dependency> > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-api-java-bridge_2.12</artifactId> > > <version>${flink.version}</version> > > </dependency> > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-api-java</artifactId> > > <version>${flink.version}</version> > > </dependency> > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-connector-kafka_2.12</artifactId> > > <version>${flink.version}</version> > > </dependency> > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-sql-connector-kafka_2.12</artifactId> > > <version>${flink.version}</version> > > </dependency> > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-json</artifactId> > > <version>${flink.version}</version> > > </dependency> > > </dependencies> > > > > > >感谢各位~ > |
Hi,
1. 好的,学习了 2. 确实,部分Flink依赖调整为provided,打包测试也可以正常执行,但是flink-walkthrough-common_2.11这种包在Flink的lib中没有看到,还是打包进去了 在 2020-07-27 11:42:50,"Caizhi Weng" <[hidden email]> 写道: >Hi, > >Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies >是否能把这些资源文件打进去。 > >另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink >的用户 jar 的话,并不需要把 Flink 的依赖也放进去。 > >RS <[hidden email]> 于2020年7月24日周五 下午8:30写道: > >> hi, >> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题 >> >> >> 我项目中新增目录:resources/META-INF/services >> 然后从Flink源码中复制了2个文件 >> org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory >> 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。 >> >> >> 在 2020-07-24 20:16:18,"JasonLee" <[hidden email]> 写道: >> >hi >> >只需要-sql和-json两个包就可以了 >> > >> > >> >| | >> >JasonLee >> >| >> >| >> >邮箱:[hidden email] >> >| >> > >> >Signature is customized by Netease Mail Master >> > >> >On 07/24/2020 17:02, RS wrote: >> >hi, >> >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 >> >编译的jar包是jar-with-dependencies的 >> > >> > >> >代码片段: >> > public String ddlSql = String.format("CREATE TABLE %s (\n" + >> > " number BIGINT,\n" + >> > " msg STRING,\n" + >> > " username STRING,\n" + >> > " update_time TIMESTAMP(3)\n" + >> > ") WITH (\n" + >> > " 'connector' = 'kafka',\n" + >> > " 'topic' = '%s',\n" + >> > " 'properties.bootstrap.servers' = '%s',\n" + >> > " 'properties.group.id' = '%s',\n" + >> > " 'format' = 'json',\n" + >> > " 'json.fail-on-missing-field' = 'false',\n" + >> > " 'json.ignore-parse-errors' = 'true'\n" + >> > ")\n", tableName, topic, servers, group); >> > >> > >> > StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> > StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(env); >> > tableEnv.executeSql(ddlSql); >> > >> > >> >报错信息: >> >Caused by: org.apache.flink.table.api.ValidationException: Could not find >> any factory for identifier 'kafka' that implements >> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the >> classpath. >> >Available factory identifiers are: >> >datagen >> >at >> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) >> >at >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) >> >... 33 more >> > >> > >> >参考了这个 >> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 >> >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错 >> > >> > >> >附上pom依赖: >> ><dependencies> >> > <dependency> >> > <groupId>org.apache.flink</groupId> >> > <artifactId>flink-java</artifactId> >> > <version>${flink.version}</version> >> > </dependency> >> > <dependency> >> > <groupId>org.apache.flink</groupId> >> > <artifactId>flink-table-api-java-bridge_2.12</artifactId> >> > <version>${flink.version}</version> >> > </dependency> >> > <dependency> >> > <groupId>org.apache.flink</groupId> >> > <artifactId>flink-table-api-java</artifactId> >> > <version>${flink.version}</version> >> > </dependency> >> > <dependency> >> > <groupId>org.apache.flink</groupId> >> > <artifactId>flink-connector-kafka_2.12</artifactId> >> > <version>${flink.version}</version> >> > </dependency> >> > <dependency> >> > <groupId>org.apache.flink</groupId> >> > <artifactId>flink-sql-connector-kafka_2.12</artifactId> >> > <version>${flink.version}</version> >> > </dependency> >> > <dependency> >> > <groupId>org.apache.flink</groupId> >> > <artifactId>flink-json</artifactId> >> > <version>${flink.version}</version> >> > </dependency> >> > </dependencies> >> > >> > >> >感谢各位~ >> |
Free forum by Nabble | Edit this page |