hi,all
我有一个场景是使用自定义的ScalaFunction实现所需要的look up功能(从数据库查询并将多行数据拼成一个数组)。 我正在试图尝试使用异步IO的方式以提高它的性能,但是似乎只有Stream API提供了该特性支持。 大家有什么建议吗?或者有其他优化思路吗? 谢谢! -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hello,kenyore.
在维表的DDL的WITH参数中添加async='true',Async 相关参数如下。 参数说明是否必填备注 async 是否开启异步请求 否 默认值为fasle。 asyncResultOrder 异步结果顺序 否 取值如下: - unordered(默认值):无序。 - ordered:有序。 asyncTimeoutMs 异步请求的超时时间 否 单位毫秒,默认值为3分钟。 asyncCapacity 异步请求的队列容量 否 默认值为100。 asyncCallbackThreads 回调处理线程数 否 回调类中的onComplete和onError默认会在线程池中处理该线程池的大小,默认值为50。 asyncConnectionQueueMaxsize 最大请求发送数 否 当等待某个服务器返回结果的请求数量达到 asyncConnectionQueueMaxsize值时,异步请求调用也会被阻塞,以防止客户端自身OOM(OutOfMemory),默认值为100。 asyncCallbackQueueMaxsize 最大回调处理队列 否 当等待回调处理的请求达到asyncCallbackQueueMaxsize 值时,异步请求调用也会被阻塞,以防止客户端自身OOM,默认值为500。 CREATE TABLE test( id VARCHAR, PRIMARY KEY(id) ) WITH( async='true', asyncResultOrder = 'unordered' ); kenyore <[hidden email]> 于2021年1月12日周二 下午2:38写道: > hi,all > 我有一个场景是使用自定义的ScalaFunction实现所需要的look up功能(从数据库查询并将多行数据拼成一个数组)。 > 我正在试图尝试使用异步IO的方式以提高它的性能,但是似乎只有Stream API提供了该特性支持。 > 大家有什么建议吗?或者有其他优化思路吗? > 谢谢! > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
感谢如此详尽的回复!
但是我的场景似乎无法直接使用维表join。 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
hello,kenyore.
我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。 公共抽象类AsyncTableFunction <T> 扩展了UserDefinedFunction <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html> AsyncTableFunction <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“ eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。 对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T) <http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletableFuture.html?is-external=true#complete-T->。对于每个异步操作,调用“ eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。 代码示例: public void eval(CompletableFuture<Collection<String>> result, String rowkey) { Get get = new Get(Bytes.toBytes(rowkey)); ListenableFuture<Result> future = hbase.asyncGet(get); Futures.addCallback(future, new FutureCallback<Result>() { public void onSuccess(Result result) { List<String> ret = process(result); result.complete(ret); } public void onFailure(Throwable thrown) { result.completeExceptionally(thrown); } }); } 参考链接: https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html kenyore <[hidden email]> 于2021年1月12日周二 下午3:29写道: > 感谢如此详尽的回复! > 但是我的场景似乎无法直接使用维表join。 > 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by kenyore
hello,kenyore.
我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。 公共抽象类AsyncTableFunction <T> 扩展了UserDefinedFunction <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html> AsyncTableFunction <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“ eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。 对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T) <http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletableFuture.html?is-external=true#complete-T->。对于每个异步操作,调用“ eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。 代码示例: public void eval(CompletableFuture<Collection<String>> result, String rowkey) { Get get = new Get(Bytes.toBytes(rowkey)); ListenableFuture<Result> future = hbase.asyncGet(get); Futures.addCallback(future, new FutureCallback<Result>() { public void onSuccess(Result result) { List<String> ret = process(result); result.complete(ret); } public void onFailure(Throwable thrown) { result.completeExceptionally(thrown); } }); } 参考链接: https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html kenyore <[hidden email]> 于2021年1月12日周二 下午3:29写道: > 感谢如此详尽的回复! > 但是我的场景似乎无法直接使用维表join。 > 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |