找回密码
 立即注册
首页 业界区 业界 Nacos源码—2.Nacos服务注册发现分析二

Nacos源码—2.Nacos服务注册发现分析二

诘琅 2025-6-2 22:44:59
大纲
5.服务发现—服务之间的调用请求链路分析
6.服务端如何维护不健康的微服务实例
7.服务下线时涉及的处理
8.服务注册发现总结
 
5.服务发现—服务之间的调用请求链路分析
(1)微服务通过Nacos完成服务调用的请求流程
(2)Nacos客户端进行服务发现的源码
(3)Nacos服务端进行服务查询的源码
(4)总结
 
(1)微服务通过Nacos完成服务调用的请求流程
按照Nacos使用简介里的案例:订单服务和库存服务完成Nacos注册后,会通过Feign来完成服务间的调用。如下图示:
1.webp
步骤一:首先每个客户端都会有一个微服务本地缓存列表,这个缓存列表会定时从注册中心获取最新的列表来更新本地缓存。
 
步骤二:然后当order-service需要调用stock-service时,order-service会先根据服务名称去本地缓存列表中找对应的微服务实例。但通过服务名称可能会找到多个,所以需要负载均衡选择其中一个。
 
步骤三:最后把服务名称更换为IP + Port,通过Feign发起HTTP调用获取返回结果。
 
(2)Nacos客户端进行服务发现的源码
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
三.nacos-client如何进行服务发现
 
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。由于nacos-discovery整合了Ribbon,所以Ribbon可以调用Nacos服务端的服务实例查询列表接口。于是Nacos客户端便借助Ribbon实现了服务调用时的负载均衡,也就是Ribbon会从服务实例列表中选择一个服务实例给客户端进行服务调用。
 
在nacos-discovery的pom.xml中,可以看到它引入了Ribbon依赖:
2.png
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
在Ribbon中会有一个ServerList接口,如下所示:这就是一个扩展接口,这个接口的作用就是获取Server列表。然后nacos-discovery会针对这个接口进行实现,从而整合Ribbon。从引入的包来看:loadbalancer是属于Ribbon源码包下的,而LoadBalancer则是Ribbon中的负载均衡器。负载均衡器会结合IRule负载均衡策略,从服务实例列表中选择一个实例。
  1. package com.netflix.loadbalancer;
  2. import java.util.List;
  3. //Interface that defines the methods sed to obtain the List of Servers
  4. public interface ServerList<T extends Server> {
  5.     public List<T> getInitialListOfServers();
  6.     //Return updated list of servers. This is called say every 30 secs
  7.     public List<T> getUpdatedListOfServers();
  8. }
复制代码
当Nacos客户端进行微服务调用时,会通过Ribbon来选出一个微服务实例。也就是Ribbon会通过调用NacosServerList的getUpdatedListOfServers()方法选出一个微服务实例。
 
nacos-discovery的NacosServerList类继承了AbstractServerList类,而且实现了Ribbon的ServerList接口的两个方法,如下所示:
  1. public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
  2.     ...
  3.     ...
  4. }
  5. public class NacosServerList extends AbstractServerList<NacosServer> {
  6.     private NacosDiscoveryProperties discoveryProperties;
  7.     private String serviceId;
  8.     public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
  9.         this.discoveryProperties = discoveryProperties;
  10.     }
  11.     @Override
  12.     public List<NacosServer> getInitialListOfServers() {
  13.         return getServers();
  14.     }
  15.     @Override
  16.     public List<NacosServer> getUpdatedListOfServers() {
  17.         return getServers();
  18.     }
  19.     private List<NacosServer> getServers() {
  20.         try {
  21.             //读取分组
  22.             String group = discoveryProperties.getGroup();
  23.             //通过服务名称、分组、true(表示只需要健康实例),
  24.             //调用NacosNamingService.selectInstances()方法来查询服务实例列表
  25.             List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
  26.             //把Instance转换成NacosServer类型
  27.             return instancesToServerList(instances);
  28.         } catch (Exception e) {
  29.             throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);
  30.         }
  31.     }
  32.     private List<NacosServer> instancesToServerList(List<Instance> instances) {
  33.         List<NacosServer> result = new ArrayList<>();
  34.         if (CollectionUtils.isEmpty(instances)) {
  35.             return result;
  36.         }
  37.         for (Instance instance : instances) {
  38.             result.add(new NacosServer(instance));
  39.         }
  40.         return result;
  41.     }
  42.     public String getServiceId() {
  43.         return serviceId;
  44.     }
  45.     @Override
  46.     public void initWithNiwsConfig(IClientConfig iClientConfig) {
  47.         this.serviceId = iClientConfig.getClientName();
  48.     }
  49. }
