Login  Register

Re: flink 1.9 消费kafka报错

Posted by star on Aug 26, 2019; 9:56am
URL: http://apache-flink.370.s1.nabble.com/flink-1-9-kafka-tp469p484.html

都加了,还是不行,下面是我的pom文件和 libraires的截图

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>

<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>



<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>


在 2019年8月26日,17:39,Jark Wu <[hidden email]> 写道:

pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 <scope>provided</scope>.


在 2019年8月26日,17:25,ddwcg <[hidden email]> 写道:


hi,我指定了使用blinkplanner,还是报一样的错
object StreamingJob {
def main(args: Array[String]) {

  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
  val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
  //val bsTableEnv2 = TableEnvironment.create(bsSettings)
  bsEnv.execute("jobname")
}


Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
at com.test.StreamingJob$.main(StreamingJob.scala:13)
at com.test.StreamingJob.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.

The following properties are requested:
batch-mode=false
class-name=org.apache.flink.table.executor.BlinkExecutorFactory

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory



在 2019年8月26日,15:07,Jark Wu <[hidden email]> 写道:

Hi,

你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。
详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>


Best,
Jark

在 2019年8月26日,14:56,ddwcg <[hidden email]> 写道:

大家好,
升级到1.9后有几个问题:
1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011

val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties)
但是现在这个类已经找不到了

2.所以我使用了 FlinkKafkaConsumer
val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties)
不知道这个consumer背后对应的kafka版本是多少

3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}

 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory



引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
at com.test.StreamingJob$.main(StreamingJob.scala:52)
at com.test.StreamingJob.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.

The following properties are requested:
batch-mode=false

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory


我的pom文件如下:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>


谢谢大家