我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误 2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 9 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'. The following properties are requested: connector.properties.0.key=group.id connector.properties.0.value=consumer_flink_etl_test connector.properties.1.key=bootstrap.servers connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092 connector.property-version=1 connector.startup-mode=latest-offset connector.topic=flink_etl_pro connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.name=rowtime schema.0.rowtime.timestamps.from=cTime schema.0.rowtime.timestamps.type=from-field schema.0.rowtime.watermarks.delay=2000 schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP schema.1.name=event schema.1.type=VARCHAR schema.2.name=adSpaceKey schema.2.type=VARCHAR schema.3.name=appkey schema.3.type=VARCHAR schema.4.name=build schema.4.type=VARCHAR schema.5.name=buoyId schema.5.type=BIGINT schema.6.name=gameHtmlId schema.6.type=BIGINT schema.7.name=uid schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65) ... 17 more 一下是pom内容 <name>Flink Quickstart Job</name> <url>http://www.myorganization.org</url> <profiles> <profile> <id>dev</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <project.scope>compile</project.scope> </properties> </profile> <profile> <id>pro</id> <properties> <project.scope>provided</project.scope> </properties> </profile> </profiles> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.1</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>${flink.version}</version> </dependency> <!--mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <!-- Gson--> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.doumob.flink.BuoyDataJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> 请问 这个应该如何解决 谢谢 |
Hi,
你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。 Best, Jingsong Lee ------------------------------------------------------------------ From:Others <[hidden email]> Send Time:2020年1月15日(星期三) 15:03 To:user-zh <[hidden email]> Subject:求助帖:flink 连接kafka source 部署集群报错 我使用的flink 版本 是1.9.1 本地调试正常。部署集群启动时报一下错误 2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 9 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'. The following properties are requested: connector.properties.0.key=group.id connector.properties.0.value=consumer_flink_etl_test connector.properties.1.key=bootstrap.servers connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092 connector.property-version=1 connector.startup-mode=latest-offset connector.topic=flink_etl_pro connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.name=rowtime schema.0.rowtime.timestamps.from=cTime schema.0.rowtime.timestamps.type=from-field schema.0.rowtime.watermarks.delay=2000 schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP schema.1.name=event schema.1.type=VARCHAR schema.2.name=adSpaceKey schema.2.type=VARCHAR schema.3.name=appkey schema.3.type=VARCHAR schema.4.name=build schema.4.type=VARCHAR schema.5.name=buoyId schema.5.type=BIGINT schema.6.name=gameHtmlId schema.6.type=BIGINT schema.7.name=uid schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65) ... 17 more 一下是pom内容 <name>Flink Quickstart Job</name> <url>http://www.myorganization.org</url> <profiles> <profile> <id>dev</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <project.scope>compile</project.scope> </properties> </profile> <profile> <id>pro</id> <properties> <project.scope>provided</project.scope> </properties> </profile> </profiles> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.1</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>${flink.version}</version> </dependency> <!--mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <!-- Gson--> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.doumob.flink.BuoyDataJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> 请问 这个应该如何解决 谢谢 |
Hi,
我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。 你试试把flink-json和flink-kafka的jar直接放入flink/lib下 Best, Jingsong Lee ------------------------------------------------------------------ From:Others <[hidden email]> Send Time:2020年1月15日(星期三) 15:27 To:[hidden email] JingsongLee <[hidden email]> Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错 集群环境下应该放在哪个lib下? 一下是打包过程的log [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob --- [INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded jar. [INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar. [INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar. [INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar. [INFO] Including commons-collections:commons-collections:jar:3.2.2 in the shaded jar. [INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the shaded jar. [INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar. [INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar. [INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar. [INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar. [INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar. [INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar. [INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded jar. [INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the shaded jar. [INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar. [INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar. [INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar. [INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar. [INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar. [INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar. [INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar. [WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440 overlapping classes: [WARNING] - org.codehaus.janino.util.resource.ResourceCreator [WARNING] - org.codehaus.janino.ReflectionIClass$ReflectionIField [WARNING] - org.codehaus.janino.IClass$1 [WARNING] - org.codehaus.janino.UnitCompiler$35 [WARNING] - org.codehaus.janino.Java$CompilationUnit$SingleStaticImportDeclaration [WARNING] - org.codehaus.janino.Java$PackageMemberEnumDeclaration [WARNING] - org.codehaus.janino.UnitCompiler$13$1 [WARNING] - org.codehaus.janino.Unparser [WARNING] - org.codehaus.janino.CodeContext$Branch [WARNING] - org.codehaus.janino.UnitCompiler$33$2 [WARNING] - 430 more... [WARNING] avatica-core-1.15.0.jar, flink-table-planner-blink_2.11-1.9.1.jar define 605 overlapping classes: [WARNING] - org.apache.calcite.avatica.AvaticaParameter [WARNING] - org.apache.calcite.avatica.Meta$ExecuteResult [WARNING] - org.apache.calcite.avatica.ConnectStringParser [WARNING] - org.apache.calcite.avatica.ConnectionConfigImpl$3 [WARNING] - org.apache.calcite.avatica.AvaticaDatabaseMetaData$2 [WARNING] - org.apache.calcite.avatica.remote.RemoteMeta$11 [WARNING] - org.apache.calcite.avatica.proto.Common$1 [WARNING] - org.apache.calcite.avatica.remote.JsonService [WARNING] - org.apache.calcite.avatica.util.Spaces$SpaceString [WARNING] - org.apache.calcite.avatica.proto.Responses$DatabasePropertyResponseOrBuilder [WARNING] - 595 more... [WARNING] flink-table-planner-blink_2.11-1.9.1.jar, commons-compiler-3.0.9.jar define 28 overlapping classes: [WARNING] - org.codehaus.commons.compiler.package-info [WARNING] - org.codehaus.commons.compiler.ICookable [WARNING] - org.codehaus.commons.compiler.samples.ScriptDemo [WARNING] - org.codehaus.commons.compiler.Sandbox [WARNING] - org.codehaus.commons.compiler.CompileException [WARNING] - org.codehaus.commons.compiler.Sandbox$1 [WARNING] - org.codehaus.commons.compiler.WarningHandler [WARNING] - org.codehaus.commons.compiler.CompilerFactoryFactory [WARNING] - org.codehaus.commons.compiler.AbstractCompilerFactory [WARNING] - org.codehaus.commons.compiler.Cookable [WARNING] - 18 more... [WARNING] maven-shade-plugin has detected that some class files are [WARNING] present in two or more JARs. When this happens, only one [WARNING] single version of the class is copied to the uber jar. [WARNING] Usually this is not harmful and you can skip these warnings, [WARNING] otherwise try to manually exclude artifacts based on [WARNING] mvn dependency:tree -Ddetail=true and the above output. [WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/ [INFO] Replacing original artifact with shaded artifact. [INFO] Replacing /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT.jar with /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT-shaded.jar [INFO] Dependency-reduced POM written at: /Users/lidonghao/Documents/doumob/flinkjob/dependency-reduced-pom.xml [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 9.447 s [INFO] Finished at: 2020-01-15T15:24:56+08:00 [INFO] Final Memory: 69M/781M [INFO] ------------------------------------------------------------------------ Process finished with exit code 0 其中显示包含[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar. ------------------ 原始邮件 ------------------ 发件人: "JingsongLee"<[hidden email]>; 发送时间: 2020年1月15日(星期三) 下午3:19 收件人: "user-zh"<[hidden email]>; 主题: Re: 求助帖:flink 连接kafka source 部署集群报错 Hi, 你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。 Best, Jingsong Lee ------------------------------------------------------------------ From:Others <[hidden email]> Send Time:2020年1月15日(星期三) 15:03 To:user-zh <[hidden email]> Subject:求助帖:flink 连接kafka source 部署集群报错 我使用的flink 版本 是1.9.1 本地调试正常。部署集群启动时报一下错误 2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 9 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'. The following properties are requested: connector.properties.0.key=group.id connector.properties.0.value=consumer_flink_etl_test connector.properties.1.key=bootstrap.servers connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092 connector.property-version=1 connector.startup-mode=latest-offset connector.topic=flink_etl_pro connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.name=rowtime schema.0.rowtime.timestamps.from=cTime schema.0.rowtime.timestamps.type=from-field schema.0.rowtime.watermarks.delay=2000 schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP schema.1.name=event schema.1.type=VARCHAR schema.2.name=adSpaceKey schema.2.type=VARCHAR schema.3.name=appkey schema.3.type=VARCHAR schema.4.name=build schema.4.type=VARCHAR schema.5.name=buoyId schema.5.type=BIGINT schema.6.name=gameHtmlId schema.6.type=BIGINT schema.7.name=uid schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65) ... 17 more 一下是pom内容 <name>Flink Quickstart Job</name> <url>http://www.myorganization.org</url> <profiles> <profile> <id>dev</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <project.scope>compile</project.scope> </properties> </profile> <profile> <id>pro</id> <properties> <project.scope>provided</project.scope> </properties> </profile> </profiles> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.1</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>${flink.version}</version> </dependency> <!--mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <!-- Gson--> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.doumob.flink.BuoyDataJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> 请问 这个应该如何解决 谢谢 |
+user-zh
------------------------------------------------------------------ From:JingsongLee <[hidden email]> Send Time:2020年1月15日(星期三) 16:05 To:Others <[hidden email]> Subject:Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 是的。 另一个方法是使用[1]的classpath,添加多个jars。 BTW, 回复邮件时请带上user-zh。 Best, Jingsong Lee [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage ------------------------------------------------------------------ From:Others <[hidden email]> Send Time:2020年1月15日(星期三) 15:54 To:[hidden email] JingsongLee <[hidden email]> Subject:回复: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 Hi, 我的集群 是Standalone 方式部署的 是加在 Flink Master机器下么 还是每一台都要加? 加完之后是否需要重启集群? ------------------ 原始邮件 ------------------ 发件人: "JingsongLee"<[hidden email]>; 发送时间: 2020年1月15日(星期三) 下午3:46 收件人: "Others"<[hidden email]>;"user-zh"<[hidden email]>; 主题: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 Hi, 我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。 你试试把flink-json和flink-kafka的jar直接放入flink/lib下 Best, Jingsong Lee ------------------------------------------------------------------ From:Others <[hidden email]> Send Time:2020年1月15日(星期三) 15:27 To:[hidden email] JingsongLee <[hidden email]> Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错 集群环境下应该放在哪个lib下? 一下是打包过程的log [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob --- [INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded jar. [INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar. [INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar. [INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar. [INFO] Including commons-collections:commons-collections:jar:3.2.2 in the shaded jar. [INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the shaded jar. [INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar. [INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar. [INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar. [INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar. [INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar. [INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar. [INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded jar. [INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the shaded jar. [INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar. [INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar. [INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar. [INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar. [INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar. [INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar. [INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar. [WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440 overlapping classes: [WARNING] - org.codehaus.janino.util.resource.ResourceCreator [WARNING] - org.codehaus.janino.ReflectionIClass$ReflectionIField [WARNING] - org.codehaus.janino.IClass$1 [WARNING] - org.codehaus.janino.UnitCompiler$35 [WARNING] - org.codehaus.janino.Java$CompilationUnit$SingleStaticImportDeclaration [WARNING] - org.codehaus.janino.Java$PackageMemberEnumDeclaration [WARNING] - org.codehaus.janino.UnitCompiler$13$1 [WARNING] - org.codehaus.janino.Unparser [WARNING] - org.codehaus.janino.CodeContext$Branch [WARNING] - org.codehaus.janino.UnitCompiler$33$2 [WARNING] - 430 more... [WARNING] avatica-core-1.15.0.jar, flink-table-planner-blink_2.11-1.9.1.jar define 605 overlapping classes: [WARNING] - org.apache.calcite.avatica.AvaticaParameter [WARNING] - org.apache.calcite.avatica.Meta$ExecuteResult [WARNING] - org.apache.calcite.avatica.ConnectStringParser [WARNING] - org.apache.calcite.avatica.ConnectionConfigImpl$3 [WARNING] - org.apache.calcite.avatica.AvaticaDatabaseMetaData$2 [WARNING] - org.apache.calcite.avatica.remote.RemoteMeta$11 [WARNING] - org.apache.calcite.avatica.proto.Common$1 [WARNING] - org.apache.calcite.avatica.remote.JsonService [WARNING] - org.apache.calcite.avatica.util.Spaces$SpaceString [WARNING] - org.apache.calcite.avatica.proto.Responses$DatabasePropertyResponseOrBuilder [WARNING] - 595 more... [WARNING] flink-table-planner-blink_2.11-1.9.1.jar, commons-compiler-3.0.9.jar define 28 overlapping classes: [WARNING] - org.codehaus.commons.compiler.package-info [WARNING] - org.codehaus.commons.compiler.ICookable [WARNING] - org.codehaus.commons.compiler.samples.ScriptDemo [WARNING] - org.codehaus.commons.compiler.Sandbox [WARNING] - org.codehaus.commons.compiler.CompileException [WARNING] - org.codehaus.commons.compiler.Sandbox$1 [WARNING] - org.codehaus.commons.compiler.WarningHandler [WARNING] - org.codehaus.commons.compiler.CompilerFactoryFactory [WARNING] - org.codehaus.commons.compiler.AbstractCompilerFactory [WARNING] - org.codehaus.commons.compiler.Cookable [WARNING] - 18 more... [WARNING] maven-shade-plugin has detected that some class files are [WARNING] present in two or more JARs. When this happens, only one [WARNING] single version of the class is copied to the uber jar. [WARNING] Usually this is not harmful and you can skip these warnings, [WARNING] otherwise try to manually exclude artifacts based on [WARNING] mvn dependency:tree -Ddetail=true and the above output. [WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/ [INFO] Replacing original artifact with shaded artifact. [INFO] Replacing /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT.jar with /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT-shaded.jar [INFO] Dependency-reduced POM written at: /Users/lidonghao/Documents/doumob/flinkjob/dependency-reduced-pom.xml [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 9.447 s [INFO] Finished at: 2020-01-15T15:24:56+08:00 [INFO] Final Memory: 69M/781M [INFO] ------------------------------------------------------------------------ Process finished with exit code 0 其中显示包含[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar. ------------------ 原始邮件 ------------------ 发件人: "JingsongLee"<[hidden email]>; 发送时间: 2020年1月15日(星期三) 下午3:19 收件人: "user-zh"<[hidden email]>; 主题: Re: 求助帖:flink 连接kafka source 部署集群报错 Hi, 你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。 Best, Jingsong Lee ------------------------------------------------------------------ From:Others <[hidden email]> Send Time:2020年1月15日(星期三) 15:03 To:user-zh <[hidden email]> Subject:求助帖:flink 连接kafka source 部署集群报错 我使用的flink 版本 是1.9.1 本地调试正常。部署集群启动时报一下错误 2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 9 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'. The following properties are requested: connector.properties.0.key=group.id connector.properties.0.value=consumer_flink_etl_test connector.properties.1.key=bootstrap.servers connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092 connector.property-version=1 connector.startup-mode=latest-offset connector.topic=flink_etl_pro connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.name=rowtime schema.0.rowtime.timestamps.from=cTime schema.0.rowtime.timestamps.type=from-field schema.0.rowtime.watermarks.delay=2000 schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP schema.1.name=event schema.1.type=VARCHAR schema.2.name=adSpaceKey schema.2.type=VARCHAR schema.3.name=appkey schema.3.type=VARCHAR schema.4.name=build schema.4.type=VARCHAR schema.5.name=buoyId schema.5.type=BIGINT schema.6.name=gameHtmlId schema.6.type=BIGINT schema.7.name=uid schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65) ... 17 more 一下是pom内容 <name>Flink Quickstart Job</name> <url>http://www.myorganization.org</url> <profiles> <profile> <id>dev</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <project.scope>compile</project.scope> </properties> </profile> <profile> <id>pro</id> <properties> <project.scope>provided</project.scope> </properties> </profile> </profiles> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.1</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>${flink.version}</version> </dependency> <!--mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <!-- Gson--> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.doumob.flink.BuoyDataJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> 请问 这个应该如何解决 谢谢 |
In reply to this post by Others
Hi:
之前有遇到过类似的情况, 我这边是kafka的factory没有找到. 把 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies 下的jar放在flink的lib目录下(可能需要重启一下集群) 就行了. 你试一下吧. 在2020年01月15日 14:59,Others<[hidden email]> 写道: 我使用的flink 版本 是1.9.1 本地调试正常。部署集群启动时报一下错误 2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 9 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'. The following properties are requested: connector.properties.0.key=group.id connector.properties.0.value=consumer_flink_etl_test connector.properties.1.key=bootstrap.servers connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092 connector.property-version=1 connector.startup-mode=latest-offset connector.topic=flink_etl_pro connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.name=rowtime schema.0.rowtime.timestamps.from=cTime schema.0.rowtime.timestamps.type=from-field schema.0.rowtime.watermarks.delay=2000 schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP schema.1.name=event schema.1.type=VARCHAR schema.2.name=adSpaceKey schema.2.type=VARCHAR schema.3.name=appkey schema.3.type=VARCHAR schema.4.name=build schema.4.type=VARCHAR schema.5.name=buoyId schema.5.type=BIGINT schema.6.name=gameHtmlId schema.6.type=BIGINT schema.7.name=uid schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65) ... 17 more 一下是pom内容 <name>Flink Quickstart Job</name> <url>http://www.myorganization.org</url> <profiles> <profile> <id>dev</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <project.scope>compile</project.scope> </properties> </profile> <profile> <id>pro</id> <properties> <project.scope>provided</project.scope> </properties> </profile> </profiles> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.1</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${project.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>${flink.version}</version> </dependency> <!--mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <!-- Gson--> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.doumob.flink.BuoyDataJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> 请问 这个应该如何解决 谢谢 |
放到 lib 下可以成功启动了 感谢解答
------------------ 原始邮件 ------------------ 发件人: "AS"<[hidden email]>; 发送时间: 2020年1月15日(星期三) 下午4:19 收件人: "[hidden email]"<[hidden email]>; 主题: 回复:求助帖:flink 连接kafka source 部署集群报错 Hi: 之前有遇到过类似的情况, 我这边是kafka的factory没有找到. 把 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies 下的jar放在flink的lib目录下(可能需要重启一下集群) 就行了. 你试一下吧. 在2020年01月15日 14:59,Others<[hidden email]> 写道: 我使用的flink 版本 是1.9.1 本地调试正常。部署集群启动时报一下错误 2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) &nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) &nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) &nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) &nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) &nbsp;&nbsp;&nbsp;&nbsp;at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86) &nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) &nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) &nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) &nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:498) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) &nbsp;&nbsp;&nbsp;&nbsp;... 9 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'. The following properties are requested: connector.properties.0.key=group.id connector.properties.0.value=consumer_flink_etl_test connector.properties.1.key=bootstrap.servers connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092 connector.property-version=1 connector.startup-mode=latest-offset connector.topic=flink_etl_pro connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.name=rowtime schema.0.rowtime.timestamps.from=cTime schema.0.rowtime.timestamps.type=from-field schema.0.rowtime.watermarks.delay=2000 schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP schema.1.name=event schema.1.type=VARCHAR schema.2.name=adSpaceKey schema.2.type=VARCHAR schema.3.name=appkey schema.3.type=VARCHAR schema.4.name=build schema.4.type=VARCHAR schema.5.name=buoyId schema.5.type=BIGINT schema.6.name=gameHtmlId schema.6.type=BIGINT schema.7.name=uid schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65) &nbsp;&nbsp;&nbsp;&nbsp;... 17 more 一下是pom内容 <name&gt;Flink Quickstart Job</name&gt; <url&gt;http://www.myorganization.org</url&gt; <profiles&gt; <profile&gt; <id&gt;dev</id&gt; <activation&gt; <activeByDefault&gt;true</activeByDefault&gt; </activation&gt; <properties&gt; <project.scope&gt;compile</project.scope&gt; </properties&gt; </profile&gt; <profile&gt; <id&gt;pro</id&gt; <properties&gt; <project.scope&gt;provided</project.scope&gt; </properties&gt; </profile&gt; </profiles&gt; <properties&gt; <project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt; <flink.version&gt;1.9.1</flink.version&gt; <java.version&gt;1.8</java.version&gt; <scala.binary.version&gt;2.11</scala.binary.version&gt; <maven.compiler.source&gt;${java.version}</maven.compiler.source&gt; <maven.compiler.target&gt;${java.version}</maven.compiler.target&gt; </properties&gt; <repositories&gt; <repository&gt; <id&gt;apache.snapshots</id&gt; <name&gt;Apache Development Snapshot Repository</name&gt; <url&gt;https://repository.apache.org/content/repositories/snapshots/</url&gt; <releases&gt; <enabled&gt;false</enabled&gt; </releases&gt; <snapshots&gt; <enabled&gt;true</enabled&gt; </snapshots&gt; </repository&gt; </repositories&gt; <dependencies&gt; <!-- Apache Flink dependencies --&gt; <!-- These dependencies are provided, because they should not be packaged into the JAR file. --&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-java</artifactId&gt; <version&gt;${flink.version}</version&gt; <scope&gt;${project.scope}</scope&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-streaming-java_${scala.binary.version}</artifactId&gt; <version&gt;${flink.version}</version&gt; <scope&gt;${project.scope}</scope&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-table-api-java-bridge_2.11</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-table-planner-blink_2.11</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-connector-kafka_2.11</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-json</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-table-common</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-jdbc_2.11</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <!--mysql--&gt; <dependency&gt; <groupId&gt;mysql</groupId&gt; <artifactId&gt;mysql-connector-java</artifactId&gt; <version&gt;5.1.48</version&gt; </dependency&gt; <!-- Gson--&gt; <dependency&gt; <groupId&gt;com.google.code.gson</groupId&gt; <artifactId&gt;gson</artifactId&gt; <version&gt;2.8.5</version&gt; </dependency&gt; <!-- Add logging framework, to produce console output when running in the IDE. --&gt; <!-- These dependencies are excluded from the application JAR by default. --&gt; <dependency&gt; <groupId&gt;org.slf4j</groupId&gt; <artifactId&gt;slf4j-log4j12</artifactId&gt; <version&gt;1.7.7</version&gt; <scope&gt;runtime</scope&gt; </dependency&gt; <dependency&gt; <groupId&gt;log4j</groupId&gt; <artifactId&gt;log4j</artifactId&gt; <version&gt;1.2.17</version&gt; <scope&gt;runtime</scope&gt; </dependency&gt; </dependencies&gt; <build&gt; <plugins&gt; <!-- Java Compiler --&gt; <plugin&gt; <groupId&gt;org.apache.maven.plugins</groupId&gt; <artifactId&gt;maven-compiler-plugin</artifactId&gt; <version&gt;3.1</version&gt; <configuration&gt; <source&gt;${java.version}</source&gt; <target&gt;${java.version}</target&gt; </configuration&gt; </plugin&gt; <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --&gt; <!-- Change the value of <mainClass&gt;...</mainClass&gt; if your program entry point changes. --&gt; <plugin&gt; <groupId&gt;org.apache.maven.plugins</groupId&gt; <artifactId&gt;maven-shade-plugin</artifactId&gt; <version&gt;3.0.0</version&gt; <executions&gt; <!-- Run shade goal on package phase --&gt; <execution&gt; <phase&gt;package</phase&gt; <goals&gt; <goal&gt;shade</goal&gt; </goals&gt; <configuration&gt; <artifactSet&gt; <excludes&gt; <exclude&gt;org.apache.flink:force-shading</exclude&gt; <exclude&gt;com.google.code.findbugs:jsr305</exclude&gt; <exclude&gt;org.slf4j:*</exclude&gt; <exclude&gt;log4j:*</exclude&gt; </excludes&gt; </artifactSet&gt; <filters&gt; <filter&gt; <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --&gt; <artifact&gt;*:*</artifact&gt; <excludes&gt; <exclude&gt;META-INF/*.SF</exclude&gt; <exclude&gt;META-INF/*.DSA</exclude&gt; <exclude&gt;META-INF/*.RSA</exclude&gt; </excludes&gt; </filter&gt; </filters&gt; <transformers&gt; <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&gt; <mainClass&gt;com.doumob.flink.BuoyDataJob</mainClass&gt; </transformer&gt; </transformers&gt; </configuration&gt; </execution&gt; </executions&gt; </plugin&gt; </plugins&gt; </build&gt; 请问 这个应该如何解决 谢谢 |
In reply to this post by JingsongLee
放到 lib 下可以成功启动了 感谢解答
------------------ 原始邮件 ------------------ 发件人: "JingsongLee"<[hidden email]>; 发送时间: 2020年1月15日(星期三) 下午4:06 收件人: "JingsongLee"<[hidden email]>;"Others"<[hidden email]>; 抄送: "user-zh"<[hidden email]>; 主题: Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 +user-zh ------------------------------------------------------------------ From:JingsongLee <[hidden email]> Send Time:2020年1月15日(星期三) 16:05 To:Others <[hidden email]> Subject:Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 是的。 另一个方法是使用[1]的classpath,添加多个jars。 BTW, 回复邮件时请带上user-zh。 Best, Jingsong Lee [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage ------------------------------------------------------------------ From:Others <[hidden email]> Send Time:2020年1月15日(星期三) 15:54 To:[hidden email] JingsongLee <[hidden email]> Subject:回复: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 Hi, 我的集群 是Standalone 方式部署的 是加在 Flink Master机器下么 还是每一台都要加? 加完之后是否需要重启集群? ------------------ 原始邮件 ------------------ 发件人: "JingsongLee"<[hidden email]>; 发送时间: 2020年1月15日(星期三) 下午3:46 收件人: "Others"<[hidden email]>;"user-zh"<[hidden email]>; 主题: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 Hi, 我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。 你试试把flink-json和flink-kafka的jar直接放入flink/lib下 Best, Jingsong Lee ------------------------------------------------------------------ From:Others <[hidden email]> Send Time:2020年1月15日(星期三) 15:27 To:[hidden email] JingsongLee <[hidden email]> Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错 集群环境下应该放在哪个lib下? 一下是打包过程的log [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob --- [INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded jar. [INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar. [INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar. [INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar. [INFO] Including commons-collections:commons-collections:jar:3.2.2 in the shaded jar. [INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the shaded jar. [INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar. [INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar. [INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar. [INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar. [INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar. [INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar. [INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded jar. [INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the shaded jar. [INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar. [INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar. [INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar. [INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar. [INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar. [INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar. [INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar. [WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440 overlapping classes: [WARNING] - org.codehaus.janino.util.resource.ResourceCreator [WARNING] - org.codehaus.janino.ReflectionIClass$ReflectionIField [WARNING] - org.codehaus.janino.IClass$1 [WARNING] - org.codehaus.janino.UnitCompiler$35 [WARNING] - org.codehaus.janino.Java$CompilationUnit$SingleStaticImportDeclaration [WARNING] - org.codehaus.janino.Java$PackageMemberEnumDeclaration [WARNING] - org.codehaus.janino.UnitCompiler$13$1 [WARNING] - org.codehaus.janino.Unparser [WARNING] - org.codehaus.janino.CodeContext$Branch [WARNING] - org.codehaus.janino.UnitCompiler$33$2 [WARNING] - 430 more... [WARNING] avatica-core-1.15.0.jar, flink-table-planner-blink_2.11-1.9.1.jar define 605 overlapping classes: [WARNING] - org.apache.calcite.avatica.AvaticaParameter [WARNING] - org.apache.calcite.avatica.Meta$ExecuteResult [WARNING] - org.apache.calcite.avatica.ConnectStringParser [WARNING] - org.apache.calcite.avatica.ConnectionConfigImpl$3 [WARNING] - org.apache.calcite.avatica.AvaticaDatabaseMetaData$2 [WARNING] - org.apache.calcite.avatica.remote.RemoteMeta$11 [WARNING] - org.apache.calcite.avatica.proto.Common$1 [WARNING] - org.apache.calcite.avatica.remote.JsonService [WARNING] - org.apache.calcite.avatica.util.Spaces$SpaceString [WARNING] - org.apache.calcite.avatica.proto.Responses$DatabasePropertyResponseOrBuilder [WARNING] - 595 more... [WARNING] flink-table-planner-blink_2.11-1.9.1.jar, commons-compiler-3.0.9.jar define 28 overlapping classes: [WARNING] - org.codehaus.commons.compiler.package-info [WARNING] - org.codehaus.commons.compiler.ICookable [WARNING] - org.codehaus.commons.compiler.samples.ScriptDemo [WARNING] - org.codehaus.commons.compiler.Sandbox [WARNING] - org.codehaus.commons.compiler.CompileException [WARNING] - org.codehaus.commons.compiler.Sandbox$1 [WARNING] - org.codehaus.commons.compiler.WarningHandler [WARNING] - org.codehaus.commons.compiler.CompilerFactoryFactory [WARNING] - org.codehaus.commons.compiler.AbstractCompilerFactory [WARNING] - org.codehaus.commons.compiler.Cookable [WARNING] - 18 more... [WARNING] maven-shade-plugin has detected that some class files are [WARNING] present in two or more JARs. When this happens, only one [WARNING] single version of the class is copied to the uber jar. [WARNING] Usually this is not harmful and you can skip these warnings, [WARNING] otherwise try to manually exclude artifacts based on [WARNING] mvn dependency:tree -Ddetail=true and the above output. [WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/ [INFO] Replacing original artifact with shaded artifact. [INFO] Replacing /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT.jar with /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT-shaded.jar [INFO] Dependency-reduced POM written at: /Users/lidonghao/Documents/doumob/flinkjob/dependency-reduced-pom.xml [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 9.447 s [INFO] Finished at: 2020-01-15T15:24:56+08:00 [INFO] Final Memory: 69M/781M [INFO] ------------------------------------------------------------------------ Process finished with exit code 0 其中显示包含[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar. ------------------ 原始邮件 ------------------ 发件人: "JingsongLee"<[hidden email]>; 发送时间: 2020年1月15日(星期三) 下午3:19 收件人: "user-zh"<[hidden email]>; 主题: Re: 求助帖:flink 连接kafka source 部署集群报错 Hi, 你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。 Best, Jingsong Lee ------------------------------------------------------------------ From:Others <[hidden email]> Send Time:2020年1月15日(星期三) 15:03 To:user-zh <[hidden email]> Subject:求助帖:flink 连接kafka source 部署集群报错 我使用的flink 版本 是1.9.1 本地调试正常。部署集群启动时报一下错误 2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) &nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) &nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) &nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) &nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) &nbsp;&nbsp;&nbsp;&nbsp;at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86) &nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) &nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) &nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) &nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:498) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) &nbsp;&nbsp;&nbsp;&nbsp;... 9 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'. The following properties are requested: connector.properties.0.key=group.id connector.properties.0.value=consumer_flink_etl_test connector.properties.1.key=bootstrap.servers connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092 connector.property-version=1 connector.startup-mode=latest-offset connector.topic=flink_etl_pro connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.name=rowtime schema.0.rowtime.timestamps.from=cTime schema.0.rowtime.timestamps.type=from-field schema.0.rowtime.watermarks.delay=2000 schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP schema.1.name=event schema.1.type=VARCHAR schema.2.name=adSpaceKey schema.2.type=VARCHAR schema.3.name=appkey schema.3.type=VARCHAR schema.4.name=build schema.4.type=VARCHAR schema.5.name=buoyId schema.5.type=BIGINT schema.6.name=gameHtmlId schema.6.type=BIGINT schema.7.name=uid schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) &nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65) &nbsp;&nbsp;&nbsp;&nbsp;... 17 more 一下是pom内容 <name&gt;Flink Quickstart Job</name&gt; <url&gt;http://www.myorganization.org</url&gt; <profiles&gt; <profile&gt; <id&gt;dev</id&gt; <activation&gt; <activeByDefault&gt;true</activeByDefault&gt; </activation&gt; <properties&gt; <project.scope&gt;compile</project.scope&gt; </properties&gt; </profile&gt; <profile&gt; <id&gt;pro</id&gt; <properties&gt; <project.scope&gt;provided</project.scope&gt; </properties&gt; </profile&gt; </profiles&gt; <properties&gt; <project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt; <flink.version&gt;1.9.1</flink.version&gt; <java.version&gt;1.8</java.version&gt; <scala.binary.version&gt;2.11</scala.binary.version&gt; <maven.compiler.source&gt;${java.version}</maven.compiler.source&gt; <maven.compiler.target&gt;${java.version}</maven.compiler.target&gt; </properties&gt; <repositories&gt; <repository&gt; <id&gt;apache.snapshots</id&gt; <name&gt;Apache Development Snapshot Repository</name&gt; <url&gt;https://repository.apache.org/content/repositories/snapshots/</url&gt; <releases&gt; <enabled&gt;false</enabled&gt; </releases&gt; <snapshots&gt; <enabled&gt;true</enabled&gt; </snapshots&gt; </repository&gt; </repositories&gt; <dependencies&gt; <!-- Apache Flink dependencies --&gt; <!-- These dependencies are provided, because they should not be packaged into the JAR file. --&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-java</artifactId&gt; <version&gt;${flink.version}</version&gt; <scope&gt;${project.scope}</scope&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-streaming-java_${scala.binary.version}</artifactId&gt; <version&gt;${flink.version}</version&gt; <scope&gt;${project.scope}</scope&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-table-api-java-bridge_2.11</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-table-planner-blink_2.11</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-connector-kafka_2.11</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-json</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-table-common</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-jdbc_2.11</artifactId&gt; <version&gt;${flink.version}</version&gt; </dependency&gt; <!--mysql--&gt; <dependency&gt; <groupId&gt;mysql</groupId&gt; <artifactId&gt;mysql-connector-java</artifactId&gt; <version&gt;5.1.48</version&gt; </dependency&gt; <!-- Gson--&gt; <dependency&gt; <groupId&gt;com.google.code.gson</groupId&gt; <artifactId&gt;gson</artifactId&gt; <version&gt;2.8.5</version&gt; </dependency&gt; <!-- Add logging framework, to produce console output when running in the IDE. --&gt; <!-- These dependencies are excluded from the application JAR by default. --&gt; <dependency&gt; <groupId&gt;org.slf4j</groupId&gt; <artifactId&gt;slf4j-log4j12</artifactId&gt; <version&gt;1.7.7</version&gt; <scope&gt;runtime</scope&gt; </dependency&gt; <dependency&gt; <groupId&gt;log4j</groupId&gt; <artifactId&gt;log4j</artifactId&gt; <version&gt;1.2.17</version&gt; <scope&gt;runtime</scope&gt; </dependency&gt; </dependencies&gt; <build&gt; <plugins&gt; <!-- Java Compiler --&gt; <plugin&gt; <groupId&gt;org.apache.maven.plugins</groupId&gt; <artifactId&gt;maven-compiler-plugin</artifactId&gt; <version&gt;3.1</version&gt; <configuration&gt; <source&gt;${java.version}</source&gt; <target&gt;${java.version}</target&gt; </configuration&gt; </plugin&gt; <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --&gt; <!-- Change the value of <mainClass&gt;...</mainClass&gt; if your program entry point changes. --&gt; <plugin&gt; <groupId&gt;org.apache.maven.plugins</groupId&gt; <artifactId&gt;maven-shade-plugin</artifactId&gt; <version&gt;3.0.0</version&gt; <executions&gt; <!-- Run shade goal on package phase --&gt; <execution&gt; <phase&gt;package</phase&gt; <goals&gt; <goal&gt;shade</goal&gt; </goals&gt; <configuration&gt; <artifactSet&gt; <excludes&gt; <exclude&gt;org.apache.flink:force-shading</exclude&gt; <exclude&gt;com.google.code.findbugs:jsr305</exclude&gt; <exclude&gt;org.slf4j:*</exclude&gt; <exclude&gt;log4j:*</exclude&gt; </excludes&gt; </artifactSet&gt; <filters&gt; <filter&gt; <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --&gt; <artifact&gt;*:*</artifact&gt; <excludes&gt; <exclude&gt;META-INF/*.SF</exclude&gt; <exclude&gt;META-INF/*.DSA</exclude&gt; <exclude&gt;META-INF/*.RSA</exclude&gt; </excludes&gt; </filter&gt; </filters&gt; <transformers&gt; <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&gt; <mainClass&gt;com.doumob.flink.BuoyDataJob</mainClass&gt; </transformer&gt; </transformers&gt; </configuration&gt; </execution&gt; </executions&gt; </plugin&gt; </plugins&gt; </build&gt; 请问 这个应该如何解决 谢谢 |
Free forum by Nabble | Edit this page |