比如:
CREATE TABLE my_table ( id BIGINT, first_name STRING, last_name STRING, email STRING ) WITH ( 'connector'='kafka', 'topic'='user_topic', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='debezium-json' ); 最终解析 debezium-json 应该是 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 下面的代码 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? 谢谢, 王磊 [hidden email] |
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors Best, Godfrey [hidden email] <[hidden email]> 于2020年7月16日周四 下午4:02写道: > 比如: > > CREATE TABLE my_table ( > id BIGINT, > first_name STRING, > last_name STRING, > email STRING > ) WITH ( > 'connector'='kafka', > 'topic'='user_topic', > 'properties.bootstrap.servers'='localhost:9092', > 'scan.startup.mode'='earliest-offset', > 'format'='debezium-json' > ); > > 最终解析 debezium-json 应该是 > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > 下面的代码 > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > 谢谢, > 王磊 > > > [hidden email] > > |
我在 flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory 找到了 SPI 的配置: org.apache.flink.formats.json.JsonFileSystemFormatFactory org.apache.flink.formats.json.JsonFormatFactory org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory org.apache.flink.formats.json.canal.CanalJsonFormatFactory 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 代码没找到类似的关系映射配置。 谢谢, 王磊 [hidden email] Sender: godfrey he Send Time: 2020-07-16 16:38 Receiver: user-zh Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors Best, Godfrey [hidden email] <[hidden email]> 于2020年7月16日周四 下午4:02写道: > 比如: > > CREATE TABLE my_table ( > id BIGINT, > first_name STRING, > last_name STRING, > email STRING > ) WITH ( > 'connector'='kafka', > 'topic'='user_topic', > 'properties.bootstrap.servers'='localhost:9092', > 'scan.startup.mode'='earliest-offset', > 'format'='debezium-json' > ); > > 最终解析 debezium-json 应该是 > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > 下面的代码 > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > 谢谢, > 王磊 > > > [hidden email] > > |
我的理解 : 大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class 类似下面的代码
private static <T extends TableFactory> T findSingleInternal( Class<T> factoryClass, Map<String, String> properties, Optional<ClassLoader> classLoader) { List<TableFactory> tableFactories = discoverFactories(classLoader); List<T> filtered = filter(tableFactories, factoryClass, properties); if (filtered.size() > 1) { throw new AmbiguousTableFactoryException( filtered, factoryClass, tableFactories, properties); } else { return filtered.get(0); } } private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) { try { List<TableFactory> result = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add); return result; } catch (ServiceConfigurationError e) { LOG.error("Could not load service provider for table factories.", e); throw new TableException("Could not load service provider for table factories.", e); } } [hidden email] <[hidden email]> 于2020年7月16日周四 下午7:04写道: > > 我在 > flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory > 找到了 SPI 的配置: > > org.apache.flink.formats.json.JsonFileSystemFormatFactory > org.apache.flink.formats.json.JsonFormatFactory > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory > org.apache.flink.formats.json.canal.CanalJsonFormatFactory > > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep > 代码没找到类似的关系映射配置。 > > > 谢谢, > 王磊 > > > > [hidden email] > > > Sender: godfrey he > Send Time: 2020-07-16 16:38 > Receiver: user-zh > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors > > Best, > Godfrey > > [hidden email] <[hidden email]> 于2020年7月16日周四 > 下午4:02写道: > > > 比如: > > > > CREATE TABLE my_table ( > > id BIGINT, > > first_name STRING, > > last_name STRING, > > email STRING > > ) WITH ( > > 'connector'='kafka', > > 'topic'='user_topic', > > 'properties.bootstrap.servers'='localhost:9092', > > 'scan.startup.mode'='earliest-offset', > > 'format'='debezium-json' > > ); > > > > 最终解析 debezium-json 应该是 > > > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > > 下面的代码 > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > > > 谢谢, > > 王磊 > > > > > > [hidden email] > > > > > -- Best Regards, Harold Miao |
In reply to this post by wanglei2@geekplus.com.cn
Hi,
> 在 2020年7月16日,19:04,[hidden email] 写道: > > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 代码没找到类似的关系映射配置。 你DDL中不是写了 ‘format’ = ‘debzium-json’ 吗?就是这里指明的。 |
In reply to this post by Harold.Miao
谢谢,我理解了。 [hidden email] Sender: Harold.Miao Send Time: 2020-07-16 19:33 Receiver: user-zh Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? 我的理解 : 大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class 类似下面的代码 private static <T extends TableFactory> T findSingleInternal( Class<T> factoryClass, Map<String, String> properties, Optional<ClassLoader> classLoader) { List<TableFactory> tableFactories = discoverFactories(classLoader); List<T> filtered = filter(tableFactories, factoryClass, properties); if (filtered.size() > 1) { throw new AmbiguousTableFactoryException( filtered, factoryClass, tableFactories, properties); } else { return filtered.get(0); } } private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) { try { List<TableFactory> result = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add); return result; } catch (ServiceConfigurationError e) { LOG.error("Could not load service provider for table factories.", e); throw new TableException("Could not load service provider for table factories.", e); } } [hidden email] <[hidden email]> 于2020年7月16日周四 下午7:04写道: > > 我在 > flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory > 找到了 SPI 的配置: > > org.apache.flink.formats.json.JsonFileSystemFormatFactory > org.apache.flink.formats.json.JsonFormatFactory > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory > org.apache.flink.formats.json.canal.CanalJsonFormatFactory > > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep > 代码没找到类似的关系映射配置。 > > > 谢谢, > 王磊 > > > > [hidden email] > > > Sender: godfrey he > Send Time: 2020-07-16 16:38 > Receiver: user-zh > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors > > Best, > Godfrey > > [hidden email] <[hidden email]> 于2020年7月16日周四 > 下午4:02写道: > > > 比如: > > > > CREATE TABLE my_table ( > > id BIGINT, > > first_name STRING, > > last_name STRING, > > email STRING > > ) WITH ( > > 'connector'='kafka', > > 'topic'='user_topic', > > 'properties.bootstrap.servers'='localhost:9092', > > 'scan.startup.mode'='earliest-offset', > > 'format'='debezium-json' > > ); > > > > 最终解析 debezium-json 应该是 > > > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > > 下面的代码 > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > > > 谢谢, > > 王磊 > > > > > > [hidden email] > > > > > -- Best Regards, Harold Miao |
Administrator
|
这个对应关系是通过 Factory#factoryIdentifier 来决定的。
比如 DebeziumJsonFormatFactory#factoryIdentifier() 就是返回了 'debezium-json' Best, Jark On Thu, 16 Jul 2020 at 22:29, [hidden email] < [hidden email]> wrote: > > 谢谢,我理解了。 > > > > [hidden email] > > Sender: Harold.Miao > Send Time: 2020-07-16 19:33 > Receiver: user-zh > Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > 我的理解 : 大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class 类似下面的代码 > > private static <T extends TableFactory> T findSingleInternal( > Class<T> factoryClass, > Map<String, String> properties, > Optional<ClassLoader> classLoader) { > > List<TableFactory> tableFactories = discoverFactories(classLoader); > List<T> filtered = filter(tableFactories, factoryClass, properties); > > if (filtered.size() > 1) { > throw new AmbiguousTableFactoryException( > filtered, > factoryClass, > tableFactories, > properties); > } else { > return filtered.get(0); > } > } > > private static List<TableFactory> > discoverFactories(Optional<ClassLoader> classLoader) { > try { > List<TableFactory> result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > return result; > } catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); > } > > } > > > [hidden email] <[hidden email]> 于2020年7月16日周四 > 下午7:04写道: > > > > > 我在 > > > flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory > > 找到了 SPI 的配置: > > > > org.apache.flink.formats.json.JsonFileSystemFormatFactory > > org.apache.flink.formats.json.JsonFormatFactory > > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory > > org.apache.flink.formats.json.canal.CanalJsonFormatFactory > > > > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory > > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep > > 代码没找到类似的关系映射配置。 > > > > > > 谢谢, > > 王磊 > > > > > > > > [hidden email] > > > > > > Sender: godfrey he > > Send Time: 2020-07-16 16:38 > > Receiver: user-zh > > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors > > > > Best, > > Godfrey > > > > [hidden email] <[hidden email]> 于2020年7月16日周四 > > 下午4:02写道: > > > > > 比如: > > > > > > CREATE TABLE my_table ( > > > id BIGINT, > > > first_name STRING, > > > last_name STRING, > > > email STRING > > > ) WITH ( > > > 'connector'='kafka', > > > 'topic'='user_topic', > > > 'properties.bootstrap.servers'='localhost:9092', > > > 'scan.startup.mode'='earliest-offset', > > > 'format'='debezium-json' > > > ); > > > > > > 最终解析 debezium-json 应该是 > > > > > > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > > > 下面的代码 > > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > > > > > 谢谢, > > > 王磊 > > > > > > > > > [hidden email] > > > > > > > > > > > -- > > Best Regards, > Harold Miao > |
Free forum by Nabble | Edit this page |