publicvoidcommitRequest(Request request){ if (syncRequestProcessorEnabled) { // Write to txnlog and take periodic snapshot syncProcessor.processRequest(request); } commitProcessor.commit(request); }
@Override protectedvoidsetupRequestProcessors(){ // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start();
/* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */ if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }
sync()
1 2 3 4 5 6 7 8
synchronizedpublicvoidsync(){ if(pendingSyncs.size() ==0){ LOG.warn("Not expecting a sync."); return; } Request r = pendingSyncs.remove(); commitProcessor.commit(r); }
// We keep a queue of requests. As requests get submitted they are // stored here. The queue is drained in the run() method. LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
@Override publicvoidrun(){ try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response // 下一个处理器CommitProcessor,写请求添加进队列等待提交请求 nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this Observer has pending, so we // add it to pendingSyncs. // 写请求转发给Leader服务器 switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getObserver().request(request); break; case OpCode.create: case OpCode.delete: case OpCode.setData: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: case OpCode.multi: zks.getObserver().request(request); break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("ObserverRequestProcessor exited loop!"); }