Hi:
flink版本1.12.0: 我想在sql-client-defaults.yaml中配置一张表,配置如下: tables: - name: t_users type: source-table connector: property-version: 1 type: kafka version: universal topic: ods.userAnalysis.user_profile startup-mode: latest-offset properties: bootstrap.servers: hostname:9092 group.id: flink-analysis format: type: debezium-avro-confluent property-version: 1 debezium-avro-confluent.schema-registry.url: http://hostname:8081 #schema-registry.url: http://hostname:8081 schema: - name: userId data-type: STRING - name: province data-type: STRING - name: city data-type: STRING - name: age data-type: INT - name: education data-type: STRING - name: jobType data-type: STRING - name: marriage data-type: STRING - name: sex data-type: STRING - name: interest data-type: STRING 我把相关的包都已经放到了lib目录下,启动sql cli时报错如下: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector.properties.bootstrap.servers=henghe66:9092 connector.properties.group.id=flink-analysis connector.property-version=1 connector.startup-mode=latest-offset connector.topic=ods.userAnalysis.user_profile connector.type=kafka connector.version=universal format.debezium-avro-confluent.schema-registry.url= http://192.168.101.43:8081 format.property-version=1 format.type=debezium-avro-confluent schema.0.data-type=VARCHAR(2147483647) schema.0.name=userId schema.1.data-type=VARCHAR(2147483647) schema.1.name=province schema.2.data-type=VARCHAR(2147483647) schema.2.name=city schema.3.data-type=INT schema.3.name=age schema.4.data-type=VARCHAR(2147483647) schema.4.name=education schema.5.data-type=VARCHAR(2147483647) schema.5.name=jobType schema.6.data-type=VARCHAR(2147483647) schema.6.name=marriage schema.7.data-type=VARCHAR(2147483647) schema.7.name=sex schema.8.data-type=VARCHAR(2147483647) schema.8.name=interest The following factories have been considered: org.apache.flink.formats.avro.AvroRowFormatFactory org.apache.flink.formats.csv.CsvRowFormatFactory org.apache.flink.formats.json.JsonRowFormatFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61) at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63) at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867) ... 3 more 此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。 所以难道是我在sql-client-defaults.yaml中配置错了吗? 请知道的大佬告知。 祝好! |
Administrator
|
YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format
只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260 Best, Jark On Tue, 24 Nov 2020 at 18:52, jy l <[hidden email]> wrote: > Hi: > flink版本1.12.0: > > 我想在sql-client-defaults.yaml中配置一张表,配置如下: > > tables: > > - name: t_users > > type: source-table > > connector: > > property-version: 1 > > type: kafka > > version: universal > > topic: ods.userAnalysis.user_profile > > startup-mode: latest-offset > > properties: > > bootstrap.servers: hostname:9092 > > group.id: flink-analysis > > format: > > type: debezium-avro-confluent > > property-version: 1 > > debezium-avro-confluent.schema-registry.url: http://hostname:8081 > > #schema-registry.url: http://hostname:8081 > > schema: > > - name: userId > > data-type: STRING > > - name: province > > data-type: STRING > > - name: city > > data-type: STRING > > - name: age > > data-type: INT > > - name: education > > data-type: STRING > > - name: jobType > > data-type: STRING > > - name: marriage > > data-type: STRING > > - name: sex > > data-type: STRING > > - name: interest > > data-type: STRING > > > > > 我把相关的包都已经放到了lib目录下,启动sql cli时报错如下: > > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: Unexpected exception. > This is a bug. Please consider filing an issue. > > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208) > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878) > > at > > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226) > > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > > the classpath. > > > Reason: Required context properties mismatch. > > > The following properties are requested: > > connector.properties.bootstrap.servers=henghe66:9092 > > connector.properties.group.id=flink-analysis > > connector.property-version=1 > > connector.startup-mode=latest-offset > > connector.topic=ods.userAnalysis.user_profile > > connector.type=kafka > > connector.version=universal > > format.debezium-avro-confluent.schema-registry.url= > http://192.168.101.43:8081 > > format.property-version=1 > > format.type=debezium-avro-confluent > > schema.0.data-type=VARCHAR(2147483647) > > schema.0.name=userId > > schema.1.data-type=VARCHAR(2147483647) > > schema.1.name=province > > schema.2.data-type=VARCHAR(2147483647) > > schema.2.name=city > > schema.3.data-type=INT > > schema.3.name=age > > schema.4.data-type=VARCHAR(2147483647) > > schema.4.name=education > > schema.5.data-type=VARCHAR(2147483647) > > schema.5.name=jobType > > schema.6.data-type=VARCHAR(2147483647) > > schema.6.name=marriage > > schema.7.data-type=VARCHAR(2147483647) > > schema.7.name=sex > > schema.8.data-type=VARCHAR(2147483647) > > schema.8.name=interest > > > The following factories have been considered: > > org.apache.flink.formats.avro.AvroRowFormatFactory > > org.apache.flink.formats.csv.CsvRowFormatFactory > > org.apache.flink.formats.json.JsonRowFormatFactory > > at > > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > > at > > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > > at > > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > > at > > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > > at > > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289) > > at > > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171) > > at > > org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61) > > at > > org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63) > > at > > org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646) > > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867) > > ... 3 more > > > 此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。 > > 所以难道是我在sql-client-defaults.yaml中配置错了吗? > > 请知道的大佬告知。 > > > 祝好! > |
Free forum by Nabble | Edit this page |