找回密码
 立即注册
首页 业界区 业界 zk源码—3.单机和集群通信原理

zk源码—3.单机和集群通信原理

能杜孱 2025-6-1 23:14:59
大纲
1.单机版的zk服务端的启动过程
(1)预启动阶段
(2)初始化阶段
2.集群版的zk服务端的启动过程
(1)预启动阶段
(2)初始化阶段
(3)Leader选举阶段
(4)Leader和Follower启动阶段
 
1.单机版的zk服务端的启动过程
(1)预启动阶段
(2)初始化阶段
 
单机版zk服务端的启动,主要分为两个阶段:预启动阶段和初始化阶段,其启动流程图如下:
1.webp
接下来介绍zk服务端的预启动阶段(启动管理)与初始化阶段的具体流程,也就是单机版的zk服务端是如何从初始化到对外提供服务的。
 
(1)预启动阶段
在zk服务端进行初始化之前,首先要对配置文件等信息进行解析和载入,而zk服务端的预启动阶段的主要工作流程如下:
 
一.启动QuorumPeerMain入口程序
二.解析zoo.cfg配置文件
三.创建和启动历史文件清理器
四.根据配置判断是集群模式还是单机模式
 
  1. public class QuorumPeerMain {
  2.     protected QuorumPeer quorumPeer;
  3.     ...
  4.     //1.启动程序入口
  5.     public static void main(String[] args) {
  6.         QuorumPeerMain main = new QuorumPeerMain();
  7.         try {
  8.             //启动程序
  9.             main.initializeAndRun(args);
  10.         } catch (IllegalArgumentException e) {
  11.             ...
  12.         }
  13.         LOG.info("Exiting normally");
  14.         System.exit(0);
  15.     }
  16.    
  17.     protected void initializeAndRun(String[] args) {
  18.         QuorumPeerConfig config = new QuorumPeerConfig();
  19.         if (args.length == 1) {
  20.             //2.解析配置文件
  21.             config.parse(args[0]);
  22.         }
  23.         //3.创建和启动历史文件清理器
  24.         DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
  25.         purgeMgr.start();
  26.         //4.根据配置判断是集群模式还是单机模式
  27.         if (args.length == 1 && config.isDistributed()) {
  28.             //集群模式
  29.             runFromConfig(config);
  30.         } else {
  31.             //单机模式
  32.             ZooKeeperServerMain.main(args);
  33.         }
  34.     }
  35.     ...
  36. }
  37. public class ZooKeeperServerMain {
  38.     private ServerCnxnFactory cnxnFactory;
  39.     ...
  40.     public static void main(String[] args) {
  41.         ZooKeeperServerMain main = new ZooKeeperServerMain();
  42.         try {
  43.             //启动程序
  44.             main.initializeAndRun(args);
  45.         } catch (IllegalArgumentException e) {
  46.             ...
  47.         }
  48.         LOG.info("Exiting normally");
  49.         System.exit(0);
  50.     }
  51.    
  52.     protected void initializeAndRun(String[] args) {
  53.         ...
  54.         ServerConfig config = new ServerConfig();
  55.         //2.解析配置文件
  56.         if (args.length == 1) {
  57.             config.parse(args[0]);
  58.         } else {
  59.             config.parse(args);
  60.         }
  61.         runFromConfig(config);
  62.     }
  63.    
  64.     //以下是初始化阶段的内容
  65.     public void runFromConfig(ServerConfig config) {
  66.         ...
  67.         txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
  68.         final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
  69.         ...   
  70.     }
  71.     ...
  72. }
  73. public class DatadirCleanupManager {
  74.     //配置zoo.cfg文件中的autopurge.snapRetainCount和autopurge.purgeInterval实现数据快照文件的定时清理;
  75.     private final File snapDir;//数据快照地址
  76.     private final File dataLogDir;//事务日志地址
  77.     //配置zoo.cfg文件中的autopurge.snapRetainCount,可指定需要保留的文件数目,默认是保留3个;
  78.     private final int snapRetainCount;//需要保留的文件数目
  79.     //配置zoo.cfg文件中的autopurge.purgeInterval可指定清理频率,以小时为单位,默认0表示不开启自动清理功能;
  80.     private final int purgeInterval;//清理频率
  81.     private Timer timer;
  82.     ...
  83. }
复制代码
一.启动程序
QuorumPeerMain类是zk服务的启动入口,可理解为Java中的main函数。通常我们执行zkServer.sh脚本启动zk服务时,就会运行这个类。QuorumPeerMain的main()方法会调用它的initializeAndRun()方法来启动程序。
 
二.解析zoo.cfg配置文件
在QuorumPeerMain的main()方法中,会执行它的initializeAndRun()方法。
 
在QuorumPeerMain的initializeAndRun()方法中,便会解析zoo.cfg配置文件。
 
在ZooKeeperServerMain的initializeAndRun()方法中,也会解析zoo.cfg配置文件。
 
zoo.cfg配置文件配置了zk运行时的基本参数,包括tickTime、dataDir等。
 
三.创建和启动历史文件清理器
文件清理器在日常的使用中非常重要。面对大流量的网络访问,zk会产生海量的数据。如果磁盘数据过多或者磁盘空间不足,可能会导致zk服务端不能正常运行,所以zk采用DatadirCleanupManager类去清理历史文件。
 
其中DatadirCleanupManager类有5个属性,如上代码所示。DatadirCleanupManager会对事务日志和数据快照文件进行定时清理,这种自动清理历史数据文件的机制可以尽量避免zk磁盘空间的浪费。
 
四.判断集群模式还是单机模式
根据从zoo.cfg文件解析出来的集群服务器地址列表来判断是否是单机模式。如果是单机模式,则会调用ZooKeeperServerMain的main()方法来进行启动。如果是集群模式,则会调用QuorumPeerMain的runFromConfig()方法来进行启动。
 
(2)初始化阶段
初始化阶段会根据预启动解析出的配置信息,初始化服务器实例。该阶段的主要工作流程如下:
 
一.创建数据持久化工具实例FileTxnSnapLog
二.创建服务端统计工具实例ServerStats
三.根据两个工具实例创建单机版服务器实例
 
四.创建网络连接工厂实例
五.初始化网络连接工厂实例
六.启动网络连接工厂实例的线程
 
七.恢复单机版服务器实例的本地数据
八.创建并启动服务器实例的会话管理器
九.初始化单机版服务器实例的请求处理链
十.注册单机版服务器实例到网络连接工厂实例
 
单机版服务器实例:ZooKeeperServer
网络连接工厂实例:ServerCnxnFactory
会话管理器:SessionTracker
 
  1. public class ZooKeeperServerMain {
  2.     private ServerCnxnFactory cnxnFactory;
  3.     ...
  4.     //以下是初始化阶段的内容
  5.     public void runFromConfig(ServerConfig config) {
  6.         ...
  7.         //1.创建zk数据管理器——持久化工具类FileTxnSnapLog的实例
  8.         txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
  9.         //2.创建zk服务运行统计器——统计工具类ServerStats的实例
  10.         //3.创建服务器实例ZooKeeperServer
  11.         final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
  12.         txnLog.setServerStats(zkServer.serverStats());  
  13.         ...
  14.         //4.创建服务端网络连接工厂实例ServerCnxnFactory
  15.         cnxnFactory = ServerCnxnFactory.createFactory();
  16.         cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
  17.         cnxnFactory.startup(zkServer);
  18.         ...
  19.     }
  20.     ...
  21. }
  22. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  23.     private final ServerStats serverStats;
  24.     private FileTxnSnapLog txnLogFactory = null;
  25.     private ZKDatabase zkDb;
  26.     protected int tickTime = DEFAULT_TICK_TIME;
  27.     private final ZooKeeperServerListener listener;
  28.     ...
  29.     public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) {
  30.         //2.创建服务端统计工具ServerStats实例
  31.         serverStats = new ServerStats(this);
  32.         this.txnLogFactory = txnLogFactory;
  33.         this.txnLogFactory.setServerStats(this.serverStats);
  34.         this.zkDb = zkDb;
  35.         this.tickTime = tickTime;
  36.         setMinSessionTimeout(minSessionTimeout);
  37.         setMaxSessionTimeout(maxSessionTimeout);
  38.         listener = new ZooKeeperServerListenerImpl(this);
  39.     }
  40.     ...
  41. }
复制代码
一.创建数据持久化工具实例FileTxnSnapLog
可以通过FileTxnSnapLog对zk服务器的内存数据进行持久化,具体会将内存数据持久化到配置文件里的事务日志文件 + 快照数据文件中。
 
所以在执行ZooKeeperServerMain的runFromConfig()方法启动zk服务端时,首先会根据zoo.cfg配置文件中的dataDir数据快照目录和dataLogDir事务日志目录,通过"new FileTxnSnapLog()"来创建持久化工具类FileTxnSnapLog的实例。
 
二.创建服务端统计工具实例ServerStats
ServerStats用于统计zk服务端运行时的状态信息,主要统计的数据包括:服务端向客户端发送的响应包次数、接收客户端发送的请求包次数、服务端处理请求的延迟情况、处理客户端的请求次数。
 
在执行ZooKeeperServerMain.runFromConfig()方法时,执行到ZooKeeperServer的构造方法就会首先创建ServerStats实例。
 
