DataTree

设置Watcher

设置watcher请求最终的处理方法。

参数中的Watcher可以查看NIOServerCnxn的解析,NIOServerCnxn负责了与客户端的链接,所以可以通过NIOServerCnxn发送响应。

watcher是设置在WatchManager中,由WatchManager管理Watcher。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches,
Watcher watcher) {
for (String path : dataWatches) {
DataNode node = getNode(path);
if (node == null) {
watcher.process(new WatchedEvent(EventType.NodeDeleted,
KeeperState.SyncConnected, path));
} else if (node.stat.getMzxid() > relativeZxid) {
watcher.process(new WatchedEvent(EventType.NodeDataChanged,
KeeperState.SyncConnected, path));
} else {
this.dataWatches.addWatch(path, watcher);
}
}
for (String path : existWatches) {
DataNode node = getNode(path);
if (node != null) {
watcher.process(new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected, path));
} else {
this.dataWatches.addWatch(path, watcher);
}
}
for (String path : childWatches) {
DataNode node = getNode(path);
if (node == null) {
watcher.process(new WatchedEvent(EventType.NodeDeleted,
KeeperState.SyncConnected, path));
} else if (node.stat.getPzxid() > relativeZxid) {
watcher.process(new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected, path));
} else {
this.childWatches.addWatch(path, watcher);
}
}
}

NIOServerCnxn.process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}

// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();

sendResponse(h, e, "notification");
}

创建节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public String createNode(String path, byte data[], List<ACL> acl,
long ephemeralOwner, int parentCVersion, long zxid, long time)
throws KeeperException.NoNodeException,
KeeperException.NodeExistsException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
StatPersisted stat = new StatPersisted();
stat.setCtime(time);
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}

if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++;
}
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
Long longval = aclCache.convertAcls(acl);
DataNode child = new DataNode(parent, data, longval, stat);
parent.addChild(childName);
nodes.put(path, child);
if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
}
// now check if its one of the zookeeper node child
if (parentName.startsWith(quotaZookeeper)) {
// now check if its the limit node
if (Quotas.limitNode.equals(childName)) {
// this is the limit node
// get the parent and add it to the trie
pTrie.addPath(parentName.substring(quotaZookeeper.length()));
}
if (Quotas.statNode.equals(childName)) {
updateQuotaForPath(parentName
.substring(quotaZookeeper.length()));
}
}
// also check to update the quotas for this node
String lastPrefix;
if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
// ok we have some match and need to update
updateCount(lastPrefix, 1);
updateBytes(lastPrefix, data == null ? 0 : data.length);
}
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
return path;
}

WatchManager

继承关系

1
public class WatchManager {}

成员变量

1
2
3
4
5
6
7
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);

private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();

private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();

设置watcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);

HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}

triggerWatch()节点产生事件通知客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}

removeWatcher 移除Warcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public synchronized void removeWatcher(Watcher watcher) {
HashSet<String> paths = watch2Paths.remove(watcher);
if (paths == null) {
return;
}
for (String p : paths) {
HashSet<Watcher> list = watchTable.get(p);
if (list != null) {
list.remove(watcher);
if (list.size() == 0) {
watchTable.remove(p);
}
}
}
}