根据zk集群启动QuorumPeerMain的解析,可以知道当前节点为主节点时,节点状态变为LEADING。

代码:org.apache.zookeeper.server.quorum.QuorumPeer#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void run() {
// ...
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
// 选主后的内容
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
// ...
}

由代码可知,转为LEADING后执行leader.lead()。

Leader类

简化流程图

Leader.leader()简化流程图

构造器

Leader类在构造器期间启动了ServerSocket监听请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new ProposalStats();
try {
if (self.getQuorumListenOnAllIPs()) {
ss = new ServerSocket(self.getQuorumAddress().getPort());
} else {
ss = new ServerSocket();
}
ss.setReuseAddress(true);
if (!self.getQuorumListenOnAllIPs()) {
ss.bind(self.getQuorumAddress());
}
} catch (BindException e) {
if (self.getQuorumListenOnAllIPs()) {
LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
} else {
LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
}
throw e;
}
this.zk=zk;
}

propose() 发送提案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public Proposal propose(Request request) throws XidRolloverException {
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg =
"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastProposalSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);

Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}

lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
sendPacket(pp);
}
return p;
}

processAck() 写请求“从服务器”响应

执行路径:

LearnerHandler.run() -> leader.processAck() -> commitProcessor.commit(request)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}",
Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
}

if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack by this method. However,
* the learner sends ack back to the leader after it gets UPTODATE
* so we just ignore the message.
*/
return;
}

if (outstandingProposals.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
}
return;
}
if (lastCommitted >= zxid) {
if (LOG.isDebugEnabled()) {
LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted), Long.toHexString(zxid));
}
// The proposal has already been committed
return;
}
//根据zxid取出提案
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
Long.toHexString(zxid), followerAddr);
return;
}
// 这里加上sid代表该sid的Follower同意了该提案,根据sid的数量判断是否有过半服务器同意。
p.ackSet.add(sid);
if (LOG.isDebugEnabled()) {
LOG.debug("Count for zxid: 0x{} is {}",
Long.toHexString(zxid), p.ackSet.size());
}
// 过半的服务器同意后执行提案
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
toBeApplied.add(p);
}

if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
commit(zxid);
inform(p);
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
}

lead()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
void lead() throws IOException, InterruptedException {
// 计算选主耗时
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {}", electionTimeTaken);
self.start_fle = 0;
self.end_fle = 0;

zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

try {
self.tick.set(0);
// 加载硬盘中的数据
zk.loadData();
// 上个时代和最后一次接受的提案zxid
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
// leader在构造器中启动了ServerSocket,这里开启一个线程负责处理连接,详见LearnerCnxAcceptor解析小节
cnxAcceptor.start();

readyToStart = true;
// 生成新的时代(计数值,每选一次主节点加一),详见生成新时代小节
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// 生成zxid,(epoch << 32L) | (counter & 0xffffffffL);
// zxid 高32位是时代,低32位是计数值,初始值是0
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

synchronized(this){
lastProposed = zk.getZxid();
}

newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);


if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of "
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
}
// 等待过半的服务器响应
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);

// We have to get at least a majority of servers in sync with
// us. We do this by waiting for the NEWLEADER packet to get
// acknowledged
try {
// 等待过半服务器事务同步完成
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ getSidSetString(newLeaderProposal.ackSet) + " ]");
HashSet<Long> followerSet = new HashSet<Long>();
for (LearnerHandler f : learners)
followerSet.add(f.getSid());

if (self.getQuorumVerifier().containsQuorum(followerSet)) {
LOG.warn("Enough followers present. "
+ "Perhaps the initTicks need to be increased.");
}
Thread.sleep(self.tickTime);
self.tick.incrementAndGet();
return;
}
// 更新QuorumPeer中的时代信息(同时写到文件中)
// 执行ZookeeperServer的startup(),详见ZookeeperServer解析篇
// ZKDataase对象更新lastProcessedZxid
startZkServer();