三.根据两个工具实例创建单机版服务器实例
ZooKeeperServer是单机版服务端的核心实体类。在执行ZooKeeperServerMain.runFromConfig()方法时,创建完zk数据管理器——持久化工具类FileTxnSnapLog的实例后,就会通过"new ZooKeeperServer()"来创建单机版服务器实例ZooKeeperServer。
 
此时会传入从zoo.cfg配置文件中解析出的tickTime和会话超时时间来创建服务器实例。创建完服务器实例ZooKeeperServer后,接下来才会对该ZooKeeperServer服务器实例进行更多的初始化工作,包括网络连接器、内存数据库和请求处理器等组件的初始化。
 
四.创建网络连接工厂实例
zk中客户端和服务端的网络通信,本质是通过Java的IO数据流进行通信的。zk一开始就是使用自己实现的NIO进行网络通信的,但之后引入了Netty框架来满足不同使用情况下的需求。
 
在执行ZooKeeperServerMain的runFromConfig()方法时,创建完服务器实例ZooKeeperServer后,就会通过ServerCnxnFactory的createFactory()方法来创建服务端网络连接工厂实例ServerCnxnFactory。
 
ServerCnxnFactory的createFactory()方法首先会获取配置值,判断是使用NIO还是使用Netty,然后再通过反射去实例化服务端网络连接工厂。
 
可以通过配置zookeeper.serverCnxnFactory来指定使用:zk自己实现的NIO还是Netty框架,来构建服务端网络连接工厂ServerCnxnFactory。
  1. public abstract class ServerCnxnFactory {
  2.     ...
  3.     static public ServerCnxnFactory createFactory() throws IOException {
  4.         String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
  5.         if (serverCnxnFactoryName == null) {
  6.             serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
  7.         }
  8.         ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();
  9.         return serverCnxnFactory;
  10.     }
  11.     ...
  12. }
复制代码
五.初始化网络连接工厂实例
在执行ZooKeeperServerMain的runFromConfig()方法时,创建完服务端网络连接工厂ServerCnxnFactory实例后,就会调用网络连接工厂ServerCnxnFactory的configure()方法来初始化网络连接工厂ServerCnxnFactory实例。
 
这里以NIOServerCnxnFactory的configure()方法为例,该方法主要会启动一个NIO服务器,以及创建三类线程:
 
一.处理客户端连接的AcceptThread线程
二.处理客户端请求的一批SelectorThread线程
三.处理过期连接的ConnectionExpirerThread线程
 
初始化完ServerCnxnFactory实例后,虽然此时NIO服务器已对外开放端口,客户端也能访问到2181端口,但此时zk服务端还不能正常处理客户端请求。
  1. public class NIOServerCnxnFactory extends ServerCnxnFactory {
  2.     //最大客户端连接数
  3.     protected int maxClientCnxns = 60;
  4.     //处理过期连接的线程
  5.     private ConnectionExpirerThread expirerThread;
  6.     //处理客户端建立连接的线程
  7.     private AcceptThread acceptThread;
  8.     //处理客户端请求的线程
  9.     private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
  10.     //会话过期相关
  11.     int sessionlessCnxnTimeout;
  12.     //连接过期队列
  13.     private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;
  14.     //selector线程数,CPU核数的一半
  15.     private int numSelectorThreads;
  16.     //工作线程数
  17.     private int numWorkerThreads;
  18.     ...
  19.     public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
  20.         ...
  21.         maxClientCnxns = maxcc;
  22.         sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
  23.         //连接过期队列
  24.         cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
  25.         //创建一个自动处理过期连接的ConnectionExpirerThread线程
  26.         expirerThread = new ConnectionExpirerThread();
  27.         int numCores = Runtime.getRuntime().availableProcessors();
  28.         numSelectorThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores/2), 1));
  29.         numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
  30.         ...
  31.         //创建一批SelectorThread线程
  32.         for (int i=0; i<numSelectorThreads; ++i) {
  33.             selectorThreads.add(new SelectorThread(i));
  34.         }
  35.         //打开ServerSocketChannel
  36.         this.ss = ServerSocketChannel.open();
  37.         ss.socket().setReuseAddress(true);
  38.         //绑定端口,启动NIO服务器
  39.         ss.socket().bind(addr);
  40.         ss.configureBlocking(false);
  41.         //创建一个AcceptThread线程
  42.         acceptThread = new AcceptThread(ss, addr, selectorThreads);
  43.     }
  44.     ...
  45. }
