深入解析yarn架构设计与技术实现-事件库与状态机

事件库主要是通过dispatch分发,以AysncDispatcher为例。AsyncDispatcher有一个blockingQueue来缓存event。AsyncDispatcher服务启动时会启动一个线程,不断的去eventQueue拿任务并且分发。分发调用dispatcher线程,而这个dispatcher会根据任务的不同类型,将任务分发到不同的eventHandler。eventhandler是需要调用dispatcher的register函数进行注册的。分发任务去不同handler的过程就是调用handler的handle函数。这个函数有可能是做一些具体的处理,也可能是接受这个event,将其放入自己的eventQueue。如果是后者,那这个过程实际上是一个事件转发,从上游调度器转发到下游调度器。看代码时迷惑可能会迷惑在同一个dispatcher的run/createThread函数启动一个线程后不停的take event,而handle函数却在不停的put event,感觉像是一个死循环。真实情况是自己的handle函数一般是上游在调用,相当于把任务转发给你,而你自己在调用自己的dispatcher时,会先选择一个event的handler,然后调用handler.handle,这个handler通常不再是自己,也就是说把它转发给下游。这种基于事件驱动的方式有着异步,高并发的特点,相比于直接的函数调用更适合大型分布式系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}

状态机的构建主要是状态注册addTransition和installToplogy。而执行时主要是调用查看现在的状态getCurrentState()和doTranstion()两个函数


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/06/24/事件库-深入解析yarn架构设计与技术实现-md/