这种问题,一般都是你kafka工厂所需要的必填参数缺失造成的,你看看是不是缺了参数,比如kafka的group
------------------ 原始邮件 ------------------ 发件人: 越张 <[hidden email]> 发送时间: 2019年9月10日 12:31 收件人: user-zh <[hidden email]> 主题: 回复:Flink1.9 sql 提交失败 代码: EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); tableEnv.connect( new Kafka() .version("0.10") .topic("installmentdb_t_user") .startFromEarliest() .property("zookeeper.connect", "risk-kafka.aku:2181") .property("bootstrap.servers", "risk-kafka.aku:9092")) .withFormat(new Json().deriveSchema()) .withSchema(new Schema() .field("business", Types.STRING) .field("type", Types.STRING) .field("es", Types.LONG) ) .inAppendMode().registerTableSource("installmentdb_t_user"); Starting execution of program ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at feature.flinktask.sqltest.main(sqltest.java:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 12 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No context matches. The following properties are requested: connector.properties.0.key=zookeeper.connect connector.properties.0.value=risk-kafka.aku:2181 connector.properties.1.key=bootstrap.servers connector.properties.1.value=risk-kafka.aku:9092 connector.property-version=1 connector.startup-mode=earliest-offset connector.topic=installmentdb_t_user connector.type=kafka connector.version=0.10 format.derive-schema=true format.property-version=1 format.type=json schema.0.name=business schema.0.type=VARCHAR schema.1.name=type schema.1.type=VARCHAR schema.2.name=es schema.2.type=BIGINT update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.formats.json.JsonRowFormatFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) ... 20 more |
Free forum by Nabble | Edit this page |