深入解析yarn架构设计与技术实现-yarn rpc框架

序列化

Yarn中rpc模块采用protocol buffers做序列化;使用avro作为日志序列化库

RPC模型

rpc跨越了OSI 7层模型中的传输层和引用曾,主要是用于让一台机器调用另一台机器的子程序时屏蔽网络通信细节。主要包含一下4个部分

  • 通信模块:两个相互写作的通信模块实现了请求应答协议,一般不对数据进行处理,常见同步异步两种。
  • Stub程序。客户端均包含Stub程序,使得函数调用表现的跟本地调用一样。
    • 在客户端,他表现的向本地程序,但是不直接执行本地调用,而是将请求发送给服务端,等服务端应答后解码。
    • 在服务端,接收应答后解码,调用相应的过程,将结果编码返回
  • 调度程序:接收通信模块的请求并选择一个stub程序进行处理

rpc的一般过程:

  1. 客户端以本地方式调用系统产生的stub程序
  2. stub程序将函数调用信息封装并交给通信模块
  3. 通信模块发给远程服务器
  4. 远程服务器接收消息后将此消息发送给相应的stub程序
  5. stub程序拆封消息,并调用相应函数
  6. 被调用函数执行并将结果返回给stub
  7. stub程序封装结果并通过通信模块传送给客户程序

总体架构:

  • 序列化层
  • 函数调用层
  • 网络传输层:采用了基于tcp/ip的socket机制
  • 服务器端处理框架:基于reactor设计模式的事件驱动i/o模型

Hadoop rpc主要是三个类:RPC,client和Server,分别对应对外编程接口,客户端实现和服务器实现

RPC类

RPC类的构建方法为getProxy()和waitForProxy(),销毁方法为stopProxy
.通过RPC.Builder.build()完成服务器对象构建并且调用Server.start()启动。

hadoop rpc使用了java动态代理完成对远程方法的调用。动态代理主要是利用java的反射,实现Java反射包中的InvocationHandler接口来执行代理功能,好处在于灵活性比较强。对于hadoop的invoke方法,因为函数不在本地,所以客户端是讲函数调用信息打包成可序列化的WritableRpcEngine.Invocation对象,并通过忘了发给服务端,服务端进行解析。invocation对象关于函数的打包代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package org.apache.hadoop.ipc;
//class WritableRpcEngine
private static class Invocation implements Writable, Configurable {
private String methodName;
private Class<?>[] parameterClasses;
private Object[] parameters;
private Configuration conf;
private long clientVersion;
private int clientMethodsHash;
private String declaringClassProtocolName;
public Invocation(Method method, Object[] parameters) {
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
rpcVersion = writableRpcVersion;
if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
//VersionedProtocol is exempted from version check.
clientVersion = 0;
clientMethodsHash = 0;
} else {
this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
this.clientMethodsHash = ProtocolSignature.getFingerprint(method
.getDeclaringClass().getMethods());
}
this.declaringClassProtocolName =
RPC.getProtocolName(method.getDeclaringClass());
}
...
}

可以通过代码清晰的看到他们如何表示一个函数。而序列化主要是通过实现Writable接口,也就是write和readFields两个方法来实现的

client

主要有两个内部类call和connection

  • call: 封装一个rpc请求,包含五个变量:标识id,函数调用信息param(就是之前invocation被序列化的那个),函数返回值value,异常信息error和执行完成表示done。其中id和param由client填写,后三个由服务端填写
  • connection类。client与每个server维护一个通信连接,连接相关的信息被封装在一个connection中。connection中有一个hashmap存着与一个server相连的所有call,其他的包括连接标识id,server端同学socker等。client中也有一个hashmap存着所有的connection,也就是一个client对应对个connection(后面代表多个server),一个connection对应多个call(就是可以对一个server发送多个call)

当调用call函数执行某个远程方法时,Client端需要进行(如图3-7所示)以下4个步骤。

  1. 创建一个Connection对象,并将远程方法调用信息封装成Call对象,放到Connection对象中的哈希表中;
    2。 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
  2. Server端处理完RPC请求后,将结果通过网络返回给Client端,Client端通过receiveRpcResponse()函数获取结果;
  3. Client检查结果处理状态(成功还是失败),并将对应Call对象从哈希表中删除

client 调用过程

server

rpc的server主要用了reactor模式.
reactor模式主要是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler。这里复用指的是一个request handler会处理多个事件,从而提高了并行性。

server的那张图

server中有一个Listener,多个reader,一个callQueue(是个BlockingQueue),多个handler还有一个responder。接收阶段Listener统一监听来自client的连接,有新的连接时将他自动分给一个reader线程。分配的方式很简单,就是轮询

1
2
3
4
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}

之后被分配的reader继续监听他负责的client是否有新的rpc,并将传过来的数据解析成一个call并且放在callQueue中。Listener和reader的reactor模式主要体现在监听上,这里用到了NIO的Selector,可以看一下这个教程。NIO中Selector的典型写法就是创建selector,注册channel,轮询遍历来访问就绪通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//创建
Selector selector = Selector.open();
//注册
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,
Selectionkey.OP_READ);
//访问。在Listener和reader中都是再套一个while(running)一直执行
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}

而另一边,Handler不断从callQueue拿到call对象执行,执行结果如果小的话handler直接返回,大的话加入一个call.connection.responseQueue,让responder去返回。responder也用了selector,异步的返回结果。并且responder会做清理(doPurge函数),对于超过15分钟的call就会删除

YARN RPC实例的简单分析

书上这一段讲的很细,我这里只是记一下我结合这个例子对rpc的理解。对于ResourceTracker这个协议,在nodemanager(client端)中是NodeStatusUpdaterImpl实现的,他的reosourceTracker是一个RMporxy,nodeManager直接调用这个proxy的registerNodeManager。而在resourceManager那边,是有一个真实的resourceTrackerService,他内部有一个org.apache.hadoop.ipc.Server,这个就对应rpc里的server。但是具体在哪里序列化我真的没找到。书中给出了ResourceTrackerPBClientImpl和ServerImpl,这个的确是用了pb做了序列化,但是整个工程没找到调用。。。

另外nodemanager的代码要看package org.apache.hadoop.yarn.server.nodemanager里面的,我之前没注意,看的resourceManage包里的,那个的nm估计是个单机模式用的,坑的一笔。。。而且hadoop到现在的确代码很多了,看起来很吃力。我看大工程的能力也的确需要提高

这里有几篇不错的参考文档。


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/06/22/深入解析yarn架构设计与技术实现-yarn-rpc框架/