flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.
提示: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. 代码: ``` import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.table.api.{EnvironmentSettings, Types} import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} import org.apache.flink.types.Row object KafkaInDDL extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) val sourceDDL = """create table sourceTable( id int, name varchar ) with ( 'connector.type' = 'kafka', 'connector.property-version' = '1', 'update-mode' = 'append', 'bootstrap.servers' = '192.168.1.160:19092', 'connector.topic' = 'hbtest1', 'connector.startup-mode' = 'earliest-offset' ) """ tEnv.sqlUpdate(sourceDDL) tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() tEnv.execute("") } ``` |
Administrator
|
Hi,
初步看下来你的 DDL 中有这几部分定义的有问题。 1. 缺少format properties 2. 缺少 connector.version 3. bootstrap.severs 的配置方式写的不对... 你可以参考下面这个作为example: CREATE TABLE kafka_json_source ( rowtime TIMESTAMP, user_name VARCHAR, event ROW<message_type VARCHAR, message VARCHAR> ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'test-json', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); Kafka 中的数据长这个样子: {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}} Best, Jark > 在 2019年8月26日,09:52,hb <[hidden email]> 写道: > > flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么. > > > 提示: > Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > > > > 代码: > ``` > import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} > import org.apache.flink.table.api.{EnvironmentSettings, Types} > import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} > import org.apache.flink.types.Row > > > object KafkaInDDL extends App { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) > > > val sourceDDL = > """create table sourceTable( > id int, > name varchar > ) with ( > 'connector.type' = 'kafka', > 'connector.property-version' = '1', > 'update-mode' = 'append', > 'bootstrap.servers' = '192.168.1.160:19092', > 'connector.topic' = 'hbtest1', > 'connector.startup-mode' = 'earliest-offset' > ) > """ > tEnv.sqlUpdate(sourceDDL) > tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() > tEnv.execute("") > } > ``` |
使用了你的ddl语句,还是报一样的错误.
我是在idea里面执行的,maven 配置的依赖. 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道: >Hi, > >初步看下来你的 DDL 中有这几部分定义的有问题。 > >1. 缺少format properties >2. 缺少 connector.version >3. bootstrap.severs 的配置方式写的不对... > > >你可以参考下面这个作为example: > > >CREATE TABLE kafka_json_source ( > rowtime TIMESTAMP, > user_name VARCHAR, > event ROW<message_type VARCHAR, message VARCHAR> >) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test-json', > 'connector.startup-mode' = 'earliest-offset', > 'connector.properties.0.key' = 'zookeeper.connect', > 'connector.properties.0.value' = 'localhost:2181', > 'connector.properties.1.key' = 'bootstrap.servers', > 'connector.properties.1.value' = 'localhost:9092', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' >); > > >Kafka 中的数据长这个样子: > >{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}} > > >Best, >Jark > > >> 在 2019年8月26日,09:52,hb <[hidden email]> 写道: >> >> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么. >> >> >> 提示: >> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in >> the classpath. >> >> >> >> >> 代码: >> ``` >> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >> import org.apache.flink.table.api.{EnvironmentSettings, Types} >> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >> import org.apache.flink.types.Row >> >> >> object KafkaInDDL extends App { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) >> >> >> val sourceDDL = >> """create table sourceTable( >> id int, >> name varchar >> ) with ( >> 'connector.type' = 'kafka', >> 'connector.property-version' = '1', >> 'update-mode' = 'append', >> 'bootstrap.servers' = '192.168.1.160:19092', >> 'connector.topic' = 'hbtest1', >> 'connector.startup-mode' = 'earliest-offset' >> ) >> """ >> tEnv.sqlUpdate(sourceDDL) >> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() >> tEnv.execute("") >> } >> ``` > |
Administrator
|
Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
Best, Jark > 在 2019年8月26日,13:57,hb <[hidden email]> 写道: > > 使用了你的ddl语句,还是报一样的错误. > 我是在idea里面执行的,maven 配置的依赖. > > 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道: >> Hi, >> >> 初步看下来你的 DDL 中有这几部分定义的有问题。 >> >> 1. 缺少format properties >> 2. 缺少 connector.version >> 3. bootstrap.severs 的配置方式写的不对... >> >> >> 你可以参考下面这个作为example: >> >> >> CREATE TABLE kafka_json_source ( >> rowtime TIMESTAMP, >> user_name VARCHAR, >> event ROW<message_type VARCHAR, message VARCHAR> >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = 'test-json', >> 'connector.startup-mode' = 'earliest-offset', >> 'connector.properties.0.key' = 'zookeeper.connect', >> 'connector.properties.0.value' = 'localhost:2181', >> 'connector.properties.1.key' = 'bootstrap.servers', >> 'connector.properties.1.value' = 'localhost:9092', >> 'update-mode' = 'append', >> 'format.type' = 'json', >> 'format.derive-schema' = 'true' >> ); >> >> >> Kafka 中的数据长这个样子: >> >> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}} >> >> >> Best, >> Jark >> >> >>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道: >>> >>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么. >>> >>> >>> 提示: >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in >>> the classpath. >>> >>> >>> >>> >>> 代码: >>> ``` >>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >>> import org.apache.flink.table.api.{EnvironmentSettings, Types} >>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>> import org.apache.flink.types.Row >>> >>> >>> object KafkaInDDL extends App { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) >>> >>> >>> val sourceDDL = >>> """create table sourceTable( >>> id int, >>> name varchar >>> ) with ( >>> 'connector.type' = 'kafka', >>> 'connector.property-version' = '1', >>> 'update-mode' = 'append', >>> 'bootstrap.servers' = '192.168.1.160:19092', >>> 'connector.topic' = 'hbtest1', >>> 'connector.startup-mode' = 'earliest-offset' >>> ) >>> """ >>> tEnv.sqlUpdate(sourceDDL) >>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() >>> tEnv.execute("") >>> } >>> ``` >> |
之前少了 flink-connector-kafka_2.11 依赖,
现在错误变成 Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V 了 pom依赖: ``` <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.9.0</version> <type>pom</type> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> ``` 在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道: >Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 > >Best, >Jark > >> 在 2019年8月26日,13:57,hb <[hidden email]> 写道: >> >> 使用了你的ddl语句,还是报一样的错误. >> 我是在idea里面执行的,maven 配置的依赖. >> >> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道: >>> Hi, >>> >>> 初步看下来你的 DDL 中有这几部分定义的有问题。 >>> >>> 1. 缺少format properties >>> 2. 缺少 connector.version >>> 3. bootstrap.severs 的配置方式写的不对... >>> >>> >>> 你可以参考下面这个作为example: >>> >>> >>> CREATE TABLE kafka_json_source ( >>> rowtime TIMESTAMP, >>> user_name VARCHAR, >>> event ROW<message_type VARCHAR, message VARCHAR> >>> ) WITH ( >>> 'connector.type' = 'kafka', >>> 'connector.version' = 'universal', >>> 'connector.topic' = 'test-json', >>> 'connector.startup-mode' = 'earliest-offset', >>> 'connector.properties.0.key' = 'zookeeper.connect', >>> 'connector.properties.0.value' = 'localhost:2181', >>> 'connector.properties.1.key' = 'bootstrap.servers', >>> 'connector.properties.1.value' = 'localhost:9092', >>> 'update-mode' = 'append', >>> 'format.type' = 'json', >>> 'format.derive-schema' = 'true' >>> ); >>> >>> >>> Kafka 中的数据长这个样子: >>> >>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}} >>> >>> >>> Best, >>> Jark >>> >>> >>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道: >>>> >>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么. >>>> >>>> >>>> 提示: >>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. >>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in >>>> the classpath. >>>> >>>> >>>> >>>> >>>> 代码: >>>> ``` >>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} >>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>>> import org.apache.flink.types.Row >>>> >>>> >>>> object KafkaInDDL extends App { >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) >>>> >>>> >>>> val sourceDDL = >>>> """create table sourceTable( >>>> id int, >>>> name varchar >>>> ) with ( >>>> 'connector.type' = 'kafka', >>>> 'connector.property-version' = '1', >>>> 'update-mode' = 'append', >>>> 'bootstrap.servers' = '192.168.1.160:19092', >>>> 'connector.topic' = 'hbtest1', >>>> 'connector.startup-mode' = 'earliest-offset' >>>> ) >>>> """ >>>> tEnv.sqlUpdate(sourceDDL) >>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() >>>> tEnv.execute("") >>>> } >>>> ``` >>> |
检查一下代码的kafka版本,可能是这方面的错误
[hidden email] 发件人: hb 发送时间: 2019-08-26 15:14 收件人: user-zh 主题: Re:Re: flink1.9 blink planner table ddl 使用问题 之前少了 flink-connector-kafka_2.11 依赖, 现在错误变成 Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V 了 pom依赖: ``` <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.9.0</version> <type>pom</type> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> ``` 在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道: >Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 > >Best, >Jark > >> 在 2019年8月26日,13:57,hb <[hidden email]> 写道: >> >> 使用了你的ddl语句,还是报一样的错误. >> 我是在idea里面执行的,maven 配置的依赖. >> >> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道: >>> Hi, >>> >>> 初步看下来你的 DDL 中有这几部分定义的有问题。 >>> >>> 1. 缺少format properties >>> 2. 缺少 connector.version >>> 3. bootstrap.severs 的配置方式写的不对... >>> >>> >>> 你可以参考下面这个作为example: >>> >>> >>> CREATE TABLE kafka_json_source ( >>> rowtime TIMESTAMP, >>> user_name VARCHAR, >>> event ROW<message_type VARCHAR, message VARCHAR> >>> ) WITH ( >>> 'connector.type' = 'kafka', >>> 'connector.version' = 'universal', >>> 'connector.topic' = 'test-json', >>> 'connector.startup-mode' = 'earliest-offset', >>> 'connector.properties.0.key' = 'zookeeper.connect', >>> 'connector.properties.0.value' = 'localhost:2181', >>> 'connector.properties.1.key' = 'bootstrap.servers', >>> 'connector.properties.1.value' = 'localhost:9092', >>> 'update-mode' = 'append', >>> 'format.type' = 'json', >>> 'format.derive-schema' = 'true' >>> ); >>> >>> >>> Kafka 中的数据长这个样子: >>> >>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}} >>> >>> >>> Best, >>> Jark >>> >>> >>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道: >>>> >>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么. >>>> >>>> >>>> 提示: >>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. >>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in >>>> the classpath. >>>> >>>> >>>> >>>> >>>> 代码: >>>> ``` >>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} >>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>>> import org.apache.flink.types.Row >>>> >>>> >>>> object KafkaInDDL extends App { >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) >>>> >>>> >>>> val sourceDDL = >>>> """create table sourceTable( >>>> id int, >>>> name varchar >>>> ) with ( >>>> 'connector.type' = 'kafka', >>>> 'connector.property-version' = '1', >>>> 'update-mode' = 'append', >>>> 'bootstrap.servers' = '192.168.1.160:19092', >>>> 'connector.topic' = 'hbtest1', >>>> 'connector.startup-mode' = 'earliest-offset' >>>> ) >>>> """ >>>> tEnv.sqlUpdate(sourceDDL) >>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() >>>> tEnv.execute("") >>>> } >>>> ``` >>> |
kafka版本是 kafka_2.11-1.1.0,
支持的kafka版本有哪些 在 2019-08-26 14:23:19,"[hidden email]" <[hidden email]> 写道: >检查一下代码的kafka版本,可能是这方面的错误 > > > >[hidden email] > >发件人: hb >发送时间: 2019-08-26 15:14 >收件人: user-zh >主题: Re:Re: flink1.9 blink planner table ddl 使用问题 >之前少了 flink-connector-kafka_2.11 依赖, >现在错误变成 Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V >了 > > >pom依赖: >``` > <dependencies> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>${flink.version}</version> > > > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.11</artifactId> > <version>${flink.version}</version> > > > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_2.11</artifactId> > <version>${flink.version}</version> > > > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_2.11</artifactId> > <version>${flink.version}</version> > > > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table</artifactId> > <version>1.9.0</version> > <type>pom</type> > <scope>provided</scope> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-cep-scala_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-filesystem_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > > > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-scala-bridge_2.11</artifactId> > <version>${flink.version}</version> ><!-- <scope>provided</scope>--> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_2.11</artifactId> > <version>${flink.version}</version> > <!-- <scope>provided</scope>--> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner_2.11</artifactId> > <version>${flink.version}</version> > <!-- <scope>provided</scope>--> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-runtime-blink_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > > > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner-blink_2.11</artifactId> > <version>${flink.version}</version> > <!-- <scope>provided</scope>--> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-elasticsearch6_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.11_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-runtime-web_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > </dependencies> > > >``` > > > > > > > > >在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道: >>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 >> >>Best, >>Jark >> >>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道: >>> >>> 使用了你的ddl语句,还是报一样的错误. >>> 我是在idea里面执行的,maven 配置的依赖. >>> >>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道: >>>> Hi, >>>> >>>> 初步看下来你的 DDL 中有这几部分定义的有问题。 >>>> >>>> 1. 缺少format properties >>>> 2. 缺少 connector.version >>>> 3. bootstrap.severs 的配置方式写的不对... >>>> >>>> >>>> 你可以参考下面这个作为example: >>>> >>>> >>>> CREATE TABLE kafka_json_source ( >>>> rowtime TIMESTAMP, >>>> user_name VARCHAR, >>>> event ROW<message_type VARCHAR, message VARCHAR> >>>> ) WITH ( >>>> 'connector.type' = 'kafka', >>>> 'connector.version' = 'universal', >>>> 'connector.topic' = 'test-json', >>>> 'connector.startup-mode' = 'earliest-offset', >>>> 'connector.properties.0.key' = 'zookeeper.connect', >>>> 'connector.properties.0.value' = 'localhost:2181', >>>> 'connector.properties.1.key' = 'bootstrap.servers', >>>> 'connector.properties.1.value' = 'localhost:9092', >>>> 'update-mode' = 'append', >>>> 'format.type' = 'json', >>>> 'format.derive-schema' = 'true' >>>> ); >>>> >>>> >>>> Kafka 中的数据长这个样子: >>>> >>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}} >>>> >>>> >>>> Best, >>>> Jark >>>> >>>> >>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道: >>>>> >>>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么. >>>>> >>>>> >>>>> 提示: >>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. >>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in >>>>> the classpath. >>>>> >>>>> >>>>> >>>>> >>>>> 代码: >>>>> ``` >>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} >>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>>>> import org.apache.flink.types.Row >>>>> >>>>> >>>>> object KafkaInDDL extends App { >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) >>>>> >>>>> >>>>> val sourceDDL = >>>>> """create table sourceTable( >>>>> id int, >>>>> name varchar >>>>> ) with ( >>>>> 'connector.type' = 'kafka', >>>>> 'connector.property-version' = '1', >>>>> 'update-mode' = 'append', >>>>> 'bootstrap.servers' = '192.168.1.160:19092', >>>>> 'connector.topic' = 'hbtest1', >>>>> 'connector.startup-mode' = 'earliest-offset' >>>>> ) >>>>> """ >>>>> tEnv.sqlUpdate(sourceDDL) >>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() >>>>> tEnv.execute("") >>>>> } >>>>> ``` >>>> |
感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题. 在 2019-08-26 14:26:15,"hb" <[hidden email]> 写道: >kafka版本是 kafka_2.11-1.1.0, >支持的kafka版本有哪些 >在 2019-08-26 14:23:19,"[hidden email]" <[hidden email]> 写道: >>检查一下代码的kafka版本,可能是这方面的错误 >> >> >> >>[hidden email] >> >>发件人: hb >>发送时间: 2019-08-26 15:14 >>收件人: user-zh >>主题: Re:Re: flink1.9 blink planner table ddl 使用问题 >>之前少了 flink-connector-kafka_2.11 依赖, >>现在错误变成 Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V >>了 >> >> >>pom依赖: >>``` >> <dependencies> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-core</artifactId> >> <version>${flink.version}</version> >> >> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients_2.11</artifactId> >> <version>${flink.version}</version> >> >> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-scala_2.11</artifactId> >> <version>${flink.version}</version> >> >> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-scala_2.11</artifactId> >> <version>${flink.version}</version> >> >> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table</artifactId> >> <version>1.9.0</version> >> <type>pom</type> >> <scope>provided</scope> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-common</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-cep-scala_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-filesystem_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> >> <version>${flink.version}</version> >><!-- <scope>provided</scope>--> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-java-bridge_2.11</artifactId> >> <version>${flink.version}</version> >> <!-- <scope>provided</scope>--> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-planner_2.11</artifactId> >> <version>${flink.version}</version> >> <!-- <scope>provided</scope>--> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-runtime-blink_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-planner-blink_2.11</artifactId> >> <version>${flink.version}</version> >> <!-- <scope>provided</scope>--> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-json</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-runtime-web_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> </dependencies> >> >> >>``` >> >> >> >> >> >> >> >> >>在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道: >>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 >>> >>>Best, >>>Jark >>> >>>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道: >>>> >>>> 使用了你的ddl语句,还是报一样的错误. >>>> 我是在idea里面执行的,maven 配置的依赖. >>>> >>>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道: >>>>> Hi, >>>>> >>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。 >>>>> >>>>> 1. 缺少format properties >>>>> 2. 缺少 connector.version >>>>> 3. bootstrap.severs 的配置方式写的不对... >>>>> >>>>> >>>>> 你可以参考下面这个作为example: >>>>> >>>>> >>>>> CREATE TABLE kafka_json_source ( >>>>> rowtime TIMESTAMP, >>>>> user_name VARCHAR, >>>>> event ROW<message_type VARCHAR, message VARCHAR> >>>>> ) WITH ( >>>>> 'connector.type' = 'kafka', >>>>> 'connector.version' = 'universal', >>>>> 'connector.topic' = 'test-json', >>>>> 'connector.startup-mode' = 'earliest-offset', >>>>> 'connector.properties.0.key' = 'zookeeper.connect', >>>>> 'connector.properties.0.value' = 'localhost:2181', >>>>> 'connector.properties.1.key' = 'bootstrap.servers', >>>>> 'connector.properties.1.value' = 'localhost:9092', >>>>> 'update-mode' = 'append', >>>>> 'format.type' = 'json', >>>>> 'format.derive-schema' = 'true' >>>>> ); >>>>> >>>>> >>>>> Kafka 中的数据长这个样子: >>>>> >>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}} >>>>> >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> >>>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道: >>>>>> >>>>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么. >>>>>> >>>>>> >>>>>> 提示: >>>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. >>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in >>>>>> the classpath. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 代码: >>>>>> ``` >>>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} >>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>>>>> import org.apache.flink.types.Row >>>>>> >>>>>> >>>>>> object KafkaInDDL extends App { >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) >>>>>> >>>>>> >>>>>> val sourceDDL = >>>>>> """create table sourceTable( >>>>>> id int, >>>>>> name varchar >>>>>> ) with ( >>>>>> 'connector.type' = 'kafka', >>>>>> 'connector.property-version' = '1', >>>>>> 'update-mode' = 'append', >>>>>> 'bootstrap.servers' = '192.168.1.160:19092', >>>>>> 'connector.topic' = 'hbtest1', >>>>>> 'connector.startup-mode' = 'earliest-offset' >>>>>> ) >>>>>> """ >>>>>> tEnv.sqlUpdate(sourceDDL) >>>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() >>>>>> tEnv.execute("") >>>>>> } >>>>>> ``` >>>>> |
这部分有文档吗,看了好几圈没看到
hb <[hidden email]> 于2019年8月26日周一 下午3:34写道: > 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了. > Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题. > > 在 2019-08-26 14:26:15,"hb" <[hidden email]> 写道: > >kafka版本是 kafka_2.11-1.1.0, > >支持的kafka版本有哪些 > >在 2019-08-26 14:23:19,"[hidden email]" < > [hidden email]> 写道: > >>检查一下代码的kafka版本,可能是这方面的错误 > >> > >> > >> > >>[hidden email] > >> > >>发件人: hb > >>发送时间: 2019-08-26 15:14 > >>收件人: user-zh > >>主题: Re:Re: flink1.9 blink planner table ddl 使用问题 > >>之前少了 flink-connector-kafka_2.11 依赖, > >>现在错误变成 Caused by: java.lang.NoSuchMethodError: > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V > >>了 > >> > >> > >>pom依赖: > >>``` > >> <dependencies> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-core</artifactId> > >> <version>${flink.version}</version> > >> > >> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-clients_2.11</artifactId> > >> <version>${flink.version}</version> > >> > >> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-scala_2.11</artifactId> > >> <version>${flink.version}</version> > >> > >> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-streaming-scala_2.11</artifactId> > >> <version>${flink.version}</version> > >> > >> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-table</artifactId> > >> <version>1.9.0</version> > >> <type>pom</type> > >> <scope>provided</scope> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-table-common</artifactId> > >> <version>${flink.version}</version> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-cep-scala_2.11</artifactId> > >> <version>${flink.version}</version> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-connector-filesystem_2.11</artifactId> > >> <version>${flink.version}</version> > >> </dependency> > >> > >> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> > >> <version>${flink.version}</version> > >><!-- <scope>provided</scope>--> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-table-api-java-bridge_2.11</artifactId> > >> <version>${flink.version}</version> > >> <!-- <scope>provided</scope>--> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-table-planner_2.11</artifactId> > >> <version>${flink.version}</version> > >> <!-- <scope>provided</scope>--> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-table-runtime-blink_2.11</artifactId> > >> <version>${flink.version}</version> > >> </dependency> > >> > >> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-table-planner-blink_2.11</artifactId> > >> <version>${flink.version}</version> > >> <!-- <scope>provided</scope>--> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> > >> <version>${flink.version}</version> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> > >> <version>${flink.version}</version> > >> </dependency> > >> > >> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-connector-kafka_2.11</artifactId> > >> <version>${flink.version}</version> > >> </dependency> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-json</artifactId> > >> <version>${flink.version}</version> > >> </dependency> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-runtime-web_2.11</artifactId> > >> <version>${flink.version}</version> > >> </dependency> > >> </dependencies> > >> > >> > >>``` > >> > >> > >> > >> > >> > >> > >> > >> > >>在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道: > >>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 > >>> > >>>Best, > >>>Jark > >>> > >>>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道: > >>>> > >>>> 使用了你的ddl语句,还是报一样的错误. > >>>> 我是在idea里面执行的,maven 配置的依赖. > >>>> > >>>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道: > >>>>> Hi, > >>>>> > >>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。 > >>>>> > >>>>> 1. 缺少format properties > >>>>> 2. 缺少 connector.version > >>>>> 3. bootstrap.severs 的配置方式写的不对... > >>>>> > >>>>> > >>>>> 你可以参考下面这个作为example: > >>>>> > >>>>> > >>>>> CREATE TABLE kafka_json_source ( > >>>>> rowtime TIMESTAMP, > >>>>> user_name VARCHAR, > >>>>> event ROW<message_type VARCHAR, message VARCHAR> > >>>>> ) WITH ( > >>>>> 'connector.type' = 'kafka', > >>>>> 'connector.version' = 'universal', > >>>>> 'connector.topic' = 'test-json', > >>>>> 'connector.startup-mode' = 'earliest-offset', > >>>>> 'connector.properties.0.key' = 'zookeeper.connect', > >>>>> 'connector.properties.0.value' = 'localhost:2181', > >>>>> 'connector.properties.1.key' = 'bootstrap.servers', > >>>>> 'connector.properties.1.value' = 'localhost:9092', > >>>>> 'update-mode' = 'append', > >>>>> 'format.type' = 'json', > >>>>> 'format.derive-schema' = 'true' > >>>>> ); > >>>>> > >>>>> > >>>>> Kafka 中的数据长这个样子: > >>>>> > >>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { > "message_type": "WARNING", "message": "This is a warning."}} > >>>>> > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> > >>>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道: > >>>>>> > >>>>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 > 需要实现TableSourceFactory,还是其他什么. > >>>>>> > >>>>>> > >>>>>> 提示: > >>>>>> Exception in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. > findAndCreateTableSource failed. > >>>>>> Caused by: > org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find > a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > >>>>>> the classpath. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> 代码: > >>>>>> ``` > >>>>>> import > org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} > >>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} > >>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} > >>>>>> import org.apache.flink.types.Row > >>>>>> > >>>>>> > >>>>>> object KafkaInDDL extends App { > >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment > >>>>>> val settings: EnvironmentSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >>>>>> val tEnv: StreamTableEnvironment = > StreamTableEnvironment.create(env, settings) > >>>>>> > >>>>>> > >>>>>> val sourceDDL = > >>>>>> """create table sourceTable( > >>>>>> id int, > >>>>>> name varchar > >>>>>> ) with ( > >>>>>> 'connector.type' = 'kafka', > >>>>>> 'connector.property-version' = '1', > >>>>>> 'update-mode' = 'append', > >>>>>> 'bootstrap.servers' = ' > 192.168.1.160:19092', > >>>>>> 'connector.topic' = 'hbtest1', > >>>>>> 'connector.startup-mode' = > 'earliest-offset' > >>>>>> ) > >>>>>> """ > >>>>>> tEnv.sqlUpdate(sourceDDL) > >>>>>> tEnv.sqlQuery("select * from > sourceTable").toAppendStream[Row].print() > >>>>>> tEnv.execute("") > >>>>>> } > >>>>>> ``` > >>>>> > |
Administrator
|
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector>
> 在 2019年8月27日,17:59,徐骁 <[hidden email]> 写道: > > 这部分有文档吗,看了好几圈没看到 > > hb <[hidden email]> 于2019年8月26日周一 下午3:34写道: > >> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了. >> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题. >> >> 在 2019-08-26 14:26:15,"hb" <[hidden email]> 写道: >>> kafka版本是 kafka_2.11-1.1.0, >>> 支持的kafka版本有哪些 >>> 在 2019-08-26 14:23:19,"[hidden email]" < >> [hidden email]> 写道: >>>> 检查一下代码的kafka版本,可能是这方面的错误 >>>> >>>> >>>> >>>> [hidden email] >>>> >>>> 发件人: hb >>>> 发送时间: 2019-08-26 15:14 >>>> 收件人: user-zh >>>> 主题: Re:Re: flink1.9 blink planner table ddl 使用问题 >>>> 之前少了 flink-connector-kafka_2.11 依赖, >>>> 现在错误变成 Caused by: java.lang.NoSuchMethodError: >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V >>>> 了 >>>> >>>> >>>> pom依赖: >>>> ``` >>>> <dependencies> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-core</artifactId> >>>> <version>${flink.version}</version> >>>> >>>> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-clients_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> >>>> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-scala_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> >>>> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-streaming-scala_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> >>>> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table</artifactId> >>>> <version>1.9.0</version> >>>> <type>pom</type> >>>> <scope>provided</scope> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-common</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-cep-scala_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-filesystem_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> >>>> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> <!-- <scope>provided</scope>--> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> <!-- <scope>provided</scope>--> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-planner_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> <!-- <scope>provided</scope>--> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-runtime-blink_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> >>>> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-planner-blink_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> <!-- <scope>provided</scope>--> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-kafka_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-json</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-runtime-web_2.11</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> </dependencies> >>>> >>>> >>>> ``` >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> 在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道: >>>>> Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道: >>>>>> >>>>>> 使用了你的ddl语句,还是报一样的错误. >>>>>> 我是在idea里面执行的,maven 配置的依赖. >>>>>> >>>>>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道: >>>>>>> Hi, >>>>>>> >>>>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。 >>>>>>> >>>>>>> 1. 缺少format properties >>>>>>> 2. 缺少 connector.version >>>>>>> 3. bootstrap.severs 的配置方式写的不对... >>>>>>> >>>>>>> >>>>>>> 你可以参考下面这个作为example: >>>>>>> >>>>>>> >>>>>>> CREATE TABLE kafka_json_source ( >>>>>>> rowtime TIMESTAMP, >>>>>>> user_name VARCHAR, >>>>>>> event ROW<message_type VARCHAR, message VARCHAR> >>>>>>> ) WITH ( >>>>>>> 'connector.type' = 'kafka', >>>>>>> 'connector.version' = 'universal', >>>>>>> 'connector.topic' = 'test-json', >>>>>>> 'connector.startup-mode' = 'earliest-offset', >>>>>>> 'connector.properties.0.key' = 'zookeeper.connect', >>>>>>> 'connector.properties.0.value' = 'localhost:2181', >>>>>>> 'connector.properties.1.key' = 'bootstrap.servers', >>>>>>> 'connector.properties.1.value' = 'localhost:9092', >>>>>>> 'update-mode' = 'append', >>>>>>> 'format.type' = 'json', >>>>>>> 'format.derive-schema' = 'true' >>>>>>> ); >>>>>>> >>>>>>> >>>>>>> Kafka 中的数据长这个样子: >>>>>>> >>>>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { >> "message_type": "WARNING", "message": "This is a warning."}} >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> >>>>>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道: >>>>>>>> >>>>>>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 >> 需要实现TableSourceFactory,还是其他什么. >>>>>>>> >>>>>>>> >>>>>>>> 提示: >>>>>>>> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: SQL validation failed. >> findAndCreateTableSource failed. >>>>>>>> Caused by: >> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find >> a suitable table factory for >> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>>>>> the classpath. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 代码: >>>>>>>> ``` >>>>>>>> import >> org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >>>>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} >>>>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>>>>>>> import org.apache.flink.types.Row >>>>>>>> >>>>>>>> >>>>>>>> object KafkaInDDL extends App { >>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> val settings: EnvironmentSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>>>> val tEnv: StreamTableEnvironment = >> StreamTableEnvironment.create(env, settings) >>>>>>>> >>>>>>>> >>>>>>>> val sourceDDL = >>>>>>>> """create table sourceTable( >>>>>>>> id int, >>>>>>>> name varchar >>>>>>>> ) with ( >>>>>>>> 'connector.type' = 'kafka', >>>>>>>> 'connector.property-version' = '1', >>>>>>>> 'update-mode' = 'append', >>>>>>>> 'bootstrap.servers' = ' >> 192.168.1.160:19092', >>>>>>>> 'connector.topic' = 'hbtest1', >>>>>>>> 'connector.startup-mode' = >> 'earliest-offset' >>>>>>>> ) >>>>>>>> """ >>>>>>>> tEnv.sqlUpdate(sourceDDL) >>>>>>>> tEnv.sqlQuery("select * from >> sourceTable").toAppendStream[Row].print() >>>>>>>> tEnv.execute("") >>>>>>>> } >>>>>>>> ``` >>>>>>> >> |
🤗感谢
Jark Wu <[hidden email]> 于2019年8月27日周二 下午6:49写道: > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector > < > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector > > > > > 在 2019年8月27日,17:59,徐骁 <[hidden email]> 写道: > > > > 这部分有文档吗,看了好几圈没看到 > > > > hb <[hidden email]> 于2019年8月26日周一 下午3:34写道: > > > >> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了. > >> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题. > >> > >> 在 2019-08-26 14:26:15,"hb" <[hidden email]> 写道: > >>> kafka版本是 kafka_2.11-1.1.0, > >>> 支持的kafka版本有哪些 > >>> 在 2019-08-26 14:23:19,"[hidden email]" < > >> [hidden email]> 写道: > >>>> 检查一下代码的kafka版本,可能是这方面的错误 > >>>> > >>>> > >>>> > >>>> [hidden email] > >>>> > >>>> 发件人: hb > >>>> 发送时间: 2019-08-26 15:14 > >>>> 收件人: user-zh > >>>> 主题: Re:Re: flink1.9 blink planner table ddl 使用问题 > >>>> 之前少了 flink-connector-kafka_2.11 依赖, > >>>> 现在错误变成 Caused by: java.lang.NoSuchMethodError: > >> > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V > >>>> 了 > >>>> > >>>> > >>>> pom依赖: > >>>> ``` > >>>> <dependencies> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-core</artifactId> > >>>> <version>${flink.version}</version> > >>>> > >>>> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-clients_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> > >>>> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-scala_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> > >>>> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-streaming-scala_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> > >>>> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table</artifactId> > >>>> <version>1.9.0</version> > >>>> <type>pom</type> > >>>> <scope>provided</scope> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-common</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-cep-scala_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-connector-filesystem_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> <!-- <scope>provided</scope>--> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> <!-- <scope>provided</scope>--> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-planner_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> <!-- <scope>provided</scope>--> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-runtime-blink_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-planner-blink_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> <!-- <scope>provided</scope>--> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-connector-kafka_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-json</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-runtime-web_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> </dependencies> > >>>> > >>>> > >>>> ``` > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> 在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道: > >>>>> Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道: > >>>>>> > >>>>>> 使用了你的ddl语句,还是报一样的错误. > >>>>>> 我是在idea里面执行的,maven 配置的依赖. > >>>>>> > >>>>>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道: > >>>>>>> Hi, > >>>>>>> > >>>>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。 > >>>>>>> > >>>>>>> 1. 缺少format properties > >>>>>>> 2. 缺少 connector.version > >>>>>>> 3. bootstrap.severs 的配置方式写的不对... > >>>>>>> > >>>>>>> > >>>>>>> 你可以参考下面这个作为example: > >>>>>>> > >>>>>>> > >>>>>>> CREATE TABLE kafka_json_source ( > >>>>>>> rowtime TIMESTAMP, > >>>>>>> user_name VARCHAR, > >>>>>>> event ROW<message_type VARCHAR, message VARCHAR> > >>>>>>> ) WITH ( > >>>>>>> 'connector.type' = 'kafka', > >>>>>>> 'connector.version' = 'universal', > >>>>>>> 'connector.topic' = 'test-json', > >>>>>>> 'connector.startup-mode' = 'earliest-offset', > >>>>>>> 'connector.properties.0.key' = 'zookeeper.connect', > >>>>>>> 'connector.properties.0.value' = 'localhost:2181', > >>>>>>> 'connector.properties.1.key' = 'bootstrap.servers', > >>>>>>> 'connector.properties.1.value' = 'localhost:9092', > >>>>>>> 'update-mode' = 'append', > >>>>>>> 'format.type' = 'json', > >>>>>>> 'format.derive-schema' = 'true' > >>>>>>> ); > >>>>>>> > >>>>>>> > >>>>>>> Kafka 中的数据长这个样子: > >>>>>>> > >>>>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": > { > >> "message_type": "WARNING", "message": "This is a warning."}} > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Jark > >>>>>>> > >>>>>>> > >>>>>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道: > >>>>>>>> > >>>>>>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 > >> 需要实现TableSourceFactory,还是其他什么. > >>>>>>>> > >>>>>>>> > >>>>>>>> 提示: > >>>>>>>> Exception in thread "main" > >> org.apache.flink.table.api.ValidationException: SQL validation failed. > >> findAndCreateTableSource failed. > >>>>>>>> Caused by: > >> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not > find > >> a suitable table factory for > >> 'org.apache.flink.table.factories.TableSourceFactory' in > >>>>>>>> the classpath. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 代码: > >>>>>>>> ``` > >>>>>>>> import > >> org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} > >>>>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} > >>>>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, > _} > >>>>>>>> import org.apache.flink.types.Row > >>>>>>>> > >>>>>>>> > >>>>>>>> object KafkaInDDL extends App { > >>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment > >>>>>>>> val settings: EnvironmentSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >>>>>>>> val tEnv: StreamTableEnvironment = > >> StreamTableEnvironment.create(env, settings) > >>>>>>>> > >>>>>>>> > >>>>>>>> val sourceDDL = > >>>>>>>> """create table sourceTable( > >>>>>>>> id int, > >>>>>>>> name varchar > >>>>>>>> ) with ( > >>>>>>>> 'connector.type' = 'kafka', > >>>>>>>> 'connector.property-version' = '1', > >>>>>>>> 'update-mode' = 'append', > >>>>>>>> 'bootstrap.servers' = ' > >> 192.168.1.160:19092', > >>>>>>>> 'connector.topic' = 'hbtest1', > >>>>>>>> 'connector.startup-mode' = > >> 'earliest-offset' > >>>>>>>> ) > >>>>>>>> """ > >>>>>>>> tEnv.sqlUpdate(sourceDDL) > >>>>>>>> tEnv.sqlQuery("select * from > >> sourceTable").toAppendStream[Row].print() > >>>>>>>> tEnv.execute("") > >>>>>>>> } > >>>>>>>> ``` > >>>>>>> > >> > > |
Free forum by Nabble | Edit this page |