SQL Cli中找不到DeserializationSchemaFactory

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

SQL Cli中找不到DeserializationSchemaFactory

jy l
Hi:
flink版本1.12.0:

我想在sql-client-defaults.yaml中配置一张表,配置如下:

tables:

  - name: t_users

    type: source-table

    connector:

        property-version: 1

        type: kafka

        version: universal

        topic: ods.userAnalysis.user_profile

        startup-mode: latest-offset

        properties:

            bootstrap.servers: hostname:9092

            group.id: flink-analysis

    format:

        type: debezium-avro-confluent

        property-version: 1

        debezium-avro-confluent.schema-registry.url: http://hostname:8081

        #schema-registry.url: http://hostname:8081

    schema:

        - name: userId

          data-type: STRING

        - name: province

          data-type: STRING

        - name: city

          data-type: STRING

        - name: age

          data-type: INT

        - name: education

          data-type: STRING

        - name: jobType

          data-type: STRING

        - name: marriage

          data-type: STRING

        - name: sex

          data-type: STRING

        - name: interest

          data-type: STRING




我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected exception.
This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.

at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)

at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in

the classpath.


Reason: Required context properties mismatch.


The following properties are requested:

connector.properties.bootstrap.servers=henghe66:9092

connector.properties.group.id=flink-analysis

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=ods.userAnalysis.user_profile

connector.type=kafka

connector.version=universal

format.debezium-avro-confluent.schema-registry.url=
http://192.168.101.43:8081

format.property-version=1

format.type=debezium-avro-confluent

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=userId

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=province

schema.2.data-type=VARCHAR(2147483647)

schema.2.name=city

schema.3.data-type=INT

schema.3.name=age

schema.4.data-type=VARCHAR(2147483647)

schema.4.name=education

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=jobType

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=marriage

schema.7.data-type=VARCHAR(2147483647)

schema.7.name=sex

schema.8.data-type=VARCHAR(2147483647)

schema.8.name=interest


The following factories have been considered:

org.apache.flink.formats.avro.AvroRowFormatFactory

org.apache.flink.formats.csv.CsvRowFormatFactory

org.apache.flink.formats.json.JsonRowFormatFactory

at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)

at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)

at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)

at
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)

at
org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)

at
org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138)

at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)

... 3 more


此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。

所以难道是我在sql-client-defaults.yaml中配置错了吗?

请知道的大佬告知。


祝好!
Reply | Threaded
Open this post in threaded view
|

Re: SQL Cli中找不到DeserializationSchemaFactory

Jark
Administrator
YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format
只实现了新接口,所以会找不到。
目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。
可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260

Best,
Jark

On Tue, 24 Nov 2020 at 18:52, jy l <[hidden email]> wrote:

> Hi:
> flink版本1.12.0:
>
> 我想在sql-client-defaults.yaml中配置一张表,配置如下:
>
> tables:
>
>   - name: t_users
>
>     type: source-table
>
>     connector:
>
>         property-version: 1
>
>         type: kafka
>
>         version: universal
>
>         topic: ods.userAnalysis.user_profile
>
>         startup-mode: latest-offset
>
>         properties:
>
>             bootstrap.servers: hostname:9092
>
>             group.id: flink-analysis
>
>     format:
>
>         type: debezium-avro-confluent
>
>         property-version: 1
>
>         debezium-avro-confluent.schema-registry.url: http://hostname:8081
>
>         #schema-registry.url: http://hostname:8081
>
>     schema:
>
>         - name: userId
>
>           data-type: STRING
>
>         - name: province
>
>           data-type: STRING
>
>         - name: city
>
>           data-type: STRING
>
>         - name: age
>
>           data-type: INT
>
>         - name: education
>
>           data-type: STRING
>
>         - name: jobType
>
>           data-type: STRING
>
>         - name: marriage
>
>           data-type: STRING
>
>         - name: sex
>
>           data-type: STRING
>
>         - name: interest
>
>           data-type: STRING
>
>
>
>
> 我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
>
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
>
> at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
>
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>
> the classpath.
>
>
> Reason: Required context properties mismatch.
>
>
> The following properties are requested:
>
> connector.properties.bootstrap.servers=henghe66:9092
>
> connector.properties.group.id=flink-analysis
>
> connector.property-version=1
>
> connector.startup-mode=latest-offset
>
> connector.topic=ods.userAnalysis.user_profile
>
> connector.type=kafka
>
> connector.version=universal
>
> format.debezium-avro-confluent.schema-registry.url=
> http://192.168.101.43:8081
>
> format.property-version=1
>
> format.type=debezium-avro-confluent
>
> schema.0.data-type=VARCHAR(2147483647)
>
> schema.0.name=userId
>
> schema.1.data-type=VARCHAR(2147483647)
>
> schema.1.name=province
>
> schema.2.data-type=VARCHAR(2147483647)
>
> schema.2.name=city
>
> schema.3.data-type=INT
>
> schema.3.name=age
>
> schema.4.data-type=VARCHAR(2147483647)
>
> schema.4.name=education
>
> schema.5.data-type=VARCHAR(2147483647)
>
> schema.5.name=jobType
>
> schema.6.data-type=VARCHAR(2147483647)
>
> schema.6.name=marriage
>
> schema.7.data-type=VARCHAR(2147483647)
>
> schema.7.name=sex
>
> schema.8.data-type=VARCHAR(2147483647)
>
> schema.8.name=interest
>
>
> The following factories have been considered:
>
> org.apache.flink.formats.avro.AvroRowFormatFactory
>
> org.apache.flink.formats.csv.CsvRowFormatFactory
>
> org.apache.flink.formats.json.JsonRowFormatFactory
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
>
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)
>
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)
>
> at
>
> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)
>
> at
>
> org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)
>
> at
>
> org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)
>
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)
>
> ... 3 more
>
>
> 此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。
>
> 所以难道是我在sql-client-defaults.yaml中配置错了吗?
>
> 请知道的大佬告知。
>
>
> 祝好!
>