吕清莹 发表于 2025-6-2 00:40:54

zk源码—6.Leader选举的实现原理

大纲
1.zk是如何实现数据一致性的
(1)数据一致性分析
(2)实现数据一致性的广播模式
(3)实现数据一致性的恢复模式
2.zk是如何进行Leader选举的
(1)服务器启动时的Leader选举
(2)服务器运行时的Leader选举
(3)Leader选举的算法设计
(4)Leader选举的实现细节
 
1.zk是如何实现数据一致性的
(1)数据一致性分析
(2)实现数据一致性的广播模式
(3)实现数据一致性的恢复模式
 
zk集群中的服务器分为Leader服务器、Follower服务器及Observer服务器。Leader选举是一个过程,在这个过程中主要做了两项工作:
工作一:选举出Leader服务器
工作二:进行数据同步
 
zk中实现的一致性不是强一致性,而是最终一致性。即集群中各个服务器上的数据并不是每时每刻都保持一致的,而是即经过一段时间后,集群服务器上的数据才最终保持一致。
 
Leader服务器主要负责处理事务请求,当Leader服务器接收到客户端的事务请求时,会先向集群中的各机器针对该请求的提议发起投票询问。
 
(1)数据一致性分析
zk在集群中采取的是多数原则的方式来保证数据一致性。即当一个事务请求导致服务器上的数据发生改变时,只要保证多数机器的数据都正确变更了,就可保证系统数据一致性。
 
因为每个Follower服务器都可以看作是Leader服务器的数据副本,所以只要保证集群中大多数机器数据是一致的,那么在集群中个别机器出现故障时,zk集群依然能保证稳定运行。
 
(2)实现数据一致性的广播模式
一.首先Leader启动时会创建网络连接管理器LearnerCnxAcceptor等待Learner的连接
LearnerCnxAcceptor监听到Learner发起的连接后,会新建一个LearnerHandler实例专门负责Leader和该Learner之间的连接。启动LearnerHandler时,又会开启一个线程专门负责发送消息给Learner。如果Learner发生故障,那么Leader中为该Learner维护的LearnerHandler的ping()方法会检测到然后关闭相关线程和实例。
public class Leader {
    ...
    private final ServerSocket ss;
   
    Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
      ...
      //创建BIO的ServerSocket,监听端口,等待客户端发起连接
      ss = new ServerSocket();
      ss.bind(self.getQuorumAddress());
      ...
    }
   
    void lead() throws IOException, InterruptedException {
      ...
      cnxAcceptor = new LearnerCnxAcceptor();//网络连接器
      cnxAcceptor.start();
      ...
      while (true) {
            ...
            for (LearnerHandler f : getLearners()) {
                //Leader向Learner发出心跳检测
                f.ping();
            }
            ...
      }
    }
   
    class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
      ...
      public void run() {
            ...
            while (!stop) {
                //监听到客户端发起的连接,新建一个线程LearnerHandler专门进行处理
                Socket s = ss.accept();
                s.setSoTimeout(self.tickTime * self.initLimit);
                s.setTcpNoDelay(nodelay);
                BufferedInputStream is = new BufferedInputStream(s.getInputStream());
                LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                fh.start();
            }
            ...
      }
      ...
    }
    ...
}

public class LearnerHandler extends ZooKeeperThread {
    ...
    public void run() {
      //先进行数据同步
      ...
      //开启线程发送信息给Learner
      startSendingPackets();
      ...
      //处理Learner发过来的消息,比如投票响应ACK消息、心跳响应PING消息等
      while (true) {
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            ...
      }
    }
   
    protected void startSendingPackets() {
      if (!sendingThreadStarted) {
            new Thread() {
                public void run() {
                  Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
                  try {
                        sendPackets();
                  } catch (InterruptedException e) {
                        LOG.warn("Unexpected interruption " + e.getMessage());
                  }
                }
            }.start();
            sendingThreadStarted = true;
      } else {
            LOG.error("Attempting to start sending thread after it already started");
      }
    }
   
    private void sendPackets() throws InterruptedException {
      long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
      while (true) {
            try {
                //从queuedPackets队列中提取消息出来发送给Learner
                QuorumPacket p = queuedPackets.poll();
                if (p == proposalOfDeath) {
                  // Packet of death!
                  break;
                }
                oa.writeRecord(p, "packet");
            } catch (IOException e) {
                //假如Leader在这里向Learner发送消息时,Learner故障了,那么就会在这里报错
                //此时,这里的报错并不影响对应的LearnerHandler实例和Leader实例
                if (!sock.isClosed()) {
                  LOG.warn("Unexpected exception at " + this, e);
                  try {
                        sock.close();
                  } catch(IOException ie) {
                        LOG.warn("Error closing socket for handler " + this, ie);
                  }
                }
                break;
            }
      }
    }
   