复制代码
七.恢复单机版服务器实例的本地数据
启动zk服务端需要从本地快照数据文件 + 事务日志文件中进行数据恢复。在执行ZooKeeperServerMain的runFromConfig()方法时,调用完ServerCnxnFactory的startup()方法启动ServerCnxnFactory的线程后,就会调用单机版服务器实例ZooKeeperServer的startdata()方法来恢复本地数据。
  1. public class NIOServerCnxnFactory extends ServerCnxnFactory {
  2.     private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;//连接的过期队列
  3.     ...
  4.     public void startup(ZooKeeperServer zks, boolean startServer) {
  5.         //6.启动各种线程
  6.         start();
  7.         setZooKeeperServer(zks);
  8.         if (startServer) {
  9.             //7.恢复本地数据
  10.             zks.startdata();
  11.             //8.创建并启动会话管理器SessionTracker
  12.             //9.初始化zk的请求处理链
  13.             //10.注册zk服务器实例
  14.             zks.startup();
  15.         }
  16.     }
  17.    
  18.     public void start() {
  19.         stopped = false;
  20.         if (workerPool == null) {
  21.             workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
  22.         }
  23.         for (SelectorThread thread : selectorThreads) {
  24.             if (thread.getState() == Thread.State.NEW) {
  25.                 thread.start();
  26.             }
  27.         }
  28.         if (acceptThread.getState() == Thread.State.NEW) {
  29.             acceptThread.start();
  30.         }
  31.         if (expirerThread.getState() == Thread.State.NEW) {
  32.             expirerThread.start();
  33.         }
  34.     }
  35.     ...
  36.     //用来处理过期连接,会启动一个超时检查线程来检查连接是否过期
  37.     private class ConnectionExpirerThread extends ZooKeeperThread {
  38.         ConnectionExpirerThread() {
  39.             super("ConnnectionExpirer");
  40.         }
  41.         @Override
  42.         public void run() {
  43.             while (!stopped) {
  44.                 //使用了分桶管理策略
  45.                 long waitTime = cnxnExpiryQueue.getWaitTime();
  46.                 if (waitTime > 0) {
  47.                     Thread.sleep(waitTime);
  48.                     continue;
  49.                 }
  50.                 for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
  51.                     conn.close();
  52.                 }
  53.             }
  54.         }
  55.     }
  56.    
  57.     //用来处理要建立连接的客户端OP_ACCEPT请求
  58.     private class AcceptThread extends AbstractSelectThread {
  59.         private final ServerSocketChannel acceptSocket;
  60.         private final SelectionKey acceptKey;
  61.         private final Collection<SelectorThread> selectorThreads;
  62.         private Iterator<SelectorThread> selectorIterator;
  63.         ...
  64.         public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {
  65.             super("NIOServerCxnFactory.AcceptThread:" + addr);
  66.             this.acceptSocket = ss;
  67.             this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
  68.             this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));
  69.             selectorIterator = this.selectorThreads.iterator();
  70.         }
  71.         
  72.         @Override
  73.         public void run() {
  74.             ...
  75.             while (!stopped && !acceptSocket.socket().isClosed()) {
  76.                 select();
  77.             }
  78.             ...
  79.         }
  80.          
  81.         private void select() {
  82.             selector.select();
  83.             Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
  84.             while (!stopped && selectedKeys.hasNext()) {
  85.                 SelectionKey key = selectedKeys.next();
  86.                 selectedKeys.remove();
  87.                 if (!key.isValid()) {
  88.                     continue;
  89.                 }
  90.                 if (key.isAcceptable()) {
  91.                     if (!doAccept()) {
  92.                         pauseAccept(10);
  93.                     }
  94.                 } else {
  95.                     LOG.warn("Unexpected ops in accept select " + key.readyOps());
  96.                 }
  97.             }
  98.         }
  99.         
  100.         private void pauseAccept(long millisecs) {
  101.             acceptKey.interestOps(0);
  102.             selector.select(millisecs);
  103.             acceptKey.interestOps(SelectionKey.OP_ACCEPT);
  104.         }
  105.         
  106.         private boolean doAccept() {
  107.             boolean accepted = false;
  108.             SocketChannel sc = null;
  109.             sc = acceptSocket.accept();
  110.             accepted = true;
  111.             InetAddress ia = sc.socket().getInetAddress();
  112.             int cnxncount = getClientCnxnCount(ia);
  113.             ...
  114.             sc.configureBlocking(false);
  115.             // Round-robin assign this connection to a selector thread
  116.             if (!selectorIterator.hasNext()) {
  117.                 selectorIterator = selectorThreads.iterator();
  118.             }
  119.             SelectorThread selectorThread = selectorIterator.next();
  120.             ...
  121.             acceptErrorLogger.flush();
  122.             return accepted;
  123.         }
  124.     }
  125.    
  126.     //用来处理AcceptThread线程建立好的客户端连接请求
  127.     class SelectorThread extends AbstractSelectThread {
  128.         private final int id;
  129.         private final Queue<SocketChannel> acceptedQueue;
  130.         private final Queue<SelectionKey> updateQueue;
  131.         
  132.         public SelectorThread(int id) throws IOException {
  133.             super("NIOServerCxnFactory.SelectorThread-" + id);
  134.             this.id = id;
  135.             acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
  136.             updateQueue = new LinkedBlockingQueue<SelectionKey>();
  137.         }
  138.         
  139.         public boolean addAcceptedConnection(SocketChannel accepted) {
  140.             if (stopped || !acceptedQueue.offer(accepted)) {
  141.                 return false;
  142.             }
  143.             wakeupSelector();
  144.             return true;
  145.         }
  146.         
  147.         ...
  148.         @Override
  149.         public void run() {
  150.             while (!stopped) {
  151.                 select();
  152.                 processAcceptedConnections();
  153.                 processInterestOpsUpdateRequests();
  154.             }
  155.             ...
  156.         }
  157.         
  158.         private void select() {
  159.             selector.select();
  160.             Set<SelectionKey> selected = selector.selectedKeys();
  161.             ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
  162.             Collections.shuffle(selectedList);
  163.             Iterator<SelectionKey> selectedKeys = selectedList.iterator();
  164.             
  165.             while(!stopped && selectedKeys.hasNext()) {
  166.                 SelectionKey key = selectedKeys.next();
  167.                 selected.remove(key);
  168.                 ...
  169.             }
  170.         }
  171.         
  172.         private void processAcceptedConnections() {
  173.             SocketChannel accepted;
  174.             while (!stopped && (accepted = acceptedQueue.poll()) != null) {
  175.                 SelectionKey key = null;
  176.                 key = accepted.register(selector, SelectionKey.OP_READ);
  177.                 NIOServerCnxn cnxn = createConnection(accepted, key, this);
  178.                 key.attach(cnxn);
  179.                 addCnxn(cnxn);
  180.             }
  181.         }
  182.         ...
  183.     }
  184.     ...
  185.     protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) {
  186.         return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);
  187.     }
  188.    
  189.     private void addCnxn(NIOServerCnxn cnxn) throws IOException {
  190.         ...
  191.         //激活连接
  192.         touchCnxn(cnxn);
  193.     }
  194.    
  195.     public void touchCnxn(NIOServerCnxn cnxn) {
  196.         //这个cnxnExpiryQueue与管理过期连接有关
  197.         cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
  198.     }
  199. }
