LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
int sessionTimeout;
protectedfinal ZooKeeperServer zkServer;
/** * The number of requests that have been submitted but not yet responded to. */ int outstandingRequests;
/** * This is the id that uniquely identifies the session of a client. Once * this session is no longer active, the ephemeral nodes will go away. */ long sessionId;
staticlong nextSessionId = 1; int outstandingLimit = 1;
voiddoIO(SelectionKey k)throws InterruptedException { try { if (isSocketOpen() == false) { LOG.warn("trying to do i/o on a null socket for session:0x" + Long.toHexString(sessionId));
return; } if (k.isReadable()) { /* 处理读操作的流程 1.最开始incomingBuffer就是lenBuffer,容量为4.第一次读取4个字节,即此次请求报文的长度 2.根据请求报文的长度分配incomingBuffer的大小 3.将读到的字节存放在incomingBuffer中,直至读满 (由于第2步中为incomingBuffer分配的长度刚好是报文的长度,此时incomingBuffer中刚好时一个报文) 4.处理报文 */ int rc = sock.read(incomingBuffer); if (rc < 0) { thrownew EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } /* 只有incomingBuffer.remaining() == 0,才会进行下一步的处理,否则一直读取数据直到incomingBuffer读满,此时有两种可能: 1.incomingBuffer就是lenBuffer,此时incomingBuffer的内容是此次请求报文的长度. 根据lenBuffer为incomingBuffer分配空间后调用readPayload(). 在readPayload()中会立马进行一次数据读取,(1)若可以将incomingBuffer读满,则incomingBuffer中就是一个完整的请求,处理该请求; (2)若不能将incomingBuffer读满,说明出现了拆包问题,此时不能构造一个完整的请求,只能等待客户端继续发送数据,等到下次socketChannel可读时,继续将数据读取到incomingBuffer中 2.incomingBuffer不是lenBuffer,说明上次读取时出现了拆包问题,incomingBuffer中只有一个请求的部分数据. 而这次读取的数据加上上次读取的数据凑成了一个完整的请求,调用readPayload() */ if (incomingBuffer.remaining() == 0) { // 完整报文读取,remaining还能存多少数据 boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword readPayload(); // 这里处理请求 } else { // four letter words take care // need not do anything else return; } } } if (k.isWritable()) { //发送请求 // ZooLog.logTraceMessage(LOG, // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK // "outgoingBuffers.size() = " + // outgoingBuffers.size()); if (outgoingBuffers.size() > 0) { //队列中有要发送的数据 // ZooLog.logTraceMessage(LOG, // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, // "sk " + k + " is valid: " + // k.isValid());
/* * This is going to reset the buffer position to 0 and the * limit to the size of the buffer, so that we can fill it * with data from the non-direct buffers that we need to * send. */ ByteBuffer directBuffer = factory.directBuffer; directBuffer.clear();
for (ByteBuffer b : outgoingBuffers) { if (directBuffer.remaining() < b.remaining()) { /* * When we call put later, if the directBuffer is to * small to hold everything, nothing will be copied, * so we've got to slice the buffer if it's too big. */ b = (ByteBuffer) b.slice().limit( directBuffer.remaining()); } /* * put() is going to modify the positions of both * buffers, put we don't want to change the position of * the source buffers (we'll do that after the send, if * needed), so we save and reset the position after the * copy */ int p = b.position(); directBuffer.put(b); b.position(p); if (directBuffer.remaining() == 0) { break; } } /* * Do the flip: limit becomes position, position gets set to * 0. This sets us up for the write. */ directBuffer.flip(); // 位置置0便于读取
int sent = sock.write(directBuffer); ByteBuffer bb;
// Remove the buffers that we have sent while (outgoingBuffers.size() > 0) { bb = outgoingBuffers.peek(); if (bb == ServerCnxnFactory.closeConn) { thrownew CloseRequestException("close requested"); } int left = bb.remaining() - sent; if (left > 0) { /* * We only partially sent this buffer, so we update * the position and exit the loop. */ bb.position(bb.position() + sent); break; } packetSent(); /* We've sent the whole buffer, so drop the buffer */ sent -= bb.remaining(); outgoingBuffers.remove(); } // ZooLog.logTraceMessage(LOG, // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send, // outgoingBuffers.size() = " + outgoingBuffers.size()); }
synchronized(this.factory){ if (outgoingBuffers.size() == 0) { if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) { thrownew CloseRequestException("responded to info probe"); } sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE)); } else { sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } } } catch (CancelledKeyException e) { LOG.warn("CancelledKeyException causing close of session 0x" + Long.toHexString(sessionId)); if (LOG.isDebugEnabled()) { LOG.debug("CancelledKeyException stack trace", e); } close(); } catch (CloseRequestException e) { // expecting close to log session closure close(); } catch (EndOfStreamException e) { LOG.warn(e.getMessage()); if (LOG.isDebugEnabled()) { LOG.debug("EndOfStreamException stack trace", e); } // expecting close to log session closure close(); } catch (IOException e) { LOG.warn("Exception causing close of session 0x" + Long.toHexString(sessionId) + ": " + e.getMessage()); if (LOG.isDebugEnabled()) { LOG.debug("IOException stack trace", e); } close(); } }
privatevoidreadPayload()throws IOException, InterruptedException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { thrownew EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } }
if (incomingBuffer.remaining() == 0) { // have we read length bytes? packetReceived(); incomingBuffer.flip();// 位置设为0,即可以从头开始读取 if (!initialized) { readConnectRequest(); } else { readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; } }
readConnectRequest()
1 2 3 4 5 6 7
privatevoidreadConnectRequest()throws IOException, InterruptedException { if (!isZKServerRunning()) { thrownew IOException("ZooKeeperServer not running"); } zkServer.processConnectRequest(this, incomingBuffer); initialized = true; }
protectedvoidinternalSendBuffer(ByteBuffer bb){ if (bb != ServerCnxnFactory.closeConn) { // We check if write interest here because if it is NOT set, // nothing is queued, so we can try to send the buffer right // away without waking up the selector if(sk.isValid() && ((sk.interestOps() & SelectionKey.OP_WRITE) == 0)) { try { sock.write(bb); } catch (IOException e) { // we are just doing best effort right now } } // if there is nothing left to send, we are done if (bb.remaining() == 0) { packetSent(); return; } }
synchronized(this.factory){ sk.selector().wakeup(); if (LOG.isTraceEnabled()) { LOG.trace("Add a buffer to outgoingBuffers, sk " + sk + " is valid: " + sk.isValid()); } outgoingBuffers.add(bb); if (sk.isValid()) { sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } }