flink1.9 blink planner table ddl 使用问题

classic Classic list List threaded Threaded
11 messages Options
hb
Reply | Threaded
Open this post in threaded view
|

flink1.9 blink planner table ddl 使用问题

hb
flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.


提示:  
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.




代码:
```
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.{EnvironmentSettings, Types}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row


object KafkaInDDL extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)


  val sourceDDL =
    """create table sourceTable(
                            id int,
                            name varchar
                          ) with (
                            'connector.type' = 'kafka',
                            'connector.property-version' = '1',
                            'update-mode' = 'append',
                            'bootstrap.servers' = '192.168.1.160:19092',
                            'connector.topic' = 'hbtest1',
                            'connector.startup-mode' = 'earliest-offset'
                          )
    """
  tEnv.sqlUpdate(sourceDDL)
  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
  tEnv.execute("")
}
```
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 blink planner table ddl 使用问题

Jark
Administrator
Hi,

初步看下来你的 DDL 中有这几部分定义的有问题。

1. 缺少format properties
2. 缺少 connector.version
3. bootstrap.severs 的配置方式写的不对...


你可以参考下面这个作为example:


CREATE TABLE kafka_json_source (
    rowtime TIMESTAMP,
    user_name VARCHAR,
    event ROW<message_type VARCHAR, message VARCHAR>
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test-json',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = 'localhost:2181',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = 'localhost:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);


Kafka 中的数据长这个样子:

{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}}


Best,
Jark


> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
>
> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.
>
>
> 提示:  
> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
>
>
>
> 代码:
> ```
> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> import org.apache.flink.table.api.{EnvironmentSettings, Types}
> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
> import org.apache.flink.types.Row
>
>
> object KafkaInDDL extends App {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>
>
>  val sourceDDL =
>    """create table sourceTable(
>                            id int,
>                            name varchar
>                          ) with (
>                            'connector.type' = 'kafka',
>                            'connector.property-version' = '1',
>                            'update-mode' = 'append',
>                            'bootstrap.servers' = '192.168.1.160:19092',
>                            'connector.topic' = 'hbtest1',
>                            'connector.startup-mode' = 'earliest-offset'
>                          )
>    """
>  tEnv.sqlUpdate(sourceDDL)
>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>  tEnv.execute("")
> }
> ```

hb
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink1.9 blink planner table ddl 使用问题

hb
使用了你的ddl语句,还是报一样的错误.
我是在idea里面执行的,maven 配置的依赖.

在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道:

>Hi,
>
>初步看下来你的 DDL 中有这几部分定义的有问题。
>
>1. 缺少format properties
>2. 缺少 connector.version
>3. bootstrap.severs 的配置方式写的不对...
>
>
>你可以参考下面这个作为example:
>
>
>CREATE TABLE kafka_json_source (
>    rowtime TIMESTAMP,
>    user_name VARCHAR,
>    event ROW<message_type VARCHAR, message VARCHAR>
>) WITH (
>    'connector.type' = 'kafka',
>    'connector.version' = 'universal',
>    'connector.topic' = 'test-json',
>    'connector.startup-mode' = 'earliest-offset',
>    'connector.properties.0.key' = 'zookeeper.connect',
>    'connector.properties.0.value' = 'localhost:2181',
>    'connector.properties.1.key' = 'bootstrap.servers',
>    'connector.properties.1.value' = 'localhost:9092',
>    'update-mode' = 'append',
>    'format.type' = 'json',
>    'format.derive-schema' = 'true'
>);
>
>
>Kafka 中的数据长这个样子:
>
>{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}}
>
>
>Best,
>Jark
>
>
>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
>>
>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.
>>
>>
>> 提示:  
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>>
>>
>>
>>
>> 代码:
>> ```
>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>> import org.apache.flink.types.Row
>>
>>
>> object KafkaInDDL extends App {
>>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>>  val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>>
>>
>>  val sourceDDL =
>>    """create table sourceTable(
>>                            id int,
>>                            name varchar
>>                          ) with (
>>                            'connector.type' = 'kafka',
>>                            'connector.property-version' = '1',
>>                            'update-mode' = 'append',
>>                            'bootstrap.servers' = '192.168.1.160:19092',
>>                            'connector.topic' = 'hbtest1',
>>                            'connector.startup-mode' = 'earliest-offset'
>>                          )
>>    """
>>  tEnv.sqlUpdate(sourceDDL)
>>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>  tEnv.execute("")
>> }
>> ```
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 blink planner table ddl 使用问题

Jark
Administrator
Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11

Best,
Jark