复制代码
八.创建并启动服务器实例的会话管理器
会话管理器SessionTracker主要负责zk服务端的会话管理。在执行ZooKeeperServerMain的runFromConfig()方法时,调用完单机版服务器实例ZooKeeperServer的startdata()方法完成本地数据恢复后,就会调用ZooKeeperServer的startup()方法来开始创建并启动会话管理器,也就是在startup()方法中会调用createSessionTracker()和startSessionTracker()方法。SessionTracker其实也是一个继承了ZooKeeperThread的线程。
  1. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  2.     private ZKDatabase zkDb;
  3.     private FileTxnSnapLog txnLogFactory = null;
  4.     ...
  5.     //7.恢复本地数据
  6.     public void startdata() {
  7.         if (zkDb == null) {
  8.             zkDb = new ZKDatabase(this.txnLogFactory);
  9.         }
  10.         if (!zkDb.isInitialized()) {
  11.             loadData();
  12.         }
  13.     }
  14.    
  15.     public void loadData() throws IOException, InterruptedException {
  16.         if (zkDb.isInitialized()) {
  17.             setZxid(zkDb.getDataTreeLastProcessedZxid());
  18.         } else {
  19.             setZxid(zkDb.loadDataBase());
  20.         }
  21.         // Clean up dead sessions
  22.         LinkedList<Long> deadSessions = new LinkedList<Long>();
  23.         for (Long session : zkDb.getSessions()) {
  24.             if (zkDb.getSessionWithTimeOuts().get(session) == null) {
  25.                 deadSessions.add(session);
  26.             }
  27.         }
  28.         for (long session : deadSessions) {
  29.             killSession(session, zkDb.getDataTreeLastProcessedZxid());
  30.         }
  31.         // Make a clean snapshot
  32.         takeSnapshot();
  33.     }
  34.    
  35.     public void takeSnapshot(){
  36.         try {
  37.             txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
  38.         } catch (IOException e) {
  39.             LOG.error("Severe unrecoverable error, exiting", e);
  40.             System.exit(10);
  41.         }
  42.     }
  43.     ...
  44. }
复制代码
九.初始化单机版服务器实例的请求处理链
在执行ZooKeeperServerMain的runFromConfig()方法时,在ZooKeeperServer的startup()方法中调用方法创建并启动好会话管理器后,就会继续在ZooKeeperServer的startup()方法中调用方法初始化请求处理链,也就是在startup()方法中会调用setupRequestProcessors()方法。
 