    public void ping() {
      if (!sendingThreadStarted) {
            return;
      }
      long id;
      if (syncLimitCheck.check(System.nanoTime())) {
            synchronized(leader) {
                id = leader.lastProposed;
            }
            QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
            queuePacket(ping);
      } else {
            LOG.warn("Closing connection to peer due to transaction timeout.");
            //Learner故障,那么就关闭当前Learner实例
            shutdown();
      }
    }
   
    public void shutdown() {
      // Send the packet of death
      try {
            queuedPackets.put(proposalOfDeath);
      } catch (InterruptedException e) {
            LOG.warn("Ignoring unexpected exception", e);
      }
      try {
            if (sock != null && !sock.isClosed()) {
                sock.close();
            }
      } catch (IOException e) {
            LOG.warn("Ignoring unexpected exception during socket close", e);
      }
      this.interrupt();
      leader.removeLearnerHandler(this);
    }
    ...
}二.然后Leader处理Learner的事务投票响应后进行事务提交
Leader有一个HashSet为forwardingFollowers,用来管理Follower服务器。当Leader对一个事务请求发起Proposal提议的投票并发现投票通过后,也就是调用如下方法时:
Leader的processAck()方法 ->
Leader的tryToCommit()方法 ->
Leader的commit()方法 ->
Leader的sendPacket()方法会在Leader的sendPacket()方法中遍历forwardingFollowers里的LearnerHandler实例,将Commit请求交给Learner和Leader建立连接时生成的LearnerHandler,最后由Leader的每个LearnerHandler实例广播给对应的Learner进行事务提交。
//1.Leader通过Leader.propose方法对事务请求生成Proposal提议并进行广播给所有Follower
public class Leader {
    private final HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();
    final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
    ...
    public Proposal propose(Request request) throws XidRolloverException {
      ...
      byte[] data = SerializeUtils.serializeRequest(request);
      proposalStats.setLastBufferSize(data.length);
      QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
      //生成Proposal提议
      Proposal p = new Proposal();
      p.packet = pp;
      p.request = request;
      
      synchronized(this) {
            p.addQuorumVerifier(self.getQuorumVerifier());
            if (request.getHdr().getType() == OpCode.reconfig) {
                self.setLastSeenQuorumVerifier(request.qv, true);                     
            }
            if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
                p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }
            lastProposed = p.packet.getZxid();
            //将发送的Proposal提议放入outstandingProposals队列中
            outstandingProposals.put(lastProposed, p);
            //发送Proposal提议,其实就是把Proposal提议交给LearnerHandler处理
            sendPacket(pp);
      }
      return p;
    }
   
    void sendPacket(QuorumPacket qp) {
      synchronized (forwardingFollowers) {
            for (LearnerHandler f : forwardingFollowers) {
                //LearnerHandler会将提议放入其发送队列里
                f.queuePacket(qp);
            }
      }
    }
    ...
}

//2.Leader完成事务日志记录后,便会通过Leader.processAck方法记录Leader已对Proposal提议完成投票
//SyncRequestProcessor的nextProcessor就是AckRequestProcessor
class AckRequestProcessor implements RequestProcessor {
    ...
    public void processRequest(Request request) {
      ...
      //Leader也作为参与Proposal投票的一份子进行ACK响应
      //将Leader的SID添加到Proposal提议的投票收集器里 + 检查Proposal提议的投票收集器是否有过半ACK才提交
      leader.processAck(self.getId(), request.zxid, null);
      ...
    }
}

//3.Follower收到提议的投票请求后返回ACK响应给Leader
//Leader接收到Follower的ACK响应后,便会通过Leader.processAck方法记录该Follower已对提议完成投票
public class LearnerHandler extends ZooKeeperThread {
    //The packets to be sent to the learner
    final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
    ...
    @Override
    public void run() {
      ...
      startSendingPackets();//开启一个线程发送queuedPackets里的Packet给Learner
      ...
      while (true) {
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");//读取Learner的响应
            ...
            switch (qp.getType()) {
                case Leader.ACK:
                  ...
                  //如果Leader收到Follower对某Proposal提议请求返回的ACK响应
                  //那么就将Follower的SID添加到该Proposal提议的投票收集器里
                  leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                  break;
                ...
            }
      }
    }
   
    protected void startSendingPackets() {
      ...
      new Thread() {
            public void run() {
                Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
                sendPackets();
            }
      }.start();
      ...
    }
   
