前言

节点未成为主节点且未设置为observer,此时该节点就处于Follower状态。

代码参考zk解析(1)-zk集群启动及选主:

org.apache.zookeeper.server.quorum.QuorumPeer#run
org.apache.zookeeper.server.quorum.Follower#followLeader
QuorumPeer#run处于FOLLOWING状态时执行follower.followLeader();

1
2
3
4
5
6
7
8
9
10
11
12
13
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;

Follower类

继承关系

Follower继承了Learner类,先查阅zk解析(4)-Learner章节

1
public class Follower extends Learner{}

构造器

1
2
3
4
5
Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
this.self = self;
this.zk=zk;
this.fzk = zk;
}

followLeader()

  1. 计算选主耗时;
  2. 连接主节点;
  3. 向主节点注册并获取新的zxid;
  4. zxid校验;
  5. 同步主节点数据;
  6. 循环接收数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    void followLeader() throws InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    LOG.info("FOLLOWING - LEADER ELECTION TOOK - {}", electionTimeTaken);
    self.start_fle = 0;
    self.end_fle = 0;
    fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
    try {
    QuorumServer leaderServer = findLeader();
    try {
    // 连接主节点
    connectToLeader(leaderServer.addr, leaderServer.hostname);
    // 请求主节点获取新的zxid
    long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

    //check to see if the leader zxid is lower than ours
    //this should never happen but is just a safety check
    // zxid检查
    long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
    if (newEpoch < self.getAcceptedEpoch()) {
    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
    + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
    throw new IOException("Error: Epoch of leader is lower");
    }
    // 同步主节点的历史数据
    syncWithLeader(newEpochZxid);
    QuorumPacket qp = new QuorumPacket();
    while (this.isRunning()) {
    readPacket(qp);
    processPacket(qp);
    }
    } catch (Exception e) {
    LOG.warn("Exception when following the leader", e);
    try {
    sock.close();
    } catch (IOException e1) {
    e1.printStackTrace();
    }

    // clear pending revalidations
    pendingRevalidations.clear();
    }
    } finally {
    zk.unregisterJMX((Learner)this);
    }
    }

processPacket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected void processPacket(QuorumPacket qp) throws IOException{
switch (qp.getType()) {
case Leader.PING: // 发送ping请求
ping(qp);
break;
case Leader.PROPOSAL: // 收到提案
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
fzk.logRequest(hdr, txn); // 使用SyncRequestProcessor记录日志
break;
case Leader.COMMIT: // 提交提案(使用CommitProcessor)
fzk.commit(qp.getZxid());
break;
case Leader.UPTODATE: //
LOG.error("Received an UPTODATE message after Follower started");
break;
case Leader.REVALIDATE: //重新校验
revalidate(qp);
break;
case Leader.SYNC: //同步
fzk.sync(); // 将等待队列头的提案commit(使用CommitProcessor)
break;
default:
LOG.error("Invalid packet type: {} received by Observer", qp.getType());
}
}