zk处理请求的方式是典型的责任链模式,zk服务端会使用多个请求处理器来依次处理一个客户端请求。所以在服务端启动时,会将这些请求处理器串联起来形成一个请求处理链。
 
单机版服务器的请求处理链包括3个请求处理器:
第一个请求处理器是:PrepRequestProcessor
第二个请求处理器是:SyncRequestProcessor
第三个请求处理器是:FinalRequestProcessor
 
zk服务端会严格按照顺序分别调用这3个请求处理器处理客户端的请求,其中PrepRequestProcessor和SyncRequestProcessor其实也是一个线程。服务端收到的客户端请求会不断被添加到请求处理器的请求队列中,然后请求处理器线程启动后就会不断从请求队列中提取请求出来进行处理。
2.png
  1. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  2.     protected SessionTracker sessionTracker;
  3.     private FileTxnSnapLog txnLogFactory = null;
  4.     private ZKDatabase zkDb;
  5.     ...
  6.     public synchronized void startup() {
  7.         startupWithServerState(State.RUNNING);
  8.     }
  9.    
  10.     private void startupWithServerState(State state) {
  11.         //8.创建并启动会话管理器SessionTracker
  12.         if (sessionTracker == null) {
  13.             createSessionTracker();
  14.         }
  15.         startSessionTracker();
  16.         //9.初始化服务器实例ZooKeeperServer的请求处理链
  17.         setupRequestProcessors();
  18.         //注册JMX服务
  19.         registerJMX();
  20.         //开启监控JVM停顿的线程
  21.         startJvmPauseMonitor();
  22.         setState(state);
  23.         notifyAll();
  24.     }
  25.    
  26.     protected void createSessionTracker() {
  27.         sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
  28.             tickTime, createSessionTrackerServerId, getZooKeeperServerListener());
  29.     }
  30.    
  31.     protected void startSessionTracker() {
  32.         ((SessionTrackerImpl)sessionTracker).start();
  33.     }
  34.     ...
  35. }
  36. public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
  37.     private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
  38.     private final SessionExpirer expirer;
  39.     ...
  40.     @Override
  41.     public void run() {
  42.         while (running) {
  43.             //使用了分桶管理策略
  44.             long waitTime = sessionExpiryQueue.getWaitTime();
  45.             if (waitTime > 0) {
  46.                 Thread.sleep(waitTime);
  47.                 continue;
  48.             }
  49.             for (SessionImpl s : sessionExpiryQueue.poll()) {
  50.                 setSessionClosing(s.sessionId);
  51.                 expirer.expire(s);
  52.             }
  53.         }
  54.     }
  55.     ...
  56. }
复制代码
十.注册单机版服务器实例到网络连接工厂实例
就是调用ServerCnxnFactory的startup()方法中的setZooKeeperServer()方法,将初始化好的单机版服务器实例ZooKeeperServer注册到网络连接工厂实例ServerCnxnFactory。同时,也会将网络连接工厂实例ServerCnxnFactory注册到单机版服务器实例ZooKeeperServer。此时,zk服务端就可以对外提供正常的服务了。
  1. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  2.     protected RequestProcessor firstProcessor;
  3.     ...
  4.     public synchronized void startup() {
  5.         startupWithServerState(State.RUNNING);
  6.     }
  7.    
  8.     private void startupWithServerState(State state) {
  9.         //8.创建并启动会话管理器SessionTracker
  10.         if (sessionTracker == null) {
  11.             createSessionTracker();
  12.         }
  13.         startSessionTracker();
  14.         //9.初始化服务器实例ZooKeeperServer的请求处理链
  15.         setupRequestProcessors();
  16.         //注册JMX服务
  17.         registerJMX();
  18.         //开启监控JVM停顿的线程
  19.         startJvmPauseMonitor();
  20.         setState(state);
  21.         notifyAll();
  22.     }
  23.    
  24.     protected void setupRequestProcessors() {
  25.         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  26.         RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
  27.         ((SyncRequestProcessor)syncProcessor).start();
  28.         firstProcessor = new PrepRequestProcessor(this, syncProcessor);
  29.         ((PrepRequestProcessor)firstProcessor).start();
  30.     }
  31.     ...
  32. }
  33. public interface RequestProcessor {
  34.     void processRequest(Request request) throws RequestProcessorException;
  35.     void shutdown();
  36. }
  37. public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
  38.     LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
  39.     private final RequestProcessor nextProcessor;
  40.     ZooKeeperServer zks;
  41.     ...
  42.     public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
  43.         super("ProcessThread(sid:" + zks.getServerId() + " cport:" + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
  44.         this.nextProcessor = nextProcessor;
  45.         this.zks = zks;
  46.     }
  47.    
  48.     public void processRequest(Request request) {
  49.         submittedRequests.add(request);
  50.     }
  51.    
  52.     @Override
  53.     public void run() {
  54.         ...
  55.         while (true) {
  56.             Request request = submittedRequests.take();
  57.             pRequest(request);
  58.         }
  59.         ...
  60.     }
  61.    
  62.     protected void pRequest(Request request) throws RequestProcessorException {
  63.         ...
  64.         //事务请求处理
  65.         pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
  66.         ...
  67.         //交给下一个处理器处理
  68.         nextProcessor.processRequest(request);
  69.     }
  70.     ...
  71. }