> 在 2019年8月26日,13:57,hb <[hidden email]> 写道:
>
> 使用了你的ddl语句,还是报一样的错误.
> 我是在idea里面执行的,maven 配置的依赖.
>
> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道:
>> Hi,
>>
>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>
>> 1. 缺少format properties
>> 2. 缺少 connector.version
>> 3. bootstrap.severs 的配置方式写的不对...
>>
>>
>> 你可以参考下面这个作为example:
>>
>>
>> CREATE TABLE kafka_json_source (
>>   rowtime TIMESTAMP,
>>   user_name VARCHAR,
>>   event ROW<message_type VARCHAR, message VARCHAR>
>> ) WITH (
>>   'connector.type' = 'kafka',
>>   'connector.version' = 'universal',
>>   'connector.topic' = 'test-json',
>>   'connector.startup-mode' = 'earliest-offset',
>>   'connector.properties.0.key' = 'zookeeper.connect',
>>   'connector.properties.0.value' = 'localhost:2181',
>>   'connector.properties.1.key' = 'bootstrap.servers',
>>   'connector.properties.1.value' = 'localhost:9092',
>>   'update-mode' = 'append',
>>   'format.type' = 'json',
>>   'format.derive-schema' = 'true'
>> );
>>
>>
>> Kafka 中的数据长这个样子:
>>
>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}}
>>
>>
>> Best,
>> Jark
>>
>>
>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
>>>
>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.
>>>
>>>
>>> 提示:  
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
>>> the classpath.
>>>
>>>
>>>
>>>
>>> 代码:
>>> ```
>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>> import org.apache.flink.types.Row
>>>
>>>
>>> object KafkaInDDL extends App {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>>>
>>>
>>> val sourceDDL =
>>>   """create table sourceTable(
>>>                           id int,
>>>                           name varchar
>>>                         ) with (
>>>                           'connector.type' = 'kafka',
>>>                           'connector.property-version' = '1',
>>>                           'update-mode' = 'append',
>>>                           'bootstrap.servers' = '192.168.1.160:19092',
>>>                           'connector.topic' = 'hbtest1',
>>>                           'connector.startup-mode' = 'earliest-offset'
>>>                         )
>>>   """
>>> tEnv.sqlUpdate(sourceDDL)
>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>> tEnv.execute("")
>>> }
>>> ```
>>

hb
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink1.9 blink planner table ddl 使用问题

hb
之前少了 flink-connector-kafka_2.11 依赖,
现在错误变成  Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V



pom依赖:
```
    <dependencies>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>


        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>


        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>


        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>


        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>1.9.0</version>
            <type>pom</type>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>




        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>




        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>


```








在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道:

>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
>
>Best,
>Jark
>
>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道:
>>
>> 使用了你的ddl语句,还是报一样的错误.
>> 我是在idea里面执行的,maven 配置的依赖.
>>
>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道:
>>> Hi,
>>>
>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>>
>>> 1. 缺少format properties
>>> 2. 缺少 connector.version
>>> 3. bootstrap.severs 的配置方式写的不对...
>>>
>>>
>>> 你可以参考下面这个作为example:
>>>
>>>
>>> CREATE TABLE kafka_json_source (
>>>   rowtime TIMESTAMP,
>>>   user_name VARCHAR,
>>>   event ROW<message_type VARCHAR, message VARCHAR>
>>> ) WITH (
>>>   'connector.type' = 'kafka',
>>>   'connector.version' = 'universal',
>>>   'connector.topic' = 'test-json',
>>>   'connector.startup-mode' = 'earliest-offset',
>>>   'connector.properties.0.key' = 'zookeeper.connect',
>>>   'connector.properties.0.value' = 'localhost:2181',
>>>   'connector.properties.1.key' = 'bootstrap.servers',
>>>   'connector.properties.1.value' = 'localhost:9092',
>>>   'update-mode' = 'append',
>>>   'format.type' = 'json',
>>>   'format.derive-schema' = 'true'
>>> );
>>>
>>>
>>> Kafka 中的数据长这个样子:
>>>
>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}}
>>>
>>>
>>> Best,
>>> Jark
>>>
>>>
>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
>>>>
>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.
>>>>
>>>>
>>>> 提示:  
>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
>>>> the classpath.
>>>>
>>>>
>>>>
>>>>
>>>> 代码:
>>>> ```
>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>>> import org.apache.flink.types.Row
>>>>
>>>>
>>>> object KafkaInDDL extends App {
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>>>>
>>>>
>>>> val sourceDDL =
>>>>   """create table sourceTable(
>>>>                           id int,
>>>>                           name varchar
>>>>                         ) with (
>>>>                           'connector.type' = 'kafka',
>>>>                           'connector.property-version' = '1',
>>>>                           'update-mode' = 'append',
>>>>                           'bootstrap.servers' = '192.168.1.160:19092',
>>>>                           'connector.topic' = 'hbtest1',
>>>>                           'connector.startup-mode' = 'earliest-offset'
>>>>                         )
>>>>   """
>>>> tEnv.sqlUpdate(sourceDDL)
>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>>> tEnv.execute("")
>>>> }
>>>> ```
>>>
Reply | Threaded
Open this post in threaded view
|

回复: Re: flink1.9 blink planner table ddl 使用问题

pengchenglin@bonc.com.cn
检查一下代码的kafka版本,可能是这方面的错误



[hidden email]
 
发件人: hb
发送时间: 2019-08-26 15:14
收件人: user-zh
主题: Re:Re: flink1.9 blink planner table ddl 使用问题
之前少了 flink-connector-kafka_2.11 依赖,
现在错误变成  Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V

 
 
pom依赖:
```
    <dependencies>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
 
 
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
 
 
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
 
 
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
 
 
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>1.9.0</version>
            <type>pom</type>
            <scope>provided</scope>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
 
 
```
 
 
 
 
 
 
 
 
在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道:

>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
>
>Best,
>Jark
>
>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道:
>>
>> 使用了你的ddl语句,还是报一样的错误.
>> 我是在idea里面执行的,maven 配置的依赖.
>>
>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道:
>>> Hi,
>>>
>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>>
>>> 1. 缺少format properties
>>> 2. 缺少 connector.version
>>> 3. bootstrap.severs 的配置方式写的不对...
>>>
>>>
>>> 你可以参考下面这个作为example:
>>>
>>>
>>> CREATE TABLE kafka_json_source (
>>>   rowtime TIMESTAMP,
>>>   user_name VARCHAR,
>>>   event ROW<message_type VARCHAR, message VARCHAR>
>>> ) WITH (
>>>   'connector.type' = 'kafka',
>>>   'connector.version' = 'universal',
>>>   'connector.topic' = 'test-json',
>>>   'connector.startup-mode' = 'earliest-offset',
>>>   'connector.properties.0.key' = 'zookeeper.connect',
>>>   'connector.properties.0.value' = 'localhost:2181',
>>>   'connector.properties.1.key' = 'bootstrap.servers',
>>>   'connector.properties.1.value' = 'localhost:9092',
>>>   'update-mode' = 'append',
>>>   'format.type' = 'json',
>>>   'format.derive-schema' = 'true'
>>> );
>>>
>>>
>>> Kafka 中的数据长这个样子:
>>>
>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}}
>>>
>>>
>>> Best,
>>> Jark
>>>
>>>
>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
>>>>
>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.
>>>>
>>>>
>>>> 提示:  
>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
>>>> the classpath.
>>>>
>>>>
>>>>
>>>>
>>>> 代码:
>>>> ```
>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>>> import org.apache.flink.types.Row
>>>>
>>>>
>>>> object KafkaInDDL extends App {
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>>>>
>>>>
>>>> val sourceDDL =
>>>>   """create table sourceTable(
>>>>                           id int,
>>>>                           name varchar
>>>>                         ) with (
>>>>                           'connector.type' = 'kafka',
>>>>                           'connector.property-version' = '1',
>>>>                           'update-mode' = 'append',
>>>>                           'bootstrap.servers' = '192.168.1.160:19092',
>>>>                           'connector.topic' = 'hbtest1',
>>>>                           'connector.startup-mode' = 'earliest-offset'
>>>>                         )
>>>>   """
>>>> tEnv.sqlUpdate(sourceDDL)
>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>>> tEnv.execute("")
>>>> }
>>>> ```
>>>
hb
Reply | Threaded
Open this post in threaded view
|

Re:回复: Re: flink1.9 blink planner table ddl 使用问题

hb
kafka版本是 kafka_2.11-1.1.0,
支持的kafka版本有哪些
在 2019-08-26 14:23:19,"[hidden email]" <[hidden email]> 写道:

