hi,
我在使用flink 1.10.1 blink planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。 遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : java.lang.Exception: Could not complete the stream element: org.apache.flink.table.dataformat.BinaryRow.... caused by : java.util.concurrent.TimeoutException: Async function call has timed out. 我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。 |
Administrator
|
可以分享下你的 AsyncTableFunction 的实现吗?
Best, Jark > 2020年7月2日 15:56,sunfulin <[hidden email]> 写道: > > hi, > 我在使用flink 1.10.1 blink planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。 > 遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : java.lang.Exception: Could not complete the stream element: org.apache.flink.table.dataformat.BinaryRow.... caused by : java.util.concurrent.TimeoutException: Async function call has timed out. > > > 我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。 |
In reply to this post by sunfulin
Hi sunfulin:
我这么实现是可以的。 public void eval(CompletableFuture<Collection<Row>> result, String key) { executorService.submit(() -> { try { Row row = fetchdata(key); if (row != null) { result.complete(Collections.singletonList(row)); } else { result.complete(Collections.singletonList(new Row(this.fieldNames.length))); } } catch (Exception e) { result.complete(Collections.singletonList(new Row(this.fieldNames.length))); } }); } Best forideal. 在 2020-07-02 15:56:46,"sunfulin" <[hidden email]> 写道: >hi, >我在使用flink 1.10.1 blink planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。 >遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : java.lang.Exception: Could not complete the stream element: org.apache.flink.table.dataformat.BinaryRow.... caused by : java.util.concurrent.TimeoutException: Async function call has timed out. > > >我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。 |
hi 抱歉忘记回复了。经过进一步调试发现,是因为定义的schema的column类型,与实际获取到的字段类型不一致导致。主要是在调试的过程中,ComplettedFuture.complete会吃掉这种类型不一致的异常,也不下发数据。看源码发现只会在timeout的时候才调用future.completeException。记录下。 在 2020-07-03 17:01:19,"forideal" <[hidden email]> 写道: >Hi sunfulin: > > 我这么实现是可以的。 >public void eval(CompletableFuture<Collection<Row>> result, String key) { > executorService.submit(() -> { >try { >Row row = fetchdata(key); > if (row != null) { >result.complete(Collections.singletonList(row)); >} else { >result.complete(Collections.singletonList(new Row(this.fieldNames.length))); >} > } catch (Exception e) { >result.complete(Collections.singletonList(new Row(this.fieldNames.length))); >} > }); >} > > > > >Best forideal. > > > > > >在 2020-07-02 15:56:46,"sunfulin" <[hidden email]> 写道: >>hi, >>我在使用flink 1.10.1 blink planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。 >>遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : java.lang.Exception: Could not complete the stream element: org.apache.flink.table.dataformat.BinaryRow.... caused by : java.util.concurrent.TimeoutException: Async function call has timed out. >> >> >>我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。 |
Free forum by Nabble | Edit this page |