复制代码
 
2.集群版的zk服务端的启动过程
(1)预启动阶段
(2)初始化阶段
(3)Leader选举阶段
(4)Leader和Follower启动阶段
 
什么是集群:
集群是由网络中不同的机器组成的一个系统,集群中的工作是通过集群中调度者服务器来协同完成的。
 
集群中的调度者服务器:
调度者的工作就是在集群收到客户端请求后,根据集群中机器的使用情况,决定将此次客户端请求交给集群中哪一台服务器或网络节点进行处理。
 
zk中的集群模式:
zk集群会将服务器分成Leader、Follower、Observer三种角色的服务器;在集群运行期间这三种角色的服务器所负责的工作各不相同。
 
一.Leader角色服务器(处理事务性请求 + 管理其他服务器)
负责处理事务性请求,以及管理集群中的其他服务器。Leader服务器是集群中工作的分配和调度者。
 
二.Follower服务器(处理非事务性请求 + 选举Leader服务器)
负责处理非事务性请求,以及选举出Leader服务器。发生Leader选举时,系统会从Follow服务器中,根据过半投票原则选举出一个Follower作为Leader服务器。
 
三.Observer服务器(处理非事务性请求 + 不参与选举和被选举)
负责处理非事务性请求,不参与Leader服务器的选举,也不会作为候选者被选举为Leader服务器。
 
zk服务端整体架构图如下:
3.webp
集群版zk服务端的启动分为四个阶段:
预启动阶段、初始化阶段、Leader选举阶段、Leader和Follower启动阶段
 
集群版zk服务端的启动流程图如下:
4.png
接下来介绍集群版的zk服务端是如何从初始化到对外提供服务的。
 
(1)预启动阶段
在zk服务端进行初始化之前,首先要对配置文件等信息进行解析和载入,而zk服务端的预启动阶段的主要工作流程如下:
 
一.QuorumPeerMain启动程序
二.解析zoo.cfg配置文件
三.创建和启动历史文件清理器
四.根据配置判断是集群模式还是单机模式
 
首先zk服务端会调用QuorumPeerMain类中的main()方法,然后在QuorumPeerMain的initializeAndRun()方法里解析zoo.cfg配置文件。接着继续在initializeAndRun()方法中创建和启动历史文件清理器,以及根据配置文件和启动参数,即args参数和config.isDistributed()方法,来判断zk服务端的启动方式是集群模式还是单机模式。如果配置参数中配置了相关的配置项,并且已经指定了集群模式运行,那么在服务启动时就会调用runFromConfig()方法完成集群模式的初始化。
  1. public class NIOServerCnxnFactory extends ServerCnxnFactory {
  2.     ...
  3.     public void startup(ZooKeeperServer zks, boolean startServer) {
  4.         //6.启动各种线程
  5.         start();
  6.         //10.注册zk服务器实例
  7.         setZooKeeperServer(zks);
  8.         if (startServer) {
  9.             //7.恢复本地数据
  10.             zks.startdata();
  11.             //8.创建并启动会话管理器SessionTracker
  12.             //9.初始化zk的请求处理链
  13.             zks.startup();
  14.         }
  15.     }
  16.     ...
  17. }
  18. public abstract class ServerCnxnFactory {
  19.     ...
  20.     protected ZooKeeperServer zkServer;
  21.    
  22.     final public void setZooKeeperServer(ZooKeeperServer zks) {
  23.         this.zkServer = zks;
  24.         if (zks != null) {
  25.             if (secure) {
  26.                 zks.setSecureServerCnxnFactory(this);
  27.             } else {
  28.                 zks.setServerCnxnFactory(this);
  29.             }
  30.         }
  31.     }
  32.     ...
  33. }
  34. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  35.     ...
  36.     protected ServerCnxnFactory serverCnxnFactory;
  37.     protected ServerCnxnFactory secureServerCnxnFactory;
  38.    
  39.     public void setServerCnxnFactory(ServerCnxnFactory factory) {
  40.         serverCnxnFactory = factory;
  41.     }
  42.    
  43.     public void setSecureServerCnxnFactory(ServerCnxnFactory factory) {
  44.         secureServerCnxnFactory = factory;
  45.     }
  46.     ...
  47. }