>检查一下代码的kafka版本,可能是这方面的错误
>
>
>
>[hidden email]
>
>发件人: hb
>发送时间: 2019-08-26 15:14
>收件人: user-zh
>主题: Re:Re: flink1.9 blink planner table ddl 使用问题
>之前少了 flink-connector-kafka_2.11 依赖,
>现在错误变成  Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
>了
>
>
>pom依赖:
>```
>    <dependencies>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-core</artifactId>
>            <version>${flink.version}</version>
>
>
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-clients_2.11</artifactId>
>            <version>${flink.version}</version>
>
>
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-scala_2.11</artifactId>
>            <version>${flink.version}</version>
>
>
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-streaming-scala_2.11</artifactId>
>            <version>${flink.version}</version>
>
>
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table</artifactId>
>            <version>1.9.0</version>
>            <type>pom</type>
>            <scope>provided</scope>
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-common</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-cep-scala_2.11</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-connector-filesystem_2.11</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>
>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
>            <version>${flink.version}</version>
><!--            <scope>provided</scope>-->
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>            <version>${flink.version}</version>
>            <!--            <scope>provided</scope>-->
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-planner_2.11</artifactId>
>            <version>${flink.version}</version>
>            <!--            <scope>provided</scope>-->
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-runtime-blink_2.11</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>
>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-table-planner-blink_2.11</artifactId>
>            <version>${flink.version}</version>
>            <!--            <scope>provided</scope>-->
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>
>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-connector-kafka_2.11</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-json</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-runtime-web_2.11</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>    </dependencies>
>
>
>```
>
>
>
>
>
>
>
>
>在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道:
>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
>>
>>Best,
>>Jark
>>
>>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道:
>>>
>>> 使用了你的ddl语句,还是报一样的错误.
>>> 我是在idea里面执行的,maven 配置的依赖.
>>>
>>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道:
>>>> Hi,
>>>>
>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>>>
>>>> 1. 缺少format properties
>>>> 2. 缺少 connector.version
>>>> 3. bootstrap.severs 的配置方式写的不对...
>>>>
>>>>
>>>> 你可以参考下面这个作为example:
>>>>
>>>>
>>>> CREATE TABLE kafka_json_source (
>>>>   rowtime TIMESTAMP,
>>>>   user_name VARCHAR,
>>>>   event ROW<message_type VARCHAR, message VARCHAR>
>>>> ) WITH (
>>>>   'connector.type' = 'kafka',
>>>>   'connector.version' = 'universal',
>>>>   'connector.topic' = 'test-json',
>>>>   'connector.startup-mode' = 'earliest-offset',
>>>>   'connector.properties.0.key' = 'zookeeper.connect',
>>>>   'connector.properties.0.value' = 'localhost:2181',
>>>>   'connector.properties.1.key' = 'bootstrap.servers',
>>>>   'connector.properties.1.value' = 'localhost:9092',
>>>>   'update-mode' = 'append',
>>>>   'format.type' = 'json',
>>>>   'format.derive-schema' = 'true'
>>>> );
>>>>
>>>>
>>>> Kafka 中的数据长这个样子:
>>>>
>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}}
>>>>
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>
>>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
>>>>>
>>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.
>>>>>
>>>>>
>>>>> 提示:  
>>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>> the classpath.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 代码:
>>>>> ```
>>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>>>> import org.apache.flink.types.Row
>>>>>
>>>>>
>>>>> object KafkaInDDL extends App {
>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>>>>>
>>>>>
>>>>> val sourceDDL =
>>>>>   """create table sourceTable(
>>>>>                           id int,
>>>>>                           name varchar
>>>>>                         ) with (
>>>>>                           'connector.type' = 'kafka',
>>>>>                           'connector.property-version' = '1',
>>>>>                           'update-mode' = 'append',
>>>>>                           'bootstrap.servers' = '192.168.1.160:19092',
>>>>>                           'connector.topic' = 'hbtest1',
>>>>>                           'connector.startup-mode' = 'earliest-offset'
>>>>>                         )
>>>>>   """
>>>>> tEnv.sqlUpdate(sourceDDL)
>>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>>>> tEnv.execute("")
>>>>> }
>>>>> ```
>>>>
hb
Reply | Threaded
Open this post in threaded view
|

Re:Re:回复: Re: flink1.9 blink planner table ddl 使用问题

hb
感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.

在 2019-08-26 14:26:15,"hb" <[hidden email]> 写道:

>kafka版本是 kafka_2.11-1.1.0,
>支持的kafka版本有哪些
>在 2019-08-26 14:23:19,"[hidden email]" <[hidden email]> 写道:
>>检查一下代码的kafka版本,可能是这方面的错误
>>
>>
>>
>>[hidden email]
>>
>>发件人: hb
>>发送时间: 2019-08-26 15:14
>>收件人: user-zh
>>主题: Re:Re: flink1.9 blink planner table ddl 使用问题
>>之前少了 flink-connector-kafka_2.11 依赖,
>>现在错误变成  Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
>>了
>>
>>
>>pom依赖:
>>```
>>    <dependencies>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-core</artifactId>
>>            <version>${flink.version}</version>
>>
>>
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-clients_2.11</artifactId>
>>            <version>${flink.version}</version>
>>
>>
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-scala_2.11</artifactId>
>>            <version>${flink.version}</version>
>>
>>
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-streaming-scala_2.11</artifactId>
>>            <version>${flink.version}</version>
>>
>>
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table</artifactId>
>>            <version>1.9.0</version>
>>            <type>pom</type>
>>            <scope>provided</scope>
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-common</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-cep-scala_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-connector-filesystem_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>
>>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
>>            <version>${flink.version}</version>
>><!--            <scope>provided</scope>-->
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>            <version>${flink.version}</version>
>>            <!--            <scope>provided</scope>-->
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-planner_2.11</artifactId>
>>            <version>${flink.version}</version>
>>            <!--            <scope>provided</scope>-->
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-runtime-blink_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>
>>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-planner-blink_2.11</artifactId>
>>            <version>${flink.version}</version>
>>            <!--            <scope>provided</scope>-->
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>
>>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-connector-kafka_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-json</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-runtime-web_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>    </dependencies>
>>
>>
>>```
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道:
>>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
>>>
>>>Best,
>>>Jark
>>>
>>>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道:
>>>>
>>>> 使用了你的ddl语句,还是报一样的错误.
>>>> 我是在idea里面执行的,maven 配置的依赖.
>>>>
>>>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道:
>>>>> Hi,
>>>>>
>>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>>>>
>>>>> 1. 缺少format properties
>>>>> 2. 缺少 connector.version
>>>>> 3. bootstrap.severs 的配置方式写的不对...
>>>>>
>>>>>
>>>>> 你可以参考下面这个作为example:
>>>>>
>>>>>
>>>>> CREATE TABLE kafka_json_source (
>>>>>   rowtime TIMESTAMP,
>>>>>   user_name VARCHAR,
>>>>>   event ROW<message_type VARCHAR, message VARCHAR>
>>>>> ) WITH (
>>>>>   'connector.type' = 'kafka',
>>>>>   'connector.version' = 'universal',
>>>>>   'connector.topic' = 'test-json',
>>>>>   'connector.startup-mode' = 'earliest-offset',
>>>>>   'connector.properties.0.key' = 'zookeeper.connect',
>>>>>   'connector.properties.0.value' = 'localhost:2181',
>>>>>   'connector.properties.1.key' = 'bootstrap.servers',
>>>>>   'connector.properties.1.value' = 'localhost:9092',
>>>>>   'update-mode' = 'append',
>>>>>   'format.type' = 'json',
>>>>>   'format.derive-schema' = 'true'
>>>>> );
>>>>>
>>>>>
>>>>> Kafka 中的数据长这个样子:
>>>>>
>>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}}
>>>>>
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>>
>>>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
>>>>>>
>>>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.
>>>>>>
>>>>>>
>>>>>> 提示:  
>>>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
>>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>>> the classpath.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 代码:
>>>>>> ```
>>>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>>>>> import org.apache.flink.types.Row
>>>>>>
>>>>>>
>>>>>> object KafkaInDDL extends App {
>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>>>>>>
>>>>>>
>>>>>> val sourceDDL =
>>>>>>   """create table sourceTable(
>>>>>>                           id int,
>>>>>>                           name varchar
>>>>>>                         ) with (
>>>>>>                           'connector.type' = 'kafka',
>>>>>>                           'connector.property-version' = '1',
>>>>>>                           'update-mode' = 'append',
>>>>>>                           'bootstrap.servers' = '192.168.1.160:19092',
>>>>>>                           'connector.topic' = 'hbtest1',
>>>>>>                           'connector.startup-mode' = 'earliest-offset'
>>>>>>                         )
>>>>>>   """
>>>>>> tEnv.sqlUpdate(sourceDDL)
>>>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>>>>> tEnv.execute("")
>>>>>> }
>>>>>> ```
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: Re:回复: Re: flink1.9 blink planner table ddl 使用问题

