public String createNode(String path, byte data[], List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time) throws KeeperException.NoNodeException, KeeperException.NodeExistsException { int lastSlash = path.lastIndexOf('/'); String parentName = path.substring(0, lastSlash); String childName = path.substring(lastSlash + 1); StatPersisted stat = new StatPersisted(); stat.setCtime(time); stat.setMtime(time); stat.setCzxid(zxid); stat.setMzxid(zxid); stat.setPzxid(zxid); stat.setVersion(0); stat.setAversion(0); stat.setEphemeralOwner(ephemeralOwner); DataNode parent = nodes.get(parentName); if (parent == null) { thrownew KeeperException.NoNodeException(); } synchronized (parent) { Set<String> children = parent.getChildren(); if (children.contains(childName)) { thrownew KeeperException.NodeExistsException(); } if (parentCVersion == -1) { parentCVersion = parent.stat.getCversion(); parentCVersion++; } parent.stat.setCversion(parentCVersion); parent.stat.setPzxid(zxid); Long longval = aclCache.convertAcls(acl); DataNode child = new DataNode(parent, data, longval, stat); parent.addChild(childName); nodes.put(path, child); if (ephemeralOwner != 0) { HashSet<String> list = ephemerals.get(ephemeralOwner); if (list == null) { list = new HashSet<String>(); ephemerals.put(ephemeralOwner, list); } synchronized (list) { list.add(path); } } } // now check if its one of the zookeeper node child if (parentName.startsWith(quotaZookeeper)) { // now check if its the limit node if (Quotas.limitNode.equals(childName)) { // this is the limit node // get the parent and add it to the trie pTrie.addPath(parentName.substring(quotaZookeeper.length())); } if (Quotas.statNode.equals(childName)) { updateQuotaForPath(parentName .substring(quotaZookeeper.length())); } } // also check to update the quotas for this node String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { // ok we have some match and need to update updateCount(lastPrefix, 1); updateBytes(lastPrefix, data == null ? 0 : data.length); } dataWatches.triggerWatch(path, Event.EventType.NodeCreated); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); return path; }
privatefinal HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();
privatefinal HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
设置watcher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
publicsynchronizedvoidaddWatch(String path, Watcher watcher){ HashSet<Watcher> list = watchTable.get(path); if (list == null) { // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); }