找回密码
 立即注册
首页 业界区 业界 zk源码—6.Leader选举的实现原理

zk源码—6.Leader选举的实现原理

吕清莹 2025-6-2 00:40:54
大纲
1.zk是如何实现数据一致性的
(1)数据一致性分析
(2)实现数据一致性的广播模式
(3)实现数据一致性的恢复模式
2.zk是如何进行Leader选举的
(1)服务器启动时的Leader选举
(2)服务器运行时的Leader选举
(3)Leader选举的算法设计
(4)Leader选举的实现细节
 
1.zk是如何实现数据一致性的
(1)数据一致性分析
(2)实现数据一致性的广播模式
(3)实现数据一致性的恢复模式
 
zk集群中的服务器分为Leader服务器、Follower服务器及Observer服务器。Leader选举是一个过程,在这个过程中主要做了两项工作:
工作一:选举出Leader服务器
工作二:进行数据同步
 
zk中实现的一致性不是强一致性,而是最终一致性。即集群中各个服务器上的数据并不是每时每刻都保持一致的,而是即经过一段时间后,集群服务器上的数据才最终保持一致。
 
Leader服务器主要负责处理事务请求,当Leader服务器接收到客户端的事务请求时,会先向集群中的各机器针对该请求的提议发起投票询问。
 
(1)数据一致性分析
zk在集群中采取的是多数原则的方式来保证数据一致性。即当一个事务请求导致服务器上的数据发生改变时,只要保证多数机器的数据都正确变更了,就可保证系统数据一致性。
 
因为每个Follower服务器都可以看作是Leader服务器的数据副本,所以只要保证集群中大多数机器数据是一致的,那么在集群中个别机器出现故障时,zk集群依然能保证稳定运行。
 