/**
* WARNING: do not use this for anything other than QA testing
* on a real cluster. Specifically to enable verification that quorum
* can handle the lower 32bit roll-over issue identified in
* ZOOKEEPER-1277. Without this option it would take a very long
* time (on order of a month say) to see the 4 billion writes
* necessary to cause the roll-over to occur.
*
* This field allows you to override the zxid of the server. Typically
* you'll want to set it to something like 0xfffffff0 and then
* start the quorum, run some operations and see the re-election.
*/
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}

if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.cnxnFactory.setZooKeeperServer(zk);
}
// Everything is a go, simply start counting the ticks
// WARNING: I couldn't find any wait statement on a synchronized
// block that would be notified by this notifyAll() call, so
// I commented it out
//synchronized (this) {
// notifyAll();
//}
// We ping twice a tick, so we only update the tick every other
// iteration
boolean tickSkip = true; // 标志位,用于记录1/2次self.tickTime时间
// 一个self.tickTime时间内ping learner服务器两次
// ping完两次后如果有过半服务器无响应则执行shutdown()停止主节点的工作
// 实际允许从服务器跳过几个ping,这个由syncLimit控制
while (true) {
Thread.sleep(self.tickTime / 2);
if (!tickSkip) { // 两次请求增加一个计数值
self.tick.incrementAndGet();
}
HashSet<Long> syncedSet = new HashSet<Long>();

// lock on the followers when we use it.
syncedSet.add(self.getId());

for (LearnerHandler f : getLearners()) {
// Synced set is used to check we have a supporting quorum, so only
// PARTICIPANT, not OBSERVER, learners should be used
if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
syncedSet.add(f.getSid());
}
f.ping();
}

// check leader running status
if (!this.isRunning()) {
shutdown("Unexpected internal error");
return;
}

if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
//if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
// Lost quorum, shutdown
shutdown("Not sufficient followers synced, only synced with sids: [ "
+ getSidSetString(syncedSet) + " ]");
// make sure the order is the same!
// the leader goes to looking
return;
}
tickSkip = !tickSkip; // 标志位变更
}
} finally {
zk.unregisterJMX(this);
}
}

生成新时代epoch

每次选主之后都会生成新的epoch,代表产生新的主节点。

生成逻辑:

epoch = lastAcceptedEpoch+1;

connectingFollowers的值要结合LearnerCnxAcceptor类,LearnerCnxAcceptor负责监听连接请求。连接之后的处理是交给LearnerHandler类处理,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized(connectingFollowers) {
if (!waitingForNewEpoch) { // 避免重复生成
return epoch;
}
// 接收的时代比当前服务器记录的时代大或相等,+1操作生成新的时代
if (lastAcceptedEpoch >= epoch) {
epoch = lastAcceptedEpoch+1;
}
// 验证当前节点是否在集群中
if (isParticipant(sid)) {
connectingFollowers.add(sid);
}
QuorumVerifier verifier = self.getQuorumVerifier();
// verifier.containsQuorum 验证集群中是否有过半服务器在线
if (connectingFollowers.contains(self.getId()) &&
verifier.containsQuorum(connectingFollowers)) {
// 设置标志位代表新的时代已经生成,避免重复生成,并唤起其他等待线程。
waitingForNewEpoch = false;
self.setAcceptedEpoch(epoch);
connectingFollowers.notifyAll();
} else {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
// 等待集群中的服务器有过半上线,直到超时或其他线程生成新时代
while(waitingForNewEpoch && cur < end) {
connectingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
// 超时,没有过半的服务器在线
if (waitingForNewEpoch) {
throw new InterruptedException("Timeout while waiting for epoch from quorum");
}
}
return epoch;
}
}

生成zxid ZxidUtils

makeZxid方法是生成Zxid的方法,在Leader.lead()中调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ZxidUtils {
static public long getEpochFromZxid(long zxid) {
return zxid >> 32L;
}
static public long getCounterFromZxid(long zxid) {
return zxid & 0xffffffffL;
}
// 高32位是时代,低32位是一个计数值,初始值为0
static public long makeZxid(long epoch, long counter) {
return (epoch << 32L) | (counter & 0xffffffffL);
}
static public String zxidToString(long zxid) {
return Long.toHexString(zxid);
}
}

等待过半节点接收新时代信息后响应waitForEpochAck

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
synchronized(electingFollowers) {
if (electionFinished) {
return;
}
if (ss.getCurrentEpoch() != -1) {
if (ss.isMoreRecentThan(leaderStateSummary)) {
throw new IOException("Follower is ahead of the leader, leader summary: "
+ leaderStateSummary.getCurrentEpoch()
+ " (current epoch), "
+ leaderStateSummary.getLastZxid()
+ " (last zxid)");
}
if (isParticipant(id)) {
electingFollowers.add(id);
}
}
QuorumVerifier verifier = self.getQuorumVerifier();
if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
electionFinished = true;
electingFollowers.notifyAll();
} else {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(!electionFinished && cur < end) {
electingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!electionFinished) {
throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
}
}
}
}

启动主节点事务处理startZkServer()

zk.startup(); 启动主节点对写事务的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private synchronized void startZkServer() {
// Update lastCommitted and Db's zxid to a value representing the new epoch
lastCommitted = zk.getZxid();
LOG.info("Have quorum of supporters, sids: [ "
+ getSidSetString(newLeaderProposal.ackSet)
+ " ]; starting up and setting last processed zxid: 0x{}",
Long.toHexString(zk.getZxid()));
zk.startup();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
* send leader election notifications to the ensemble.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
self.updateElectionVote(getEpoch());

zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
}

leader状态通信 LearnerCnxAcceptor

启动

LearnerCnxAcceptor的启动是在Leader类的lead()方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void lead() throws IOException, InterruptedException {
// ...省略
try {
self.tick.set(0);
// 加载硬盘中的数据
zk.loadData();

leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
// leader在构造器中启动了ServerSocket,这里开启一个线程负责处理连接,详见LearnerCnxAcceptor解析小节
cnxAcceptor.start();
// ...省略
}
}

LearnerCnxAcceptor解析

LearnerCnxAcceptor类的内容非常少就是ss.accept()接收连接,并将连接(socket)交给LearnerHandler在单独的线程中处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class LearnerCnxAcceptor extends ZooKeeperThread{
private volatile boolean stop = false;

public LearnerCnxAcceptor() {
super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress());
}

@Override
public void run() {
try {
while (!stop) {
try{
// 获取连接的socket
// ss是Leader类的属性,LearnerCnxAcceptor是Leader的内部类,所以可以直接使用
Socket s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);

BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
// 这里将socket的读写交给LearnerHandler在单独的线程处理
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
if (stop) {
LOG.info("exception while shutting down acceptor: "
+ e);

// When Leader.shutdown() calls ss.close(),
// the call to accept throws an exception.
// We catch and set stop to true.
stop = true;
} else {
throw e;
}
} catch (SaslException e){
LOG.error("Exception while connecting to quorum learner", e);
}
}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e);
}
}

public void halt() {
stop = true;
}
}

socket处理 LearnerHandler

LearnerHandler

1
public class LearnerHandler extends ZooKeeperThread {}

构造器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
LearnerHandler(Socket sock, BufferedInputStream bufferedInput,
Leader leader) throws IOException {
super("LearnerHandler-" + sock.getRemoteSocketAddress());
this.sock = sock;
this.leader = leader;
this.bufferedInput = bufferedInput;
try {
leader.self.authServer.authenticate(sock,
new DataInputStream(bufferedInput));
} catch (IOException e) {
LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection",
sock.getRemoteSocketAddress(), e);
try {
sock.close();
} catch (IOException ie) {
LOG.error("Exception while closing socket", ie);
}
throw new SaslException("Authentication failure: " + e.getMessage());
}
}
发送包sendPackets()

