各位大佬:
由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 KafkaUpsertTableSink: KafkaUpsertTableSink KafkaUpsertTableSinkBase KafkaUpsertTableSourceSinkFactory KafkaUpsertTableSourceSinkFactoryBase MyKafkaValidator 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 呢? /** * Searches for factories using Java service providers. * * @return all factories in the classpath */ private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) { try { List<TableFactory> result = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add); //todo add result.add(new KafkaUpsertTableSourceSinkFactory()); return result; } catch (ServiceConfigurationError e) { LOG.error("Could not load service provider for table factories.", e); throw new TableException("Could not load service provider for table factories.", e); } } 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 以成功运行的。 非常感谢 ------------------ Thanks venn |
Hi,
你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? <[hidden email]> 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath > */ > private static List<TableFactory> discoverFactories(Optional<ClassLoader> > classLoader) { > try { > List<TableFactory> result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; > } catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for table > factories.", e); > } > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > ------------------ > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
-----Original Message----- From: user-zh-return-2640-wxchunjhyy=[hidden email] <user-zh-return-2640-wxchunjhyy=[hidden email]> On Behalf Of Benchao Li Sent: Saturday, March 28, 2020 6:28 PM To: user-zh <[hidden email]> Subject: Re: 实现 KafkaUpsertTableSink Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? <[hidden email]> 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath */ private static > List<TableFactory> discoverFactories(Optional<ClassLoader> > classLoader) { > try { > List<TableFactory> result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; > } catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); > } > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > ------------------ > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) ... 3 more 这个改怎样解决呢? 谢谢, 王磊 [hidden email] Sender: [hidden email] Send Time: 2020-03-29 10:32 Receiver: [hidden email] Subject: RE: 实现 KafkaUpsertTableSink Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 -----Original Message----- From: user-zh-return-2640-wxchunjhyy=[hidden email] <user-zh-return-2640-wxchunjhyy=[hidden email]> On Behalf Of Benchao Li Sent: Saturday, March 28, 2020 6:28 PM To: user-zh <[hidden email]> Subject: Re: 实现 KafkaUpsertTableSink Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? <[hidden email]> 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath */ private static > List<TableFactory> discoverFactories(Optional<ClassLoader> > classLoader) { > try { > List<TableFactory> result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; > } catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); > } > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > ------------------ > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 KafkaRetractTableSourceSinkFactory 写了一遍
但这个应该怎样改才合适呢? 137 private static <T extends TableFactory> T findSingleInternal( 138 Class<T> factoryClass, 139 Map<String, String> properties, 140 Optional<ClassLoader> classLoader) { 141 142 List<TableFactory> tableFactories = discoverFactories(classLoader); 143 List<T> filtered = filter(tableFactories, factoryClass, properties); 144 145 if (filtered.size() > 1) { 146 throw new AmbiguousTableFactoryException( 147 filtered, 148 factoryClass, 149 tableFactories, 150 properties); 151 } else { 152 return filtered.get(0); 153 } 154 } 谢谢, 王磊 [hidden email] Sender: [hidden email] Send Time: 2020-03-31 10:50 Receiver: user-zh Subject: Re: RE: 实现 KafkaUpsertTableSink 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) ... 3 more 这个改怎样解决呢? 谢谢, 王磊 [hidden email] Sender: [hidden email] Send Time: 2020-03-29 10:32 Receiver: [hidden email] Subject: RE: 实现 KafkaUpsertTableSink Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 -----Original Message----- From: user-zh-return-2640-wxchunjhyy=[hidden email] <user-zh-return-2640-wxchunjhyy=[hidden email]> On Behalf Of Benchao Li Sent: Saturday, March 28, 2020 6:28 PM To: user-zh <[hidden email]> Subject: Re: 实现 KafkaUpsertTableSink Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? <[hidden email]> 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath */ private static > List<TableFactory> discoverFactories(Optional<ClassLoader> > classLoader) { > try { > List<TableFactory> result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; > } catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); > } > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > ------------------ > > Thanks > > venn > > > > Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方,
然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的? [hidden email] <[hidden email]> 于2020年3月31日周二 上午11:17写道: > 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 > KafkaRetractTableSourceSinkFactory 写了一遍 > 但这个应该怎样改才合适呢? > > 137 private static <T extends TableFactory> T > findSingleInternal( > 138 Class<T> factoryClass, > 139 Map<String, String> properties, > 140 Optional<ClassLoader> classLoader) { > 141 > 142 List<TableFactory> tableFactories = > discoverFactories(classLoader); > 143 List<T> filtered = filter(tableFactories, > factoryClass, properties); > 144 > 145 if (filtered.size() > 1) { > 146 throw new AmbiguousTableFactoryException( > 147 filtered, > 148 factoryClass, > 149 tableFactories, > 150 properties); > 151 } else { > 152 return filtered.get(0); > 153 } > 154 } > > > 谢谢, > 王磊 > > > [hidden email] > > > Sender: [hidden email] > Send Time: 2020-03-31 10:50 > Receiver: user-zh > Subject: Re: RE: 实现 KafkaUpsertTableSink > > 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: > > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) > ... 3 more > > 这个改怎样解决呢? > > 谢谢, > 王磊 > > > > [hidden email] > > Sender: [hidden email] > Send Time: 2020-03-29 10:32 > Receiver: [hidden email] > Subject: RE: 实现 KafkaUpsertTableSink > Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 > -----Original Message----- > From: user-zh-return-2640-wxchunjhyy=[hidden email] > <user-zh-return-2640-wxchunjhyy=[hidden email]> On Behalf Of > Benchao Li > Sent: Saturday, March 28, 2020 6:28 PM > To: user-zh <[hidden email]> > Subject: Re: 实现 KafkaUpsertTableSink > Hi, > 你需要把你新增的Factory添加到 resources下的 > > META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? > <[hidden email]> 于2020年3月28日周六 下午5:38写道: > > 各位大佬: > > > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > > KafkaUpsertTableSink: > > > > KafkaUpsertTableSink > > > > KafkaUpsertTableSinkBase > > > > KafkaUpsertTableSourceSinkFactory > > > > KafkaUpsertTableSourceSinkFactoryBase > > > > MyKafkaValidator > > > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > > 呢? > > > > > > > > > > /** > > * Searches for factories using Java service providers. > > * > > * @return all factories in the classpath */ private static > > List<TableFactory> discoverFactories(Optional<ClassLoader> > > classLoader) { > > try { > > List<TableFactory> result = new LinkedList<>(); > > ClassLoader cl = > > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > > ServiceLoader > > .load(TableFactory.class, cl) > > .iterator() > > .forEachRemaining(result::add); > > //todo add > > result.add(new KafkaUpsertTableSourceSinkFactory()); > > return result; > > } catch (ServiceConfigurationError e) { > > LOG.error("Could not load service provider for table factories.", > e); > > throw new TableException("Could not load service provider for > > table factories.", e); > > } > > > > } > > > > > > > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > > 以成功运行的。 > > > > 非常感谢 > > > > > > > > > > > > ------------------ > > > > Thanks > > > > venn > > > > > > > > > -- > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
我只保留 KafkaRetractTableSourceSinkFactory 一个, KafkaRetractTableSinkBase 实现 RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。 @Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x -> x.f1); INSERT INTO table1 SELCET field, count(*) from table2 group by field 这是 一个 RetractStream,结果里面会有 True/False, 通过这个过滤是可以的。 INSERT INTO table1 SELECT feild, 1 from table2 我理解这不是一个 RetractStream, 上面 dataStream.filter(x -> x.f0 == Boolean.TRUE) 的代码应该会出错,但实际上没有出错 还不是完全能理解,我再看一下吧。 谢谢, 王磊 [hidden email] Sender: Benchao Li Send Time: 2020-03-31 12:02 Receiver: user-zh Subject: Re: Re: 实现 KafkaUpsertTableSink 我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方, 然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的? [hidden email] <[hidden email]> 于2020年3月31日周二 上午11:17写道: > 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 > KafkaRetractTableSourceSinkFactory 写了一遍 > 但这个应该怎样改才合适呢? > > 137 private static <T extends TableFactory> T > findSingleInternal( > 138 Class<T> factoryClass, > 139 Map<String, String> properties, > 140 Optional<ClassLoader> classLoader) { > 141 > 142 List<TableFactory> tableFactories = > discoverFactories(classLoader); > 143 List<T> filtered = filter(tableFactories, > factoryClass, properties); > 144 > 145 if (filtered.size() > 1) { > 146 throw new AmbiguousTableFactoryException( > 147 filtered, > 148 factoryClass, > 149 tableFactories, > 150 properties); > 151 } else { > 152 return filtered.get(0); > 153 } > 154 } > > > 谢谢, > 王磊 > > > [hidden email] > > > Sender: [hidden email] > Send Time: 2020-03-31 10:50 > Receiver: user-zh > Subject: Re: RE: 实现 KafkaUpsertTableSink > > 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: > > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) > ... 3 more > > 这个改怎样解决呢? > > 谢谢, > 王磊 > > > > [hidden email] > > Sender: [hidden email] > Send Time: 2020-03-29 10:32 > Receiver: [hidden email] > Subject: RE: 实现 KafkaUpsertTableSink > Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 > -----Original Message----- > From: user-zh-return-2640-wxchunjhyy=[hidden email] > <user-zh-return-2640-wxchunjhyy=[hidden email]> On Behalf Of > Benchao Li > Sent: Saturday, March 28, 2020 6:28 PM > To: user-zh <[hidden email]> > Subject: Re: 实现 KafkaUpsertTableSink > Hi, > 你需要把你新增的Factory添加到 resources下的 > > META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? > <[hidden email]> 于2020年3月28日周六 下午5:38写道: > > 各位大佬: > > > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > > KafkaUpsertTableSink: > > > > KafkaUpsertTableSink > > > > KafkaUpsertTableSinkBase > > > > KafkaUpsertTableSourceSinkFactory > > > > KafkaUpsertTableSourceSinkFactoryBase > > > > MyKafkaValidator > > > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > > 呢? > > > > > > > > > > /** > > * Searches for factories using Java service providers. > > * > > * @return all factories in the classpath */ private static > > List<TableFactory> discoverFactories(Optional<ClassLoader> > > classLoader) { > > try { > > List<TableFactory> result = new LinkedList<>(); > > ClassLoader cl = > > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > > ServiceLoader > > .load(TableFactory.class, cl) > > .iterator() > > .forEachRemaining(result::add); > > //todo add > > result.add(new KafkaUpsertTableSourceSinkFactory()); > > return result; > > } catch (ServiceConfigurationError e) { > > LOG.error("Could not load service provider for table factories.", > e); > > throw new TableException("Could not load service provider for > > table factories.", e); > > } > > > > } > > > > > > > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > > 以成功运行的。 > > > > 非常感谢 > > > > > > > > > > > > ------------------ > > > > Thanks > > > > venn > > > > > > > > > -- > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Free forum by Nabble | Edit this page |