(2)实现数据一致性的广播模式
一.首先Leader启动时会创建网络连接管理器LearnerCnxAcceptor等待Learner的连接
LearnerCnxAcceptor监听到Learner发起的连接后,会新建一个LearnerHandler实例专门负责Leader和该Learner之间的连接。启动LearnerHandler时,又会开启一个线程专门负责发送消息给Learner。如果Learner发生故障,那么Leader中为该Learner维护的LearnerHandler的ping()方法会检测到然后关闭相关线程和实例。
  1. public class Leader {
  2.     ...
  3.     private final ServerSocket ss;
  4.    
  5.     Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
  6.         ...
  7.         //创建BIO的ServerSocket,监听端口,等待客户端发起连接
  8.         ss = new ServerSocket();
  9.         ss.bind(self.getQuorumAddress());
  10.         ...
  11.     }
  12.    
  13.     void lead() throws IOException, InterruptedException {
  14.         ...
  15.         cnxAcceptor = new LearnerCnxAcceptor();//网络连接器
  16.         cnxAcceptor.start();
  17.         ...
  18.         while (true) {
  19.             ...
  20.             for (LearnerHandler f : getLearners()) {
  21.                 //Leader向Learner发出心跳检测
  22.                 f.ping();
  23.             }
  24.             ...
  25.         }
  26.     }
  27.    
  28.     class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
  29.         ...
  30.         public void run() {
  31.             ...
  32.             while (!stop) {
  33.                 //监听到客户端发起的连接,新建一个线程LearnerHandler专门进行处理
  34.                 Socket s = ss.accept();
  35.                 s.setSoTimeout(self.tickTime * self.initLimit);
  36.                 s.setTcpNoDelay(nodelay);
  37.                 BufferedInputStream is = new BufferedInputStream(s.getInputStream());
  38.                 LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
  39.                 fh.start();
  40.             }
  41.             ...
  42.         }
  43.         ...
  44.     }
  45.     ...
  46. }
  47. public class LearnerHandler extends ZooKeeperThread {
  48.     ...
  49.     public void run() {
  50.         //先进行数据同步
  51.         ...
  52.         //开启线程发送信息给Learner
  53.         startSendingPackets();
  54.         ...
  55.         //处理Learner发过来的消息,比如投票响应ACK消息、心跳响应PING消息等
  56.         while (true) {
  57.             qp = new QuorumPacket();
  58.             ia.readRecord(qp, "packet");
  59.             ...
  60.         }
  61.     }
  62.    
  63.     protected void startSendingPackets() {
  64.         if (!sendingThreadStarted) {
  65.             new Thread() {
  66.                 public void run() {
  67.                     Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
  68.                     try {
  69.                         sendPackets();
  70.                     } catch (InterruptedException e) {
  71.                         LOG.warn("Unexpected interruption " + e.getMessage());
  72.                     }
  73.                 }
  74.             }.start();
  75.             sendingThreadStarted = true;
  76.         } else {
  77.             LOG.error("Attempting to start sending thread after it already started");
  78.         }
  79.     }
  80.    
  81.     private void sendPackets() throws InterruptedException {
  82.         long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
  83.         while (true) {
  84.             try {
  85.                 //从queuedPackets队列中提取消息出来发送给Learner
  86.                 QuorumPacket p = queuedPackets.poll();
  87.                 if (p == proposalOfDeath) {
  88.                     // Packet of death!
  89.                     break;
  90.                 }
  91.                 oa.writeRecord(p, "packet");
  92.             } catch (IOException e) {
  93.                 //假如Leader在这里向Learner发送消息时,Learner故障了,那么就会在这里报错
  94.                 //此时,这里的报错并不影响对应的LearnerHandler实例和Leader实例
  95.                 if (!sock.isClosed()) {
  96.                     LOG.warn("Unexpected exception at " + this, e);
  97.                     try {
  98.                         sock.close();
  99.                     } catch(IOException ie) {
  100.                         LOG.warn("Error closing socket for handler " + this, ie);
  101.                     }
  102.                 }
  103.                 break;
  104.             }
  105.         }
  106.     }
  107.    
  108.     public void ping() {
  109.         if (!sendingThreadStarted) {
  110.             return;
  111.         }
  112.         long id;
  113.         if (syncLimitCheck.check(System.nanoTime())) {
  114.             synchronized(leader) {
  115.                 id = leader.lastProposed;
  116.             }
  117.             QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
  118.             queuePacket(ping);
  119.         } else {
  120.             LOG.warn("Closing connection to peer due to transaction timeout.");
  121.             //Learner故障,那么就关闭当前Learner实例
  122.             shutdown();
  123.         }
  124.     }
  125.    
  126.     public void shutdown() {
  127.         // Send the packet of death
  128.         try {
  129.             queuedPackets.put(proposalOfDeath);
  130.         } catch (InterruptedException e) {
  131.             LOG.warn("Ignoring unexpected exception", e);
  132.         }
  133.         try {
  134.             if (sock != null && !sock.isClosed()) {
  135.                 sock.close();
  136.             }
  137.         } catch (IOException e) {
  138.             LOG.warn("Ignoring unexpected exception during socket close", e);
  139.         }
  140.         this.interrupt();
  141.         leader.removeLearnerHandler(this);
  142.     }
  143.     ...
  144. }
复制代码
二.然后Leader处理Learner的事务投票响应后进行事务提交
Leader有一个HashSet为forwardingFollowers,用来管理Follower服务器。当Leader对一个事务请求发起Proposal提议的投票并发现投票通过后,也就是调用如下方法时:
  1. Leader的processAck()方法 ->
  2. Leader的tryToCommit()方法 ->
  3. Leader的commit()方法 ->
  4. Leader的sendPacket()方法
复制代码
会在Leader的sendPacket()方法中遍历forwardingFollowers里的LearnerHandler实例,将Commit请求交给Learner和Leader建立连接时生成的LearnerHandler,最后由Leader的每个LearnerHandler实例广播给对应的Learner进行事务提交。
  1. //1.Leader通过Leader.propose方法对事务请求生成Proposal提议并进行广播给所有Follower
  2. public class Leader {
  3.     private final HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();
  4.     final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
  5.     ...
  6.     public Proposal propose(Request request) throws XidRolloverException {
  7.         ...
  8.         byte[] data = SerializeUtils.serializeRequest(request);
  9.         proposalStats.setLastBufferSize(data.length);
  10.         QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
  11.         //生成Proposal提议
  12.         Proposal p = new Proposal();
  13.         p.packet = pp;
  14.         p.request = request;
  15.       
  16.         synchronized(this) {
  17.             p.addQuorumVerifier(self.getQuorumVerifier());
  18.             if (request.getHdr().getType() == OpCode.reconfig) {
  19.                 self.setLastSeenQuorumVerifier(request.qv, true);                       
  20.             }
  21.             if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
  22.                 p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
  23.             }
  24.             lastProposed = p.packet.getZxid();
  25.             //将发送的Proposal提议放入outstandingProposals队列中
  26.             outstandingProposals.put(lastProposed, p);
  27.             //发送Proposal提议,其实就是把Proposal提议交给LearnerHandler处理
  28.             sendPacket(pp);
  29.         }
  30.         return p;
  31.     }
  32.    
  33.     void sendPacket(QuorumPacket qp) {
  34.         synchronized (forwardingFollowers) {
  35.             for (LearnerHandler f : forwardingFollowers) {
  36.                 //LearnerHandler会将提议放入其发送队列里
  37.                 f.queuePacket(qp);
  38.             }
  39.         }
  40.     }
  41.     ...
  42. }
  43. //2.Leader完成事务日志记录后,便会通过Leader.processAck方法记录Leader已对Proposal提议完成投票
  44. //SyncRequestProcessor的nextProcessor就是AckRequestProcessor
  45. class AckRequestProcessor implements RequestProcessor {
  46.     ...
  47.     public void processRequest(Request request) {
  48.         ...
  49.         //Leader也作为参与Proposal投票的一份子进行ACK响应
  50.         //将Leader的SID添加到Proposal提议的投票收集器里 + 检查Proposal提议的投票收集器是否有过半ACK才提交
  51.         leader.processAck(self.getId(), request.zxid, null);
  52.         ...
  53.     }
  54. }
  55. //3.Follower收到提议的投票请求后返回ACK响应给Leader
  56. //Leader接收到Follower的ACK响应后,便会通过Leader.processAck方法记录该Follower已对提议完成投票
  57. public class LearnerHandler extends ZooKeeperThread {
  58.     //The packets to be sent to the learner
  59.     final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
  60.     ...
  61.     @Override
  62.     public void run() {
  63.         ...
  64.         startSendingPackets();//开启一个线程发送queuedPackets里的Packet给Learner
  65.         ...
  66.         while (true) {
  67.             qp = new QuorumPacket();
  68.             ia.readRecord(qp, "packet");//读取Learner的响应
  69.             ...
  70.             switch (qp.getType()) {
  71.                 case Leader.ACK:
  72.                     ...
  73.                     //如果Leader收到Follower对某Proposal提议请求返回的ACK响应
  74.                     //那么就将Follower的SID添加到该Proposal提议的投票收集器里
  75.                     leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
  76.                     break;
  77.                 ...
  78.             }
  79.         }
  80.     }
  81.    
  82.     protected void startSendingPackets() {
  83.         ...
  84.         new Thread() {
  85.             public void run() {
  86.                 Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
  87.                 sendPackets();
  88.             }
  89.         }.start();
  90.         ...
  91.     }
  92.    
  93.     private void sendPackets() throws InterruptedException {
  94.         while (true) {
  95.             ...
  96.             QuorumPacket p = queuedPackets.poll();
  97.             oa.writeRecord(p, "packet");
  98.             ...
  99.         }
  100.     }
  101.     ...
  102. }
  103. public class Leader {
  104.     private final HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();
  105.     final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
  106.     ...
  107.     synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        
  108.         ...
  109.         //检查请求的ZXID,需要比上次已提交的请求的ZXID也就是lastCommitted要大
  110.         if (lastCommitted >= zxid) {
  111.             if (LOG.isDebugEnabled()) {
  112.                 LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));
  113.             }
  114.             // The proposal has already been committed
  115.             return;
  116.         }
  117.         Proposal p = outstandingProposals.get(zxid);
  118.         //将Leader的SID添加到Proposal提议的投票收集器里
  119.         p.addAck(sid);
  120.         //尝试提交,即检查Proposal提议的投票收集器中是否有过半ACK响应
  121.         boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
  122.         ...
  123.     }
  124.    
  125.     synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {      
  126.         //如果提议队列中存在该提议的前一个提议,说明该提议的前一个提议还没提交,那么就返回false
  127.         if (outstandingProposals.containsKey(zxid - 1)) return false;
  128.         //getting a quorum from all necessary configurations.
  129.         //Proposal提议的投票收集器是否已过半
  130.         if (!p.hasAllQuorums()) {
  131.             return false;                 
  132.         }
  133.         ...
  134.         outstandingProposals.remove(zxid);
  135.         if (p.request != null) {
  136.             toBeApplied.add(p);
  137.         }
  138.         ...
  139.         //一旦提议通过,马上就要在Leader中标记lastCommitted为最新的提交ZXID
  140.         commit(zxid);//给Follower广播commit消息
  141.         inform(p);//给Observer发送commit消息
  142.         ...
  143.         //调用CommitProcessor处理器的commit方法提交请求
  144.         zk.commitProcessor.commit(p.request);//让Leader执行commit消息
  145.         //下面处理的是Learner发起的同步请求
  146.         if (pendingSyncs.containsKey(zxid)) {
  147.             for (LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
  148.                 sendSync(r);
  149.             }               
  150.         }
  151.         return  true;   
  152.     }
  153.    
  154.     //广播commit消息
  155.     public void commit(long zxid) {
  156.         synchronized(this) {
  157.             //标记lastCommitted为最新的提交ZXID
  158.             lastCommitted = zxid;
  159.         }
  160.         QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
  161.         sendPacket(qp);
  162.     }
  163.    
  164.     void sendPacket(QuorumPacket qp) {
  165.         synchronized (forwardingFollowers) {
  166.             for (LearnerHandler f : forwardingFollowers) {
  167.                 //调用LearnerHandler的queuePacket方法添加Packet到发送队列
  168.                 f.queuePacket(qp);
  169.             }
  170.         }
  171.     }
  172.    
  173.     public void inform(Proposal proposal) {
  174.         QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);
  175.         sendObserverPacket(qp);
  176.     }
  177.     ...
  178.     static public class Proposal extends SyncedLearnerTracker {
  179.         public QuorumPacket packet;
  180.         public Request request;
  181.         ...
  182.     }
  183. }
  184. public class SyncedLearnerTracker {
  185.     protected ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();
  186.     ...
  187.     //添加到投票收集器
  188.     public boolean addAck(Long sid) {
  189.         boolean change = false;
  190.         for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
  191.             if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {
  192.                 qvAckset.getAckset().add(sid);
  193.                 change = true;
  194.             }
  195.         }
  196.         return change;
  197.     }
  198.    
  199.     //判断投票收集器是否过半
  200.     public boolean hasAllQuorums() {
  201.         for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
  202.             if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
  203.                 return false;
  204.         }
  205.         return true;
  206.     }
  207.     ...
  208. }