从队里中取出QuorumPacket,然后发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void sendPackets() throws InterruptedException {
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
while (true) {
try {
QuorumPacket p;
p = queuedPackets.poll();
if (p == null) {
bufferedOutput.flush();
p = queuedPackets.take();
}

if (p == proposalOfDeath) {
// Packet of death!
break;
}
if (p.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (p.getType() == Leader.PROPOSAL) {
syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
}
oa.writeRecord(p, "packet");
} catch (IOException e) {
if (!sock.isClosed()) {
LOG.warn("Unexpected exception at " + this, e);
try {
// this will cause everything to shutdown on
// this learner handler and will help notify
// the learner/observer instantaneously
sock.close();
} catch(IOException ie) {
LOG.warn("Error closing socket for handler " + this, ie);
}
}
break;
}
}
}

run()方法

run()方法负责处理请求
简化流程图
Leader主从通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
 @Override
public void run() {
try {
leader.addLearnerHandler(this);
tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;
// 输入流和输出流
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);

QuorumPacket qp = new QuorumPacket();
// 读取数据
ia.readRecord(qp, "packet");
// 状态不对
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
if (learnerInfoData.length == 8) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
this.sid = bbsid.getLong();
} else {
LearnerInfo li = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
this.sid = li.getServerid();
this.version = li.getProtocolVersion();
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}

LOG.info("Follower sid: " + sid + " : info : "
+ leader.self.quorumPeers.get(sid));

if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}

long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
// 获取新时代,新时代生成必须有过半节点连接
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);

if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// 等待过半节点接收新时代信息后响应
leader.waitForEpochAck(this.getSid(), ss);
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
// 响应主节信息
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
// 读取响应
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
// 等待过半节点接收新时代信息后响应
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();

/* the default to send to the follower */
int packetToSend = Leader.SNAP; // SNAP默认是发送快照
long zxidToSend = 0;
long leaderLastZxid = 0;
/** the packets that the follower needs to get updates from **/
long updates = peerLastZxid;

/* we are sending the diff check if we have proposals in memory to be able to
* send a diff to the
*/
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LOG.info("Synchronizing with Follower sid: " + sid
+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
// 获取已经接受的提案
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
// 已经和主节点的数据一致了
if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
// Follower is already sync with us, send empty diff
LOG.info("leader and follower are in sync, zxid=0x{}",
Long.toHexString(peerLastZxid));
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else if (proposals.size() != 0) {
LOG.debug("proposal size is {}", proposals.size());
// 主节点的事务比较大,其他节点要同步主节点的数据
// 而且要大于最小事务,不然只能快照同步
if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
LOG.debug("Sending proposals to follower");

// as we look through proposals, this variable keeps track of previous
// proposal Id.
long prevProposalZxid = minCommittedLog;

// Keep track of whether we are about to send the first packet.
// Before sending the first packet, we have to tell the learner
// whether to expect a trunc or a diff
boolean firstPacket=true;

// If we are here, we can use committedLog to sync with
// follower. Then we only need to decide whether to
// send trunc or not
packetToSend = Leader.DIFF;
zxidToSend = maxCommittedLog;

for (Proposal propose: proposals) {
// skip the proposals the peer already has
// 跳过已经接受的事务
if (propose.packet.getZxid() <= peerLastZxid) {
prevProposalZxid = propose.packet.getZxid();
continue;
} else {
// If we are sending the first packet, figure out whether to trunc
// in case the follower has some proposals that the leader doesn't
if (firstPacket) {
firstPacket = false;
// Does the peer have some proposals that the leader hasn't seen yet
if (prevProposalZxid < peerLastZxid) {
// send a trunc message before sending the diff
packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
}
}
queuePacket(propose.packet);
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
queuePacket(qcommit);
}
}
} else if (peerLastZxid > maxCommittedLog) { // 其他服务器比主节点的事务高
LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
Long.toHexString(maxCommittedLog),
Long.toHexString(updates));

packetToSend = Leader.TRUNC;
zxidToSend = maxCommittedLog;
updates = zxidToSend;
} else {
LOG.warn("Unhandled proposal scenario");
}
} else {
// just let the state transfer happen
LOG.debug("proposals is empty");
}

LOG.info("Sending " + Leader.getPacketType(packetToSend));
leaderLastZxid = leader.startForwarding(this, updates);

} finally {
rl.unlock();
}
// 同步数据完毕
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
ZxidUtils.makeZxid(newEpoch, 0), null, null);
if (getVersion() < 0x10000) {
oa.writeRecord(newLeaderQP, "packet");
} else {
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
//Need to set the zxidToSend to the latest zxid
if (packetToSend == Leader.SNAP) {
zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
}
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();

/* if we are not truncating or sending a diff just send a snapshot */
// 发送快照
if (packetToSend == Leader.SNAP) {
LOG.info("Sending snapshot last zxid of peer is 0x"
+ Long.toHexString(peerLastZxid) + " "
+ " zxid of leader is 0x"
+ Long.toHexString(leaderLastZxid)
+ "sent zxid of db as 0x"
+ Long.toHexString(zxidToSend));
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();

// Start sending packets
// 开启线程发送
new Thread() {
public void run() {
Thread.currentThread().setName(
"Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption",e);
}
}
}.start();

/*
* Have to wait for the first ACK, wait until
* the leader is ready, and only then we can
* start processing messages.
*/
qp = new QuorumPacket();
// 等待响应
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.ACK){
LOG.error("Next packet was supposed to be an ACK");
return;
}
LOG.info("Received NEWLEADER-ACK message from " + getSid());
// 等待半数回应
leader.waitForNewLeaderAck(getSid(), qp.getZxid());

syncLimitCheck.start();

// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

/*
* Wait until leader starts up
*/
synchronized(leader.zk){
while(!leader.zk.isRunning() && !this.isInterrupted()){
leader.zk.wait(20);
}
}
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
// 标记什么时候可以开始使用数据
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");

long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
// tick: 代表当前leader第几次ping服务器,变化在Leader.lead()
// syncLimit: 允许从服务器丢失几次ping,在zoo.cfg中配置
// 两者相加代表从服务器失去连接的最大计数值,校验在 org.apache.zookeeper.server.quorum.LearnerHandler#synced()
tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;

ByteBuffer bb;
long sessionId;
int cxid;
int type;

switch (qp.getType()) {
case Leader.ACK: // 响应某个提案
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
// 处理响应,如果有过半服务器都响应了这个提案就执行这个提案
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING: // 保持连接,维持session
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp
.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
// 维持session
leader.zk.touch(sess, to);
}
break;
case Leader.REVALIDATE: // 重新生效,校验session
bis = new ByteArrayInputStream(qp.getData());
dis = new DataInputStream(bis);
long id = dis.readLong();
int to = dis.readInt();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
// session
boolean valid = leader.zk.touch(id, to);
if (valid) {
try {
//set the session owner
// as the follower that
// owns the session
leader.zk.setOwner(id, this);
} catch (SessionExpiredException e) {
LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
}
}
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(id)
+ " is valid: "+ valid);
}
// 返回校验结果
dos.writeBoolean(valid);
qp.setData(bos.toByteArray());
queuedPackets.add(qp);
break;
case Leader.REQUEST: // 请求
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
leader.zk.submitRequest(si); //提交请求,处理请求在zk解析(3)中解析ZooKeeperServer
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break;
}
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock "
+ "still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch(IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception causing shutdown", e);
} finally {
LOG.warn("******* GOODBYE "
+ (sock != null ? sock.getRemoteSocketAddress() : "<null>")
+ " ********");
shutdown();
}
}