hadoop rpc源码阅读
董老师书里表示rpc最好的学习方法是自己实现一个c/s然后读一下源码。我这边没有去实现,不过我找了一个test,package org.apache.hadoop.ipc.TestRPC
的testCalls这个测试实际上就是一个完整的rpc过程。
server建立
server的建立是调用RPC.Builder().build()。通过setProtocol和setInstance把TestProtocol和TestImpl传进去。之后server.start()。
调用server.start之后,可以看到启动了好几个线程,比如server的listener之类的,具体如下:
|
|
其中在build的过程中,WritablePrcEngine的getServer函数中,会调用Server(),他的super函数会创建IPC Server idle connection scanner。在builder过程中同样会创建listener对象,listener对象的创建过程中会启动数个reader线程。而Server.start()时会启动responder,listener线程以及数个handler线程。
proxy(client)建立
proxy建立调用的是RPC.getproxy. proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
这里将TestProtocol作为参数传进去, TestProtocol是一个接口,定义了各种函数,比如下面会用到的ping,echo等待。当然这些函数是要实现的,这个test里在TestImpl类中实现了这个接口。
调用getProxy以后,一路调用后来到了org.apache.hadoop.ipc.RPC.getProtocolProxy()。(hadoop-2.7.1第564行)这里 return getProtocolEngine(protocol, conf).getProxy(...);
。首先是getProtocolEngine他是通过一个全局map搜索符合这个protocol的engine,如果没有的话通过反射创建一个,默认engien是WritableRpcEngine。这样就把test中的proxy和某一个具体的engine关联起来(这个engine会直接影响后面proxy的具体执行,等下一小节再说)。至于后面那个getProxy是在WritableRpcEngine的具体实现。
一次RPC调用
client
以proxy.echo为例吧。调用proxy.echo以后,首先回进入WritableRpcEngine,调用invoke函数,invoke会调用Client.call,并把方法和参数打包成一个Invocation对象作为call的一个参数。
client.call会生成一个call对象,然后通过remoteID等信息拿到一个connection,调用connection.sendRpcRequest(call)发送一个remote请求。之后Client.call函数会不停的轮询检查call.done,然后返回call.getRpcResponse()。这里比较绕的地方在于那个Connection类继承自Thread,应该是一个新的线程,但是sendRpcRequest函数的注释说“Note: this is not called from the Connection thread, but by other
threads.”。同时看debug这个函数是被client那个线程调用的。同时sendRPCRequest内部又用了future。这里的线程关系很乱。不过我觉得都是为了在远程调用的时候用多安县城加速吧。
server
看完client端再看看更复杂的server端。在server启动以后,listener,reader,handler,responser都是server对应内部类的run函数或者runloop函数中轮询的。
首先是listener,当有一个key处于Acceptable状态时,就调用doAccept函数。doAccept选择一个Reader(现在就是轮询),将一个connection加入这个reader的pendingConnections中,并把这个reader唤醒。这其中用了noi的selector,这个还需要研究一下。
reader在轮询的过程中主要是调用了doRead,尝试读取一个完整的rpc调用,并将其通过processRpcReuqest函数放入server的callqueue。
之后handler会在轮询时从callQueue拿到一个call,并且调用一个RPC.call()函数。这个函数会调用WritableRpcEngine.server.WritableRpcInvoker.call(),并通过最早RPC Server构建过程中设置的impl找到TestImpl,然后通过反射调用echo的具体实现。最后将结果还是包成一个call对象送给responder,让其调用responder.doRespond(call)。
最后就是responder轮询时不断调用doAsyncWrite,从responseQueue拿到结果并返回。
至于书上说的低于阈值由handler自己返回,高于阈值才由responder返回,貌似是这样的。handler的doResponder会在responsequeue.size()==1时自己调用processResponse(),在这一次没返回完的情况下,剩余的由responder异步返回。
本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。
本文链接:http://thousandhu.github.io/2016/06/26/深入解析yarn架构设计与技术实现-hadoop-rpc实例分析/