大家好,
升级到1.9后有几个问题: 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties) 但是现在这个类已经找不到了 2.所以我使用了 FlinkKafkaConsumer val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties) 不知道这个consumer背后对应的kafka版本是多少 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version} 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) at com.test.StreamingJob$.main(StreamingJob.scala:52) at com.test.StreamingJob.main(StreamingJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. The following properties are requested: batch-mode=false The following factories have been considered: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory 我的pom文件如下: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Scala Library, provided by Flink as well. --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> 谢谢大家 |
Administrator
|
Hi,
你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> Best, Jark > 在 2019年8月26日,14:56,ddwcg <[hidden email]> 写道: > > 大家好, > 升级到1.9后有几个问题: > 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 > > val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties) > 但是现在这个类已经找不到了 > > 2.所以我使用了 FlinkKafkaConsumer > val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties) > 不知道这个consumer背后对应的kafka版本是多少 > > 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version} > > 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory > > > > 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: > Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) > at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) > at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) > at com.test.StreamingJob$.main(StreamingJob.scala:52) > at com.test.StreamingJob.main(StreamingJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in > the classpath. > > Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. > > The following properties are requested: > batch-mode=false > > The following factories have been considered: > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > > > 我的pom文件如下: > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <!-- Scala Library, provided by Flink as well. --> > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-library</artifactId> > <version>${scala.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.11</artifactId> > <version>${flink.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > > 谢谢大家 |
hi,我指定了使用blinkplanner,还是报一样的错 object StreamingJob { def main(args: Array[String]) { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) //val bsTableEnv2 = TableEnvironment.create(bsSettings) bsEnv.execute("jobname") } Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) at com.test.StreamingJob$.main(StreamingJob.scala:13) at com.test.StreamingJob.main(StreamingJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. The following properties are requested: batch-mode=false class-name=org.apache.flink.table.executor.BlinkExecutorFactory The following factories have been considered: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > 在 2019年8月26日,15:07,Jark Wu <[hidden email]> 写道: > > Hi, > > 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。 > 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> > > > Best, > Jark > >> 在 2019年8月26日,14:56,ddwcg <[hidden email]> 写道: >> >> 大家好, >> 升级到1.9后有几个问题: >> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 >> >> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties) >> 但是现在这个类已经找不到了 >> >> 2.所以我使用了 FlinkKafkaConsumer >> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties) >> 不知道这个consumer背后对应的kafka版本是多少 >> >> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version} >> >> 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory >> >> >> >> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: >> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath >> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) >> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) >> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) >> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) >> at com.test.StreamingJob$.main(StreamingJob.scala:52) >> at com.test.StreamingJob.main(StreamingJob.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in >> the classpath. >> >> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. >> >> The following properties are requested: >> batch-mode=false >> >> The following factories have been considered: >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >> >> >> 我的pom文件如下: >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-scala_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> >> <!-- Scala Library, provided by Flink as well. --> >> <dependency> >> <groupId>org.scala-lang</groupId> >> <artifactId>scala-library</artifactId> >> <version>${scala.version}</version> >> <scope>provided</scope> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka_2.11</artifactId> >> <version>${flink.version}</version> >> <scope>compile</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-common</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> 谢谢大家 > |
Administrator
|
pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 <scope>provided</scope>.
> 在 2019年8月26日,17:25,ddwcg <[hidden email]> 写道: > > > hi,我指定了使用blinkplanner,还是报一样的错 > object StreamingJob { > def main(args: Array[String]) { > > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > //val bsTableEnv2 = TableEnvironment.create(bsSettings) > bsEnv.execute("jobname") > } > > > Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) > at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) > at com.test.StreamingJob$.main(StreamingJob.scala:13) > at com.test.StreamingJob.main(StreamingJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in > the classpath. > > Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. > > The following properties are requested: > batch-mode=false > class-name=org.apache.flink.table.executor.BlinkExecutorFactory > > The following factories have been considered: > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > > > >> 在 2019年8月26日,15:07,Jark Wu <[hidden email]> 写道: >> >> Hi, >> >> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。 >> 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> >> >> >> Best, >> Jark >> >>> 在 2019年8月26日,14:56,ddwcg <[hidden email]> 写道: >>> >>> 大家好, >>> 升级到1.9后有几个问题: >>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 >>> >>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties) >>> 但是现在这个类已经找不到了 >>> >>> 2.所以我使用了 FlinkKafkaConsumer >>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties) >>> 不知道这个consumer背后对应的kafka版本是多少 >>> >>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version} >>> >>> 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory >>> >>> >>> >>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: >>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath >>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) >>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) >>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) >>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) >>> at com.test.StreamingJob$.main(StreamingJob.scala:52) >>> at com.test.StreamingJob.main(StreamingJob.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in >>> the classpath. >>> >>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. >>> >>> The following properties are requested: >>> batch-mode=false >>> >>> The following factories have been considered: >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >>> >>> >>> 我的pom文件如下: >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-scala_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> <scope>provided</scope> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> <scope>provided</scope> >>> </dependency> >>> >>> <!-- Scala Library, provided by Flink as well. --> >>> <dependency> >>> <groupId>org.scala-lang</groupId> >>> <artifactId>scala-library</artifactId> >>> <version>${scala.version}</version> >>> <scope>provided</scope> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-connector-kafka_2.11</artifactId> >>> <version>${flink.version}</version> >>> <scope>compile</scope> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-common</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> >>> >>> 谢谢大家 >> > |
都加了,还是不行,下面是我的pom文件和 libraires的截图 <repositories>
|
Administrator
|
看起来是你依赖了一个老版本的 EnvironmentSettings,可能是本地 mvn cache 导致的。
可以尝试清空下 “~/.m2/repository/org/apache/flink/flink-table-api-java” 目录。 Best, Jark > 在 2019年8月26日,17:56,ddwcg <[hidden email]> 写道: > > 都加了,还是不行,下面是我的pom文件和 libraires的截图 > > <repositories> > <repository> > <id>apache.snapshots</id> > <name>Apache Development Snapshot Repository</name> > <url>https://repository.apache.org/content/repositories/snapshots/ <https://repository.apache.org/content/repositories/snapshots/></url> > <releases> > <enabled>false</enabled> > </releases> > <snapshots> > <enabled>true</enabled> > </snapshots> > </repository> > </repositories> > > <properties> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > <flink.version>1.9.0</flink.version> > <scala.binary.version>2.11</scala.binary.version> > <scala.version>2.11.8</scala.version> > </properties> > > <dependencies> > <!-- Apache Flink dependencies --> > <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- Scala Library, provided by Flink as well. --> > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-library</artifactId> > <version>${scala.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > </dependency> > > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>1.7.7</version> > <scope>runtime</scope> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>1.2.17</version> > <scope>runtime</scope> > </dependency> > </dependencies> > <屏幕快照 2019-08-26 17.54.22.png> > > >> 在 2019年8月26日,17:39,Jark Wu <[hidden email] <mailto:[hidden email]>> 写道: >> >> pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 <scope>provided</scope>. >> >> >>> 在 2019年8月26日,17:25,ddwcg <[hidden email] <mailto:[hidden email]>> 写道: >>> >>> >>> hi,我指定了使用blinkplanner,还是报一样的错 >>> object StreamingJob { >>> def main(args: Array[String]) { >>> >>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >>> val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) >>> //val bsTableEnv2 = TableEnvironment.create(bsSettings) >>> bsEnv.execute("jobname") >>> } >>> >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath >>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) >>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) >>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) >>> at com.test.StreamingJob$.main(StreamingJob.scala:13) >>> at com.test.StreamingJob.main(StreamingJob.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in >>> the classpath. >>> >>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. >>> >>> The following properties are requested: >>> batch-mode=false >>> class-name=org.apache.flink.table.executor.BlinkExecutorFactory >>> >>> The following factories have been considered: >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >>> >>> >>> >>>> 在 2019年8月26日,15:07,Jark Wu <[hidden email] <mailto:[hidden email]>> 写道: >>>> >>>> Hi, >>>> >>>> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。 >>>> 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>> >>>> >>>> >>>> Best, >>>> Jark >>>> >>>>> 在 2019年8月26日,14:56,ddwcg <[hidden email] <mailto:[hidden email]>> 写道: >>>>> >>>>> 大家好, >>>>> 升级到1.9后有几个问题: >>>>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 >>>>> >>>>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties) >>>>> 但是现在这个类已经找不到了 >>>>> >>>>> 2.所以我使用了 FlinkKafkaConsumer >>>>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties) >>>>> 不知道这个consumer背后对应的kafka版本是多少 >>>>> >>>>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version} >>>>> >>>>> 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory >>>>> >>>>> >>>>> >>>>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: >>>>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath >>>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) >>>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) >>>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) >>>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) >>>>> at com.test.StreamingJob$.main(StreamingJob.scala:52) >>>>> at com.test.StreamingJob.main(StreamingJob.scala) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in >>>>> the classpath. >>>>> >>>>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. >>>>> >>>>> The following properties are requested: >>>>> batch-mode=false >>>>> >>>>> The following factories have been considered: >>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >>>>> >>>>> >>>>> 我的pom文件如下: >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-scala_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> >>>>> <!-- Scala Library, provided by Flink as well. --> >>>>> <dependency> >>>>> <groupId>org.scala-lang</groupId> >>>>> <artifactId>scala-library</artifactId> >>>>> <version>${scala.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-connector-kafka_2.11</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>compile</scope> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-common</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> >>>>> >>>>> 谢谢大家 >>>> >>> >> >> > |
In reply to this post by star
谢谢您的耐心解答,是本地cache的问题,已经解决
> 在 2019年8月26日,17:56,ddwcg <[hidden email]> 写道: > > 都加了,还是不行,下面是我的pom文件和 libraires的截图 > > <repositories> > <repository> > <id>apache.snapshots</id> > <name>Apache Development Snapshot Repository</name> > <url>https://repository.apache.org/content/repositories/snapshots/ <https://repository.apache.org/content/repositories/snapshots/></url> > <releases> > <enabled>false</enabled> > </releases> > <snapshots> > <enabled>true</enabled> > </snapshots> > </repository> > </repositories> > > <properties> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > <flink.version>1.9.0</flink.version> > <scala.binary.version>2.11</scala.binary.version> > <scala.version>2.11.8</scala.version> > </properties> > > <dependencies> > <!-- Apache Flink dependencies --> > <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- Scala Library, provided by Flink as well. --> > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-library</artifactId> > <version>${scala.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > </dependency> > > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>1.7.7</version> > <scope>runtime</scope> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>1.2.17</version> > <scope>runtime</scope> > </dependency> > </dependencies> > <屏幕快照 2019-08-26 17.54.22.png> > > >> 在 2019年8月26日,17:39,Jark Wu <[hidden email] <mailto:[hidden email]>> 写道: >> >> pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 <scope>provided</scope>. >> >> >>> 在 2019年8月26日,17:25,ddwcg <[hidden email] <mailto:[hidden email]>> 写道: >>> >>> >>> hi,我指定了使用blinkplanner,还是报一样的错 >>> object StreamingJob { >>> def main(args: Array[String]) { >>> >>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >>> val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) >>> //val bsTableEnv2 = TableEnvironment.create(bsSettings) >>> bsEnv.execute("jobname") >>> } >>> >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath >>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) >>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) >>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) >>> at com.test.StreamingJob$.main(StreamingJob.scala:13) >>> at com.test.StreamingJob.main(StreamingJob.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in >>> the classpath. >>> >>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. >>> >>> The following properties are requested: >>> batch-mode=false >>> class-name=org.apache.flink.table.executor.BlinkExecutorFactory >>> >>> The following factories have been considered: >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >>> >>> >>> >>>> 在 2019年8月26日,15:07,Jark Wu <[hidden email] <mailto:[hidden email]>> 写道: >>>> >>>> Hi, >>>> >>>> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。 >>>> 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>> >>>> >>>> >>>> Best, >>>> Jark >>>> >>>>> 在 2019年8月26日,14:56,ddwcg <[hidden email] <mailto:[hidden email]>> 写道: >>>>> >>>>> 大家好, >>>>> 升级到1.9后有几个问题: >>>>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 >>>>> >>>>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties) >>>>> 但是现在这个类已经找不到了 >>>>> >>>>> 2.所以我使用了 FlinkKafkaConsumer >>>>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties) >>>>> 不知道这个consumer背后对应的kafka版本是多少 >>>>> >>>>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version} >>>>> >>>>> 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory >>>>> >>>>> >>>>> >>>>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: >>>>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath >>>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) >>>>> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) >>>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) >>>>> at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) >>>>> at com.test.StreamingJob$.main(StreamingJob.scala:52) >>>>> at com.test.StreamingJob.main(StreamingJob.scala) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in >>>>> the classpath. >>>>> >>>>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. >>>>> >>>>> The following properties are requested: >>>>> batch-mode=false >>>>> >>>>> The following factories have been considered: >>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >>>>> >>>>> >>>>> 我的pom文件如下: >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-scala_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> >>>>> <!-- Scala Library, provided by Flink as well. --> >>>>> <dependency> >>>>> <groupId>org.scala-lang</groupId> >>>>> <artifactId>scala-library</artifactId> >>>>> <version>${scala.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-connector-kafka_2.11</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>compile</scope> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-common</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> >>>>> >>>>> 谢谢大家 >>>> >>> >> >> > |
Free forum by Nabble | Edit this page |