复制代码
(2)初始化阶段
一.创建网络连接工厂实例ServerCnxnFactory
二.初始化网络连接工厂实例ServerCnxnFactory
三.创建集群版服务器实例QuorumPeer
四.创建数据持久化工具FileTxnSnapLog并设置到QuorumPeer实例中
五.创建内存数据库ZKDatabase并设置到QuorumPeer实例中
六.初始化集群版服务器实例QuorumPeer
七.恢复集群版服务器实例QuorumPeer本地数据
八.启动网络连接工厂ServerCnxnFactory主线程
[code]public class QuorumPeerMain {    protected QuorumPeer quorumPeer;    ...    public void runFromConfig(QuorumPeerConfig config) {        ...        ServerCnxnFactory cnxnFactory = null;        if (config.getClientPortAddress() != null) {            //1.创建网络连接工厂实例ServerCnxnFactory            cnxnFactory = ServerCnxnFactory.createFactory();            //2.初始化网络连接工厂实例ServerCnxnFactory            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);        }        //接下来就是初始化集群版服务器实例QuorumPeer        //3.创建集群版服务器实例QuorumPeer        quorumPeer = getQuorumPeer();        //4.创建zk数据管理器FileTxnSnapLog        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());        quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());        quorumPeer.setElectionType(config.getElectionAlg());        quorumPeer.setMyid(config.getServerId());        quorumPeer.setTickTime(config.getTickTime());        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());        quorumPeer.setInitLimit(config.getInitLimit());        quorumPeer.setSyncLimit(config.getSyncLimit());        quorumPeer.setConfigFileName(config.getConfigFilename());        //5.创建并初始化内存数据库ZKDatabase        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);        if (config.getLastSeenQuorumVerifier() != null) {            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);        }        quorumPeer.initConfigInZKDatabase();        quorumPeer.setCnxnFactory(cnxnFactory);        quorumPeer.setSslQuorum(config.isSslQuorum());        quorumPeer.setUsePortUnification(config.shouldUsePortUnification());        quorumPeer.setLearnerType(config.getPeerType());        quorumPeer.setSyncEnabled(config.getSyncEnabled());        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());        if (config.sslQuorumReloadCertFiles) {            quorumPeer.getX509Util().enableCertFileReloading();        }        // sets quorum sasl authentication configurations        quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);        if (quorumPeer.isQuorumSaslAuthEnabled()) {            quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);            quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);            quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);            quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);            quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);        }        quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);        quorumPeer.initialize();        //6.初始化集群版服务器实例QuorumPeer        quorumPeer.start();        //join方法会将当前线程挂起,等待QuorumPeer线程结束后再执行当前线程        quorumPeer.join();    }        protected QuorumPeer getQuorumPeer() throws SaslException {        return new QuorumPeer();    }}public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {    ServerCnxnFactory cnxnFactory;    ServerCnxnFactory secureCnxnFactory;    ...    public synchronized void start() {        //7.恢复集群版服务器实例QuorumPeer本地数据        loadDataBase();        //8.启动网络连接工厂ServerCnxnFactory主线程        startServerCnxnFactory();        adminServer.start();        //9.开始Leader选举        startLeaderElection();        startJvmPauseMonitor();//开启监控JVM停顿的线程        //10.启动集群版服务器实例QuorumPeer        super.start();    }        private void startServerCnxnFactory() {        if (cnxnFactory != null) {            cnxnFactory.start();        }        if (secureCnxnFactory != null) {            secureCnxnFactory.start();        }    }    ...}public abstract class ServerCnxnFactory {
    ...
    static public ServerCnxnFactory createFactory() throws IOException {
        String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
        if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();
        return serverCnxnFactory;
    }
    ...
}public class NIOServerCnxnFactory extends ServerCnxnFactory {    //最大客户端连接数    protected int maxClientCnxns = 60;    //处理连接过期的线程    private ConnectionExpirerThread expirerThread;    //处理客户端建立连接的线程    private AcceptThread acceptThread;    //处理客户端请求的线程    private final Set selectorThreads = new HashSet();    //会话过期相关    int sessionlessCnxnTimeout;    private ExpiryQueue cnxnExpiryQueue;    //selector线程数,CPU核数的一半    private int numSelectorThreads;    //工作线程数    private int numWorkerThreads;    ...    public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {        ...        maxClientCnxns = maxcc;        sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);        cnxnExpiryQueue = new ExpiryQueue(sessionlessCnxnTimeout);        //创建一个自动处理过期会话的ConnectionExpirerThread线程        expirerThread = new ConnectionExpirerThread();        int numCores = Runtime.getRuntime().availableProcessors();        numSelectorThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores/2), 1));        numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);        ...        //创建一批SelectorThread线程        for (int i=0; i

相关推荐

您需要登录后才可以回帖 登录 | 立即注册