    private void sendPackets() throws InterruptedException {
      while (true) {
            ...
            QuorumPacket p = queuedPackets.poll();
            oa.writeRecord(p, "packet");
            ...
      }
    }
    ...
}

public class Leader {
    private final HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();
    final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
    ...
    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {      
      ...
      //检查请求的ZXID,需要比上次已提交的请求的ZXID也就是lastCommitted要大
      if (lastCommitted >= zxid) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));
            }
            // The proposal has already been committed
            return;
      }
      Proposal p = outstandingProposals.get(zxid);
      //将Leader的SID添加到Proposal提议的投票收集器里
      p.addAck(sid);
      //尝试提交,即检查Proposal提议的投票收集器中是否有过半ACK响应
      boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
      ...
    }
   
    synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {      
      //如果提议队列中存在该提议的前一个提议,说明该提议的前一个提议还没提交,那么就返回false
      if (outstandingProposals.containsKey(zxid - 1)) return false;
      //getting a quorum from all necessary configurations.
      //Proposal提议的投票收集器是否已过半
      if (!p.hasAllQuorums()) {
            return false;               
      }
      ...
      outstandingProposals.remove(zxid);
      if (p.request != null) {
            toBeApplied.add(p);
      }
      ...
      //一旦提议通过,马上就要在Leader中标记lastCommitted为最新的提交ZXID
      commit(zxid);//给Follower广播commit消息
      inform(p);//给Observer发送commit消息
      ...
      //调用CommitProcessor处理器的commit方法提交请求
      zk.commitProcessor.commit(p.request);//让Leader执行commit消息
      //下面处理的是Learner发起的同步请求
      if (pendingSyncs.containsKey(zxid)) {
            for (LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                sendSync(r);
            }               
      }
      returntrue;   
    }
   
    //广播commit消息
    public void commit(long zxid) {
      synchronized(this) {
            //标记lastCommitted为最新的提交ZXID
            lastCommitted = zxid;
      }
      QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
      sendPacket(qp);
    }
   
    void sendPacket(QuorumPacket qp) {
      synchronized (forwardingFollowers) {
            for (LearnerHandler f : forwardingFollowers) {
                //调用LearnerHandler的queuePacket方法添加Packet到发送队列
                f.queuePacket(qp);
            }
      }
    }
   
    public void inform(Proposal proposal) {
      QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);
      sendObserverPacket(qp);
    }
    ...
    static public class Proposal extends SyncedLearnerTracker {
      public QuorumPacket packet;
      public Request request;
      ...
    }
}

public class SyncedLearnerTracker {
    protected ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();
    ...
    //添加到投票收集器
    public boolean addAck(Long sid) {
      boolean change = false;
      for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
            if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {
                qvAckset.getAckset().add(sid);
                change = true;
            }
      }
      return change;
    }
   
    //判断投票收集器是否过半
    public boolean hasAllQuorums() {
      for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
            if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
                return false;
      }
      return true;
    }
    ...
}(3)实现数据一致性的恢复模式
当Leader故障时,Follower服务器会发生如下操作:首先Follower的followLeader()方法里的while循环会被中断运行,然后在QuorumPeer线程中就会触发执行Follower的shutdown()方法,接着执行QuorumPeer的updateServerState()方法更改节点的状态为LOOKING,之后Follower服务器在QuorumPeer线程中会重新进行Leader选举。
 
重新选举Leader需要经历一段时间,此时集群会短暂没有Leader服务器,而且重新选举Leader期间,Follower也会被关闭。
 
注意:Leader故障时,ZooKeeperServer的shutdown()方法会关闭firstProcessor线程。所以恢复模式下的选举过程中,发送到Learner的请求会进入firstProcessor,但是这些请求都会先被queuedRequests存起来,暂时不处理。
public class QuorumPeerMain {
    protected QuorumPeer quorumPeer;
   