复制代码
NacosServerList的核心方法是NacosServerList的getServers()方法,因为nacos-discovery实现Ribbon的两个接口都调用到了该方法。
 
在nacos-discovery的NacosServerList的getServers()方法中,会调用nacos-client的NacosNamingService的selectInstances()方法,来获取服务实例列表。
 
三.nacos-client如何进行服务发现
在nacos-client的NacosNamingService的selectInstances()方法中:首先会调用HostReactor的getServiceInfo()方法获取服务实例列表,然后调用HostReactor的getServiceInfo0()方法尝试从本地缓存获取,接着调用HostReactor的updateServiceNow()方法查询并更新缓存,也就是调用HostReactor的updateService()方法查询并更新缓存。即先调用NamingProxy的queryList()方法来查询服务端的服务实例列表,再调用HostReactor的processServiceJson()方法更新本地缓存。最后调用HostReactor的scheduleUpdateIfAbsent()方法提交同步缓存任务。
 
所以nacos-client的HostReactor的getServiceInfo()方法是服务发现的核心,它会先到本地缓存中去查询对应的服务实例列表。如果本地缓存查不到对应的服务数据,则到服务端去查询服务实例列表。当获取完服务实例列表后,会向调度线程池提交一个延迟执行的任务,在延迟任务中会执行UpdateTask任务的run()方法。
 
