flink 1.9 消费kafka报错

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.9 消费kafka报错

star
大家好,
升级到1.9后有几个问题:
1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011

val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties)
但是现在这个类已经找不到了

2.所以我使用了 FlinkKafkaConsumer
val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties)
不知道这个consumer背后对应的kafka版本是多少

3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}

     不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory

   

引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
        at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
        at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
        at com.test.StreamingJob$.main(StreamingJob.scala:52)
        at com.test.StreamingJob.main(StreamingJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.

The following properties are requested:
batch-mode=false

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory


我的pom文件如下:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-scala_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
   <groupId>org.scala-lang</groupId>
   <artifactId>scala-library</artifactId>
   <version>${scala.version}</version>
   <scope>provided</scope>
</dependency>

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka_2.11</artifactId>
   <version>${flink.version}</version>
   <scope>compile</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-common</artifactId>
   <version>${flink.version}</version>
</dependency>


<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>


谢谢大家
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.9 消费kafka报错

Jark
Administrator
Hi,

你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。
详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>


Best,
Jark

> 在 2019年8月26日,14:56,ddwcg <[hidden email]> 写道:
>
> 大家好,
> 升级到1.9后有几个问题:
> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011
>
> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties)
> 但是现在这个类已经找不到了
>
> 2.所以我使用了 FlinkKafkaConsumer
> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties)
> 不知道这个consumer背后对应的kafka版本是多少
>
> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}
>
>     不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory
>
>
>
> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
> at com.test.StreamingJob$.main(StreamingJob.scala:52)
> at com.test.StreamingJob.main(StreamingJob.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
> the classpath.
>
> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>
> The following properties are requested:
> batch-mode=false
>
> The following factories have been considered:
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>
>
> 我的pom文件如下:
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-scala_${scala.binary.version}</artifactId>
>   <version>${flink.version}</version>
>   <scope>provided</scope>
> </dependency>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>   <version>${flink.version}</version>
>   <scope>provided</scope>
> </dependency>
>
> <!-- Scala Library, provided by Flink as well. -->
> <dependency>
>   <groupId>org.scala-lang</groupId>
>   <artifactId>scala-library</artifactId>
>   <version>${scala.version}</version>
>   <scope>provided</scope>
> </dependency>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-connector-kafka_2.11</artifactId>
>   <version>${flink.version}</version>
>   <scope>compile</scope>
> </dependency>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-table-common</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
>
> 谢谢大家

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.9 消费kafka报错

star

hi,我指定了使用blinkplanner,还是报一样的错
object StreamingJob {
  def main(args: Array[String]) {

    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
    //val bsTableEnv2 = TableEnvironment.create(bsSettings)
    bsEnv.execute("jobname")
  }


Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
        at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
        at com.test.StreamingJob$.main(StreamingJob.scala:13)
        at com.test.StreamingJob.main(StreamingJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.

The following properties are requested:
batch-mode=false
class-name=org.apache.flink.table.executor.BlinkExecutorFactory

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory



> 在 2019年8月26日,15:07,Jark Wu <[hidden email]> 写道:
>
> Hi,
>
> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。
> 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>
>
>
> Best,
> Jark
>
>> 在 2019年8月26日,14:56,ddwcg <[hidden email]> 写道:
>>
>> 大家好,
>> 升级到1.9后有几个问题:
>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011
>>
>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties)
>> 但是现在这个类已经找不到了
>>
>> 2.所以我使用了 FlinkKafkaConsumer
>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties)
>> 不知道这个consumer背后对应的kafka版本是多少
>>
>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}
>>
>>    不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory
>>
>>
>>
>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
>> at com.test.StreamingJob$.main(StreamingJob.scala:52)
>> at com.test.StreamingJob.main(StreamingJob.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
>> the classpath.
>>
>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>>
>> The following properties are requested:
>> batch-mode=false
>>
>> The following factories have been considered:
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>>
>>
>> 我的pom文件如下:
>>
>> <dependency>
>>  <groupId>org.apache.flink</groupId>
>>  <artifactId>flink-scala_${scala.binary.version}</artifactId>
>>  <version>${flink.version}</version>
>>  <scope>provided</scope>
>> </dependency>
>> <dependency>
>>  <groupId>org.apache.flink</groupId>
>>  <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>  <version>${flink.version}</version>
>>  <scope>provided</scope>
>> </dependency>
>>
>> <!-- Scala Library, provided by Flink as well. -->
>> <dependency>
>>  <groupId>org.scala-lang</groupId>
>>  <artifactId>scala-library</artifactId>
>>  <version>${scala.version}</version>
>>  <scope>provided</scope>
>> </dependency>
>>
>> <dependency>
>>  <groupId>org.apache.flink</groupId>
>>  <artifactId>flink-connector-kafka_2.11</artifactId>
>>  <version>${flink.version}</version>
>>  <scope>compile</scope>
>> </dependency>
>> <dependency>
>>  <groupId>org.apache.flink</groupId>
>>  <artifactId>flink-table-common</artifactId>
>>  <version>${flink.version}</version>
>> </dependency>
>>
>>
>> <dependency>
>>  <groupId>org.apache.flink</groupId>
>>  <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>  <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>>  <groupId>org.apache.flink</groupId>
>>  <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>  <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>>  <groupId>org.apache.flink</groupId>
>>  <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>  <version>${flink.version}</version>
>> </dependency>
>>
>>
>> 谢谢大家
>

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.9 消费kafka报错

Jark
Administrator
pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 <scope>provided</scope>.



> 在 2019年8月26日,17:25,ddwcg <[hidden email]> 写道:
>
>
> hi,我指定了使用blinkplanner,还是报一样的错
> object StreamingJob {
>  def main(args: Array[String]) {
>
>    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>    val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>    //val bsTableEnv2 = TableEnvironment.create(bsSettings)
>    bsEnv.execute("jobname")
>  }
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
> at com.test.StreamingJob$.main(StreamingJob.scala:13)
> at com.test.StreamingJob.main(StreamingJob.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
> the classpath.
>
> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>
> The following properties are requested:
> batch-mode=false
> class-name=org.apache.flink.table.executor.BlinkExecutorFactory
>
> The following factories have been considered:
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>
>
>
>> 在 2019年8月26日,15:07,Jark Wu <[hidden email]> 写道:
>>
>> Hi,
>>
>> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。
>> 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>
>>
>>
>> Best,
>> Jark
>>
>>> 在 2019年8月26日,14:56,ddwcg <[hidden email]> 写道:
>>>
>>> 大家好,
>>> 升级到1.9后有几个问题:
>>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011
>>>
>>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties)
>>> 但是现在这个类已经找不到了
>>>
>>> 2.所以我使用了 FlinkKafkaConsumer
>>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties)
>>> 不知道这个consumer背后对应的kafka版本是多少
>>>
>>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}
>>>
>>>   不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory
>>>
>>>
>>>
>>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
>>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
>>> at com.test.StreamingJob$.main(StreamingJob.scala:52)
>>> at com.test.StreamingJob.main(StreamingJob.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
>>> the classpath.
>>>
>>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>>>
>>> The following properties are requested:
>>> batch-mode=false
>>>
>>> The following factories have been considered:
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>>>
>>>
>>> 我的pom文件如下:
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-scala_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> <scope>provided</scope>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> <scope>provided</scope>
>>> </dependency>
>>>
>>> <!-- Scala Library, provided by Flink as well. -->
>>> <dependency>
>>> <groupId>org.scala-lang</groupId>
>>> <artifactId>scala-library</artifactId>
>>> <version>${scala.version}</version>
>>> <scope>provided</scope>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-connector-kafka_2.11</artifactId>
>>> <version>${flink.version}</version>
>>> <scope>compile</scope>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-table-common</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>>
>>>
>>> 谢谢大家
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.9 消费kafka报错

star
都加了,还是不行,下面是我的pom文件和 libraires的截图

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>

<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>



<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>


在 2019年8月26日,17:39,Jark Wu <[hidden email]> 写道:

pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 <scope>provided</scope>.


在 2019年8月26日,17:25,ddwcg <[hidden email]> 写道:


hi,我指定了使用blinkplanner,还是报一样的错
object StreamingJob {
def main(args: Array[String]) {

  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
  val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
  //val bsTableEnv2 = TableEnvironment.create(bsSettings)
  bsEnv.execute("jobname")
}


Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
at com.test.StreamingJob$.main(StreamingJob.scala:13)
at com.test.StreamingJob.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.

The following properties are requested:
batch-mode=false
class-name=org.apache.flink.table.executor.BlinkExecutorFactory

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory



在 2019年8月26日,15:07,Jark Wu <[hidden email]> 写道:

Hi,

你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。
详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>


Best,
Jark

在 2019年8月26日,14:56,ddwcg <[hidden email]> 写道:

大家好,
升级到1.9后有几个问题:
1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011

val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties)
但是现在这个类已经找不到了

2.所以我使用了 FlinkKafkaConsumer
val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties)
不知道这个consumer背后对应的kafka版本是多少

