使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

王敏超
 AsyncDataStream
      //顺序异步IO
      .orderedWait(input, new AsyncDatabaseRequest(), 5000,
TimeUnit.MILLISECONDS, 1000)

  当我没重写timeout方法的时候,会执行这个报错信息
resultFuture.completeExceptionally(new TimeoutException("Async function call
has timed out."))


  当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
  override def timeout(input: String, resultFuture: ResultFuture[Int]): Unit
= {
    println("time out ... ")
  }




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

Benchao Li-2
你的timeout方法应该要正确的处理ResultFuture,
比如ResultFuture.complete或者completeExceptionally,如果你什么都没做,那么这个异步请求就还没有真的结束。

王敏超 <[hidden email]> 于2020年9月29日周二 下午5:43写道:

>  AsyncDataStream
>       //顺序异步IO
>       .orderedWait(input, new AsyncDatabaseRequest(), 5000,
> TimeUnit.MILLISECONDS, 1000)
>
>   当我没重写timeout方法的时候,会执行这个报错信息
> resultFuture.completeExceptionally(new TimeoutException("Async function
> call
> has timed out."))
>
>
>   当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
>   override def timeout(input: String, resultFuture: ResultFuture[Int]):
> Unit
> = {
>     println("time out ... ")
>   }
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

王敏超
嗯嗯,是的。安装大佬的方法,的确成功了。再次感谢大佬。。。。



--
Sent from: http://apache-flink.147419.n8.nabble.com/