找回密码
 立即注册
首页 业界区 业界 Nacos源码—8.Nacos升级gRPC分析三

Nacos源码—8.Nacos升级gRPC分析三

志灿隐 2025-6-2 23:33:15
大纲
7.服务端对服务实例进行健康检查
8.服务下线如何注销注册表和客户端等信息
9.事件驱动架构源码分析
 
7.服务端对服务实例进行健康检查
(1)服务端对服务实例进行健康检查的设计逻辑
(2)服务端对服务实例进行健康检查的源码
(3)服务端检查服务实例不健康后的注销处理
 
(1)服务端对服务实例进行健康检查的设计逻辑
一.首先会获取所有客户端的Connection连接对象
Connection连接对象里有个属性叫lastActiveTime,表示的是最后存活时间。
 
二.然后判断当前时间-最后存活时间是否大于20s
如果大于,则把该Connection连接对象的connectionId放入到一个集合里。这个集合是一个名为outDatedConnections的待移除集合Set,此时该Connection连接对象并不会马上删除。
 
三.当判断完全部的Connection连接对象后会遍历outDatedConnections集合
向遍历到的Connection连接对象发起一次请求,确认是否真的下线。如果响应成功,则往successConnections集合中添加connectionId,并且刷新Connection连接对象的lastActiveTime属性。这个机制有一个专业的名称叫做:探活机制。
 
四.遍历待移除集合进行注销并且在注销之前先判断一下是否探活成功
也就是connectionId存在于待移除集合outDatedConnections中,但是不存在于探活成功集合successConnections中,那么这个connectionId对应的客户端就会被注销掉。
 
(2)服务端对服务实例进行健康检查的源码
对服务实例进行健康检查的源码入口是ConnectionManager的start()方法。
  1. @Service
  2. public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
  3.     Map<String, Connection> connections = new ConcurrentHashMap<>();
  4.     ...
  5.     //Start Task:Expel the connection which active Time expire.
  6.     @PostConstruct
  7.     public void start() {
  8.         //Start UnHealthy Connection Expel Task.
  9.         RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
  10.             @Override
  11.             public void run() {
  12.                 ...
  13.                 //一.首先获取所有的连接
  14.                 Set<Map.Entry<String, Connection>> entries = connections.entrySet();
  15.                 ...
  16.                 //二.然后判断客户端是否超过20s没有发来心跳信息了,如果是则会将clientId加入outDatedConnections集合中
  17.                 Set<String> outDatedConnections = new HashSet<>();
  18.                 long now = System.currentTimeMillis();
  19.                 for (Map.Entry<String, Connection> entry : entries) {
  20.                     Connection client = entry.getValue();
  21.                     String clientIp = client.getMetaInfo().getClientIp();
  22.                     AtomicInteger integer = expelForIp.get(clientIp);
  23.                     if (integer != null && integer.intValue() > 0) {
  24.                         integer.decrementAndGet();
  25.                         expelClient.add(client.getMetaInfo().getConnectionId());
  26.                         expelCount--;
  27.                     } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {//判断心跳时间
  28.                         //添加到待移除列表
  29.                         outDatedConnections.add(client.getMetaInfo().getConnectionId());
  30.                     }
  31.                 }
  32.                 ...
  33.                 //client active detection.
  34.                 //三.初次检测完超过20s的Connection连接对象后,并不会立马进行删除,而是进行探活,服务端主动请求客户端,来确认是否真的下线
  35.                 Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
  36.                 if (CollectionUtils.isNotEmpty(outDatedConnections)) {
  37.                     Set<String> successConnections = new HashSet<>();
  38.                     final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
  39.                     //遍历超过20s没有心跳的客户端clientId
  40.                     for (String outDateConnectionId : outDatedConnections) {
  41.                         try {
  42.                             Connection connection = getConnection(outDateConnectionId);
  43.                             if (connection != null) {
  44.                                 ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
  45.                                 //调用GrpcConnection.asyncRequest()方法异步发送请求
  46.                                 connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
  47.                                     @Override
  48.                                     public Executor getExecutor() {
  49.                                         return null;
  50.                                     }
  51.                                     @Override
  52.                                     public long getTimeout() {
  53.                                         return 1000L;
  54.                                     }
  55.                                     @Override
  56.                                     public void onResponse(Response response) {
  57.                                         latch.countDown();
  58.                                         if (response != null && response.isSuccess()) {
  59.                                             //响应成功刷新心跳时间
  60.                                             connection.freshActiveTime();
  61.                                             //并且加入到探活成功的集合列表中
  62.                                             successConnections.add(outDateConnectionId);
  63.                                         }
  64.                                     }
  65.                                     @Override
  66.                                     public void onException(Throwable e) {
  67.                                         latch.countDown();
  68.                                     }
  69.                                 });
  70.                                 Loggers.REMOTE_DIGEST.info("[{}]send connection active request ", outDateConnectionId);
  71.                             } else {
  72.                                 latch.countDown();
  73.                             }                           
  74.                         } catch (ConnectionAlreadyClosedException e) {
  75.                             latch.countDown();
  76.                         } catch (Exception e) {
  77.                             Loggers.REMOTE_DIGEST.error("[{}]Error occurs when check client active detection ,error={}", outDateConnectionId, e);
  78.                             latch.countDown();
  79.                         }
  80.                     }
  81.                     latch.await(3000L, TimeUnit.MILLISECONDS);
  82.                     Loggers.REMOTE_DIGEST.info("Out dated connection check successCount={}", successConnections.size());
  83.                     //经过探活还是不成功的Connection连接对象,就准备进行移除了
  84.                     //遍历20s没有心跳的客户端,准备移除客户端信息
  85.                     for (String outDateConnectionId : outDatedConnections) {
  86.                         //判断探活是否成功,如果成功了则不需要移除
  87.                         if (!successConnections.contains(outDateConnectionId)) {
  88.                             Loggers.REMOTE_DIGEST.info("[{}]Unregister Out dated connection....", outDateConnectionId);
  89.                             //执行客户端注销逻辑
  90.                             unregister(outDateConnectionId);
  91.                         }
  92.                     }
  93.                 }
  94.                 ...
  95.             }
  96.         }, 1000L, 3000L, TimeUnit.MILLISECONDS);
  97.     }
  98.     ...
  99. }
复制代码
(3)服务端检查服务实例不健康后的注销处理
进行注销处理的方法是ConnectionManager的unregister()方法。该方法主要会移除Connection连接对象 + 清除一些数据,以及发布一个ClientDisconnectEvent客户端注销事件。
[code]@Servicepublic class ConnectionManager extends Subscriber {    private Map connectionForClientIp = new ConcurrentHashMap(16);    Map connections = new ConcurrentHashMap();    @Autowired    private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;    ...    //unregister a connection .    public synchronized void unregister(String connectionId) {        //移除客户端信息        Connection remove = this.connections.remove(connectionId);        if (remove != null) {            String clientIp = remove.getMetaInfo().clientIp;            AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);            if (atomicInteger != null) {                int count = atomicInteger.decrementAndGet();                if (count
您需要登录后才可以回帖 登录 | 立即注册