3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}

 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory



引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
at com.test.StreamingJob$.main(StreamingJob.scala:52)
at com.test.StreamingJob.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.

The following properties are requested:
batch-mode=false

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory


我的pom文件如下:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>


谢谢大家





Reply | Threaded
Open this post in threaded view
|

Re: flink 1.9 消费kafka报错

Jark
Administrator
看起来是你依赖了一个老版本的 EnvironmentSettings,可能是本地 mvn cache 导致的。

可以尝试清空下 “~/.m2/repository/org/apache/flink/flink-table-api-java” 目录。


Best,
Jark

> 在 2019年8月26日,17:56,ddwcg <[hidden email]> 写道:
>
> 都加了,还是不行,下面是我的pom文件和 libraires的截图
>
> <repositories>
>    <repository>
>       <id>apache.snapshots</id>
>       <name>Apache Development Snapshot Repository</name>
>       <url>https://repository.apache.org/content/repositories/snapshots/ <https://repository.apache.org/content/repositories/snapshots/></url>
>       <releases>
>          <enabled>false</enabled>
>       </releases>
>       <snapshots>
>          <enabled>true</enabled>
>       </snapshots>
>    </repository>
> </repositories>
>
> <properties>
>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>    <flink.version>1.9.0</flink.version>
>    <scala.binary.version>2.11</scala.binary.version>
>    <scala.version>2.11.8</scala.version>
> </properties>
>
> <dependencies>
>    <!-- Apache Flink dependencies -->
>    <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-scala_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
>    <!-- Scala Library, provided by Flink as well. -->
>    <dependency>
>       <groupId>org.scala-lang</groupId>
>       <artifactId>scala-library</artifactId>
>       <version>${scala.version}</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>       <scope>compile</scope>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-common</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.slf4j</groupId>
>       <artifactId>slf4j-log4j12</artifactId>
>       <version>1.7.7</version>
>       <scope>runtime</scope>
>    </dependency>
>    <dependency>
>       <groupId>log4j</groupId>
>       <artifactId>log4j</artifactId>
>       <version>1.2.17</version>
>       <scope>runtime</scope>
>    </dependency>
> </dependencies>
> <屏幕快照 2019-08-26 17.54.22.png>
>
>
>> 在 2019年8月26日,17:39,Jark Wu <[hidden email] <mailto:[hidden email]>> 写道:
>>
>> pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 <scope>provided</scope>.
>>
>>
>>> 在 2019年8月26日,17:25,ddwcg <[hidden email] <mailto:[hidden email]>> 写道:
>>>
>>>
>>> hi,我指定了使用blinkplanner,还是报一样的错
>>> object StreamingJob {
>>> def main(args: Array[String]) {
>>>
>>>   val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>   val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>   val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>>>   //val bsTableEnv2 = TableEnvironment.create(bsSettings)
>>>   bsEnv.execute("jobname")
>>> }
>>>
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>>> at com.test.StreamingJob$.main(StreamingJob.scala:13)
>>> at com.test.StreamingJob.main(StreamingJob.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
>>> the classpath.
>>>
>>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>>>
>>> The following properties are requested:
>>> batch-mode=false
>>> class-name=org.apache.flink.table.executor.BlinkExecutorFactory
>>>
>>> The following factories have been considered:
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>>>
>>>
>>>
>>>> 在 2019年8月26日,15:07,Jark Wu <[hidden email] <mailto:[hidden email]>> 写道:
>>>>
>>>> Hi,
>>>>
>>>> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。
>>>> 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>>
>>>>
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>> 在 2019年8月26日,14:56,ddwcg <[hidden email] <mailto:[hidden email]>> 写道:
>>>>>
>>>>> 大家好,
>>>>> 升级到1.9后有几个问题:
>>>>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011
>>>>>
>>>>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties)
>>>>> 但是现在这个类已经找不到了
>>>>>
>>>>> 2.所以我使用了 FlinkKafkaConsumer
>>>>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties)
>>>>> 不知道这个consumer背后对应的kafka版本是多少
>>>>>
>>>>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}
>>>>>
>>>>>  不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory
>>>>>
>>>>>
>>>>>
>>>>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
>>>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>>>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>>>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>>>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
>>>>> at com.test.StreamingJob$.main(StreamingJob.scala:52)
>>>>> at com.test.StreamingJob.main(StreamingJob.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
>>>>> the classpath.
>>>>>
>>>>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>>>>>
>>>>> The following properties are requested:
>>>>> batch-mode=false
>>>>>
>>>>> The following factories have been considered:
>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>>>>>
>>>>>
>>>>> 我的pom文件如下:
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-scala_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>>
>>>>> <!-- Scala Library, provided by Flink as well. -->
>>>>> <dependency>
>>>>> <groupId>org.scala-lang</groupId>
>>>>> <artifactId>scala-library</artifactId>
>>>>> <version>${scala.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-connector-kafka_2.11</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>compile</scope>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-common</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>>
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>>
>>>>>
>>>>> 谢谢大家
>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.9 消费kafka报错

star
In reply to this post by star
谢谢您的耐心解答,是本地cache的问题,已经解决

> 在 2019年8月26日,17:56,ddwcg <[hidden email]> 写道:
>
> 都加了,还是不行,下面是我的pom文件和 libraires的截图
>
> <repositories>
>    <repository>
>       <id>apache.snapshots</id>
>       <name>Apache Development Snapshot Repository</name>
>       <url>https://repository.apache.org/content/repositories/snapshots/ <https://repository.apache.org/content/repositories/snapshots/></url>
>       <releases>
>          <enabled>false</enabled>
>       </releases>
>       <snapshots>
>          <enabled>true</enabled>
>       </snapshots>
>    </repository>
> </repositories>
>
> <properties>
>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>    <flink.version>1.9.0</flink.version>
>    <scala.binary.version>2.11</scala.binary.version>
>    <scala.version>2.11.8</scala.version>
> </properties>
>
> <dependencies>
>    <!-- Apache Flink dependencies -->
>    <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-scala_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
>    <!-- Scala Library, provided by Flink as well. -->
>    <dependency>
>       <groupId>org.scala-lang</groupId>
>       <artifactId>scala-library</artifactId>
>       <version>${scala.version}</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>       <scope>compile</scope>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-common</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.slf4j</groupId>
>       <artifactId>slf4j-log4j12</artifactId>
>       <version>1.7.7</version>
>       <scope>runtime</scope>
>    </dependency>
>    <dependency>
>       <groupId>log4j</groupId>
>       <artifactId>log4j</artifactId>
>       <version>1.2.17</version>
>       <scope>runtime</scope>
>    </dependency>
> </dependencies>
> <屏幕快照 2019-08-26 17.54.22.png>
>
>
>> 在 2019年8月26日,17:39,Jark Wu <[hidden email] <mailto:[hidden email]>> 写道:
>>
>> pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 <scope>provided</scope>.
>>
>>
>>> 在 2019年8月26日,17:25,ddwcg <[hidden email] <mailto:[hidden email]>> 写道:
>>>
>>>
>>> hi,我指定了使用blinkplanner,还是报一样的错
>>> object StreamingJob {
>>> def main(args: Array[String]) {
>>>
>>>   val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>   val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>   val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>>>   //val bsTableEnv2 = TableEnvironment.create(bsSettings)
>>>   bsEnv.execute("jobname")
>>> }
>>>
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>>> at com.test.StreamingJob$.main(StreamingJob.scala:13)
>>> at com.test.StreamingJob.main(StreamingJob.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
>>> the classpath.
>>>
>>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>>>
>>> The following properties are requested:
>>> batch-mode=false
>>> class-name=org.apache.flink.table.executor.BlinkExecutorFactory
>>>
>>> The following factories have been considered:
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>>>
>>>
>>>
>>>> 在 2019年8月26日,15:07,Jark Wu <[hidden email] <mailto:[hidden email]>> 写道:
>>>>
>>>> Hi,
>>>>
>>>> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。
>>>> 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>>
>>>>
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>> 在 2019年8月26日,14:56,ddwcg <[hidden email] <mailto:[hidden email]>> 写道:
>>>>>
>>>>> 大家好,
>>>>> 升级到1.9后有几个问题:
>>>>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011
>>>>>
>>>>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties)
>>>>> 但是现在这个类已经找不到了
>>>>>
>>>>> 2.所以我使用了 FlinkKafkaConsumer
>>>>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties)
>>>>> 不知道这个consumer背后对应的kafka版本是多少
>>>>>
>>>>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}
>>>>>
>>>>>  不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory
>>>>>
>>>>>
>>>>>
>>>>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
>>>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>>>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>>>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>>>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
>>>>> at com.test.StreamingJob$.main(StreamingJob.scala:52)
>>>>> at com.test.StreamingJob.main(StreamingJob.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
>>>>> the classpath.
>>>>>
>>>>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>>>>>
>>>>> The following properties are requested:
>>>>> batch-mode=false
>>>>>
>>>>> The following factories have been considered:
>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>>>>>
>>>>>
>>>>> 我的pom文件如下:
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-scala_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>>
>>>>> <!-- Scala Library, provided by Flink as well. -->
>>>>> <dependency>
>>>>> <groupId>org.scala-lang</groupId>
>>>>> <artifactId>scala-library</artifactId>
>>>>> <version>${scala.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-connector-kafka_2.11</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>compile</scope>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-common</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>>
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>>
>>>>>
>>>>> 谢谢大家
>>>>
>>>
>>
>>
>