UpdateTask任务的run()方法:会调用updateService()方法查询服务实例列表并更新本地缓存。当该任务执行完毕时,会继续向调度线程池提交一个延迟执行的任务,从而实现不断重复地更新本地缓存的服务实例列表。
  1. public class NacosNamingService implements NamingService {
  2.     private HostReactor hostReactor;
  3.     ...
  4.     @Override
  5.     public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
  6.         return selectInstances(serviceName, groupName, healthy, true);
  7.     }
  8.     @Override
  9.     public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
  10.         return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
  11.     }
  12.     @Override
  13.     public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
  14.         ServiceInfo serviceInfo;
  15.         //这个参数传入默认就是true
  16.         if (subscribe) {
  17.             serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
  18.         } else {
  19.             serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
  20.         }
  21.         return selectInstances(serviceInfo, healthy);
  22.     }
  23.     ...
  24. }
  25. public class HostReactor implements Closeable {
  26.     //服务实例列表的本地缓存
  27.     private final Map<String, ServiceInfo> serviceInfoMap;
  28.     private final Map<String, Object> updatingMap;
  29.     private final NamingProxy serverProxy;
  30.     private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
  31.     private final ScheduledExecutorService executor;
  32.     ...
  33.     public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
  34.         NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
  35.         String key = ServiceInfo.getKey(serviceName, clusters);
  36.         if (failoverReactor.isFailoverSwitch()) {
  37.             return failoverReactor.getService(key);
  38.         }
  39.         //先查询本地缓存中的服务实例列表
  40.         ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
  41.         //如果本地缓存实例列表为空
  42.         if (null == serviceObj) {
  43.             serviceObj = new ServiceInfo(serviceName, clusters);
  44.             serviceInfoMap.put(serviceObj.getKey(), serviceObj);
  45.             updatingMap.put(serviceName, new Object());
  46.             //调用Nacos服务端的服务实例列表查询接口,立即更新Service数据
  47.             updateServiceNow(serviceName, clusters);
  48.             updatingMap.remove(serviceName);
  49.         } else if (updatingMap.containsKey(serviceName)) {
  50.             if (UPDATE_HOLD_INTERVAL > 0) {
  51.                 //hold a moment waiting for update finish
  52.                 synchronized (serviceObj) {
  53.                     try {
  54.                         serviceObj.wait(UPDATE_HOLD_INTERVAL);
  55.                     } catch (InterruptedException e) {
  56.                         NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
  57.                     }
  58.                 }
  59.             }
  60.         }
  61.         //开启定时任务,维护本地缓存
  62.         scheduleUpdateIfAbsent(serviceName, clusters);
  63.         //最后从本地缓存中,获取服务实例列表数据
  64.         return serviceInfoMap.get(serviceObj.getKey());
  65.     }
  66.     private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
  67.         String key = ServiceInfo.getKey(serviceName, clusters);
  68.         //从本地缓存中获取服务实例列表
  69.         return serviceInfoMap.get(key);
  70.     }
  71.     private void updateServiceNow(String serviceName, String clusters) {
  72.         try {
  73.             updateService(serviceName, clusters);
  74.         } catch (NacosException e) {
  75.             NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
  76.         }
  77.     }
  78.     //Update service now.
  79.     public void updateService(String serviceName, String clusters) throws NacosException {
  80.         ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
  81.         try {
  82.             //调用Nacos服务端的服务实例查询接口
  83.             String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
  84.             //如果结果不为空,则更新本地缓存
  85.             if (StringUtils.isNotEmpty(result)) {
  86.                 //更新本地缓存
  87.                 processServiceJson(result);
  88.             }
  89.         } finally {
  90.             if (oldService != null) {
  91.                 synchronized (oldService) {
  92.                     oldService.notifyAll();
  93.                 }
  94.             }
  95.         }
  96.     }
  97.     ...
  98.     public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
  99.         if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
  100.             return;
  101.         }
  102.         synchronized (futureMap) {
  103.             if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
  104.                 return;
  105.             }
  106.             //向调度线程池提交一个延迟执行的任务
  107.             ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
  108.             futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
  109.         }
  110.     }
  111.     public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
  112.         //向调度线程池提交一个延迟执行的任务
  113.         return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
  114.     }
  115.     public class UpdateTask implements Runnable {
  116.         long lastRefTime = Long.MAX_VALUE;
  117.         private final String clusters;   
  118.         private final String serviceName;
  119.         private int failCount = 0;
  120.         public UpdateTask(String serviceName, String clusters) {
  121.             this.serviceName = serviceName;
  122.             this.clusters = clusters;
  123.         }
  124.         private void incFailCount() {
  125.             int limit = 6;
  126.             if (failCount == limit) {
  127.                 return;
  128.             }
  129.             failCount++;
  130.         }
  131.         private void resetFailCount() {
  132.             failCount = 0;
  133.         }
  134.         @Override
  135.         public void run() {
  136.             long delayTime = DEFAULT_DELAY;
  137.             try {
  138.                 ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
  139.                 //如果本地缓存为空
  140.                 if (serviceObj == null) {
  141.                     updateService(serviceName, clusters);
  142.                     return;
  143.                 }
  144.                 //lastRefTime是最大的Long型
  145.                 if (serviceObj.getLastRefTime() <= lastRefTime) {
  146.                     updateService(serviceName, clusters);
  147.                     serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
  148.                 } else {
  149.                     refreshOnly(serviceName, clusters);
  150.                 }
  151.                 lastRefTime = serviceObj.getLastRefTime();
  152.                 if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
  153.                     NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
  154.                     return;
  155.                 }
  156.                 if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
  157.                     incFailCount();
  158.                     return;
  159.                 }
  160.                 delayTime = serviceObj.getCacheMillis();
  161.                 resetFailCount();
  162.             } catch (Throwable e) {
  163.                 incFailCount();
  164.                 NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
  165.             } finally {
  166.                 //向调度线程池继续提交一个延迟执行的任务继续同步本地缓存
  167.                 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
  168.             }
  169.         }
  170.     }
  171. }
  172. public class NamingProxy implements Closeable {
  173.     ...
  174.     //向Nacos服务端发起HTTP形式的服务实例列表查询请求
  175.     public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
  176.         final Map<String, String> params = new HashMap<String, String>(8);
  177.         params.put(CommonParams.NAMESPACE_ID, namespaceId);
  178.         params.put(CommonParams.SERVICE_NAME, serviceName);
  179.         params.put("clusters", clusters);
  180.         params.put("udpPort", String.valueOf(udpPort));
  181.         params.put("clientIP", NetUtils.localIP());
  182.         params.put("healthyOnly", String.valueOf(healthyOnly));
  183.         //通过HTTP的方式,请求"/nacos/v1/ns/instance/list"接口
  184.         return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
  185.     }
  186.     ...
  187. }
复制代码
二.服务端处理心跳请求的源码
服务端的InstanceController的beat()方法,会处理客户端发来的心跳请求。首先会尝试从ServiceManager的注册表中获取对应的Instance实例对象。如果在内存注册表中找不到对应的Instance实例对象,则直接调用ServiceManager的registerInstance()方法进行服务注册。
 
如果在内存注册表中可以找到对应的Instance实例对象,那么就从ServiceManager的注册表中取出对应的Service服务对象,这样后续对Service的Cluster的Instance进行修改时,就会修改到注册表数据。接着执行Service的processClientBeat()方法,该方法会提交一个异步任务ClientBeatProcessor给线程池,其中线程池的线程数是可用线程数的一半。
 
在ClientBeatProcessor的run()方法中:会先通过集群名找到所有的临时实例列表。然后通过for循环对这些临时实例进行IP + Port判断,找出对应的Instance实例对象。找出对应的Instance后,接着就会把Instance的lastBeat属性修改成当前时间,然后再判断当前Instance的状态是否不健康,若是则重新标记成健康状态。
[code]//Instance operation controller.@RestController@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")public class InstanceController {    ...    //Create a beat for instance.    @CanDistro    @PutMapping("/beat")    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)    public ObjectNode beat(HttpServletRequest request) throws Exception {        ObjectNode result = JacksonUtils.createEmptyJsonNode();        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());        String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);        //获取请求参数、namespaceId、serviceName        RsInfo clientBeat = null;        if (StringUtils.isNotBlank(beat)) {            clientBeat = JacksonUtils.toObj(beat, RsInfo.class);        }        String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);        String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);        int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));        if (clientBeat != null) {            if (StringUtils.isNotBlank(clientBeat.getCluster())) {                clusterName = clientBeat.getCluster();            } else {                clientBeat.setCluster(clusterName);            }            ip = clientBeat.getIp();            port = clientBeat.getPort();        }        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);        NamingUtils.checkServiceNameFormat(serviceName);        Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);        //通过命令空间、服务名等信息,从ServiceManager内存注册表中获取instance实例对象        Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);        //如果获取实例为空,则会重新调用服务注册的方法ServiceManager.registerInstance()        if (instance == null) {            if (clientBeat == null) {                result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);                return result;            }            Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);            instance = new Instance();            instance.setPort(clientBeat.getPort());            instance.setIp(clientBeat.getIp());            instance.setWeight(clientBeat.getWeight());            instance.setMetadata(clientBeat.getMetadata());            instance.setClusterName(clusterName);            instance.setServiceName(serviceName);            instance.setInstanceId(instance.getInstanceId());            instance.setEphemeral(clientBeat.isEphemeral());            //重新注册服务实例            serviceManager.registerInstance(namespaceId, serviceName, instance);        }        //从ServiceManager内存注册表中获取服务Service,后续对Service中的Cluster的Instance修改,便会修改到注册表        Service service = serviceManager.getService(namespaceId, serviceName);        if (service == null) {            throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId);        }        if (clientBeat == null) {            clientBeat = new RsInfo();            clientBeat.setIp(ip);            clientBeat.setPort(port);            clientBeat.setCluster(clusterName);        }        //提交客户端服务实例的心跳健康检查任务,更改lastBeat属性        service.processClientBeat(clientBeat);        result.put(CommonParams.CODE, NamingResponseCode.OK);        if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {            result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());        }        result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());        return result;    }    ...}@JsonInclude(Include.NON_NULL)public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener {    ...    public void processClientBeat(final RsInfo rsInfo) {        ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();        clientBeatProcessor.setService(this);        clientBeatProcessor.setRsInfo(rsInfo);        //立即执行        HealthCheckReactor.scheduleNow(clientBeatProcessor);    }    ...}//Health check reactor.@SuppressWarnings("MD.ThreadPoolCreationRule")public class HealthCheckReactor {    ...    //Schedule client beat check task without a delay.    public static ScheduledFuture scheduleNow(Runnable task) {        //提交任务到线程池立即执行        return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);    }    ...}public class GlobalExecutor {    public static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors()

相关推荐

您需要登录后才可以回帖 登录 | 立即注册