大纲
5.服务发现—服务之间的调用请求链路分析
6.服务端如何维护不健康的微服务实例
7.服务下线时涉及的处理
8.服务注册发现总结
5.服务发现—服务之间的调用请求链路分析
(1)微服务通过Nacos完成服务调用的请求流程
(2)Nacos客户端进行服务发现的源码
(3)Nacos服务端进行服务查询的源码
(4)总结
(1)微服务通过Nacos完成服务调用的请求流程
按照Nacos使用简介里的案例:订单服务和库存服务完成Nacos注册后,会通过Feign来完成服务间的调用。如下图示:
步骤一:首先每个客户端都会有一个微服务本地缓存列表,这个缓存列表会定时从注册中心获取最新的列表来更新本地缓存。
步骤二:然后当order-service需要调用stock-service时,order-service会先根据服务名称去本地缓存列表中找对应的微服务实例。但通过服务名称可能会找到多个,所以需要负载均衡选择其中一个。
步骤三:最后把服务名称更换为IP + Port,通过Feign发起HTTP调用获取返回结果。
(2)Nacos客户端进行服务发现的源码
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
三.nacos-client如何进行服务发现
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。由于nacos-discovery整合了Ribbon,所以Ribbon可以调用Nacos服务端的服务实例查询列表接口。于是Nacos客户端便借助Ribbon实现了服务调用时的负载均衡,也就是Ribbon会从服务实例列表中选择一个服务实例给客户端进行服务调用。
在nacos-discovery的pom.xml中,可以看到它引入了Ribbon依赖:
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
在Ribbon中会有一个ServerList接口,如下所示:这就是一个扩展接口,这个接口的作用就是获取Server列表。然后nacos-discovery会针对这个接口进行实现,从而整合Ribbon。从引入的包来看:loadbalancer是属于Ribbon源码包下的,而LoadBalancer则是Ribbon中的负载均衡器。负载均衡器会结合IRule负载均衡策略,从服务实例列表中选择一个实例。- package com.netflix.loadbalancer;
- import java.util.List;
- //Interface that defines the methods sed to obtain the List of Servers
- public interface ServerList<T extends Server> {
- public List<T> getInitialListOfServers();
- //Return updated list of servers. This is called say every 30 secs
- public List<T> getUpdatedListOfServers();
- }
复制代码 当Nacos客户端进行微服务调用时,会通过Ribbon来选出一个微服务实例。也就是Ribbon会通过调用NacosServerList的getUpdatedListOfServers()方法选出一个微服务实例。
nacos-discovery的NacosServerList类继承了AbstractServerList类,而且实现了Ribbon的ServerList接口的两个方法,如下所示:- public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
- ...
- ...
- }
- public class NacosServerList extends AbstractServerList<NacosServer> {
- private NacosDiscoveryProperties discoveryProperties;
- private String serviceId;
- public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
- this.discoveryProperties = discoveryProperties;
- }
- @Override
- public List<NacosServer> getInitialListOfServers() {
- return getServers();
- }
- @Override
- public List<NacosServer> getUpdatedListOfServers() {
- return getServers();
- }
- private List<NacosServer> getServers() {
- try {
- //读取分组
- String group = discoveryProperties.getGroup();
- //通过服务名称、分组、true(表示只需要健康实例),
- //调用NacosNamingService.selectInstances()方法来查询服务实例列表
- List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
- //把Instance转换成NacosServer类型
- return instancesToServerList(instances);
- } catch (Exception e) {
- throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);
- }
- }
- private List<NacosServer> instancesToServerList(List<Instance> instances) {
- List<NacosServer> result = new ArrayList<>();
- if (CollectionUtils.isEmpty(instances)) {
- return result;
- }
- for (Instance instance : instances) {
- result.add(new NacosServer(instance));
- }
- return result;
- }
- public String getServiceId() {
- return serviceId;
- }
- @Override
- public void initWithNiwsConfig(IClientConfig iClientConfig) {
- this.serviceId = iClientConfig.getClientName();
- }
- }
复制代码 NacosServerList的核心方法是NacosServerList的getServers()方法,因为nacos-discovery实现Ribbon的两个接口都调用到了该方法。
在nacos-discovery的NacosServerList的getServers()方法中,会调用nacos-client的NacosNamingService的selectInstances()方法,来获取服务实例列表。
三.nacos-client如何进行服务发现
在nacos-client的NacosNamingService的selectInstances()方法中:首先会调用HostReactor的getServiceInfo()方法获取服务实例列表,然后调用HostReactor的getServiceInfo0()方法尝试从本地缓存获取,接着调用HostReactor的updateServiceNow()方法查询并更新缓存,也就是调用HostReactor的updateService()方法查询并更新缓存。即先调用NamingProxy的queryList()方法来查询服务端的服务实例列表,再调用HostReactor的processServiceJson()方法更新本地缓存。最后调用HostReactor的scheduleUpdateIfAbsent()方法提交同步缓存任务。
所以nacos-client的HostReactor的getServiceInfo()方法是服务发现的核心,它会先到本地缓存中去查询对应的服务实例列表。如果本地缓存查不到对应的服务数据,则到服务端去查询服务实例列表。当获取完服务实例列表后,会向调度线程池提交一个延迟执行的任务,在延迟任务中会执行UpdateTask任务的run()方法。
UpdateTask任务的run()方法:会调用updateService()方法查询服务实例列表并更新本地缓存。当该任务执行完毕时,会继续向调度线程池提交一个延迟执行的任务,从而实现不断重复地更新本地缓存的服务实例列表。- public class NacosNamingService implements NamingService {
- private HostReactor hostReactor;
- ...
- @Override
- public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
- return selectInstances(serviceName, groupName, healthy, true);
- }
- @Override
- public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
- return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
- }
- @Override
- public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
- ServiceInfo serviceInfo;
- //这个参数传入默认就是true
- if (subscribe) {
- serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
- } else {
- serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
- }
- return selectInstances(serviceInfo, healthy);
- }
- ...
- }
- public class HostReactor implements Closeable {
- //服务实例列表的本地缓存
- private final Map<String, ServiceInfo> serviceInfoMap;
- private final Map<String, Object> updatingMap;
- private final NamingProxy serverProxy;
- private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
- private final ScheduledExecutorService executor;
- ...
- public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
- NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
- String key = ServiceInfo.getKey(serviceName, clusters);
- if (failoverReactor.isFailoverSwitch()) {
- return failoverReactor.getService(key);
- }
- //先查询本地缓存中的服务实例列表
- ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
- //如果本地缓存实例列表为空
- if (null == serviceObj) {
- serviceObj = new ServiceInfo(serviceName, clusters);
- serviceInfoMap.put(serviceObj.getKey(), serviceObj);
- updatingMap.put(serviceName, new Object());
- //调用Nacos服务端的服务实例列表查询接口,立即更新Service数据
- updateServiceNow(serviceName, clusters);
- updatingMap.remove(serviceName);
- } else if (updatingMap.containsKey(serviceName)) {
- if (UPDATE_HOLD_INTERVAL > 0) {
- //hold a moment waiting for update finish
- synchronized (serviceObj) {
- try {
- serviceObj.wait(UPDATE_HOLD_INTERVAL);
- } catch (InterruptedException e) {
- NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
- }
- }
- }
- }
- //开启定时任务,维护本地缓存
- scheduleUpdateIfAbsent(serviceName, clusters);
- //最后从本地缓存中,获取服务实例列表数据
- return serviceInfoMap.get(serviceObj.getKey());
- }
- private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
- String key = ServiceInfo.getKey(serviceName, clusters);
- //从本地缓存中获取服务实例列表
- return serviceInfoMap.get(key);
- }
- private void updateServiceNow(String serviceName, String clusters) {
- try {
- updateService(serviceName, clusters);
- } catch (NacosException e) {
- NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
- }
- }
- //Update service now.
- public void updateService(String serviceName, String clusters) throws NacosException {
- ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
- try {
- //调用Nacos服务端的服务实例查询接口
- String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
- //如果结果不为空,则更新本地缓存
- if (StringUtils.isNotEmpty(result)) {
- //更新本地缓存
- processServiceJson(result);
- }
- } finally {
- if (oldService != null) {
- synchronized (oldService) {
- oldService.notifyAll();
- }
- }
- }
- }
- ...
- public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
- if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
- return;
- }
- synchronized (futureMap) {
- if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
- return;
- }
- //向调度线程池提交一个延迟执行的任务
- ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
- futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
- }
- }
- public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
- //向调度线程池提交一个延迟执行的任务
- return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
- }
- public class UpdateTask implements Runnable {
- long lastRefTime = Long.MAX_VALUE;
- private final String clusters;
- private final String serviceName;
- private int failCount = 0;
- public UpdateTask(String serviceName, String clusters) {
- this.serviceName = serviceName;
- this.clusters = clusters;
- }
- private void incFailCount() {
- int limit = 6;
- if (failCount == limit) {
- return;
- }
- failCount++;
- }
- private void resetFailCount() {
- failCount = 0;
- }
- @Override
- public void run() {
- long delayTime = DEFAULT_DELAY;
- try {
- ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
- //如果本地缓存为空
- if (serviceObj == null) {
- updateService(serviceName, clusters);
- return;
- }
- //lastRefTime是最大的Long型
- if (serviceObj.getLastRefTime() <= lastRefTime) {
- updateService(serviceName, clusters);
- serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
- } else {
- refreshOnly(serviceName, clusters);
- }
- lastRefTime = serviceObj.getLastRefTime();
- if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
- NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
- return;
- }
- if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
- incFailCount();
- return;
- }
- delayTime = serviceObj.getCacheMillis();
- resetFailCount();
- } catch (Throwable e) {
- incFailCount();
- NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
- } finally {
- //向调度线程池继续提交一个延迟执行的任务继续同步本地缓存
- executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
- }
- }
- }
- }
- public class NamingProxy implements Closeable {
- ...
- //向Nacos服务端发起HTTP形式的服务实例列表查询请求
- public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
- final Map<String, String> params = new HashMap<String, String>(8);
- params.put(CommonParams.NAMESPACE_ID, namespaceId);
- params.put(CommonParams.SERVICE_NAME, serviceName);
- params.put("clusters", clusters);
- params.put("udpPort", String.valueOf(udpPort));
- params.put("clientIP", NetUtils.localIP());
- params.put("healthyOnly", String.valueOf(healthyOnly));
- //通过HTTP的方式,请求"/nacos/v1/ns/instance/list"接口
- return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
- }
- ...
- }
复制代码 二.服务端处理心跳请求的源码
服务端的InstanceController的beat()方法,会处理客户端发来的心跳请求。首先会尝试从ServiceManager的注册表中获取对应的Instance实例对象。如果在内存注册表中找不到对应的Instance实例对象,则直接调用ServiceManager的registerInstance()方法进行服务注册。
如果在内存注册表中可以找到对应的Instance实例对象,那么就从ServiceManager的注册表中取出对应的Service服务对象,这样后续对Service的Cluster的Instance进行修改时,就会修改到注册表数据。接着执行Service的processClientBeat()方法,该方法会提交一个异步任务ClientBeatProcessor给线程池,其中线程池的线程数是可用线程数的一半。
在ClientBeatProcessor的run()方法中:会先通过集群名找到所有的临时实例列表。然后通过for循环对这些临时实例进行IP + Port判断,找出对应的Instance实例对象。找出对应的Instance后,接着就会把Instance的lastBeat属性修改成当前时间,然后再判断当前Instance的状态是否不健康,若是则重新标记成健康状态。
[code]//Instance operation controller.@RestController@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")public class InstanceController { ... //Create a beat for instance. @CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); //获取请求参数、namespaceId、serviceName RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); //通过命令空间、服务名等信息,从ServiceManager内存注册表中获取instance实例对象 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); //如果获取实例为空,则会重新调用服务注册的方法ServiceManager.registerInstance() if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); //重新注册服务实例 serviceManager.registerInstance(namespaceId, serviceName, instance); } //从ServiceManager内存注册表中获取服务Service,后续对Service中的Cluster的Instance修改,便会修改到注册表 Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } //提交客户端服务实例的心跳健康检查任务,更改lastBeat属性 service.processClientBeat(clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; } ...}@JsonInclude(Include.NON_NULL)public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener { ... public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); //立即执行 HealthCheckReactor.scheduleNow(clientBeatProcessor); } ...}//Health check reactor.@SuppressWarnings(" MD.ThreadPoolCreationRule")public class HealthCheckReactor { ... //Schedule client beat check task without a delay. public static ScheduledFuture scheduleNow(Runnable task) { //提交任务到线程池立即执行 return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS); } ...}public class GlobalExecutor { public static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() |