Xiao Xu
这部分有文档吗,看了好几圈没看到

hb <[hidden email]> 于2019年8月26日周一 下午3:34写道:

> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
>
> 在 2019-08-26 14:26:15,"hb" <[hidden email]> 写道:
> >kafka版本是 kafka_2.11-1.1.0,
> >支持的kafka版本有哪些
> >在 2019-08-26 14:23:19,"[hidden email]" <
> [hidden email]> 写道:
> >>检查一下代码的kafka版本,可能是这方面的错误
> >>
> >>
> >>
> >>[hidden email]
> >>
> >>发件人: hb
> >>发送时间: 2019-08-26 15:14
> >>收件人: user-zh
> >>主题: Re:Re: flink1.9 blink planner table ddl 使用问题
> >>之前少了 flink-connector-kafka_2.11 依赖,
> >>现在错误变成  Caused by: java.lang.NoSuchMethodError:
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
> >>了
> >>
> >>
> >>pom依赖:
> >>```
> >>    <dependencies>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-core</artifactId>
> >>            <version>${flink.version}</version>
> >>
> >>
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-clients_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>
> >>
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-scala_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>
> >>
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-streaming-scala_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>
> >>
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-table</artifactId>
> >>            <version>1.9.0</version>
> >>            <type>pom</type>
> >>            <scope>provided</scope>
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-table-common</artifactId>
> >>            <version>${flink.version}</version>
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-cep-scala_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-connector-filesystem_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>        </dependency>
> >>
> >>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >><!--            <scope>provided</scope>-->
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>            <!--            <scope>provided</scope>-->
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-table-planner_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>            <!--            <scope>provided</scope>-->
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-table-runtime-blink_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>        </dependency>
> >>
> >>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-table-planner-blink_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>            <!--            <scope>provided</scope>-->
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>        </dependency>
> >>
> >>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-connector-kafka_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>        </dependency>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-json</artifactId>
> >>            <version>${flink.version}</version>
> >>        </dependency>
> >>        <dependency>
> >>            <groupId>org.apache.flink</groupId>
> >>            <artifactId>flink-runtime-web_2.11</artifactId>
> >>            <version>${flink.version}</version>
> >>        </dependency>
> >>    </dependencies>
> >>
> >>
> >>```
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道:
> >>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
> >>>
> >>>Best,
> >>>Jark
> >>>
> >>>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道:
> >>>>
> >>>> 使用了你的ddl语句,还是报一样的错误.
> >>>> 我是在idea里面执行的,maven 配置的依赖.
> >>>>
> >>>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道:
> >>>>> Hi,
> >>>>>
> >>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
> >>>>>
> >>>>> 1. 缺少format properties
> >>>>> 2. 缺少 connector.version
> >>>>> 3. bootstrap.severs 的配置方式写的不对...
> >>>>>
> >>>>>
> >>>>> 你可以参考下面这个作为example:
> >>>>>
> >>>>>
> >>>>> CREATE TABLE kafka_json_source (
> >>>>>   rowtime TIMESTAMP,
> >>>>>   user_name VARCHAR,
> >>>>>   event ROW<message_type VARCHAR, message VARCHAR>
> >>>>> ) WITH (
> >>>>>   'connector.type' = 'kafka',
> >>>>>   'connector.version' = 'universal',
> >>>>>   'connector.topic' = 'test-json',
> >>>>>   'connector.startup-mode' = 'earliest-offset',
> >>>>>   'connector.properties.0.key' = 'zookeeper.connect',
> >>>>>   'connector.properties.0.value' = 'localhost:2181',
> >>>>>   'connector.properties.1.key' = 'bootstrap.servers',
> >>>>>   'connector.properties.1.value' = 'localhost:9092',
> >>>>>   'update-mode' = 'append',
> >>>>>   'format.type' = 'json',
> >>>>>   'format.derive-schema' = 'true'
> >>>>> );
> >>>>>
> >>>>>
> >>>>> Kafka 中的数据长这个样子:
> >>>>>
> >>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": {
> "message_type": "WARNING", "message": "This is a warning."}}
> >>>>>
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>
> >>>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
> >>>>>>
> >>>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是
> 需要实现TableSourceFactory,还是其他什么.
> >>>>>>
> >>>>>>
> >>>>>> 提示:
> >>>>>> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> findAndCreateTableSource failed.
> >>>>>> Caused by:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>>>>> the classpath.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> 代码:
> >>>>>> ```
> >>>>>> import
> org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> >>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
> >>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
> >>>>>> import org.apache.flink.types.Row
> >>>>>>
> >>>>>>
> >>>>>> object KafkaInDDL extends App {
> >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>>>>> val settings: EnvironmentSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>>>>> val tEnv: StreamTableEnvironment =
> StreamTableEnvironment.create(env, settings)
> >>>>>>
> >>>>>>
> >>>>>> val sourceDDL =
> >>>>>>   """create table sourceTable(
> >>>>>>                           id int,
> >>>>>>                           name varchar
> >>>>>>                         ) with (
> >>>>>>                           'connector.type' = 'kafka',
> >>>>>>                           'connector.property-version' = '1',
> >>>>>>                           'update-mode' = 'append',
> >>>>>>                           'bootstrap.servers' = '
> 192.168.1.160:19092',
> >>>>>>                           'connector.topic' = 'hbtest1',
> >>>>>>                           'connector.startup-mode' =
> 'earliest-offset'
> >>>>>>                         )
> >>>>>>   """
> >>>>>> tEnv.sqlUpdate(sourceDDL)
> >>>>>> tEnv.sqlQuery("select * from
> sourceTable").toAppendStream[Row].print()
> >>>>>> tEnv.execute("")
> >>>>>> }
> >>>>>> ```
> >>>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 blink planner table ddl 使用问题

