找回密码
 立即注册
首页 业界区 业界 Nacos源码—1.Nacos服务注册发现分析一

Nacos源码—1.Nacos服务注册发现分析一

黎娅茜 2025-6-2 22:43:06
大纲
1.客户端如何发起服务注册 + 发送服务心跳
2.服务端如何处理客户端的服务注册请求
3.注册服务—如何实现高并发支撑上百万服务注册
4.内存注册表—如何处理注册表的高并发读写冲突
 
1.客户端如何发起服务注册 + 发送服务心跳
(1)Nacos客户端项目启动时为什么会自动注册服务
(2)Nacos客户端通过什么方式注册服务
(3)Nacos客户端如何发送服务心跳
 
(1)Nacos客户端项目启动时为什么会自动注册服务
Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。引入spring-cloud-starter-alibaba-nacos-discovery后,才自动注册服务。查看这个依赖包中的spring.factories文件,发现有一些Configuration类。
 
Spring Boot启动时会扫描spring.factories文件,然后创建里面的配置类。
 
在spring.pactories文件中,与注册相关的类就是:NacosServiceRegistryAutoConfiguration这个Nacos服务注册自动配置类。
1.webp
Nacos服务注册自动配置类NacosServiceRegistryAutoConfiguration如下,该配置类创建了三个Bean。
 
第一个Bean:NacosServiceRegistry
这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。
 
第二个Bean:NacosRegistration
这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。
 
第三个Bean:NacosAutoServiceRegistration
这个Bean在创建时,会传入NacosServiceRegistry和NacosRegistration两个Bean。然后该Bean继承了AbstractAutoServiceRegistration抽象类。该抽象类实现了ApplicationListener接口,所以项目启动时便是利用了Spring的监听事件来实现自动注册服务的。因为在Spring容器启动的最后会执行finishRefresh()方法,然后会发布一个事件,该事件会触发调用onApplicationEvent()方法。
 
调用AbstractAutoServiceRegistration的onApplicationEvent()方法时,首先会调用AbstractAutoServiceRegistration的bind()方法,然后调用AbstractAutoServiceRegistration的start()方法,接着调用AbstractAutoServiceRegistration的register()方法发起注册,也就是调用this.serviceRegistry的register()方法完成服务注册的具体工作。
 
