zk解析(2):zk集群Leader初始化leader.lead()
根据zk集群启动QuorumPeerMain的解析,可以知道当前节点为主节点时,节点状态变为LEADING。
代码:org.apache.zookeeper.server.quorum.QuorumPeer#run1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void run() {
// ...
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
// 选主后的内容
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
// ...
}
由代码可知,转为LEADING后执行leader.lead()。
Leader类
简化流程图
构造器
Leader类在构造器期间启动了ServerSocket监听请求。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new ProposalStats();
try {
if (self.getQuorumListenOnAllIPs()) {
ss = new ServerSocket(self.getQuorumAddress().getPort());
} else {
ss = new ServerSocket();
}
ss.setReuseAddress(true);
if (!self.getQuorumListenOnAllIPs()) {
ss.bind(self.getQuorumAddress());
}
} catch (BindException e) {
if (self.getQuorumListenOnAllIPs()) {
LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
} else {
LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
}
throw e;
}
this.zk=zk;
}
propose() 发送提案
1 | public Proposal propose(Request request) throws XidRolloverException { |
processAck() 写请求“从服务器”响应
执行路径:
LearnerHandler.run() -> leader.processAck() -> commitProcessor.commit(request)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}",
Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
}
if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack by this method. However,
* the learner sends ack back to the leader after it gets UPTODATE
* so we just ignore the message.
*/
return;
}
if (outstandingProposals.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
}
return;
}
if (lastCommitted >= zxid) {
if (LOG.isDebugEnabled()) {
LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted), Long.toHexString(zxid));
}
// The proposal has already been committed
return;
}
//根据zxid取出提案
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
Long.toHexString(zxid), followerAddr);
return;
}
// 这里加上sid代表该sid的Follower同意了该提案,根据sid的数量判断是否有过半服务器同意。
p.ackSet.add(sid);
if (LOG.isDebugEnabled()) {
LOG.debug("Count for zxid: 0x{} is {}",
Long.toHexString(zxid), p.ackSet.size());
}
// 过半的服务器同意后执行提案
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
toBeApplied.add(p);
}
if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
commit(zxid);
inform(p);
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
}
lead()方法
1 | void lead() throws IOException, InterruptedException { |
生成新时代epoch
每次选主之后都会生成新的epoch,代表产生新的主节点。
生成逻辑:
epoch = lastAcceptedEpoch+1;
connectingFollowers的值要结合LearnerCnxAcceptor类,LearnerCnxAcceptor负责监听连接请求。连接之后的处理是交给LearnerHandler类处理,1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized(connectingFollowers) {
if (!waitingForNewEpoch) { // 避免重复生成
return epoch;
}
// 接收的时代比当前服务器记录的时代大或相等,+1操作生成新的时代
if (lastAcceptedEpoch >= epoch) {
epoch = lastAcceptedEpoch+1;
}
// 验证当前节点是否在集群中
if (isParticipant(sid)) {
connectingFollowers.add(sid);
}
QuorumVerifier verifier = self.getQuorumVerifier();
// verifier.containsQuorum 验证集群中是否有过半服务器在线
if (connectingFollowers.contains(self.getId()) &&
verifier.containsQuorum(connectingFollowers)) {
// 设置标志位代表新的时代已经生成,避免重复生成,并唤起其他等待线程。
waitingForNewEpoch = false;
self.setAcceptedEpoch(epoch);
connectingFollowers.notifyAll();
} else {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
// 等待集群中的服务器有过半上线,直到超时或其他线程生成新时代
while(waitingForNewEpoch && cur < end) {
connectingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
// 超时,没有过半的服务器在线
if (waitingForNewEpoch) {
throw new InterruptedException("Timeout while waiting for epoch from quorum");
}
}
return epoch;
}
}
生成zxid ZxidUtils
makeZxid方法是生成Zxid的方法,在Leader.lead()中调用。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class ZxidUtils {
static public long getEpochFromZxid(long zxid) {
return zxid >> 32L;
}
static public long getCounterFromZxid(long zxid) {
return zxid & 0xffffffffL;
}
// 高32位是时代,低32位是一个计数值,初始值为0
static public long makeZxid(long epoch, long counter) {
return (epoch << 32L) | (counter & 0xffffffffL);
}
static public String zxidToString(long zxid) {
return Long.toHexString(zxid);
}
}
等待过半节点接收新时代信息后响应waitForEpochAck
1 | public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { |
启动主节点事务处理startZkServer()
zk.startup(); 启动主节点对写事务的处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private synchronized void startZkServer() {
// Update lastCommitted and Db's zxid to a value representing the new epoch
lastCommitted = zk.getZxid();
LOG.info("Have quorum of supporters, sids: [ "
+ getSidSetString(newLeaderProposal.ackSet)
+ " ]; starting up and setting last processed zxid: 0x{}",
Long.toHexString(zk.getZxid()));
zk.startup();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
* send leader election notifications to the ensemble.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
self.updateElectionVote(getEpoch());
zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
}
leader状态通信 LearnerCnxAcceptor
启动
LearnerCnxAcceptor的启动是在Leader类的lead()方法中。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17void lead() throws IOException, InterruptedException {
// ...省略
try {
self.tick.set(0);
// 加载硬盘中的数据
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
// leader在构造器中启动了ServerSocket,这里开启一个线程负责处理连接,详见LearnerCnxAcceptor解析小节
cnxAcceptor.start();
// ...省略
}
}
LearnerCnxAcceptor解析
LearnerCnxAcceptor类的内容非常少就是ss.accept()接收连接,并将连接(socket)交给LearnerHandler在单独的线程中处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50class LearnerCnxAcceptor extends ZooKeeperThread{
private volatile boolean stop = false;
public LearnerCnxAcceptor() {
super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress());
}
public void run() {
try {
while (!stop) {
try{
// 获取连接的socket
// ss是Leader类的属性,LearnerCnxAcceptor是Leader的内部类,所以可以直接使用
Socket s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
// 这里将socket的读写交给LearnerHandler在单独的线程处理
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
if (stop) {
LOG.info("exception while shutting down acceptor: "
+ e);
// When Leader.shutdown() calls ss.close(),
// the call to accept throws an exception.
// We catch and set stop to true.
stop = true;
} else {
throw e;
}
} catch (SaslException e){
LOG.error("Exception while connecting to quorum learner", e);
}
}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e);
}
}
public void halt() {
stop = true;
}
}
socket处理 LearnerHandler
LearnerHandler1
public class LearnerHandler extends ZooKeeperThread {}
构造器
1 | LearnerHandler(Socket sock, BufferedInputStream bufferedInput, |
发送包sendPackets()
从队里中取出QuorumPacket,然后发送1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41private void sendPackets() throws InterruptedException {
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
while (true) {
try {
QuorumPacket p;
p = queuedPackets.poll();
if (p == null) {
bufferedOutput.flush();
p = queuedPackets.take();
}
if (p == proposalOfDeath) {
// Packet of death!
break;
}
if (p.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (p.getType() == Leader.PROPOSAL) {
syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
}
oa.writeRecord(p, "packet");
} catch (IOException e) {
if (!sock.isClosed()) {
LOG.warn("Unexpected exception at " + this, e);
try {
// this will cause everything to shutdown on
// this learner handler and will help notify
// the learner/observer instantaneously
sock.close();
} catch(IOException ie) {
LOG.warn("Error closing socket for handler " + this, ie);
}
}
break;
}
}
}
run()方法
run()方法负责处理请求
简化流程图
1 |
|