    public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
      ...
      quorumPeer.start();
      quorumPeer.join();
      ...
    }
    ...
}

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
    ...
    public synchronized void start() {
      loadDataBase();
      startServerCnxnFactory();
      adminServer.start();
      //初始化Leader选举(初始化当前投票+监听选举端口+启动选举守护线程)
      startLeaderElection();
      startJvmPauseMonitor();
      super.start();
    }
   
    @Override
    public void run() {
      ...
      while (running) {
            switch (getPeerState()) {
                case LOOKING:
                  ...
                  if (shuttingDownLE) {
                        shuttingDownLE = false;
                        startLeaderElection();
                  }
                  //调用QuorumPeer.electionAlg的lookForLeader(),也就是FastLeaderElection.lookForLeader()开启一轮选举
                  setCurrentVote(makeLEStrategy().lookForLeader());
                  ...
                case FOLLOWING:
                  try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        //Leader故障,那么就会中断follower.followLeader()里的的while循环
                        follower.followLeader();
                  } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                  } finally {
                        //Leader故障,就会执行这里的方法
                        follower.shutdown();
                        setFollower(null);
                        updateServerState();
                  }
                  break;
                case LEADING:
                  LOG.info("LEADING");
                  try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                  } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                  } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        updateServerState();
                  }
                  break;
            }
            ...
      }
      ...
    }
   
    private synchronized void updateServerState() {
      //reconfigFlag初始化就为false
      if (!reconfigFlag) {
            setPeerState(ServerState.LOOKING);
            LOG.warn("PeerState set to LOOKING");
            return;
      }
      ...
      //updateServerStatez方法被执行后又会重置为false
      reconfigFlag = false;
    }
   
    //开始Leader选举
    //创建选举服务端QuorumCnxManager并启动监听 + 创建选举算法FastLeaderElection并启动
    //将FastLeaderElection实例赋值给QuorumPeer.electionAlg
    synchronized public void startLeaderElection() {
      if (getPeerState() == ServerState.LOOKING) {
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
      }
      ...
      this.electionAlg = createElectionAlgorithm(electionType);
    }
    ...
}

public class Follower extends Learner{
    ...
    public void shutdown() {   
      super.shutdown();
    }
   
    void followLeader() throws InterruptedException {
      ...
      QuorumServer leaderServer = findLeader();
      try {
            //Follower启动时向Leader发起连接
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
            ...
            syncWithLeader(newEpochZxid);
            ...
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                //读取BIO输入流
                readPacket(qp);
                processPacket(qp);
            }
      } catch (Exception e) {
            //Leader故障,那么这里就会报异常,从而中断上面的while循环
            LOG.warn("Exception when following the leader", e);
            closeSocket();
            pendingRevalidations.clear();
      }
      ...
    }
    ...
}

public class Learner {      
    ...
    public void shutdown() {
      self.setZooKeeperServer(null);
      self.closeAllConnections();
      self.adminServer.setZooKeeperServer(null);
      closeSocket();
      //关闭比如FollowerZooKeeperServer
      if (zk != null) {
            //关闭sessionTracker的超时检查线程 + 设置firstProcessor为null + 清空zkDb
            zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
      }
    }
   
    protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
      //创建Learner当前BIO的客户端Socket
      this.sock = createSocket();
      int initLimitTime = self.tickTime * self.initLimit;
      int remainingInitLimitTime = initLimitTime;
      long startNanoTime = nanoTime();
      //尝试重连最多5次
      for (int tries = 0; tries < 5; tries++) {
            try {
                remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
                if (remainingInitLimitTime <= 0) {
                  LOG.error("initLimit exceeded on retries.");
                  throw new IOException("initLimit exceeded on retries.");
                }
                //向Leader发起连接请求
                sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
                if (self.isSslQuorum()){
                  ((SSLSocket) sock).startHandshake();
                }
                sock.setTcpNoDelay(nodelay);
                break;
            } catch (IOException e) {
                ...
            }
            Thread.sleep(1000);
      }
      self.authLearner.authenticate(sock, hostname);
      //初始化输入输出流
      leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
      bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
      leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
    }
   
    private Socket createSocket() throws X509Exception, IOException {
      //创建客户端Socket
      Socket sock;
      if (self.isSslQuorum()) {
            sock = self.getX509Util().createSSLSocket();
      } else {
            sock = new Socket();
      }
      sock.setSoTimeout(self.tickTime * self.initLimit);
      return sock;
    }
   
    protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
      //向BIO的服务端发起连接请求
      sock.connect(addr, timeout);
    }
   
    void readPacket(QuorumPacket pp) throws IOException {
      synchronized (leaderIs) {
            leaderIs.readRecord(pp, "packet");
      }
    }
    ...
}

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
    ...
    public synchronized void shutdown(boolean fullyShutDown) {
      ...
      //关闭会话的超时检查线程
      if (sessionTracker != null) {
            sessionTracker.shutdown();
      }
      //关闭firstProcessor线程
      //所以Leader故障时,发送到Learner的请求会在firstProcessor的queuedRequests中存起来,暂时不处理
      if (firstProcessor != null) {
            firstProcessor.shutdown();
      }
      if (zkDb != null) {
            if (fullyShutDown) {
                zkDb.clear();
            } else {
                ...
            }
      }
      unregisterJMX();
    }
    ...


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: zk源码—6.Leader选举的实现原理