Jark
Administrator
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector>

> 在 2019年8月27日,17:59,徐骁 <[hidden email]> 写道:
>
> 这部分有文档吗,看了好几圈没看到
>
> hb <[hidden email]> 于2019年8月26日周一 下午3:34写道:
>
>> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
>> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
>>
>> 在 2019-08-26 14:26:15,"hb" <[hidden email]> 写道:
>>> kafka版本是 kafka_2.11-1.1.0,
>>> 支持的kafka版本有哪些
>>> 在 2019-08-26 14:23:19,"[hidden email]" <
>> [hidden email]> 写道:
>>>> 检查一下代码的kafka版本,可能是这方面的错误
>>>>
>>>>
>>>>
>>>> [hidden email]
>>>>
>>>> 发件人: hb
>>>> 发送时间: 2019-08-26 15:14
>>>> 收件人: user-zh
>>>> 主题: Re:Re: flink1.9 blink planner table ddl 使用问题
>>>> 之前少了 flink-connector-kafka_2.11 依赖,
>>>> 现在错误变成  Caused by: java.lang.NoSuchMethodError:
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
>>>> 了
>>>>
>>>>
>>>> pom依赖:
>>>> ```
>>>>   <dependencies>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-core</artifactId>
>>>>           <version>${flink.version}</version>
>>>>
>>>>
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-clients_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>
>>>>
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-scala_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>
>>>>
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-streaming-scala_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>
>>>>
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-table</artifactId>
>>>>           <version>1.9.0</version>
>>>>           <type>pom</type>
>>>>           <scope>provided</scope>
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-table-common</artifactId>
>>>>           <version>${flink.version}</version>
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-cep-scala_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-connector-filesystem_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>       </dependency>
>>>>
>>>>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>> <!--            <scope>provided</scope>-->
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>           <!--            <scope>provided</scope>-->
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-table-planner_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>           <!--            <scope>provided</scope>-->
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-table-runtime-blink_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>       </dependency>
>>>>
>>>>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-table-planner-blink_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>           <!--            <scope>provided</scope>-->
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>       </dependency>
>>>>
>>>>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-connector-kafka_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>       </dependency>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-json</artifactId>
>>>>           <version>${flink.version}</version>
>>>>       </dependency>
>>>>       <dependency>
>>>>           <groupId>org.apache.flink</groupId>
>>>>           <artifactId>flink-runtime-web_2.11</artifactId>
>>>>           <version>${flink.version}</version>
>>>>       </dependency>
>>>>   </dependencies>
>>>>
>>>>
>>>> ```
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道:
>>>>> Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道:
>>>>>>
>>>>>> 使用了你的ddl语句,还是报一样的错误.
>>>>>> 我是在idea里面执行的,maven 配置的依赖.
>>>>>>
>>>>>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道:
>>>>>>> Hi,
>>>>>>>
>>>>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>>>>>>
>>>>>>> 1. 缺少format properties
>>>>>>> 2. 缺少 connector.version
>>>>>>> 3. bootstrap.severs 的配置方式写的不对...
>>>>>>>
>>>>>>>
>>>>>>> 你可以参考下面这个作为example:
>>>>>>>
>>>>>>>
>>>>>>> CREATE TABLE kafka_json_source (
>>>>>>>  rowtime TIMESTAMP,
>>>>>>>  user_name VARCHAR,
>>>>>>>  event ROW<message_type VARCHAR, message VARCHAR>
>>>>>>> ) WITH (
>>>>>>>  'connector.type' = 'kafka',
>>>>>>>  'connector.version' = 'universal',
>>>>>>>  'connector.topic' = 'test-json',
>>>>>>>  'connector.startup-mode' = 'earliest-offset',
>>>>>>>  'connector.properties.0.key' = 'zookeeper.connect',
>>>>>>>  'connector.properties.0.value' = 'localhost:2181',
>>>>>>>  'connector.properties.1.key' = 'bootstrap.servers',
>>>>>>>  'connector.properties.1.value' = 'localhost:9092',
>>>>>>>  'update-mode' = 'append',
>>>>>>>  'format.type' = 'json',
>>>>>>>  'format.derive-schema' = 'true'
>>>>>>> );
>>>>>>>
>>>>>>>
>>>>>>> Kafka 中的数据长这个样子:
>>>>>>>
>>>>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": {
>> "message_type": "WARNING", "message": "This is a warning."}}
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>>
>>>>>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
>>>>>>>>
>>>>>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是
>> 需要实现TableSourceFactory,还是其他什么.
>>>>>>>>
>>>>>>>>
>>>>>>>> 提示:
>>>>>>>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed.
>> findAndCreateTableSource failed.
>>>>>>>> Caused by:
>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
>> a suitable table factory for
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>>>>> the classpath.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 代码:
>>>>>>>> ```
>>>>>>>> import
>> org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>>>>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>>>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>>>>>>> import org.apache.flink.types.Row
>>>>>>>>
>>>>>>>>
>>>>>>>> object KafkaInDDL extends App {
>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> val settings: EnvironmentSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>>> val tEnv: StreamTableEnvironment =
>> StreamTableEnvironment.create(env, settings)
>>>>>>>>
>>>>>>>>
>>>>>>>> val sourceDDL =
>>>>>>>>  """create table sourceTable(
>>>>>>>>                          id int,
>>>>>>>>                          name varchar
>>>>>>>>                        ) with (
>>>>>>>>                          'connector.type' = 'kafka',
>>>>>>>>                          'connector.property-version' = '1',
>>>>>>>>                          'update-mode' = 'append',
>>>>>>>>                          'bootstrap.servers' = '
>> 192.168.1.160:19092',
>>>>>>>>                          'connector.topic' = 'hbtest1',
>>>>>>>>                          'connector.startup-mode' =
>> 'earliest-offset'
>>>>>>>>                        )
>>>>>>>>  """
>>>>>>>> tEnv.sqlUpdate(sourceDDL)
>>>>>>>> tEnv.sqlQuery("select * from
>> sourceTable").toAppendStream[Row].print()
>>>>>>>> tEnv.execute("")
>>>>>>>> }
>>>>>>>> ```
>>>>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 blink planner table ddl 使用问题

