FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

wanglei2@geekplus.com.cn
比如:

CREATE TABLE my_table (
  id BIGINT,
 first_name STRING,
 last_name STRING,
 email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);

最终解析 debezium-json 应该是  flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 下面的代码
但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?

谢谢,
王磊


[hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

godfrey he
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors

Best,
Godfrey

[hidden email] <[hidden email]> 于2020年7月16日周四 下午4:02写道:

> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> [hidden email]
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

wanglei2@geekplus.com.cn

我在 flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory 找到了 SPI 的配置:

org.apache.flink.formats.json.JsonFileSystemFormatFactory
org.apache.flink.formats.json.JsonFormatFactory
org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
org.apache.flink.formats.json.canal.CanalJsonFormatFactory

还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 代码没找到类似的关系映射配置。


谢谢,
王磊



[hidden email]

 
Sender: godfrey he
Send Time: 2020-07-16 16:38
Receiver: user-zh
Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
 
Best,
Godfrey
 
[hidden email] <[hidden email]> 于2020年7月16日周四 下午4:02写道:
 

> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> [hidden email]
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

Harold.Miao
我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码

private static <T extends TableFactory> T findSingleInternal(
      Class<T> factoryClass,
      Map<String, String> properties,
      Optional<ClassLoader> classLoader) {

   List<TableFactory> tableFactories = discoverFactories(classLoader);
   List<T> filtered = filter(tableFactories, factoryClass, properties);

   if (filtered.size() > 1) {
      throw new AmbiguousTableFactoryException(
         filtered,
         factoryClass,
         tableFactories,
         properties);
   } else {
      return filtered.get(0);
   }
}

private static List<TableFactory>
discoverFactories(Optional<ClassLoader> classLoader) {
   try {
      List<TableFactory> result = new LinkedList<>();
      ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
      ServiceLoader
         .load(TableFactory.class, cl)
         .iterator()
         .forEachRemaining(result::add);
      return result;
   } catch (ServiceConfigurationError e) {
      LOG.error("Could not load service provider for table factories.", e);
      throw new TableException("Could not load service provider for
table factories.", e);
   }

}


[hidden email] <[hidden email]> 于2020年7月16日周四 下午7:04写道:

>
> 我在
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> 找到了 SPI 的配置:
>
> org.apache.flink.formats.json.JsonFileSystemFormatFactory
> org.apache.flink.formats.json.JsonFormatFactory
> org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> org.apache.flink.formats.json.canal.CanalJsonFormatFactory
>
> 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> 代码没找到类似的关系映射配置。
>
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
>
> Sender: godfrey he
> Send Time: 2020-07-16 16:38
> Receiver: user-zh
> Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
>
> Best,
> Godfrey
>
> [hidden email] <[hidden email]> 于2020年7月16日周四
> 下午4:02写道:
>
> > 比如:
> >
> > CREATE TABLE my_table (
> >   id BIGINT,
> >  first_name STRING,
> >  last_name STRING,
> >  email STRING
> > ) WITH (
> >  'connector'='kafka',
> >  'topic'='user_topic',
> >  'properties.bootstrap.servers'='localhost:9092',
> >  'scan.startup.mode'='earliest-offset',
> >  'format'='debezium-json'
> > );
> >
> > 最终解析 debezium-json 应该是
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > 下面的代码
> > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> >
> > 谢谢,
> > 王磊
> >
> >
> > [hidden email]
> >
> >
>


--

Best Regards,
Harold Miao
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

Leonard Xu
In reply to this post by wanglei2@geekplus.com.cn
Hi,

> 在 2020年7月16日,19:04,[hidden email] 写道:
>
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 代码没找到类似的关系映射配置。

你DDL中不是写了 ‘format’ = ‘debzium-json’ 吗?就是这里指明的。
Reply | Threaded
Open this post in threaded view
|

Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

wanglei2@geekplus.com.cn
In reply to this post by Harold.Miao

 谢谢,我理解了。



[hidden email]

Sender: Harold.Miao
Send Time: 2020-07-16 19:33
Receiver: user-zh
Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码
 
private static <T extends TableFactory> T findSingleInternal(
      Class<T> factoryClass,
      Map<String, String> properties,
      Optional<ClassLoader> classLoader) {
 
   List<TableFactory> tableFactories = discoverFactories(classLoader);
   List<T> filtered = filter(tableFactories, factoryClass, properties);
 
   if (filtered.size() > 1) {
      throw new AmbiguousTableFactoryException(
         filtered,
         factoryClass,
         tableFactories,
         properties);
   } else {
      return filtered.get(0);
   }
}
 
private static List<TableFactory>
discoverFactories(Optional<ClassLoader> classLoader) {
   try {
      List<TableFactory> result = new LinkedList<>();
      ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
      ServiceLoader
         .load(TableFactory.class, cl)
         .iterator()
         .forEachRemaining(result::add);
      return result;
   } catch (ServiceConfigurationError e) {
      LOG.error("Could not load service provider for table factories.", e);
      throw new TableException("Could not load service provider for
table factories.", e);
   }
 
}
 
 
[hidden email] <[hidden email]> 于2020年7月16日周四 下午7:04写道:
 

>
> 我在
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> 找到了 SPI 的配置:
>
> org.apache.flink.formats.json.JsonFileSystemFormatFactory
> org.apache.flink.formats.json.JsonFormatFactory
> org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> org.apache.flink.formats.json.canal.CanalJsonFormatFactory
>
> 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> 代码没找到类似的关系映射配置。
>
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
>
> Sender: godfrey he
> Send Time: 2020-07-16 16:38
> Receiver: user-zh
> Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
>
> Best,
> Godfrey
>
> [hidden email] <[hidden email]> 于2020年7月16日周四
> 下午4:02写道:
>
> > 比如:
> >
> > CREATE TABLE my_table (
> >   id BIGINT,
> >  first_name STRING,
> >  last_name STRING,
> >  email STRING
> > ) WITH (
> >  'connector'='kafka',
> >  'topic'='user_topic',
> >  'properties.bootstrap.servers'='localhost:9092',
> >  'scan.startup.mode'='earliest-offset',
> >  'format'='debezium-json'
> > );
> >
> > 最终解析 debezium-json 应该是
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > 下面的代码
> > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> >
> > 谢谢,
> > 王磊
> >
> >
> > [hidden email]
> >
> >
>
 
 
--
 
Best Regards,
Harold Miao
Reply | Threaded
Open this post in threaded view
|

Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

Jark
Administrator
这个对应关系是通过 Factory#factoryIdentifier 来决定的。
比如 DebeziumJsonFormatFactory#factoryIdentifier() 就是返回了 'debezium-json'

Best,
Jark

On Thu, 16 Jul 2020 at 22:29, [hidden email] <
[hidden email]> wrote:

>
>  谢谢,我理解了。
>
>
>
> [hidden email]
>
> Sender: Harold.Miao
> Send Time: 2020-07-16 19:33
> Receiver: user-zh
> Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码
>
> private static <T extends TableFactory> T findSingleInternal(
>       Class<T> factoryClass,
>       Map<String, String> properties,
>       Optional<ClassLoader> classLoader) {
>
>    List<TableFactory> tableFactories = discoverFactories(classLoader);
>    List<T> filtered = filter(tableFactories, factoryClass, properties);
>
>    if (filtered.size() > 1) {
>       throw new AmbiguousTableFactoryException(
>          filtered,
>          factoryClass,
>          tableFactories,
>          properties);
>    } else {
>       return filtered.get(0);
>    }
> }
>
> private static List<TableFactory>
> discoverFactories(Optional<ClassLoader> classLoader) {
>    try {
>       List<TableFactory> result = new LinkedList<>();
>       ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>       ServiceLoader
>          .load(TableFactory.class, cl)
>          .iterator()
>          .forEachRemaining(result::add);
>       return result;
>    } catch (ServiceConfigurationError e) {
>       LOG.error("Could not load service provider for table factories.", e);
>       throw new TableException("Could not load service provider for
> table factories.", e);
>    }
>
> }
>
>
> [hidden email] <[hidden email]> 于2020年7月16日周四
> 下午7:04写道:
>
> >
> > 我在
> >
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> > 找到了 SPI 的配置:
> >
> > org.apache.flink.formats.json.JsonFileSystemFormatFactory
> > org.apache.flink.formats.json.JsonFormatFactory
> > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> > org.apache.flink.formats.json.canal.CanalJsonFormatFactory
> >
> > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> > 代码没找到类似的关系映射配置。
> >
> >
> > 谢谢,
> > 王磊
> >
> >
> >
> > [hidden email]
> >
> >
> > Sender: godfrey he
> > Send Time: 2020-07-16 16:38
> > Receiver: user-zh
> > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
> >
> > Best,
> > Godfrey
> >
> > [hidden email] <[hidden email]> 于2020年7月16日周四
> > 下午4:02写道:
> >
> > > 比如:
> > >
> > > CREATE TABLE my_table (
> > >   id BIGINT,
> > >  first_name STRING,
> > >  last_name STRING,
> > >  email STRING
> > > ) WITH (
> > >  'connector'='kafka',
> > >  'topic'='user_topic',
> > >  'properties.bootstrap.servers'='localhost:9092',
> > >  'scan.startup.mode'='earliest-offset',
> > >  'format'='debezium-json'
> > > );
> > >
> > > 最终解析 debezium-json 应该是
> > >
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > > 下面的代码
> > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> > >
> > > 谢谢,
> > > 王磊
> > >
> > >
> > > [hidden email]
> > >
> > >
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>