回复:Flink1.9 sql 提交失败

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

回复:Flink1.9 sql 提交失败

Jun Zhang-2
这种问题,一般都是你kafka工厂所需要的必填参数缺失造成的,你看看是不是缺了参数,比如kafka的group





------------------ 原始邮件 ------------------
发件人: 越张 <[hidden email]&gt;
发送时间: 2019年9月10日 12:31
收件人: user-zh <[hidden email]&gt;
主题: 回复:Flink1.9 sql 提交失败



代码:
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings);

tableEnv.connect(&nbsp;&nbsp;&nbsp; new Kafka()
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .version("0.10")
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .topic("installmentdb_t_user")
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .startFromEarliest()
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .property("zookeeper.connect", "risk-kafka.aku:2181")
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .property("bootstrap.servers", "risk-kafka.aku:9092"))
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withFormat(new Json().deriveSchema())
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .withSchema(new Schema()
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .field("business", Types.STRING)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .field("type", Types.STRING)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .field("es", Types.LONG)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .inAppendMode().registerTableSource("installmentdb_t_user");




Starting execution of program

------------------------------------------------------------
&nbsp;The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at feature.flinktask.sqltest.main(sqltest.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 12 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=risk-kafka.aku:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=risk-kafka.aku:9092
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=installmentdb_t_user
connector.type=kafka
connector.version=0.10
format.derive-schema=true
format.property-version=1
format.type=json
schema.0.name=business
schema.0.type=VARCHAR
schema.1.name=type
schema.1.type=VARCHAR
schema.2.name=es
schema.2.type=BIGINT
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.formats.json.JsonRowFormatFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
... 20 more