Xiao Xu
🤗感谢

Jark Wu <[hidden email]> 于2019年8月27日周二 下午6:49写道:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
> >
>
> > 在 2019年8月27日,17:59,徐骁 <[hidden email]> 写道:
> >
> > 这部分有文档吗,看了好几圈没看到
> >
> > hb <[hidden email]> 于2019年8月26日周一 下午3:34写道:
> >
> >> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
> >> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
> >>
> >> 在 2019-08-26 14:26:15,"hb" <[hidden email]> 写道:
> >>> kafka版本是 kafka_2.11-1.1.0,
> >>> 支持的kafka版本有哪些
> >>> 在 2019-08-26 14:23:19,"[hidden email]" <
> >> [hidden email]> 写道:
> >>>> 检查一下代码的kafka版本,可能是这方面的错误
> >>>>
> >>>>
> >>>>
> >>>> [hidden email]
> >>>>
> >>>> 发件人: hb
> >>>> 发送时间: 2019-08-26 15:14
> >>>> 收件人: user-zh
> >>>> 主题: Re:Re: flink1.9 blink planner table ddl 使用问题
> >>>> 之前少了 flink-connector-kafka_2.11 依赖,
> >>>> 现在错误变成  Caused by: java.lang.NoSuchMethodError:
> >>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
> >>>> 了
> >>>>
> >>>>
> >>>> pom依赖:
> >>>> ```
> >>>>   <dependencies>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-core</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>
> >>>>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-clients_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>
> >>>>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-scala_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>
> >>>>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-streaming-scala_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>
> >>>>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table</artifactId>
> >>>>           <version>1.9.0</version>
> >>>>           <type>pom</type>
> >>>>           <scope>provided</scope>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-common</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-cep-scala_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-connector-filesystem_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>> <!--            <scope>provided</scope>-->
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-api-java-bridge_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>           <!--            <scope>provided</scope>-->
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-planner_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>           <!--            <scope>provided</scope>-->
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-runtime-blink_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-planner-blink_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>           <!--            <scope>provided</scope>-->
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-connector-kafka_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-json</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-runtime-web_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>   </dependencies>
> >>>>
> >>>>
> >>>> ```
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> 在 2019-08-26 13:37:51,"Jark Wu" <[hidden email]> 写道:
> >>>>> Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>> 在 2019年8月26日,13:57,hb <[hidden email]> 写道:
> >>>>>>
> >>>>>> 使用了你的ddl语句,还是报一样的错误.
> >>>>>> 我是在idea里面执行的,maven 配置的依赖.
> >>>>>>
> >>>>>> 在 2019-08-26 11:22:20,"Jark Wu" <[hidden email]> 写道:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
> >>>>>>>
> >>>>>>> 1. 缺少format properties
> >>>>>>> 2. 缺少 connector.version
> >>>>>>> 3. bootstrap.severs 的配置方式写的不对...
> >>>>>>>
> >>>>>>>
> >>>>>>> 你可以参考下面这个作为example:
> >>>>>>>
> >>>>>>>
> >>>>>>> CREATE TABLE kafka_json_source (
> >>>>>>>  rowtime TIMESTAMP,
> >>>>>>>  user_name VARCHAR,
> >>>>>>>  event ROW<message_type VARCHAR, message VARCHAR>
> >>>>>>> ) WITH (
> >>>>>>>  'connector.type' = 'kafka',
> >>>>>>>  'connector.version' = 'universal',
> >>>>>>>  'connector.topic' = 'test-json',
> >>>>>>>  'connector.startup-mode' = 'earliest-offset',
> >>>>>>>  'connector.properties.0.key' = 'zookeeper.connect',
> >>>>>>>  'connector.properties.0.value' = 'localhost:2181',
> >>>>>>>  'connector.properties.1.key' = 'bootstrap.servers',
> >>>>>>>  'connector.properties.1.value' = 'localhost:9092',
> >>>>>>>  'update-mode' = 'append',
> >>>>>>>  'format.type' = 'json',
> >>>>>>>  'format.derive-schema' = 'true'
> >>>>>>> );
> >>>>>>>
> >>>>>>>
> >>>>>>> Kafka 中的数据长这个样子:
> >>>>>>>
> >>>>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event":
> {
> >> "message_type": "WARNING", "message": "This is a warning."}}
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jark
> >>>>>>>
> >>>>>>>
> >>>>>>>> 在 2019年8月26日,09:52,hb <[hidden email]> 写道:
> >>>>>>>>
> >>>>>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是
> >> 需要实现TableSourceFactory,还是其他什么.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 提示:
> >>>>>>>> Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
> >> findAndCreateTableSource failed.
> >>>>>>>> Caused by:
> >> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not
> find
> >> a suitable table factory for
> >> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>>>>>>> the classpath.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 代码:
> >>>>>>>> ```
> >>>>>>>> import
> >> org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> >>>>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
> >>>>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment,
> _}
> >>>>>>>> import org.apache.flink.types.Row
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> object KafkaInDDL extends App {
> >>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>>>>>>> val settings: EnvironmentSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>>>>>>> val tEnv: StreamTableEnvironment =
> >> StreamTableEnvironment.create(env, settings)
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> val sourceDDL =
> >>>>>>>>  """create table sourceTable(
> >>>>>>>>                          id int,
> >>>>>>>>                          name varchar
> >>>>>>>>                        ) with (
> >>>>>>>>                          'connector.type' = 'kafka',
> >>>>>>>>                          'connector.property-version' = '1',
> >>>>>>>>                          'update-mode' = 'append',
> >>>>>>>>                          'bootstrap.servers' = '
> >> 192.168.1.160:19092',
> >>>>>>>>                          'connector.topic' = 'hbtest1',
> >>>>>>>>                          'connector.startup-mode' =
> >> 'earliest-offset'
> >>>>>>>>                        )
> >>>>>>>>  """
> >>>>>>>> tEnv.sqlUpdate(sourceDDL)
> >>>>>>>> tEnv.sqlQuery("select * from
> >> sourceTable").toAppendStream[Row].print()
> >>>>>>>> tEnv.execute("")
> >>>>>>>> }
> >>>>>>>> ```
> >>>>>>>
> >>
>
>