复制代码
(3)实现数据一致性的恢复模式
当Leader故障时,Follower服务器会发生如下操作:首先Follower的followLeader()方法里的while循环会被中断运行,然后在QuorumPeer线程中就会触发执行Follower的shutdown()方法,接着执行QuorumPeer的updateServerState()方法更改节点的状态为LOOKING,之后Follower服务器在QuorumPeer线程中会重新进行Leader选举。
 
重新选举Leader需要经历一段时间,此时集群会短暂没有Leader服务器,而且重新选举Leader期间,Follower也会被关闭。
 
注意:Leader故障时,ZooKeeperServer的shutdown()方法会关闭firstProcessor线程。所以恢复模式下的选举过程中,发送到Learner的请求会进入firstProcessor,但是这些请求都会先被queuedRequests存起来,暂时不处理。
  1. public class QuorumPeerMain {
  2.     protected QuorumPeer quorumPeer;
  3.    
  4.     public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
  5.         ...
  6.         quorumPeer.start();
  7.         quorumPeer.join();
  8.         ...
  9.     }
  10.     ...
  11. }
  12. public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
  13.     ...
  14.     public synchronized void start() {
  15.         loadDataBase();
  16.         startServerCnxnFactory();
  17.         adminServer.start();
  18.         //初始化Leader选举(初始化当前投票+监听选举端口+启动选举守护线程)
  19.         startLeaderElection();
  20.         startJvmPauseMonitor();
  21.         super.start();
  22.     }
  23.    
  24.     @Override
  25.     public void run() {
  26.         ...
  27.         while (running) {
  28.             switch (getPeerState()) {
  29.                 case LOOKING:
  30.                     ...
  31.                     if (shuttingDownLE) {
  32.                         shuttingDownLE = false;
  33.                         startLeaderElection();
  34.                     }
  35.                     //调用QuorumPeer.electionAlg的lookForLeader(),也就是FastLeaderElection.lookForLeader()开启一轮选举
  36.                     setCurrentVote(makeLEStrategy().lookForLeader());
  37.                     ...
  38.                 case FOLLOWING:
  39.                     try {
  40.                         LOG.info("FOLLOWING");
  41.                         setFollower(makeFollower(logFactory));
  42.                         //Leader故障,那么就会中断follower.followLeader()里的的while循环
  43.                         follower.followLeader();
  44.                     } catch (Exception e) {
  45.                         LOG.warn("Unexpected exception",e);
  46.                     } finally {
  47.                         //Leader故障,就会执行这里的方法
  48.                         follower.shutdown();
  49.                         setFollower(null);
  50.                         updateServerState();
  51.                     }
  52.                     break;
  53.                 case LEADING:
  54.                     LOG.info("LEADING");
  55.                     try {
  56.                         setLeader(makeLeader(logFactory));
  57.                         leader.lead();
  58.                         setLeader(null);
  59.                     } catch (Exception e) {
  60.                         LOG.warn("Unexpected exception",e);
  61.                     } finally {
  62.                         if (leader != null) {
  63.                             leader.shutdown("Forcing shutdown");
  64.                             setLeader(null);
  65.                         }
  66.                         updateServerState();
  67.                     }
  68.                     break;
  69.             }
  70.             ...
  71.         }
  72.         ...
  73.     }
  74.    
  75.     private synchronized void updateServerState() {
  76.         //reconfigFlag初始化就为false
  77.         if (!reconfigFlag) {
  78.             setPeerState(ServerState.LOOKING);
  79.             LOG.warn("PeerState set to LOOKING");
  80.             return;
  81.         }
  82.         ...
  83.         //updateServerStatez方法被执行后又会重置为false
  84.         reconfigFlag = false;
  85.     }
  86.    
  87.     //开始Leader选举
  88.     //创建选举服务端QuorumCnxManager并启动监听 + 创建选举算法FastLeaderElection并启动
  89.     //将FastLeaderElection实例赋值给QuorumPeer.electionAlg
  90.     synchronized public void startLeaderElection() {
  91.         if (getPeerState() == ServerState.LOOKING) {
  92.             currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
  93.         }
  94.         ...
  95.         this.electionAlg = createElectionAlgorithm(electionType);
  96.     }
  97.     ...
  98. }
  99. public class Follower extends Learner{
  100.     ...
  101.     public void shutdown() {   
  102.         super.shutdown();
  103.     }
  104.    
  105.     void followLeader() throws InterruptedException {
  106.         ...
  107.         QuorumServer leaderServer = findLeader();
  108.         try {
  109.             //Follower启动时向Leader发起连接
  110.             connectToLeader(leaderServer.addr, leaderServer.hostname);
  111.             long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
  112.             ...
  113.             syncWithLeader(newEpochZxid);
  114.             ...
  115.             QuorumPacket qp = new QuorumPacket();
  116.             while (this.isRunning()) {
  117.                 //读取BIO输入流
  118.                 readPacket(qp);
  119.                 processPacket(qp);
  120.             }  
  121.         } catch (Exception e) {
  122.             //Leader故障,那么这里就会报异常,从而中断上面的while循环
  123.             LOG.warn("Exception when following the leader", e);
  124.             closeSocket();
  125.             pendingRevalidations.clear();
  126.         }
  127.         ...
  128.     }
  129.     ...
  130. }
  131. public class Learner {      
  132.     ...
  133.     public void shutdown() {
  134.         self.setZooKeeperServer(null);
  135.         self.closeAllConnections();
  136.         self.adminServer.setZooKeeperServer(null);
  137.         closeSocket();
  138.         //关闭比如FollowerZooKeeperServer
  139.         if (zk != null) {
  140.             //关闭sessionTracker的超时检查线程 + 设置firstProcessor为null + 清空zkDb
  141.             zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
  142.         }
  143.     }
  144.    
  145.     protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
  146.         //创建Learner当前BIO的客户端Socket
  147.         this.sock = createSocket();
  148.         int initLimitTime = self.tickTime * self.initLimit;
  149.         int remainingInitLimitTime = initLimitTime;
  150.         long startNanoTime = nanoTime();
  151.         //尝试重连最多5次
  152.         for (int tries = 0; tries < 5; tries++) {
  153.             try {
  154.                 remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
  155.                 if (remainingInitLimitTime <= 0) {
  156.                     LOG.error("initLimit exceeded on retries.");
  157.                     throw new IOException("initLimit exceeded on retries.");
  158.                 }
  159.                 //向Leader发起连接请求
  160.                 sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
  161.                 if (self.isSslQuorum())  {
  162.                     ((SSLSocket) sock).startHandshake();
  163.                 }
  164.                 sock.setTcpNoDelay(nodelay);
  165.                 break;
  166.             } catch (IOException e) {
  167.                 ...
  168.             }
  169.             Thread.sleep(1000);
  170.         }
  171.         self.authLearner.authenticate(sock, hostname);
  172.         //初始化输入输出流
  173.         leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
  174.         bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
  175.         leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
  176.     }
  177.    
  178.     private Socket createSocket() throws X509Exception, IOException {
  179.         //创建客户端Socket
  180.         Socket sock;
  181.         if (self.isSslQuorum()) {
  182.             sock = self.getX509Util().createSSLSocket();
  183.         } else {
  184.             sock = new Socket();
  185.         }
  186.         sock.setSoTimeout(self.tickTime * self.initLimit);
  187.         return sock;
  188.     }
  189.    
  190.     protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
  191.         //向BIO的服务端发起连接请求
  192.         sock.connect(addr, timeout);
  193.     }
  194.    
  195.     void readPacket(QuorumPacket pp) throws IOException {
  196.         synchronized (leaderIs) {
  197.             leaderIs.readRecord(pp, "packet");
  198.         }
  199.     }
  200.     ...
  201. }
  202. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  203.     ...
  204.     public synchronized void shutdown(boolean fullyShutDown) {
  205.         ...
  206.         //关闭会话的超时检查线程
  207.         if (sessionTracker != null) {
  208.             sessionTracker.shutdown();
  209.         }
  210.         //关闭firstProcessor线程
  211.         //所以Leader故障时,发送到Learner的请求会在firstProcessor的queuedRequests中存起来,暂时不处理
  212.         if (firstProcessor != null) {
  213.             firstProcessor.shutdown();
  214.         }
  215.         if (zkDb != null) {
  216.             if (fullyShutDown) {
  217.                 zkDb.clear();
  218.             } else {
  219.                 ...
  220.             }
  221.         }
  222.         unregisterJMX();
  223.     }
  224.     ...
  225. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册