深入解析yarn架构设计与技术实现-hadoop rpc实例分析

hadoop rpc源码阅读

董老师书里表示rpc最好的学习方法是自己实现一个c/s然后读一下源码。我这边没有去实现,不过我找了一个test,package org.apache.hadoop.ipc.TestRPC的testCalls这个测试实际上就是一个完整的rpc过程。

server建立

server的建立是调用RPC.Builder().build()。通过setProtocol和setInstance把TestProtocol和TestImpl传进去。之后server.start()。

调用server.start之后,可以看到启动了好几个线程,比如server的listener之类的,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at org.apache.hadoop.ipc.TestRPC.testCallsInternal(TestRPC.java:457)
"Socket Reader #1 for port 60511@1685" prio=5 tid=0xe nid=NA runnable
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java:-1)
"IPC Server listener on 60511@1731" daemon prio=5 tid=0xd nid=NA runnable
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java:-1)
"IPC Server Responder@1730" daemon prio=5 tid=0x10 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java:-1)
"IPC Server handler 0 on 60511@1735" daemon prio=5 tid=0x11 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2017)
"IPC Server idle connection scanner for port 60511@1698" daemon prio=5 tid=0xf nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
"Finalizer@1557" daemon prio=8 tid=0x3 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
"Reference Handler@1558" daemon prio=10 tid=0x2 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
"Signal Dispatcher@1556" daemon prio=9 tid=0x4 nid=NA runnable
java.lang.Thread.State: RUNNABLE

其中在build的过程中,WritablePrcEngine的getServer函数中,会调用Server(),他的super函数会创建IPC Server idle connection scanner。在builder过程中同样会创建listener对象,listener对象的创建过程中会启动数个reader线程。而Server.start()时会启动responder,listener线程以及数个handler线程。

proxy(client)建立

proxy建立调用的是RPC.getproxy. proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); 这里将TestProtocol作为参数传进去, TestProtocol是一个接口,定义了各种函数,比如下面会用到的ping,echo等待。当然这些函数是要实现的,这个test里在TestImpl类中实现了这个接口。

调用getProxy以后,一路调用后来到了org.apache.hadoop.ipc.RPC.getProtocolProxy()。(hadoop-2.7.1第564行)这里 return getProtocolEngine(protocol, conf).getProxy(...); 。首先是getProtocolEngine他是通过一个全局map搜索符合这个protocol的engine,如果没有的话通过反射创建一个,默认engien是WritableRpcEngine。这样就把test中的proxy和某一个具体的engine关联起来(这个engine会直接影响后面proxy的具体执行,等下一小节再说)。至于后面那个getProxy是在WritableRpcEngine的具体实现。

一次RPC调用

client

以proxy.echo为例吧。调用proxy.echo以后,首先回进入WritableRpcEngine,调用invoke函数,invoke会调用Client.call,并把方法和参数打包成一个Invocation对象作为call的一个参数。

client.call会生成一个call对象,然后通过remoteID等信息拿到一个connection,调用connection.sendRpcRequest(call)发送一个remote请求。之后Client.call函数会不停的轮询检查call.done,然后返回call.getRpcResponse()。这里比较绕的地方在于那个Connection类继承自Thread,应该是一个新的线程,但是sendRpcRequest函数的注释说“Note: this is not called from the Connection thread, but by other
threads.”。同时看debug这个函数是被client那个线程调用的。同时sendRPCRequest内部又用了future。这里的线程关系很乱。不过我觉得都是为了在远程调用的时候用多安县城加速吧。

server

看完client端再看看更复杂的server端。在server启动以后,listener,reader,handler,responser都是server对应内部类的run函数或者runloop函数中轮询的。

首先是listener,当有一个key处于Acceptable状态时,就调用doAccept函数。doAccept选择一个Reader(现在就是轮询),将一个connection加入这个reader的pendingConnections中,并把这个reader唤醒。这其中用了noi的selector,这个还需要研究一下。

reader在轮询的过程中主要是调用了doRead,尝试读取一个完整的rpc调用,并将其通过processRpcReuqest函数放入server的callqueue。

之后handler会在轮询时从callQueue拿到一个call,并且调用一个RPC.call()函数。这个函数会调用WritableRpcEngine.server.WritableRpcInvoker.call(),并通过最早RPC Server构建过程中设置的impl找到TestImpl,然后通过反射调用echo的具体实现。最后将结果还是包成一个call对象送给responder,让其调用responder.doRespond(call)。

最后就是responder轮询时不断调用doAsyncWrite,从responseQueue拿到结果并返回。

至于书上说的低于阈值由handler自己返回,高于阈值才由responder返回,貌似是这样的。handler的doResponder会在responsequeue.size()==1时自己调用processResponse(),在这一次没返回完的情况下,剩余的由responder异步返回。


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/06/26/深入解析yarn架构设计与技术实现-hadoop-rpc实例分析/