ZKDatabase: zk内存数据存储的核心类,维护了DataTree。

DataTree:zk存储的数据结构,以树的形式保存,代表了内存中一份完整的数据

DataNode: 叶子节点,存储节点数据。

DataTree 数据结构

DataTree中维护了一个DataNode的map,节点名称作为key

1
2
private final ConcurrentHashMap<String, DataNode> nodes =
new ConcurrentHashMap<String, DataNode>();

在看看DataNode的数据结构

1
2
3
4
5
6
7
8
public class DataNode implements Record {
DataNode parent;
byte data[];
Long acl;
public StatPersisted stat;
private Set<String> children = null;
private static final Set<String> EMPTY_SET = Collections.emptySet();
}

可以看出DataNode存储的子节点是节点名称,父节点存储的是节点信息。

DataTree构造器:

1
2
3
4
5
6
7
8
9
10
11
12
public DataTree() {
/* Rather than fight it, let root have an alias */
nodes.put("", root);
nodes.put(rootZookeeper, root);

/** add the proc node and quota node */
root.addChild(procChildZookeeper);
nodes.put(procZookeeper, procDataNode);

procDataNode.addChild(quotaChildZookeeper);
nodes.put(quotaZookeeper, quotaDataNode);
}

可以明确的知道DataTree中的map存储了所有节点的信息,查找子节点可以根据子节点的节点名称从map中获取。

ZKDatabase数据持久化

zk会在一定时间将内存中的数据存储到硬盘快照,而正常操作会一直记录到log中,通过快照和log的结合保证数据可恢复。

zk存储数据依赖的是FileTxnSnapLog类。

日志写入

FileTxnLog负责维护事务日志对外的接口,包括事务日志的写入和读取等。Zookeeper的事务日志写入过程大体可以分为如下6个步骤。

(1) 确定是否有事务日志可写。当Zookeeper服务器启动完成需要进行第一次事务日志的写入,或是上一次事务日志写满时,都会处于与事务日志文件断开的状态,即Zookeeper服务器没有和任意一个日志文件相关联。因此在进行事务日志写入前,Zookeeper首先会判断FileTxnLog组件是否已经关联上一个可写的事务日志文件。若没有,则会使用该事务操作关联的ZXID作为后缀创建一个事务日志文件,同时构建事务日志的文件头信息,并立即写入这个事务日志文件中去,同时将该文件的文件流放入streamToFlush集合,该集合用来记录当前需要强制进行数据落盘的文件流。

  (2) 确定事务日志文件是否需要扩容(预分配)。Zookeeper会采用磁盘空间预分配策略。当检测到当前事务日志文件剩余空间不足4096字节时,就会开始进行文件空间扩容,即在现有文件大小上,将文件增加65536KB(64MB),然后使用”0”填充被扩容的文件空间。

  (3) 事务序列化。对事务头和事务体的序列化,其中事务体又可分为会话创建事务、节点创建事务、节点删除事务、节点数据更新事务等。

  (4) 生成Checksum。为保证日志文件的完整性和数据的准确性,Zookeeper在将事务日志写入文件前,会计算生成Checksum。

  (5) 写入事务日志文件流。将序列化后的事务头、事务体和Checksum写入文件流中,此时并为写入到磁盘上。

  (6) 事务日志刷入磁盘。由于步骤5中的缓存原因,无法实时地写入磁盘文件中,因此需要将缓存数据强制刷入磁盘。

数据快照

  FileSnap负责维护快照数据对外的接口,包括快照数据的写入和读取等,将内存数据库写入快照数据文件其实是一个序列化过程。针对客户端的每一次事务操作,Zookeeper都会将他们记录到事务日志中,同时也会将数据变更应用到内存数据库中,Zookeeper在进行若干次事务日志记录后,将内存数据库的全量数据Dump到本地文件中,这就是数据快照。其步骤如下

  (1) 确定是否需要进行数据快照。每进行一次事务日志记录之后,Zookeeper都会检测当前是否需要进行数据快照,考虑到数据快照对于Zookeeper机器的影响,需要尽量避免Zookeeper集群中的所有机器在同一时刻进行数据快照。采用过半随机策略进行数据快照操作。

  (2) 切换事务日志文件。表示当前的事务日志已经写满,需要重新创建一个新的事务日志。

  (3) 创建数据快照异步线程。创建单独的异步线程来进行数据快照以避免影响Zookeeper主流程。

  (4) 获取全量数据和会话信息。从ZKDatabase中获取到DataTree和会话信息。

  (5) 生成快照数据文件名。Zookeeper根据当前已经提交的最大ZXID来生成数据快照文件名。

  (6) 数据序列化。首先序列化文件头信息,然后再对会话信息和DataTree分别进行序列化,同时生成一个Checksum,一并写入快照数据文件中去。

  2.4 初始化

  在Zookeeper服务器启动期间,首先会进行数据初始化工作,用于将存储在磁盘上的数据文件加载到Zookeeper服务器内存中。

保存流程代码解析

负责log记录和快照保存操作的类是SyncRequestProcessor类,这个处理器负责每次请求的log记录和周期性快照的持久化。

核心方法为线程运行的run()方法,其它内容详见zk解析(3)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
// .. 省略其它,其它内容详见zk解析(3)
public void run() {
try {
int logCount = 0;

// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
// 为了防止所有服务器都在同时保存快照,这里使用了一个随机数
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
// 如果toFlush集合中没有已经保存的请求,就执行队列的take
// LinkedBlockingQueue的take方法会在队列为空时堵塞线程
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else { // toFlush集合中有已经保存的请求,就要处理toFlush集合里的数据不能堵塞线程,就执行poll方法
// poll方法不堵塞线程,集合为空时返回null
si = queuedRequests.poll();
if (si == null) { //没有请求,代表线程空闲,就将log刷新到硬盘中
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) { // 有新的请求
// track the number of records written to the log
// 将日志写入stream流中,此时写入流中还在内存中未刷新到硬盘
if (zks.getZKDatabase().append(si)) {
logCount++;
// log计数值达到上限,此时触发快照保存流程
if (logCount > (snapCount / 2 + randRoll)) {
// 计算下一个随机数用于下次生成快照的时间
setRandRoll(r.nextInt(snapCount/2));
// roll the log
// 滚动日志,每次保存快照时,log日志会另起一个文件保存
zks.getZKDatabase().rollLog();
// take a snapshot
// 正在保存快照略过
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else { // 开线程保存快照
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
// 保存到集合中
toFlush.add(si);
// 集合中的请求log达到一定数量后,将内存中的log保存到硬盘中
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}

}

参考

Zookeeper数据与存储
zookeeper原理解析-数据存储