利用zookeeperWatcher监控zookeeper node变化

我们知识图谱(knowledge graph,简称kg)的数据是存在es和hbase里面的。如果有时候数据,尤其是hbase rowkey计算逻辑发生修改,就需要重做整个kg的数据,一般是建一个新的es index和hbase的表。为了能够让线上服务自动加载新的kg地址,我们是把配置写在zookeeper里面然后在kgclient启动一个zkwatcher来实时获得新的kg地址并重启kgclient。

zookeeperwatcher的基本知识

zookeeper监听比较重要的两个类是ZookeeperWatcher和ZookeeperListener。Watcher是监听的接口,每个需要watcher的地方都需要实例化一个Watcher,比如hbase的HMaster,RegionServer等

所有需要监听的事件通过listener操作并需要通过ZookeeperWatcher.registerListener注册自己。 (注意,wather在调用listeners里的方法时将会被阻塞,所以listener里的方法不要long-running)

Listener比较重要的函数是:

  • void nodeChildrenChanged(String path):Called when an existing node has a child node added or removed.
  • void nodeCreated(String path):Called when a new node has been created.
  • void nodeDataChanged(String path):Called when an existing node has changed data.
  • void nodeDeleted(String path):Called when a node has been deleted

在zookeeper任意节点有对应变化时,对于函数会被调用,而path参数就是变化节点的路径。

我们的使用方式

先看ZookeeperListener的子类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ConfigWatcher extends ZooKeeperListener {
private String basePath;
public ConfigWatcher(ZooKeeperWatcher watcher, String base) {
super(watcher);
basePath = base;
}
public void start() throws KeeperException {
watcher.registerListener(this);
if (ZKUtil.watchAndCheckExists(watcher, basePath)) {
byte[] data = ZKUtil.getDataAndWatch(zkw, basePath);
if (data != null) {
parseConfigFromZk(data);
}
} else {
logger.warn("Cannot find config from zk at " + basePath +", waiting ...");
}
}
private void handle(String path) {
if (!basePath.equals(path))
return;
try {
byte[] data;
data = ZKUtil.getDataAndWatch(zkw, basePath);
if (data != null) {
parseConfigAndRestart(data);
}
} catch (KeeperException e) {
String errMsg = "Error when fetching data from zk on path " + path + ". Aborting ...";
logger.error(errMsg);
watcher.abort(errMsg, e);
}
}
@Override
public void nodeCreated(String path) {
handle(path);
}
@Override
public void nodeDataChanged(String path) {
handle(path);
}
public void close() {
if (this.watcher != null) {
watcher.unregisterListener(this);
}
}
}

configWatcher启动时在watcher中注册自己,然后第一次拿到配置。他重载了Listener的nodeCreated和nodeDataChanged,当节点变化时就调用handle函数。而handle函数会通过getDataAndWatch来获取新的配置并且继续watch。如果不需要继续watch就调用getdata即可。

再看一下kgclient的config:

1
2
3
4
5
6
7
8
9
10
11
12
synchronized public void config(String hosts, int port, String basePath) throws IOException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", hosts);
conf.setInt("hbase.zookeeper.property.clientPort", port);
zkw = new ZooKeeperWatcher(conf, ConfigWatcher.class.getName(), null);
watcher = new ConfigWatcher(zkw, basePath);
try {
watcher.start();
} catch (KeeperException e) {
throw new IOException(e);
}
}

这个函数new了一个zkWatcher和一个configWatcher,并启动了这个configWatch。而这个函数是在创建kgclient单例时调用的。通过这个方法,我们就能动态的得到kgclient的配置,保证他从最新的hbase表读取数据

参考文献

HBase源码解析(二) HMaster主要类成员解析


本文采用创作共用保留署名-非商业-禁止演绎4.0国际许可证,欢迎转载,但转载请注明来自http://thousandhu.github.io,并保持转载后文章内容的完整。本人保留所有版权相关权利。

本文链接:http://thousandhu.github.io/2016/07/23/利用zookeeperWatcher监控zookeeper-node变化/