大纲
1.zk是如何实现数据一致性的
(1)数据一致性分析
(2)实现数据一致性的广播模式
(3)实现数据一致性的恢复模式
2.zk是如何进行Leader选举的
(1)服务器启动时的Leader选举
(2)服务器运行时的Leader选举
(3)Leader选举的算法设计
(4)Leader选举的实现细节
1.zk是如何实现数据一致性的
(1)数据一致性分析
(2)实现数据一致性的广播模式
(3)实现数据一致性的恢复模式
zk集群中的服务器分为Leader服务器、Follower服务器及Observer服务器。Leader选举是一个过程,在这个过程中主要做了两项工作:
工作一:选举出Leader服务器
工作二:进行数据同步
zk中实现的一致性不是强一致性,而是最终一致性。即集群中各个服务器上的数据并不是每时每刻都保持一致的,而是即经过一段时间后,集群服务器上的数据才最终保持一致。
Leader服务器主要负责处理事务请求,当Leader服务器接收到客户端的事务请求时,会先向集群中的各机器针对该请求的提议发起投票询问。
(1)数据一致性分析
zk在集群中采取的是多数原则的方式来保证数据一致性。即当一个事务请求导致服务器上的数据发生改变时,只要保证多数机器的数据都正确变更了,就可保证系统数据一致性。
因为每个Follower服务器都可以看作是Leader服务器的数据副本,所以只要保证集群中大多数机器数据是一致的,那么在集群中个别机器出现故障时,zk集群依然能保证稳定运行。
(2)实现数据一致性的广播模式
一.首先Leader启动时会创建网络连接管理器LearnerCnxAcceptor等待Learner的连接
LearnerCnxAcceptor监听到Learner发起的连接后,会新建一个LearnerHandler实例专门负责Leader和该Learner之间的连接。启动LearnerHandler时,又会开启一个线程专门负责发送消息给Learner。如果Learner发生故障,那么Leader中为该Learner维护的LearnerHandler的ping()方法会检测到然后关闭相关线程和实例。- public class Leader {
- ...
- private final ServerSocket ss;
-
- Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
- ...
- //创建BIO的ServerSocket,监听端口,等待客户端发起连接
- ss = new ServerSocket();
- ss.bind(self.getQuorumAddress());
- ...
- }
-
- void lead() throws IOException, InterruptedException {
- ...
- cnxAcceptor = new LearnerCnxAcceptor();//网络连接器
- cnxAcceptor.start();
- ...
- while (true) {
- ...
- for (LearnerHandler f : getLearners()) {
- //Leader向Learner发出心跳检测
- f.ping();
- }
- ...
- }
- }
-
- class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
- ...
- public void run() {
- ...
- while (!stop) {
- //监听到客户端发起的连接,新建一个线程LearnerHandler专门进行处理
- Socket s = ss.accept();
- s.setSoTimeout(self.tickTime * self.initLimit);
- s.setTcpNoDelay(nodelay);
- BufferedInputStream is = new BufferedInputStream(s.getInputStream());
- LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
- fh.start();
- }
- ...
- }
- ...
- }
- ...
- }
- public class LearnerHandler extends ZooKeeperThread {
- ...
- public void run() {
- //先进行数据同步
- ...
- //开启线程发送信息给Learner
- startSendingPackets();
- ...
- //处理Learner发过来的消息,比如投票响应ACK消息、心跳响应PING消息等
- while (true) {
- qp = new QuorumPacket();
- ia.readRecord(qp, "packet");
- ...
- }
- }
-
- protected void startSendingPackets() {
- if (!sendingThreadStarted) {
- new Thread() {
- public void run() {
- Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
- try {
- sendPackets();
- } catch (InterruptedException e) {
- LOG.warn("Unexpected interruption " + e.getMessage());
- }
- }
- }.start();
- sendingThreadStarted = true;
- } else {
- LOG.error("Attempting to start sending thread after it already started");
- }
- }
-
- private void sendPackets() throws InterruptedException {
- long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
- while (true) {
- try {
- //从queuedPackets队列中提取消息出来发送给Learner
- QuorumPacket p = queuedPackets.poll();
- if (p == proposalOfDeath) {
- // Packet of death!
- break;
- }
- oa.writeRecord(p, "packet");
- } catch (IOException e) {
- //假如Leader在这里向Learner发送消息时,Learner故障了,那么就会在这里报错
- //此时,这里的报错并不影响对应的LearnerHandler实例和Leader实例
- if (!sock.isClosed()) {
- LOG.warn("Unexpected exception at " + this, e);
- try {
- sock.close();
- } catch(IOException ie) {
- LOG.warn("Error closing socket for handler " + this, ie);
- }
- }
- break;
- }
- }
- }
-
- public void ping() {
- if (!sendingThreadStarted) {
- return;
- }
- long id;
- if (syncLimitCheck.check(System.nanoTime())) {
- synchronized(leader) {
- id = leader.lastProposed;
- }
- QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
- queuePacket(ping);
- } else {
- LOG.warn("Closing connection to peer due to transaction timeout.");
- //Learner故障,那么就关闭当前Learner实例
- shutdown();
- }
- }
-
- public void shutdown() {
- // Send the packet of death
- try {
- queuedPackets.put(proposalOfDeath);
- } catch (InterruptedException e) {
- LOG.warn("Ignoring unexpected exception", e);
- }
- try {
- if (sock != null && !sock.isClosed()) {
- sock.close();
- }
- } catch (IOException e) {
- LOG.warn("Ignoring unexpected exception during socket close", e);
- }
- this.interrupt();
- leader.removeLearnerHandler(this);
- }
- ...
- }
复制代码 二.然后Leader处理Learner的事务投票响应后进行事务提交
Leader有一个HashSet为forwardingFollowers,用来管理Follower服务器。当Leader对一个事务请求发起Proposal提议的投票并发现投票通过后,也就是调用如下方法时:- Leader的processAck()方法 ->
- Leader的tryToCommit()方法 ->
- Leader的commit()方法 ->
- Leader的sendPacket()方法
复制代码 会在Leader的sendPacket()方法中遍历forwardingFollowers里的LearnerHandler实例,将Commit请求交给Learner和Leader建立连接时生成的LearnerHandler,最后由Leader的每个LearnerHandler实例广播给对应的Learner进行事务提交。- //1.Leader通过Leader.propose方法对事务请求生成Proposal提议并进行广播给所有Follower
- public class Leader {
- private final HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();
- final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
- ...
- public Proposal propose(Request request) throws XidRolloverException {
- ...
- byte[] data = SerializeUtils.serializeRequest(request);
- proposalStats.setLastBufferSize(data.length);
- QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
- //生成Proposal提议
- Proposal p = new Proposal();
- p.packet = pp;
- p.request = request;
-
- synchronized(this) {
- p.addQuorumVerifier(self.getQuorumVerifier());
- if (request.getHdr().getType() == OpCode.reconfig) {
- self.setLastSeenQuorumVerifier(request.qv, true);
- }
- if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
- p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
- }
- lastProposed = p.packet.getZxid();
- //将发送的Proposal提议放入outstandingProposals队列中
- outstandingProposals.put(lastProposed, p);
- //发送Proposal提议,其实就是把Proposal提议交给LearnerHandler处理
- sendPacket(pp);
- }
- return p;
- }
-
- void sendPacket(QuorumPacket qp) {
- synchronized (forwardingFollowers) {
- for (LearnerHandler f : forwardingFollowers) {
- //LearnerHandler会将提议放入其发送队列里
- f.queuePacket(qp);
- }
- }
- }
- ...
- }
- //2.Leader完成事务日志记录后,便会通过Leader.processAck方法记录Leader已对Proposal提议完成投票
- //SyncRequestProcessor的nextProcessor就是AckRequestProcessor
- class AckRequestProcessor implements RequestProcessor {
- ...
- public void processRequest(Request request) {
- ...
- //Leader也作为参与Proposal投票的一份子进行ACK响应
- //将Leader的SID添加到Proposal提议的投票收集器里 + 检查Proposal提议的投票收集器是否有过半ACK才提交
- leader.processAck(self.getId(), request.zxid, null);
- ...
- }
- }
- //3.Follower收到提议的投票请求后返回ACK响应给Leader
- //Leader接收到Follower的ACK响应后,便会通过Leader.processAck方法记录该Follower已对提议完成投票
- public class LearnerHandler extends ZooKeeperThread {
- //The packets to be sent to the learner
- final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
- ...
- @Override
- public void run() {
- ...
- startSendingPackets();//开启一个线程发送queuedPackets里的Packet给Learner
- ...
- while (true) {
- qp = new QuorumPacket();
- ia.readRecord(qp, "packet");//读取Learner的响应
- ...
- switch (qp.getType()) {
- case Leader.ACK:
- ...
- //如果Leader收到Follower对某Proposal提议请求返回的ACK响应
- //那么就将Follower的SID添加到该Proposal提议的投票收集器里
- leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
- break;
- ...
- }
- }
- }
-
- protected void startSendingPackets() {
- ...
- new Thread() {
- public void run() {
- Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
- sendPackets();
- }
- }.start();
- ...
- }
-
- private void sendPackets() throws InterruptedException {
- while (true) {
- ...
- QuorumPacket p = queuedPackets.poll();
- oa.writeRecord(p, "packet");
- ...
- }
- }
- ...
- }
- public class Leader {
- private final HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();
- final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
- ...
- synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
- ...
- //检查请求的ZXID,需要比上次已提交的请求的ZXID也就是lastCommitted要大
- if (lastCommitted >= zxid) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));
- }
- // The proposal has already been committed
- return;
- }
- Proposal p = outstandingProposals.get(zxid);
- //将Leader的SID添加到Proposal提议的投票收集器里
- p.addAck(sid);
- //尝试提交,即检查Proposal提议的投票收集器中是否有过半ACK响应
- boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
- ...
- }
-
- synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
- //如果提议队列中存在该提议的前一个提议,说明该提议的前一个提议还没提交,那么就返回false
- if (outstandingProposals.containsKey(zxid - 1)) return false;
- //getting a quorum from all necessary configurations.
- //Proposal提议的投票收集器是否已过半
- if (!p.hasAllQuorums()) {
- return false;
- }
- ...
- outstandingProposals.remove(zxid);
- if (p.request != null) {
- toBeApplied.add(p);
- }
- ...
- //一旦提议通过,马上就要在Leader中标记lastCommitted为最新的提交ZXID
- commit(zxid);//给Follower广播commit消息
- inform(p);//给Observer发送commit消息
- ...
- //调用CommitProcessor处理器的commit方法提交请求
- zk.commitProcessor.commit(p.request);//让Leader执行commit消息
- //下面处理的是Learner发起的同步请求
- if (pendingSyncs.containsKey(zxid)) {
- for (LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
- sendSync(r);
- }
- }
- return true;
- }
-
- //广播commit消息
- public void commit(long zxid) {
- synchronized(this) {
- //标记lastCommitted为最新的提交ZXID
- lastCommitted = zxid;
- }
- QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
- sendPacket(qp);
- }
-
- void sendPacket(QuorumPacket qp) {
- synchronized (forwardingFollowers) {
- for (LearnerHandler f : forwardingFollowers) {
- //调用LearnerHandler的queuePacket方法添加Packet到发送队列
- f.queuePacket(qp);
- }
- }
- }
-
- public void inform(Proposal proposal) {
- QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);
- sendObserverPacket(qp);
- }
- ...
- static public class Proposal extends SyncedLearnerTracker {
- public QuorumPacket packet;
- public Request request;
- ...
- }
- }
- public class SyncedLearnerTracker {
- protected ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();
- ...
- //添加到投票收集器
- public boolean addAck(Long sid) {
- boolean change = false;
- for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
- if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {
- qvAckset.getAckset().add(sid);
- change = true;
- }
- }
- return change;
- }
-
- //判断投票收集器是否过半
- public boolean hasAllQuorums() {
- for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
- if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
- return false;
- }
- return true;
- }
- ...
- }
复制代码 (3)实现数据一致性的恢复模式
当Leader故障时,Follower服务器会发生如下操作:首先Follower的followLeader()方法里的while循环会被中断运行,然后在QuorumPeer线程中就会触发执行Follower的shutdown()方法,接着执行QuorumPeer的updateServerState()方法更改节点的状态为LOOKING,之后Follower服务器在QuorumPeer线程中会重新进行Leader选举。
重新选举Leader需要经历一段时间,此时集群会短暂没有Leader服务器,而且重新选举Leader期间,Follower也会被关闭。
注意:Leader故障时,ZooKeeperServer的shutdown()方法会关闭firstProcessor线程。所以恢复模式下的选举过程中,发送到Learner的请求会进入firstProcessor,但是这些请求都会先被queuedRequests存起来,暂时不处理。- public class QuorumPeerMain {
- protected QuorumPeer quorumPeer;
-
- public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
- ...
- quorumPeer.start();
- quorumPeer.join();
- ...
- }
- ...
- }
- public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
- ...
- public synchronized void start() {
- loadDataBase();
- startServerCnxnFactory();
- adminServer.start();
- //初始化Leader选举(初始化当前投票+监听选举端口+启动选举守护线程)
- startLeaderElection();
- startJvmPauseMonitor();
- super.start();
- }
-
- @Override
- public void run() {
- ...
- while (running) {
- switch (getPeerState()) {
- case LOOKING:
- ...
- if (shuttingDownLE) {
- shuttingDownLE = false;
- startLeaderElection();
- }
- //调用QuorumPeer.electionAlg的lookForLeader(),也就是FastLeaderElection.lookForLeader()开启一轮选举
- setCurrentVote(makeLEStrategy().lookForLeader());
- ...
- case FOLLOWING:
- try {
- LOG.info("FOLLOWING");
- setFollower(makeFollower(logFactory));
- //Leader故障,那么就会中断follower.followLeader()里的的while循环
- follower.followLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- } finally {
- //Leader故障,就会执行这里的方法
- follower.shutdown();
- setFollower(null);
- updateServerState();
- }
- break;
- case LEADING:
- LOG.info("LEADING");
- try {
- setLeader(makeLeader(logFactory));
- leader.lead();
- setLeader(null);
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- } finally {
- if (leader != null) {
- leader.shutdown("Forcing shutdown");
- setLeader(null);
- }
- updateServerState();
- }
- break;
- }
- ...
- }
- ...
- }
-
- private synchronized void updateServerState() {
- //reconfigFlag初始化就为false
- if (!reconfigFlag) {
- setPeerState(ServerState.LOOKING);
- LOG.warn("PeerState set to LOOKING");
- return;
- }
- ...
- //updateServerStatez方法被执行后又会重置为false
- reconfigFlag = false;
- }
-
- //开始Leader选举
- //创建选举服务端QuorumCnxManager并启动监听 + 创建选举算法FastLeaderElection并启动
- //将FastLeaderElection实例赋值给QuorumPeer.electionAlg
- synchronized public void startLeaderElection() {
- if (getPeerState() == ServerState.LOOKING) {
- currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
- }
- ...
- this.electionAlg = createElectionAlgorithm(electionType);
- }
- ...
- }
- public class Follower extends Learner{
- ...
- public void shutdown() {
- super.shutdown();
- }
-
- void followLeader() throws InterruptedException {
- ...
- QuorumServer leaderServer = findLeader();
- try {
- //Follower启动时向Leader发起连接
- connectToLeader(leaderServer.addr, leaderServer.hostname);
- long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
- ...
- syncWithLeader(newEpochZxid);
- ...
- QuorumPacket qp = new QuorumPacket();
- while (this.isRunning()) {
- //读取BIO输入流
- readPacket(qp);
- processPacket(qp);
- }
- } catch (Exception e) {
- //Leader故障,那么这里就会报异常,从而中断上面的while循环
- LOG.warn("Exception when following the leader", e);
- closeSocket();
- pendingRevalidations.clear();
- }
- ...
- }
- ...
- }
- public class Learner {
- ...
- public void shutdown() {
- self.setZooKeeperServer(null);
- self.closeAllConnections();
- self.adminServer.setZooKeeperServer(null);
- closeSocket();
- //关闭比如FollowerZooKeeperServer
- if (zk != null) {
- //关闭sessionTracker的超时检查线程 + 设置firstProcessor为null + 清空zkDb
- zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
- }
- }
-
- protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
- //创建Learner当前BIO的客户端Socket
- this.sock = createSocket();
- int initLimitTime = self.tickTime * self.initLimit;
- int remainingInitLimitTime = initLimitTime;
- long startNanoTime = nanoTime();
- //尝试重连最多5次
- for (int tries = 0; tries < 5; tries++) {
- try {
- remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
- if (remainingInitLimitTime <= 0) {
- LOG.error("initLimit exceeded on retries.");
- throw new IOException("initLimit exceeded on retries.");
- }
- //向Leader发起连接请求
- sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
- if (self.isSslQuorum()) {
- ((SSLSocket) sock).startHandshake();
- }
- sock.setTcpNoDelay(nodelay);
- break;
- } catch (IOException e) {
- ...
- }
- Thread.sleep(1000);
- }
- self.authLearner.authenticate(sock, hostname);
- //初始化输入输出流
- leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
- bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
- leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
- }
-
- private Socket createSocket() throws X509Exception, IOException {
- //创建客户端Socket
- Socket sock;
- if (self.isSslQuorum()) {
- sock = self.getX509Util().createSSLSocket();
- } else {
- sock = new Socket();
- }
- sock.setSoTimeout(self.tickTime * self.initLimit);
- return sock;
- }
-
- protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
- //向BIO的服务端发起连接请求
- sock.connect(addr, timeout);
- }
-
- void readPacket(QuorumPacket pp) throws IOException {
- synchronized (leaderIs) {
- leaderIs.readRecord(pp, "packet");
- }
- }
- ...
- }
- public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
- ...
- public synchronized void shutdown(boolean fullyShutDown) {
- ...
- //关闭会话的超时检查线程
- if (sessionTracker != null) {
- sessionTracker.shutdown();
- }
- //关闭firstProcessor线程
- //所以Leader故障时,发送到Learner的请求会在firstProcessor的queuedRequests中存起来,暂时不处理
- if (firstProcessor != null) {
- firstProcessor.shutdown();
- }
- if (zkDb != null) {
- if (fullyShutDown) {
- zkDb.clear();
- } else {
- ...
- }
- }
- unregisterJMX();
- }
- ...
- }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |