1.10任务执行过程--源码的一些疑问

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

1.10任务执行过程--源码的一些疑问

admin
Hi,all
在阅读1.10有关job执行过程相关源码时遇到一些疑问,我在看到Task#doRun()方法
invokable.invoke();具体执行过程应该在这个方法里吧?
进一步看了StreamTask#invoke()->runMailboxLoop();继续往下深入也没发现最终调用udf的入口
问题1:MailboxProcessor、Mailbox、Mail这些概念什么意思,什么作用?

然而在另一处实例化AbstractInvokable时,比如StreamTask构造函数里会调用processInput方法,这个就类似1.9之前的实现方式了
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
问题2:这里面是真正的数据处理过程吗?为什么不像1.9之前那样在invokable.invoke()里面做业务处理?
感谢您的答复!


                                                                                                                                                                               
Best,
Sun.Zhu



Reply | Threaded
Open this post in threaded view
|

Re: 1.10任务执行过程--源码的一些疑问

tison
invokable 一般是 StreamTask 或者它的子类 StreamSourceTask,具体的 UDF 在 StreamTask
里,有几层包装。

MailBox 那些其实是一个简单的 EventLoop 实现,或者你理解为 Actor Model 的实现也行,可以参考这些名词的解释文章一一对应。

Best,
tison.


祝尚 <[hidden email]> 于2020年4月19日周日 下午5:43写道:

> Hi,all
> 在阅读1.10有关job执行过程相关源码时遇到一些疑问,我在看到Task#doRun()方法
> invokable.invoke();具体执行过程应该在这个方法里吧?
> 进一步看了StreamTask#invoke()->runMailboxLoop();继续往下深入也没发现最终调用udf的入口
> 问题1:MailboxProcessor、Mailbox、Mail这些概念什么意思,什么作用?
>
>
> 然而在另一处实例化AbstractInvokable时,比如StreamTask构造函数里会调用processInput方法,这个就类似1.9之前的实现方式了
> this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox,
> actionExecutor);
> 问题2:这里面是真正的数据处理过程吗?为什么不像1.9之前那样在invokable.invoke()里面做业务处理?
> 感谢您的答复!
>
>
>
> Best,
> Sun.Zhu
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

回复:1.10任务执行过程--源码的一些疑问

蒋佳成(Jiacheng Jiang)
In reply to this post by admin
在构建MailboxProcessor的时候将streamtask的processInput方法作为MailboxDefaultAction传给了MailboxProcessor。其中的InputStatus status = inputProcessor.processInput();就是处理数据的地方,比如inputProcessor为StreamOneInputProcessor中InputStatus status = input.emitNext(output);input为StreamTaskNetworkInput,里面有processElement方法。StreamTask就是AbstractInvokable,StreamTask的invoke()方法调用了runMailboxLoop(),不就是在StreamTask的invoke()中处理的数据吗?



------------------&nbsp;原始邮件&nbsp;------------------
发件人: "祝尚"<[hidden email]&gt;;
发送时间: 2020年4月19日(星期天) 下午5:37
收件人: "user-zh"<[hidden email]&gt;;
主题: 1.10任务执行过程--源码的一些疑问



Hi,all
在阅读1.10有关job执行过程相关源码时遇到一些疑问,我在看到Task#doRun()方法
invokable.invoke();具体执行过程应该在这个方法里吧?
进一步看了StreamTask#invoke()-&gt;runMailboxLoop();继续往下深入也没发现最终调用udf的入口
问题1:MailboxProcessor、Mailbox、Mail这些概念什么意思,什么作用?

然而在另一处实例化AbstractInvokable时,比如StreamTask构造函数里会调用processInput方法,这个就类似1.9之前的实现方式了
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
问题2:这里面是真正的数据处理过程吗?为什么不像1.9之前那样在invokable.invoke()里面做业务处理?
感谢您的答复!


                                                                                                                                                                               
Best,
Sun.Zhu
Reply | Threaded
Open this post in threaded view
|

Re: 1.10任务执行过程--源码的一些疑问

admin
hi,tison,jiacheng感谢解答,按照你说的又仔细看了一遍,确实如此,在实例化MailboxProcessor有把processInput当做参数传进去,在MailboxProcessor#runMailboxLoop中会去执行defaultAction方法
while (processMail(localMailbox)) {
   mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed
}
再次感谢
Best,
Sun.Zhu




> 2020年4月19日 下午8:26,蒋佳成(Jiacheng Jiang) <[hidden email]> 写道:
>
> 在构建MailboxProcessor的时候将streamtask的processInput方法作为MailboxDefaultAction传给了MailboxProcessor。其中的InputStatus status = inputProcessor.processInput();就是处理数据的地方,比如inputProcessor为StreamOneInputProcessor中InputStatus status = input.emitNext(output);input为StreamTaskNetworkInput,里面有processElement方法。StreamTask就是AbstractInvokable,StreamTask的invoke()方法调用了runMailboxLoop(),不就是在StreamTask的invoke()中处理的数据吗?
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人: "祝尚"<[hidden email]&gt;;
> 发送时间: 2020年4月19日(星期天) 下午5:37
> 收件人: "user-zh"<[hidden email]&gt;;
> 主题: 1.10任务执行过程--源码的一些疑问
>
>
>
> Hi,all
> 在阅读1.10有关job执行过程相关源码时遇到一些疑问,我在看到Task#doRun()方法
> invokable.invoke();具体执行过程应该在这个方法里吧?
> 进一步看了StreamTask#invoke()-&gt;runMailboxLoop();继续往下深入也没发现最终调用udf的入口
> 问题1:MailboxProcessor、Mailbox、Mail这些概念什么意思,什么作用?
>
> 然而在另一处实例化AbstractInvokable时,比如StreamTask构造函数里会调用processInput方法,这个就类似1.9之前的实现方式了
> this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
> 问题2:这里面是真正的数据处理过程吗?为什么不像1.9之前那样在invokable.invoke()里面做业务处理?
> 感谢您的答复!
>
>
>
> Best,
> Sun.Zhu