This post was updated on .
Dear All,
我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下 其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下: 请指教 Java Code TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS user_behavior_kafka_table"); tableResult.print(); TableResult tableResult2 = tableEnvironment.executeSql( "CREATE TABLE user_behavior_kafka_table (\r\n" + " `user_id` STRING,\r\n" + " `item_id` STRING\r\n" + " ) WITH (\r\n" + " 'connector' = 'kafka',\r\n" + " 'topic' = 'TestTopic',\r\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\r\n" + " 'properties.group.id' = 'consumerTest',\r\n" + " 'scan.startup.mode' = 'earliest-offset',\r\n" + " 'format' = 'json'\r\n" + ")"); tableResult2.print(); // 数据写入 tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnvironment.executeSql( "INSERT INTO user_behavior_hive_table SELECT user_id, item_id FROM user_behavior_kafka_table"); POM File <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.7.5-10.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>provided</scope> </dependency> Error Messge Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81) ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at com.newegg.flink.sqlkafka.Main.main(Main.java:66) ~[flink-0.0.1-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_191-ojdkbuild] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_191-ojdkbuild] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_191-ojdkbuild] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191-ojdkbuild] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.2.jar:1.11.2] ... 10 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
Hi Jacob,
Maybe you miss the kafka connector dependency in your pom, you could refer to this url : https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html Best, LakeShen Jacob <[hidden email]> 于2021年6月1日周二 上午9:54写道: > Dear All, > > 我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下 > > 其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。 > 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下: > > 请指教 > > *Java Code* > > TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS > user_behavior_kafka_table"); > tableResult.print(); > TableResult tableResult2 = tableEnvironment.executeSql( > "CREATE TABLE user_behavior_kafka_table > (\r\n" + > " `user_id` STRING,\r\n" + > " `item_id` STRING\r\n" + > " ) WITH (\r\n" + > " 'connector' = 'kafka',\r\n" + > " 'topic' = 'TestTopic',\r\n" + > " 'properties.bootstrap.servers' = > 'localhost:9092',\r\n" + > " 'properties.group.id' = > 'consumerTest',\r\n" + > " 'scan.startup.mode' = > 'earliest-offset',\r\n" + > " 'format' = 'json'\r\n" + > ")"); > tableResult2.print(); > > > // 数据写入 > tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE); > tableEnvironment.executeSql( > "INSERT INTO user_behavior_hive_table SELECT user_id, > item_id FROM user_behavior_kafka_table"); > > > *POM File* > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_2.11</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-clients_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > > > <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-shaded-hadoop-2-uber</artifactId> > <version>2.7.5-10.0</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-hive_2.11</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-exec</artifactId> > <version>${hive.version}</version> > <scope>provided</scope> > </dependency> > > > *Error Messge* > > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover > a > connector using option ''connector'='kafka''. > at > > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81) > ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] > at com.newegg.flink.sqlkafka.Main.main(Main.java:66) > ~[flink-0.0.1-SNAPSHOT.jar:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_191-ojdkbuild] > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_191-ojdkbuild] > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_191-ojdkbuild] > at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_191-ojdkbuild] > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > ... 10 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the > classpath. > > > > > ----- > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Thank you for your reply!
您所说的kafka connector 是*flink-connector-kafka_2.11* 这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了* flink-sql-connector-kafka_2.11*依赖了。 我试了引入* flink-connector-kafka_2.11*,但还是会报错的。 ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
In reply to this post by LakeShen
问题已解决
需要在FLink home的lib中引入kafka connector jar包 ----- Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob |
Free forum by Nabble | Edit this page |