其中,AbstractAutoServiceRegistration的serviceRegistry属性,是在服务注册自动配置类NacosServiceRegistryAutoConfiguration,创建第三个Bean—NacosAutoServiceRegistration时,通过传入其创建的第一个Bean—NacosServiceRegistry进行赋值的。
  1. @Configuration(proxyBeanMethods = false)
  2. @EnableConfigurationProperties
  3. @ConditionalOnNacosDiscoveryEnabled
  4. @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
  5. @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
  6. public class NacosServiceRegistryAutoConfiguration {
  7.     @Bean
  8.     public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
  9.         //传入NacosDiscoveryProperties作为参数
  10.         return new NacosServiceRegistry(nacosDiscoveryProperties);
  11.     }
  12.     @Bean
  13.     @ConditionalOnBean(AutoServiceRegistrationProperties.class)
  14.     public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
  15.         //传入NacosDiscoveryProperties作为参数
  16.         return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
  17.     }
  18.     @Bean
  19.     @ConditionalOnBean(AutoServiceRegistrationProperties.class)
  20.     public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
  21.         //传入NacosServiceRegistry和NacosRegistration作为参数
  22.         return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
  23.     }
  24. }
  25. @ConfigurationProperties("spring.cloud.nacos.discovery")
  26. public class NacosDiscoveryProperties {
  27.     //nacos discovery server address.
  28.     private String serverAddr;
  29.     //the nacos authentication username.
  30.     private String username;
  31.     //the nacos authentication password.
  32.     private String password;
  33.     //namespace, separation registry of different environments.
  34.     private String namespace;
  35.     //service name to registry.
  36.     @Value("${spring.cloud.nacos.discovery.service:${spring.application.name:}}")
  37.     private String service;
  38.     //cluster name for nacos.
  39.     private String clusterName = "DEFAULT";
  40.     //group name for nacos.
  41.     private String group = "DEFAULT_GROUP";
  42.     //The ip address your want to register for your service instance, needn't to set it if the auto detect ip works well.
  43.     private String ip;
  44.     //The port your want to register for your service instance, needn't to set it if the auto detect port works well.
  45.     private int port = -1;
  46.     //Heart beat interval. Time unit: millisecond.
  47.     private Integer heartBeatInterval;
  48.     //Heart beat timeout. Time unit: millisecond.
  49.     private Integer heartBeatTimeout;
  50.     //If instance is ephemeral.The default value is true.
  51.     private boolean ephemeral = true;
  52.     ...
  53. }
  54. public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {
  55.     ...
  56.     private NacosRegistration registration;
  57.     public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
  58.             AutoServiceRegistrationProperties autoServiceRegistrationProperties,
  59.             NacosRegistration registration) {
  60.         super(serviceRegistry, autoServiceRegistrationProperties);
  61.         this.registration = registration;
  62.     }
  63.     ...
  64. }
  65. public abstract class AbstractAutoServiceRegistration<R extends Registration>
  66.         implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
  67.     ...
  68.     private final ServiceRegistry<R> serviceRegistry;
  69.     private AutoServiceRegistrationProperties properties;
  70.     protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry, AutoServiceRegistrationProperties properties) {
  71.         this.serviceRegistry = serviceRegistry;
  72.         this.properties = properties;
  73.     }
  74.     ...
  75.     @Override
  76.     @SuppressWarnings("deprecation")
  77.     public void onApplicationEvent(WebServerInitializedEvent event) {
  78.         bind(event);
  79.     }
  80.     public void bind(WebServerInitializedEvent event) {
  81.         ApplicationContext context = event.getApplicationContext();
  82.         if (context instanceof ConfigurableWebServerApplicationContext) {
  83.             if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
  84.                 return;
  85.             }
  86.         }
  87.         this.port.compareAndSet(0, event.getWebServer().getPort());
  88.         this.start();
  89.     }
  90.     public void start() {
  91.         if (!isEnabled()) {
  92.             if (logger.isDebugEnabled()) {
  93.                 logger.debug("Discovery Lifecycle disabled. Not starting");
  94.             }
  95.             return;
  96.         }
  97.         //only initialize if nonSecurePort is greater than 0 and it isn't already running
  98.         //because of containerPortInitializer below
  99.         if (!this.running.get()) {
  100.             this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
  101.             //发起注册
  102.             register();
  103.             if (shouldRegisterManagement()) {
  104.                 registerManagement();
  105.             }
  106.             this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
  107.             this.running.compareAndSet(false, true);
  108.         }
  109.     }
  110.     protected void register() {
  111.         //调用创建NacosAutoServiceRegistration时传入的NacosServiceRegistry实例的register()方法
  112.         this.serviceRegistry.register(getRegistration());
  113.     }
  114.     ...
  115. }
  116. public class NacosServiceRegistry implements ServiceRegistry<Registration> {
  117.     private final NacosDiscoveryProperties nacosDiscoveryProperties;
  118.     public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
  119.         this.nacosDiscoveryProperties = nacosDiscoveryProperties;
  120.     }
  121.     @Override
  122.     public void register(Registration registration) {
  123.         if (StringUtils.isEmpty(registration.getServiceId())) {
  124.             log.warn("No service to register for nacos client...");
  125.             return;
  126.         }
  127.         NamingService namingService = namingService();
  128.         String serviceId = registration.getServiceId();
  129.         String group = nacosDiscoveryProperties.getGroup();
  130.         Instance instance = getNacosInstanceFromRegistration(registration);
  131.         try {
  132.             //把当前的服务实例注册到Nacos中
  133.             namingService.registerInstance(serviceId, group, instance);
  134.             log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
  135.         } catch (Exception e) {
  136.             log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
  137.             //rethrow a RuntimeException if the registration is failed.
  138.             //issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
  139.             rethrowRuntimeException(e);
  140.         }
  141.     }
  142.     private NamingService namingService() {
  143.         return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
  144.     }
  145.     ...
  146. }
复制代码
Nacos客户端项目启动时自动触发服务实例注册的流程总结:Spring监听器调用onApplicationEvent()方法 -> bind()方法 -> start()方法 -> register()方法,最后register()方法会调用serviceRegistry属性的register()方法进行注册。
 
整个流程具体来说就是:首先通过spring.factories文件,找到一个注册相关的Configuration配置类,这个配置类里面定义了三个Bean对象。创建第三个Bean对象时,需要第一个、第二个Bean对象作为参数传进去。第一个Bean对象里面就有真正进行服务注册的register()方法,并且第一个Bean对象会赋值给第三个Bean对象中的serviceRegistry属性。在第三个Bean对象的父类会实现Spring的监听器方法。所以在Spring容器启动时会发布监听事件,从而触发执行Nacos注册逻辑。
2.png
(2)Nacos客户端通过什么方式注册服务
项目启动时是通过NacosServiceRegistry的register()方法发起服务注册的,然后会调用NacosNamingService的registerInstance()方法注册服务实例,接着调用NamingProxy的registerService()方法组装参数发起服务注册请求,接着调用NamingProxy的reqApi()方法向Nacos服务端发起服务注册请求,也就是调用NamingProxy的callServer()方法向Nacos服务端发送注册请求。
 
在NamingProxy的callServer()方法中,首先会调用NacosRestTemplate的exchangeForm()方法发起HTTP请求,然后会调用this.requestClient()的execute()方法执行HTTP请求的发送,接着会调用DefaultHttpClientRequest的execute()方法处理请求的发送,也就是通过Apache的CloseableHttpClient组件来处理发送HTTP请求。
 
注意:NacosServiceRegistry是属于nacos-discovery包中的类,NacosNamingService是属于nacos-client包中的类。
  1. public class NacosServiceRegistry implements ServiceRegistry<Registration> {
  2.     private final NacosDiscoveryProperties nacosDiscoveryProperties;
  3.     public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
  4.         this.nacosDiscoveryProperties = nacosDiscoveryProperties;
  5.     }
  6.     @Override
  7.     public void register(Registration registration) {
  8.         if (StringUtils.isEmpty(registration.getServiceId())) {
  9.             log.warn("No service to register for nacos client...");
  10.             return;
  11.         }
  12.         NamingService namingService = namingService();
  13.         //服务名称
  14.         String serviceId = registration.getServiceId();
  15.         //服务分组
  16.         String group = nacosDiscoveryProperties.getGroup();
  17.         //服务实例,包含了IP、Port等信息
  18.         Instance instance = getNacosInstanceFromRegistration(registration);
  19.         try {
  20.             //调用NacosNamingService.registerInstance()方法把当前的服务实例注册到Nacos中
  21.             namingService.registerInstance(serviceId, group, instance);
  22.             log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
  23.         } catch (Exception e) {
  24.             log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
  25.             rethrowRuntimeException(e);
  26.         }
  27.     }
  28.     private NamingService namingService() {
  29.         return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
  30.     }
  31.     private Instance getNacosInstanceFromRegistration(Registration registration) {
  32.         Instance instance = new Instance();
  33.         instance.setIp(registration.getHost());
  34.         instance.setPort(registration.getPort());
  35.         instance.setWeight(nacosDiscoveryProperties.getWeight());
  36.         instance.setClusterName(nacosDiscoveryProperties.getClusterName());
  37.         instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
  38.         instance.setMetadata(registration.getMetadata());
  39.         instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
  40.         return instance;
  41.     }
  42.     ...
  43. }
  44. public class NacosNamingService implements NamingService {
  45.     private BeatReactor beatReactor;
  46.     private NamingProxy serverProxy;
  47.     ...
  48.     @Override
  49.     public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
  50.         NamingUtils.checkInstanceIsLegal(instance);
  51.         //获取分组服务名字
  52.         String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  53.         //判断要注册的服务实例是否是临时实例
  54.         if (instance.isEphemeral()) {
  55.             //如果是临时实例,则构建心跳信息
  56.             BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
  57.             //添加心跳信息
  58.             beatReactor.addBeatInfo(groupedServiceName, beatInfo);
  59.         }
  60.         //接下来调用NamingProxy的注册方法registerService()来注册服务实例
  61.         serverProxy.registerService(groupedServiceName, groupName, instance);
  62.     }
  63.     ...
  64. }
  65. public class NamingProxy implements Closeable {
  66.     private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
  67.     ...
  68.     //register a instance to service with specified instance properties.
  69.     //@param serviceName name of service
  70.     //@param groupName   group of service
  71.     //@param instance    instance to register
  72.     public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  73.         NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance);
  74.         //创建一个Map组装注册请求参数
  75.         final Map<String, String> params = new HashMap<String, String>(16);
  76.         params.put(CommonParams.NAMESPACE_ID, namespaceId);
  77.         params.put(CommonParams.SERVICE_NAME, serviceName);
  78.         params.put(CommonParams.GROUP_NAME, groupName);
  79.         params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
  80.         params.put("ip", instance.getIp());
  81.         params.put("port", String.valueOf(instance.getPort()));
  82.         params.put("weight", String.valueOf(instance.getWeight()));
  83.         params.put("enable", String.valueOf(instance.isEnabled()));
  84.         params.put("healthy", String.valueOf(instance.isHealthy()));
  85.         params.put("ephemeral", String.valueOf(instance.isEphemeral()));
  86.         params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
  87.         //下面UtilAndComs常量类拼装的请求url是: /Nacos/v1/ns/instance
  88.         reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
  89.     }
  90.     public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
  91.         return reqApi(api, params, Collections.EMPTY_MAP, method);
  92.     }
  93.     public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {
  94.         return reqApi(api, params, body, getServerList(), method);
  95.     }
  96.     //Request api.
  97.     public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
  98.         params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
  99.         if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
  100.             throw new NacosException(NacosException.INVALID_PARAM, "no server available");
  101.         }
  102.         NacosException exception = new NacosException();
  103.         if (StringUtils.isNotBlank(nacosDomain)) {
  104.             for (int i = 0; i < maxRetry; i++) {
  105.                 try {
  106.                     return callServer(api, params, body, nacosDomain, method);
  107.                 } catch (NacosException e) {
  108.                     exception = e;
  109.                     if (NAMING_LOGGER.isDebugEnabled()) {
  110.                         NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
  111.                     }
  112.                 }
  113.             }
  114.         } else {
  115.             Random random = new Random(System.currentTimeMillis());
  116.             int index = random.nextInt(servers.size());
  117.             for (int i = 0; i < servers.size(); i++) {
  118.                 String server = servers.get(index);
  119.                 try {
  120.                     return callServer(api, params, body, server, method);
  121.                 } catch (NacosException e) {
  122.                     exception = e;
  123.                     if (NAMING_LOGGER.isDebugEnabled()) {
  124.                         NAMING_LOGGER.debug("request {} failed.", server, e);
  125.                     }
  126.                 }
  127.                 index = (index + 1) % servers.size();
  128.             }
  129.         }
  130.         NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg());
  131.         throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
  132.     }
  133.     //Call server.
  134.     public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {
  135.         long start = System.currentTimeMillis();
  136.         long end = 0;
  137.         injectSecurityInfo(params);
  138.         Header header = builderHeader();
  139.         String url;
  140.         if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
  141.             url = curServer + api;
  142.         } else {
  143.             if (!IPUtil.containsPort(curServer)) {
  144.                 curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
  145.             }
  146.             url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
  147.         }
  148.         try {
  149.             //调用NacosRestTemplate.exchangeForm()方法发起HTTP请求
  150.             HttpRestResult<String> restResult = nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
  151.             end = System.currentTimeMillis();
  152.             MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start);
  153.             if (restResult.ok()) {
  154.                 return restResult.getData();
  155.             }
  156.             if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
  157.                 return StringUtils.EMPTY;
  158.             }
  159.             throw new NacosException(restResult.getCode(), restResult.getMessage());
  160.         } catch (Exception e) {
  161.             NAMING_LOGGER.error("[NA] failed to request", e);
  162.             throw new NacosException(NacosException.SERVER_ERROR, e);
  163.         }
  164.     }
  165.     ...
  166. }
  167. public class NacosRestTemplate extends AbstractNacosRestTemplate {
  168.     private final HttpClientRequest requestClient;
  169.     ...
  170.     //Execute the HTTP method to the given URI template, writing the given request entity to the request, and returns the response as {@link HttpRestResult}.
  171.     public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception {
  172.         RequestHttpEntity requestHttpEntity = new RequestHttpEntity(header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
  173.         return execute(url, httpMethod, requestHttpEntity, responseType);
  174.     }
  175.     private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception {
  176.         URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
  177.         if (logger.isDebugEnabled()) {
  178.             logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
  179.         }
  180.         ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
  181.         HttpClientResponse response = null;
  182.         try {
  183.             response = this.requestClient().execute(uri, httpMethod, requestEntity);
  184.             return responseHandler.handle(response);
  185.         } finally {
  186.             if (response != null) {
  187.                 response.close();
  188.             }
  189.         }
  190.     }
  191.     private HttpClientRequest requestClient() {
  192.         if (CollectionUtils.isNotEmpty(interceptors)) {
  193.             if (logger.isDebugEnabled()) {
  194.                 logger.debug("Execute via interceptors :{}", interceptors);
  195.             }
  196.             return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());
  197.         }
  198.         return requestClient;
  199.     }
  200.     ...
  201. }
  202. public class DefaultHttpClientRequest implements HttpClientRequest {
  203.     private final CloseableHttpClient client;
  204.     public DefaultHttpClientRequest(CloseableHttpClient client) {
  205.         this.client = client;
  206.     }
  207.     @Override
  208.     public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception {
  209.         HttpRequestBase request = build(uri, httpMethod, requestHttpEntity);
  210.         //通过Apache的CloseableHttpClient组件执行HTTP请求
  211.         CloseableHttpResponse response = client.execute(request);
  212.         return new DefaultClientHttpResponse(response);
  213.     }
  214.     ...
  215. }
复制代码
由此可知:Nacos客户端是通过HTTP的方式往Nacos服务端发起服务注册的,Nacos服务端会提供服务注册的API接口给Nacos客户端进行HTTP调用,Nacos官方Open API文档中注册服务实例的接口说明如下:
3.png
(3)Nacos客户端如何发送服务心跳
调用NacosNamingService的registerInstance()方法注册服务实例时,在调用NamingProxy的registerService()方法来注册服务实例之前,会根据注册的服务实例是临时实例来构建和添加心跳信息到beatReactor,也就是调用BeatReactor的buildBeatInfo()方法和addBeatInfo()方法。
 
在BeatReactor的buildBeatInfo()方法中,会通过beatInfo的setPeriod()方法设置心跳间隔时间,默认是5秒。
 
在BeatReactor的addBeatInfo()方法中,倒数第二行会开启一个延时执行的任务,执行的任务是根据心跳信息BeatInfo封装的BeatTask。该BeatTask任务会交给BeatReactor的ScheduledExecutorService来执行,并通过beatInfo的getPeriod()方法获取延时执行的时间为5秒。
 
在BeatTask的run()方法中,就会调用NamingProxy的sendBeat()方法发送心跳请求给Nacos服务端,也就是调用NamingProxy的reqApi()方法向Nacos服务端发起心跳请求。如果返回的心跳响应表明服务实例不存在则重新发起服务实例注册请求。无论心跳响应如何,继续根据心跳信息BeatInfo封装一个BeatTask任务,然后将该任务交给线程池ScheduledExecutorService来延时5秒执行。
  1. public class NacosServiceRegistry implements ServiceRegistry<Registration> {
  2.     private final NacosDiscoveryProperties nacosDiscoveryProperties;
  3.     public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
  4.         this.nacosDiscoveryProperties = nacosDiscoveryProperties;
  5.     }
  6.     @Override
  7.     public void register(Registration registration) {
  8.         if (StringUtils.isEmpty(registration.getServiceId())) {
  9.             log.warn("No service to register for nacos client...");
  10.             return;
  11.         }
  12.         NamingService namingService = namingService();
  13.         //服务名称
  14.         String serviceId = registration.getServiceId();
  15.         //服务分组
  16.         String group = nacosDiscoveryProperties.getGroup();
  17.         //服务实例,包含了IP、Port等信息
  18.         Instance instance = getNacosInstanceFromRegistration(registration);
  19.         try {
  20.             //调用NacosNamingService.registerInstance()方法把当前的服务实例注册到Nacos中
  21.             namingService.registerInstance(serviceId, group, instance);
  22.             log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
  23.         } catch (Exception e) {
  24.             log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
  25.             rethrowRuntimeException(e);
  26.         }
  27.     }
  28.     private NamingService namingService() {
  29.         return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
  30.     }
  31.     private Instance getNacosInstanceFromRegistration(Registration registration) {
  32.         Instance instance = new Instance();
  33.         instance.setIp(registration.getHost());
  34.         instance.setPort(registration.getPort());
  35.         instance.setWeight(nacosDiscoveryProperties.getWeight());
  36.         instance.setClusterName(nacosDiscoveryProperties.getClusterName());
  37.         instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
  38.         instance.setMetadata(registration.getMetadata());
  39.         instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
  40.         return instance;
  41.     }
  42.     ...
  43. }
  44. public class NacosNamingService implements NamingService {
  45.     private BeatReactor beatReactor;
  46.     private NamingProxy serverProxy;
  47.     ...
  48.     @Override
  49.     public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
  50.         NamingUtils.checkInstanceIsLegal(instance);
  51.         //获取分组服务名字
  52.         String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  53.         //判定要注册的服务实例是否是临时实例
  54.         if (instance.isEphemeral()) {
  55.             //如果是临时实例,则构建心跳信息
  56.             BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
  57.             //添加心跳信息
  58.             beatReactor.addBeatInfo(groupedServiceName, beatInfo);
  59.         }
  60.         //接下来调用NamingProxy的注册方法registerService()来注册服务实例
  61.         serverProxy.registerService(groupedServiceName, groupName, instance);
  62.     }
  63.     ...
  64. }
  65. public class BeatReactor implements Closeable {
  66.     ...
  67.     //Build new beat information.
  68.     public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
  69.         BeatInfo beatInfo = new BeatInfo();
  70.         beatInfo.setServiceName(groupedServiceName);
  71.         beatInfo.setIp(instance.getIp());
  72.         beatInfo.setPort(instance.getPort());
  73.         beatInfo.setCluster(instance.getClusterName());
  74.         beatInfo.setWeight(instance.getWeight());
  75.         beatInfo.setMetadata(instance.getMetadata());
  76.         beatInfo.setScheduled(false);
  77.         //getInstanceHeartBeatInterval()的返回值是5000
  78.         beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
  79.         return beatInfo;
  80.     }
  81.     ...
  82. }
  83. @JsonInclude(Include.NON_NULL)
  84. public class Instance implements Serializable {
  85.     ...
  86.     public long getInstanceHeartBeatInterval() {
  87.         //Constants.DEFAULT_HEART_BEAT_INTERVAL,默认是5000
  88.         return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL);
  89.     }
  90.     ...
  91. }
  92. public class BeatReactor implements Closeable {
  93.     private final ScheduledExecutorService executorService;
  94.     private final NamingProxy serverProxy;
  95.     public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
  96.     public BeatReactor(NamingProxy serverProxy) {
  97.         this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
  98.     }
  99.     public BeatReactor(NamingProxy serverProxy, int threadCount) {
  100.         this.serverProxy = serverProxy;
  101.         this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
  102.             @Override
  103.             public Thread newThread(Runnable r) {
  104.                 Thread thread = new Thread(r);
  105.                 thread.setDaemon(true);
  106.                 thread.setName("com.alibaba.nacos.naming.beat.sender");
  107.                 return thread;
  108.             }
  109.         });
  110.     }
  111.     ...
  112.     //Add beat information.
  113.     public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
  114.         NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
  115.         String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
  116.         BeatInfo existBeat = null;
  117.         if ((existBeat = dom2Beat.remove(key)) != null) {
  118.             existBeat.setStopped(true);
  119.         }
  120.         dom2Beat.put(key, beatInfo);
  121.         //开启一个延时执行的任务,执行的任务是BeatTask
  122.         executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
  123.         MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
  124.     }
  125.     ...
  126.     class BeatTask implements Runnable {
  127.         BeatInfo beatInfo;
  128.       
  129.         public BeatTask(BeatInfo beatInfo) {
  130.             this.beatInfo = beatInfo;
  131.         }
  132.       
  133.         @Override
  134.         public void run() {
  135.             //判断是否需要停止
  136.             if (beatInfo.isStopped()) {
  137.                 return;
  138.             }
  139.             //获取下一次执行的时间,同样还是5s
  140.             long nextTime = beatInfo.getPeriod();
  141.             try {
  142.                 //调用NamingProxy.sendBeat()方法发送心跳请求给Nacos服务端
  143.                 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
  144.                 long interval = result.get("clientBeatInterval").asLong();
  145.                 boolean lightBeatEnabled = false;
  146.                 if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
  147.                     lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
  148.                 }
  149.                 BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
  150.                 if (interval > 0) {
  151.                     nextTime = interval;
  152.                 }
  153.                 //获取Nacos服务端返回的code状态码
  154.                 int code = NamingResponseCode.OK;
  155.                 if (result.has(CommonParams.CODE)) {
  156.                     code = result.get(CommonParams.CODE).asInt();
  157.                 }
  158.                 //如果code = RESOURCE_NOT_FOUND,没有找到资源,那么表示之前注册的信息,已经被Nacos服务端移除了
  159.                 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
  160.                     //然后重新组装参数,重新发起注册请求
  161.                     Instance instance = new Instance();
  162.                     instance.setPort(beatInfo.getPort());
  163.                     instance.setIp(beatInfo.getIp());
  164.                     instance.setWeight(beatInfo.getWeight());
  165.                     instance.setMetadata(beatInfo.getMetadata());
  166.                     instance.setClusterName(beatInfo.getCluster());
  167.                     instance.setServiceName(beatInfo.getServiceName());
  168.                     instance.setInstanceId(instance.getInstanceId());
  169.                     instance.setEphemeral(true);
  170.                     try {
  171.                         //调用NamingProxy.registerService()方法发送服务实例注册请求到Nacos服务端
  172.                         serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
  173.                     } catch (Exception ignore) {
  174.                     }
  175.                 }
  176.             } catch (NacosException ex) {
  177.                 NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
  178.             }
  179.             //把beatInfo又重新放入延迟任务当中,并且还是5秒,所以一直是个循环的状态
  180.             executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
  181.         }
  182.     }
  183. }
  184. public class NamingProxy implements Closeable {
  185.     ...
  186.     //Send beat.
  187.     public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {   
  188.         if (NAMING_LOGGER.isDebugEnabled()) {
  189.             NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
  190.         }
  191.         Map<String, String> params = new HashMap<String, String>(8);
  192.         Map<String, String> bodyMap = new HashMap<String, String>(2);
  193.         if (!lightBeatEnabled) {
  194.             bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
  195.         }
  196.         params.put(CommonParams.NAMESPACE_ID, namespaceId);
  197.         params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
  198.         params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
  199.         params.put("ip", beatInfo.getIp());
  200.         params.put("port", String.valueOf(beatInfo.getPort()));
  201.         String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
  202.         return JacksonUtils.toObj(result);
  203.     }
  204.     ...
  205. }
复制代码
由此可见,在客户端在发起服务注册期间,会开启一个心跳健康检查的延时任务,这个任务每间隔5s执行一次。任务内容就是通过HTTP请求调用发送Nacos提供的服务实例心跳接口。Nacos官方Open API文档中服务实例心跳接口说明如下:
4.png
如下是客户端发起服务注册 + 发送服务心跳的整个流程图:
5.png
 
 
2.服务端如何处理客户端的服务注册请求
(1)客户端自动发送服务注册请求梳理
(2)Nacos服务端处理服务请求的代码入口
(3)Nacos服务端处理服务注册请求的源码分析
(4)服务端接收到服务实例注册请求后的处理总结
 
(1)客户端自动发送服务注册请求梳理
首先,从spring-cloud-starter-alibaba-nacos-discovery中,发现在spring.factories文件定义了很多Configuration配置类,其中就包括了NacosServiceRegistryAutoConfiguration配置类。这个配置类会创建三个Bean对象,其中有个Bean对象便实现了一个监听事件方法。
 
然后,Spring容器启动时,会发布一个事件。这个事件会被名为NacosAutoServiceRegistration的Bean对象监听到,从而自动发起Nacos服务注册。在注册时会开启心跳健康延时任务,每隔5s执行一次。不管是服务注册还是心跳检查,都是通过HTTP方式调用Nacos服务端。
 
客户端向服务端发起服务注册请求是通过HTTP接口"/nacos/v1/ns/instance"来实现的,客户端向服务端发起心跳请求是通过HTTP接口"/nacos/v1/ns/instance/beat"来实现的。
 
(2)Nacos服务端处理服务注册请求的代码入口
Nacos服务端有一个叫nacos-naming的模块,这个nacos-naming模块其实就是一个Spring Boot项目,模块中的controllers包则是用来处理服务相关的HTTP请求。
6.png
由于服务端处理服务注册请求的地址是"/nacos/v1/ns/instance",所以对服务实例进行处理的入口是controllers包下的InstanceController。InstanceController的代码很好地遵守了Restful风格,其中的regsiter()方法注册新服务实例对应@PostMapping、deregister()方法注销服务实例对应@DeleteMapping、update()方法修改服务实例对应@PutMapping。虽然都可以使用@PostMapping,但Nacos就严格按照了Restful标准。
  1. @RestController
  2. @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
  3. public class InstanceController {
  4.     ...
  5.     //Register new instance.
  6.     @CanDistro
  7.     @PostMapping
  8.     @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
  9.     public String register(HttpServletRequest request) throws Exception {
  10.         ...
  11.     }
  12.     //Deregister instances.
  13.     @CanDistro
  14.     @DeleteMapping
  15.     @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
  16.     public String deregister(HttpServletRequest request) throws Exception {
  17.         ...
  18.     }
  19.     //Update instance.
  20.     @CanDistro
  21.     @PutMapping
  22.     @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
  23.     public String update(HttpServletRequest request) throws Exception {
  24.         ...
  25.     }
  26.     ...
  27. }
  28. public class UtilsAndCommons {
  29.     // ********************** Nacos HTTP Context ************************ \\
  30.     public static final String NACOS_SERVER_CONTEXT = "/nacos";
  31.     public static final String NACOS_SERVER_VERSION = "/v1";
  32.     public static final String DEFAULT_NACOS_NAMING_CONTEXT = NACOS_SERVER_VERSION + "/ns";
  33.     public static final String NACOS_NAMING_CONTEXT = DEFAULT_NACOS_NAMING_CONTEXT;
  34.     ...
  35. }
复制代码
(3)Nacos服务端处理服务注册请求的源码分析
对于Nacos客户端的服务实例注册请求,会由InstanceController的register()方法进行处理。该方法首先会从请求参数中获取Instance服务实例,然后调用ServiceManager的registerInstance()方法来进行服务实例注册。ServiceManager是Nacos的服务管理者,拥有所有的服务列表,可以通过它来管理所有服务的注册、销毁、修改等。
 
在ServiceManager的registerInstance()方法中:首先会通过调用ServiceManager的createEmptyService()方法创建一个空服务,然后通过ServiceManager的addInstance()方法添加注册请求中的服务实例。
 
在ServiceManager的addInstance()方法中:首先构建出要注册的服务实例对应的服务的key,然后使用synchronized锁住要注册的服务实例对应的服务,接着获取要注册的服务实例对应的服务的最新服务实例列表,最后执行DelegateConsistencyServiceImpl的put()方法更新服务实例列表。
[code]@RestController@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")public class InstanceController {    @Autowired    private ServiceManager serviceManager;    ...    //Register new instance.    @CanDistro    @PostMapping    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)    public String register(HttpServletRequest request) throws Exception {        //从request中获取命名空间、服务名称        final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);        NamingUtils.checkServiceNameFormat(serviceName);        //从request中获取Instance服务实例        final Instance instance = parseInstance(request);        //调用ServiceManager的注册实例方法        serviceManager.registerInstance(namespaceId, serviceName, instance);        return "ok";    }    ...}//服务管理者,拥有所有的服务列表,用于管理所有服务的注册、销毁、修改等@Componentpublic class ServiceManager implements RecordListener {    //注册表,Map(namespace, Map(group::serviceName, Service)).    private final Map serviceMap = new ConcurrentHashMap();    @Resource(name = "consistencyDelegate")    private ConsistencyService consistencyService;    private final Object putServiceLock = new Object();    ...    //Register an instance to a service in AP mode.    //This method creates service or cluster silently if they don't exist.    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {        //1.创建一个空的服务        createEmptyService(namespaceId, serviceName, instance.isEphemeral());        //2.根据命名空间ID、服务名获取一个服务,如果获取结果为null则抛异常        Service service = getService(namespaceId, serviceName);        if (service == null) {            throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);        }        //3.添加服务实例        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);    }    ...    //1.创建一个空服务    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {        createServiceIfAbsent(namespaceId, serviceName, local, null);    }    //Create service if not exist.    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {        Service service = getService(namespaceId, serviceName);        if (service == null) {            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);            service = new Service();            service.setName(serviceName);            service.setNamespaceId(namespaceId);            service.setGroupName(NamingUtils.getGroupName(serviceName));            //now validate the service. if failed, exception will be thrown            service.setLastModifiedMillis(System.currentTimeMillis());            service.recalculateChecksum();            if (cluster != null) {                cluster.setService(service);                service.getClusterMap().put(cluster.getName(), cluster);            }            service.validate();            putServiceAndInit(service);            if (!local) {                addOrReplaceService(service);            }        }    }    private void putServiceAndInit(Service service) throws NacosException {        //把Service放入注册表serviceMap中        putService(service);        service.init();        //把Service作为监听器添加到consistencyService的listeners中        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());    }    //Put service into manager.    public void putService(Service service) {        if (!serviceMap.containsKey(service.getNamespaceId())) {            synchronized (putServiceLock) {                if (!serviceMap.containsKey(service.getNamespaceId())) {                    serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap());                }            }        }        serviceMap.get(service.getNamespaceId()).put(service.getName(), service);    }    public void addOrReplaceService(Service service) throws NacosException {        consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);    }    ...    //2.根据命名空间ID、服务名获取一个服务    public Service getService(String namespaceId, String serviceName) {        if (serviceMap.get(namespaceId) == null) {            return null;        }        return chooseServiceMap(namespaceId).get(serviceName);    }    public Map chooseServiceMap(String namespaceId) {        return serviceMap.get(namespaceId);    }    ...    //3.添加服务实例    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {        //构建要注册的服务实例对应的服务的key        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);        //根据命名空间以及服务名获取要注册的服务实例对应的服务        Service service = getService(namespaceId, serviceName);        //使用synchronized锁住要注册的服务实例对应的服务        synchronized (service) {            //由于一个服务可能存在多个服务实例,所以需要根据当前注册请求的服务实例ips,获取对应服务的最新服务实例列表            List instanceList = addIpAddresses(service, ephemeral, ips);            //Instances实现了用于在Nacos集群进行网络传输的Record接口            Instances instances = new Instances();            instances.setInstanceList(instanceList);            //执行DelegateConsistencyServiceImpl的put()方法            consistencyService.put(key, instances);        }    }    private List addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {        //更新对应服务的服务实例列表        return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);    }    //Compare and get new instance list.    public List updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {        //先获取已经注册到Nacos的、当前要注册的服务实例对应的服务的、所有服务实例        Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));        List currentIPs = service.allIPs(ephemeral);        Map currentInstances = new HashMap(currentIPs.size());        Set currentInstanceIds = Sets.newHashSet();        for (Instance instance : currentIPs) {            //把instance实例的IP当作key,instance实例当作value,放入currentInstances            currentInstances.put(instance.toIpAddr(), instance);            //把实例唯一编码添加到currentInstanceIds中            currentInstanceIds.add(instance.getInstanceId());        }        //用来存放当前要注册的服务实例对应的服务的、所有服务实例        Map instanceMap;        if (datum != null && null != datum.value) {            instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);        } else {            instanceMap = new HashMap(ips.length);        }        for (Instance instance : ips) {            if (!service.getClusterMap().containsKey(instance.getClusterName())) {                Cluster cluster = new Cluster(instance.getClusterName(), service);                cluster.init();                service.getClusterMap().put(instance.getClusterName(), cluster);                Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());            }            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {                instanceMap.remove(instance.getDatumKey());            } else {                Instance oldInstance = instanceMap.get(instance.getDatumKey());                if (oldInstance != null) {                    instance.setInstanceId(oldInstance.getInstanceId());                } else {                    instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));                }                //instanceMap的key与IP和端口有关                instanceMap.put(instance.getDatumKey(), instance);            }        }        if (instanceMap.size()

相关推荐

您需要登录后才可以回帖 登录 | 立即注册