Flink 1.11.2 SQL消费kafka写Hive报错

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11.2 SQL消费kafka写Hive报错

Jacob
This post was updated on .
Dear All,

我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下

其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。
搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:

请指教

Java Code

TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS
user_behavior_kafka_table");
                tableResult.print();
                TableResult tableResult2 = tableEnvironment.executeSql(
                                "CREATE TABLE user_behavior_kafka_table (\r\n" +
                                "   `user_id` STRING,\r\n" +
                                "   `item_id` STRING\r\n" +
                                " ) WITH (\r\n" +
                                "   'connector' = 'kafka',\r\n" +
                                "   'topic' = 'TestTopic',\r\n" +
                                "   'properties.bootstrap.servers' = 'localhost:9092',\r\n" +
                                "   'properties.group.id' = 'consumerTest',\r\n" +
                                "   'scan.startup.mode' = 'earliest-offset',\r\n" +
                                "   'format' = 'json'\r\n" +
                                ")");
                tableResult2.print();
               

                // 数据写入
        tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
                tableEnvironment.executeSql(
                "INSERT INTO user_behavior_hive_table SELECT user_id,
item_id FROM user_behavior_kafka_table");


POM File

                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-json</artifactId>
                        <version>${flink.version}</version>
                </dependency>
               
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.11</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
                </dependency>

                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
                </dependency>

                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
                </dependency>

                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
                </dependency>

                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                </dependency>


                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-shaded-hadoop-2-uber</artifactId>
                        <version>2.7.5-10.0</version>
                        <scope>provided</scope>
                </dependency>

                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-hive_2.11</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
                </dependency>

                <dependency>
                        <groupId>org.apache.hive</groupId>
                        <artifactId>hive-exec</artifactId>
                        <version>${hive.version}</version>
                        <scope>provided</scope>
                </dependency>


Error Messge

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''.
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81) ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
        at com.newegg.flink.sqlkafka.Main.main(Main.java:66) ~[flink-0.0.1-SNAPSHOT.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_191-ojdkbuild]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_191-ojdkbuild]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_191-ojdkbuild]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191-ojdkbuild]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        ... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.





-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11.2 SQL消费kafka写Hive报错

LakeShen
Hi Jacob,

Maybe you miss the kafka connector dependency in your pom,
you could refer to this url :
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
LakeShen

Jacob <[hidden email]> 于2021年6月1日周二 上午9:54写道:

> Dear All,
>
> 我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下
>
> 其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。
> 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:
>
> 请指教
>
> *Java Code*
>
> TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS
> user_behavior_kafka_table");
>                 tableResult.print();
>                 TableResult tableResult2 = tableEnvironment.executeSql(
>                                 "CREATE TABLE user_behavior_kafka_table
> (\r\n" +
>                                 "   `user_id` STRING,\r\n" +
>                                 "   `item_id` STRING\r\n" +
>                                 " ) WITH (\r\n" +
>                                 "   'connector' = 'kafka',\r\n" +
>                                 "   'topic' = 'TestTopic',\r\n" +
>                                 "   'properties.bootstrap.servers' =
> 'localhost:9092',\r\n" +
>                                 "   'properties.group.id' =
> 'consumerTest',\r\n" +
>                                 "   'scan.startup.mode' =
> 'earliest-offset',\r\n" +
>                                 "   'format' = 'json'\r\n" +
>                                 ")");
>                 tableResult2.print();
>
>
>                 // 数据写入
>         tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
>                 tableEnvironment.executeSql(
>                 "INSERT INTO user_behavior_hive_table SELECT user_id,
> item_id FROM user_behavior_kafka_table");
>
>
> *POM File*
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-json</artifactId>
>                         <version>${flink.version}</version>
>                 </dependency>
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-streaming-java_2.11</artifactId>
>                         <version>${flink.version}</version>
>                         <scope>provided</scope>
>                 </dependency>
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-clients_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         <scope>provided</scope>
>                 </dependency>
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>
>
> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         <scope>provided</scope>
>                 </dependency>
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         <scope>provided</scope>
>                 </dependency>
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                 </dependency>
>
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-shaded-hadoop-2-uber</artifactId>
>                         <version>2.7.5-10.0</version>
>                         <scope>provided</scope>
>                 </dependency>
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-connector-hive_2.11</artifactId>
>                         <version>${flink.version}</version>
>                         <scope>provided</scope>
>                 </dependency>
>
>                 <dependency>
>                         <groupId>org.apache.hive</groupId>
>                         <artifactId>hive-exec</artifactId>
>                         <version>${hive.version}</version>
>                         <scope>provided</scope>
>                 </dependency>
>
>
> *Error Messge*
>
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
> a
> connector using option ''connector'='kafka''.
>         at
>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81)
> ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
>         at com.newegg.flink.sqlkafka.Main.main(Main.java:66)
> ~[flink-0.0.1-SNAPSHOT.jar:?]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_191-ojdkbuild]
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_191-ojdkbuild]
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_191-ojdkbuild]
>         at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_191-ojdkbuild]
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>         ... 10 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
>
>
>
>
> -----
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11.2 SQL消费kafka写Hive报错

Jacob
Thank you for your reply!

您所说的kafka connector 是*flink-connector-kafka_2.11*
这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了* flink-sql-connector-kafka_2.11*依赖了。
我试了引入* flink-connector-kafka_2.11*,但还是会报错的。



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11.2 SQL消费kafka写Hive报错

Jacob
In reply to this post by LakeShen
问题已解决

需要在FLink home的lib中引入kafka connector jar包



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob