代码:
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); tableEnv.connect( new Kafka() .version("0.10") .topic("installmentdb_t_user") .startFromEarliest() .property("zookeeper.connect", "risk-kafka.aku:2181") .property("bootstrap.servers", "risk-kafka.aku:9092")) .withFormat(new Json().deriveSchema()) .withSchema(new Schema() .field("business", Types.STRING) .field("type", Types.STRING) .field("es", Types.LONG) ) .inAppendMode().registerTableSource("installmentdb_t_user"); Starting execution of program ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at feature.flinktask.sqltest.main(sqltest.java:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 12 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No context matches. The following properties are requested: connector.properties.0.key=zookeeper.connect connector.properties.0.value=risk-kafka.aku:2181 connector.properties.1.key=bootstrap.servers connector.properties.1.value=risk-kafka.aku:9092 connector.property-version=1 connector.startup-mode=earliest-offset connector.topic=installmentdb_t_user connector.type=kafka connector.version=0.10 format.derive-schema=true format.property-version=1 format.type=json schema.0.name=business schema.0.type=VARCHAR schema.1.name=type schema.1.type=VARCHAR schema.2.name=es schema.2.type=BIGINT update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.formats.json.JsonRowFormatFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) ... 20 more |
看你的报错,Kafka010TableSourceSinkFactory不在classpath里,需要把kafka connector的jar(0.10需要依赖flink-connector-kafka-0.10_2.11或者flink-connector-kafka-0.10_2.12)放到依赖里。
> 在 2019年9月10日,下午12:31,越张 <[hidden email]> 写道: > > 代码: > EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); > > tableEnv.connect( new Kafka() > .version("0.10") > .topic("installmentdb_t_user") > .startFromEarliest() > .property("zookeeper.connect", "risk-kafka.aku:2181") > .property("bootstrap.servers", "risk-kafka.aku:9092")) > .withFormat(new Json().deriveSchema()) > .withSchema(new Schema() > .field("business", Types.STRING) > .field("type", Types.STRING) > .field("es", Types.LONG) > ) > .inAppendMode().registerTableSource("installmentdb_t_user"); > > > > > Starting execution of program > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) > at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. > at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) > at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) > at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) > at feature.flinktask.sqltest.main(sqltest.java:39) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > ... 12 more > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > Reason: No context matches. > > The following properties are requested: > connector.properties.0.key=zookeeper.connect > connector.properties.0.value=risk-kafka.aku:2181 > connector.properties.1.key=bootstrap.servers > connector.properties.1.value=risk-kafka.aku:9092 > connector.property-version=1 > connector.startup-mode=earliest-offset > connector.topic=installmentdb_t_user > connector.type=kafka > connector.version=0.10 > format.derive-schema=true > format.property-version=1 > format.type=json > schema.0.name=business > schema.0.type=VARCHAR > schema.1.name=type > schema.1.type=VARCHAR > schema.2.name=es > schema.2.type=BIGINT > update-mode=append > > The following factories have been considered: > org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > org.apache.flink.table.sinks.CsvAppendTableSinkFactory > org.apache.flink.table.planner.StreamPlannerFactory > org.apache.flink.table.executor.StreamExecutorFactory > org.apache.flink.table.planner.delegation.BlinkPlannerFactory > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > org.apache.flink.formats.json.JsonRowFormatFactory > at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) > at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) > at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) > at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) > at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) > ... 20 more |
是的,是这个问题,发现包打在胖包里面了,但是找不到,把包放在flink lib 目录下就好了,很奇怪
> 在 2019年9月11日,上午9:35,Dian Fu <[hidden email]> 写道: > > 看你的报错,Kafka010TableSourceSinkFactory不在classpath里,需要把kafka connector的jar(0.10需要依赖flink-connector-kafka-0.10_2.11或者flink-connector-kafka-0.10_2.12)放到依赖里。 > > >> 在 2019年9月10日,下午12:31,越张 <[hidden email]> 写道: >> >> 代码: >> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); >> >> tableEnv.connect( new Kafka() >> .version("0.10") >> .topic("installmentdb_t_user") >> .startFromEarliest() >> .property("zookeeper.connect", "risk-kafka.aku:2181") >> .property("bootstrap.servers", "risk-kafka.aku:9092")) >> .withFormat(new Json().deriveSchema()) >> .withSchema(new Schema() >> .field("business", Types.STRING) >> .field("type", Types.STRING) >> .field("es", Types.LONG) >> ) >> .inAppendMode().registerTableSource("installmentdb_t_user"); >> >> >> >> >> Starting execution of program >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. >> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) >> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) >> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) >> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) >> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) >> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) >> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) >> Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. >> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) >> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) >> at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) >> at feature.flinktask.sqltest.main(sqltest.java:39) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >> ... 12 more >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in >> the classpath. >> >> Reason: No context matches. >> >> The following properties are requested: >> connector.properties.0.key=zookeeper.connect >> connector.properties.0.value=risk-kafka.aku:2181 >> connector.properties.1.key=bootstrap.servers >> connector.properties.1.value=risk-kafka.aku:9092 >> connector.property-version=1 >> connector.startup-mode=earliest-offset >> connector.topic=installmentdb_t_user >> connector.type=kafka >> connector.version=0.10 >> format.derive-schema=true >> format.property-version=1 >> format.type=json >> schema.0.name=business >> schema.0.type=VARCHAR >> schema.1.name=type >> schema.1.type=VARCHAR >> schema.2.name=es >> schema.2.type=BIGINT >> update-mode=append >> >> The following factories have been considered: >> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >> org.apache.flink.table.sources.CsvBatchTableSourceFactory >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >> org.apache.flink.table.sinks.CsvAppendTableSinkFactory >> org.apache.flink.table.planner.StreamPlannerFactory >> org.apache.flink.table.executor.StreamExecutorFactory >> org.apache.flink.table.planner.delegation.BlinkPlannerFactory >> org.apache.flink.table.planner.delegation.BlinkExecutorFactory >> org.apache.flink.formats.json.JsonRowFormatFactory >> at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) >> at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) >> at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) >> at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) >> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) >> ... 20 more > |
Free forum by Nabble | Edit this page |