找回密码
 立即注册
首页 业界区 业界 Nacos源码—9.Nacos升级gRPC分析四

Nacos源码—9.Nacos升级gRPC分析四

闵雇 2025-6-2 23:36:32
大纲
10.gRPC客户端初始化分析
11.gRPC客户端的心跳机制(健康检查)
12.gRPC服务端如何处理客户端的建立连接请求
13.gRPC服务端如何映射各种请求与对应的Handler处理类
14.gRPC简单介绍
 
10.gRPC客户端初始化分析
(1)gRPC客户端代理初始化的源码
(2)gRPC客户端启动的源码
(3)gRPC客户端发起与服务端建立连接请求的源码
 
(1)gRPC客户端代理初始化的源码
Nacos客户端注册服务实例时会调用NacosNamingService的registerInstance()方法,接着会调用NamingClientProxyDelegate的registerService()方法,然后判断注册的服务实例是不是临时的。如果注册的服务实例是临时的,那么就使用gRPC客户端代理去进行注册。如果注册的服务实例不是临时的,那么就使用HTTP客户端代理去进行注册。
 
NacosNamingService的init()方法在创建客户端代理,也就是执行NamingClientProxyDelegate的构造方法时,便会创建和初始化gRPC客户端代理NamingGrpcClientProxy。
 
创建和初始化gRPC客户端代理NamingGrpcClientProxy时,首先会由RpcClientFactory的createClient()方法创建一个RpcClient对象,并将GrpcClient对象赋值给NamingGrpcClientProxy的rpcClient属性,然后调用NamingGrpcClientProxy的start()方法启动RPC客户端连接。
 
在NamingGrpcClientProxy的start()方法中,会先注册一个用于处理服务端推送请求的NamingPushRequestHandler,然后调用RpcClient的start()方法启动RPC客户端即RpcClient对象,最后将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册。
  1. public class NacosNamingService implements NamingService {
  2.     ...
  3.     private NamingClientProxy clientProxy;
  4.    
  5.     private void init(Properties properties) throws NacosException {
  6.         ...
  7.         this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
  8.     }
  9.     ...
  10.     @Override
  11.     public void registerInstance(String serviceName, Instance instance) throws NacosException {
  12.         registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
  13.     }
  14.    
  15.     @Override
  16.     public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
  17.         NamingUtils.checkInstanceIsLegal(instance);
  18.         //调用NamingClientProxy的注册方法registerService(),其实就是NamingClientProxyDelegate.registerService()方法
  19.         clientProxy.registerService(serviceName, groupName, instance);
  20.     }
  21.     ...
  22. }
  23. //客户端代理
  24. public class NamingClientProxyDelegate implements NamingClientProxy {
  25.     private final NamingHttpClientProxy httpClientProxy;
  26.     private final NamingGrpcClientProxy grpcClientProxy;
  27.    
  28.     public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {
  29.         ...
  30.         //初始化HTTP客户端代理
  31.         this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
  32.         //初始化gRPC客户端代理
  33.         this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
  34.     }
  35.     ...
  36.    
  37.     @Override
  38.     public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  39.         getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
  40.     }
  41.    
  42.     private NamingClientProxy getExecuteClientProxy(Instance instance) {
  43.         return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
  44.     }
  45.     ...
  46. }
  47. //gRPC客户端代理
  48. public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
  49.     private final String namespaceId;
  50.     private final String uuid;   
  51.     private final Long requestTimeout;   
  52.     private final RpcClient rpcClient;
  53.     private final NamingGrpcRedoService redoService;
  54.    
  55.     //初始化gRPC客户端代理
  56.     public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
  57.         super(securityProxy);
  58.         this.namespaceId = namespaceId;
  59.         this.uuid = UUID.randomUUID().toString();
  60.         this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
  61.         Map<String, String> labels = new HashMap<String, String>();
  62.         labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
  63.         labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
  64.         //1.通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性
  65.         this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
  66.         this.redoService = new NamingGrpcRedoService(this);
  67.         //2.启动gRPC客户端代理NamingGrpcClientProxy
  68.         start(serverListFactory, serviceInfoHolder);
  69.     }
  70.    
  71.     private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
  72.         rpcClient.serverListFactory(serverListFactory);
  73.         //注册连接监听器
  74.         rpcClient.registerConnectionListener(redoService);
  75.         //1.注册一个用于处理服务端推送请求的NamingPushRequestHandler
  76.         rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
  77.         //2.启动RPC客户端RpcClient
  78.         rpcClient.start();
  79.         //3.将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册
  80.         NotifyCenter.registerSubscriber(this);
  81.     }
  82.     ...
  83.    
  84.     @Override
  85.     public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  86.         NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance);
  87.         redoService.cacheInstanceForRedo(serviceName, groupName, instance);
  88.         //执行服务实例的注册
  89.         doRegisterService(serviceName, groupName, instance);
  90.     }
  91.    
  92.     //Execute register operation.
  93.     public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
  94.         //创建请求参数对象
  95.         InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance);
  96.         //向服务端发起请求
  97.         requestToServer(request, Response.class);
  98.         redoService.instanceRegistered(serviceName, groupName);
  99.     }
  100.    
  101.     private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {
  102.         try {
  103.             request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
  104.             //实际会调用RpcClient.request()方法发起gRPC请求
  105.             Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
  106.             if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
  107.                 throw new NacosException(response.getErrorCode(), response.getMessage());
  108.             }
  109.             if (responseClass.isAssignableFrom(response.getClass())) {
  110.                 return (T) response;
  111.             }
  112.             NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());
  113.         } catch (Exception e) {
  114.             throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
  115.         }
  116.         throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
  117.     }
  118.     ...
  119. }
  120. public class RpcClientFactory {
  121.     private static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>();
  122.     ...
  123.    
  124.     //create a rpc client.
  125.     public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
  126.         if (!ConnectionType.GRPC.equals(connectionType)) {
  127.             throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
  128.         }
  129.         return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
  130.             LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
  131.             try {
  132.                 //创建GrpcClient对象
  133.                 GrpcClient client = new GrpcSdkClient(clientNameInner);
  134.                 //设置线程核心数和最大数
  135.                 client.setThreadPoolCoreSize(threadPoolCoreSize);
  136.                 client.setThreadPoolMaxSize(threadPoolMaxSize);
  137.                 client.labels(labels);
  138.                 return client;
  139.             } catch (Throwable throwable) {
  140.                 LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
  141.                 throw throwable;
  142.             }
  143.         });
  144.     }
  145.     ...
  146. }
复制代码
(2)gRPC客户端启动的源码
NamingGrpcClientProxy的start()方法会通过调用RpcClient的start()方法,来启动RPC客户端即RpcClient对象。
 
在RpcClient的start()方法中,首先会利用CAS来修改RPC客户端(RpcClient)的状态,也就是将RpcClient.rpcClientStatus属性从INITIALIZED更新为STARTING。
 
然后会创建一个核心线程数为2的线程池,并提交两个任务。任务一是处理连接成功或连接断开时的线程,任务二是处理重连或健康检查的线程。
 
接着会创建Connection连接对象,也就是在while循环中调用GrpcClient的connectToServer()方法,尝试与服务端建立连接。如果连接失败,则会抛出异常并且进行重试,由于是同步连接,所以最大重试次数是3。
 
最后当客户端与服务端成功建立连接后,会把对应的Connection连接对象赋值给RpcClient.currentConnection属性,并且修改RpcClient.rpcClientStatus属性即RPC客户端状态为RUNNING。
 
如果客户端与服务端连接失败,则会通过异步尝试进行连接,也就是调用RpcClient的switchServerAsync()方法,往RpcClient的reconnectionSignal队列中放入一个ReconnectContext对象,reconnectionSignal队列中的元素会交给任务2来处理。
  1. public abstract class RpcClient implements Closeable {
  2.     protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);
  3.     protected ScheduledExecutorService clientEventExecutor;
  4.     protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
  5.     //在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性
  6.     protected volatile Connection currentConnection;
  7.     private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);
  8.     ...
  9.    
  10.     public final void start() throws NacosException {
  11.         //利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTING
  12.         boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
  13.         if (!success) {
  14.             return;
  15.         }
  16.         //接下来创建调度线程池执行器,并提交两个任务
  17.         clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
  18.             Thread t = new Thread(r);
  19.             t.setName("com.alibaba.nacos.client.remote.worker");
  20.             t.setDaemon(true);
  21.             return t;
  22.         });
  23.    
  24.         //任务1:处理连接成功或连接断开时的线程
  25.         clientEventExecutor.submit(() -> {
  26.             ...     
  27.         });
  28.         //任务2:处理重连或健康检查的线程
  29.         clientEventExecutor.submit(() -> {
  30.             ...
  31.         });
  32.         //创建连接对象
  33.         Connection connectToServer = null;
  34.         rpcClientStatus.set(RpcClientStatus.STARTING);
  35.         //重试次数为3次
  36.         int startUpRetryTimes = RETRY_TIMES;
  37.         //在while循环中尝试与服务端建立连接,最多循环3次
  38.         while (startUpRetryTimes > 0 && connectToServer == null) {
  39.             try {
  40.                 startUpRetryTimes--;
  41.                 //获取服务端信息
  42.                 ServerInfo serverInfo = nextRpcServer();
  43.                 LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name, serverInfo);
  44.                 //调用GrpcClient.connectToServer()方法建立和服务端的长连接
  45.                 connectToServer = connectToServer(serverInfo);
  46.             } catch (Throwable e) {
  47.                 LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes);
  48.             }
  49.         }
  50.         //如果连接成功,connectToServer对象就不为空
  51.         if (connectToServer != null) {
  52.             LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}", name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
  53.             //连接对象赋值,currentConnection其实就是一个在客户端使用的GrpcConnection对象实例
  54.             this.currentConnection = connectToServer;
  55.             //更改RPC客户端RpcClient的状态
  56.             rpcClientStatus.set(RpcClientStatus.RUNNING);
  57.             //往eventLinkedBlockingQueue队列放入ConnectionEvent事件
  58.             eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
  59.         } else {
  60.             //尝试进行异步连接
  61.             switchServerAsync();
  62.         }
  63.         registerServerRequestHandler(new ConnectResetRequestHandler());   
  64.         //register client detection request.
  65.         registerServerRequestHandler(request -> {
  66.             if (request instanceof ClientDetectionRequest) {
  67.                 return new ClientDetectionResponse();
  68.             }
  69.             return null;
  70.         });
  71.     }
  72.    
  73.     protected ServerInfo nextRpcServer() {
  74.         String serverAddress = getServerListFactory().genNextServer();
  75.         //获取服务端信息
  76.         return resolveServerInfo(serverAddress);
  77.     }
  78.    
  79.     private ServerInfo resolveServerInfo(String serverAddress) {
  80.         Matcher matcher = EXCLUDE_PROTOCOL_PATTERN.matcher(serverAddress);
  81.         if (matcher.find()) {
  82.             serverAddress = matcher.group(1);
  83.         }
  84.         String[] ipPortTuple = serverAddress.split(Constants.COLON, 2);
  85.         int defaultPort = Integer.parseInt(System.getProperty("nacos.server.port", "8848"));
  86.         String serverPort = CollectionUtils.getOrDefault(ipPortTuple, 1, Integer.toString(defaultPort));
  87.         return new ServerInfo(ipPortTuple[0], NumberUtils.toInt(serverPort, defaultPort));
  88.     }
  89.    
  90.     public void switchServerAsync() {
  91.         //异步注册逻辑
  92.         switchServerAsync(null, false);
  93.     }
  94.    
  95.     protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {
  96.         //往reconnectionSignal队列里放入一个对象
  97.         reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));
  98.     }
  99.     ...
  100. }
复制代码
(3)gRPC客户端发起与服务端建立连接请求的源码
gRPC客户端与服务端建立连接的方法是GrpcClient的connectToServer()方法。该方法首先会获取进行网络通信的端口号,因为gRPC服务需要额外占用一个端口的,所以这个端口号是在Nacos的8848基础上 + 偏移量1000,变成9848。
 
在建立连接之前,会先检查一下服务端,如果没问题才发起连接请求,接着就会调用GrpcConnection的sendRequest()方法发起连接请求,最后返回GrpcConnection连接对象。
  1. public abstract class GrpcClient extends RpcClient {
  2.     ...
  3.     @Override
  4.     public Connection connectToServer(ServerInfo serverInfo) {
  5.         try {
  6.             if (grpcExecutor == null) {
  7.                 this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());
  8.             }
  9.             //获取端口号:gRPC服务需要额外占用一个端口的,这个端口是在Nacos 8848的基础上,+ 偏移量1000,所以是9848
  10.             int port = serverInfo.getServerPort() + rpcPortOffset();
  11.             RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
  12.             if (newChannelStubTemp != null) {
  13.                 //检查一下服务端,没问题才会发起RPC连接请求
  14.                 Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
  15.                 if (response == null || !(response instanceof ServerCheckResponse)) {
  16.                     shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
  17.                     return null;
  18.                 }
  19.             
  20.                 BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel());
  21.                 //创建连接对象
  22.                 GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
  23.                 grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
  24.             
  25.                 //create stream request and bind connection event to this connection.
  26.                 //创建流请求并将连接事件绑定到此连接
  27.                 StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
  28.             
  29.                 //stream observer to send response to server
  30.                 grpcConn.setPayloadStreamObserver(payloadStreamObserver);
  31.                 grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
  32.                 grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
  33.                 //send a  setup request.
  34.                 ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
  35.                 conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
  36.                 conSetupRequest.setLabels(super.getLabels());
  37.                 conSetupRequest.setAbilities(super.clientAbilities);
  38.                 conSetupRequest.setTenant(super.getTenant());
  39.                 //发起连接请求
  40.                 grpcConn.sendRequest(conSetupRequest);
  41.                 //wait to register connection setup
  42.                 Thread.sleep(100L);
  43.                 return grpcConn;
  44.             }
  45.             return null;
  46.         } catch (Exception e) {
  47.             LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
  48.         }
  49.         return null;
  50.     }
  51.    
  52.     private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) {
  53.         try {
  54.             if (requestBlockingStub == null) {
  55.                 return null;
  56.             }
  57.             ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
  58.             Payload grpcRequest = GrpcUtils.convert(serverCheckRequest);
  59.             //向服务端发送一个检查请求
  60.             ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);
  61.             Payload response = responseFuture.get(3000L, TimeUnit.MILLISECONDS);
  62.             //receive connection unregister response here,not check response is success.
  63.             return (Response) GrpcUtils.parse(response);
  64.         } catch (Exception e) {
  65.             LoggerUtils.printIfErrorEnabled(LOGGER, "Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e);
  66.             return null;
  67.         }
  68.     }
  69.    
  70.     private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub, final GrpcConnection grpcConn) {
  71.         //调用BiRequestStreamStub.requestBiStream()方法连接服务端
  72.         return streamStub.requestBiStream(new StreamObserver<Payload>() {
  73.             @Override
  74.             public void onNext(Payload payload) {
  75.                 LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}", grpcConn.getConnectionId(), payload.toString());
  76.                 try {
  77.                     Object parseBody = GrpcUtils.parse(payload);
  78.                     final Request request = (Request) parseBody;
  79.                     if (request != null) {
  80.                         try {
  81.                             Response response = handleServerRequest(request);
  82.                             if (response != null) {
  83.                                 response.setRequestId(request.getRequestId());
  84.                                 sendResponse(response);
  85.                             } else {
  86.                                 LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(), request.getRequestId());
  87.                             }
  88.                         } catch (Exception e) {
  89.                             LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}", grpcConn.getConnectionId(), payload.toString(), e.getMessage());
  90.                             Response errResponse = ErrorResponse.build(NacosException.CLIENT_ERROR, "Handle server request error");
  91.                             errResponse.setRequestId(request.getRequestId());
  92.                             sendResponse(errResponse);
  93.                         }
  94.                     }
  95.                 } catch (Exception e) {
  96.                     LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}", grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
  97.                 }
  98.             }
  99.             
  100.             @Override
  101.             public void onError(Throwable throwable) {
  102.                 boolean isRunning = isRunning();
  103.                 boolean isAbandon = grpcConn.isAbandon();
  104.                 if (isRunning && !isAbandon) {
  105.                     LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}", grpcConn.getConnectionId(), throwable);
  106.                     if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
  107.                         switchServerAsync();
  108.                     }
  109.                 } else {
  110.                     LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);
  111.                 }
  112.             }
  113.             
  114.             @Override
  115.             public void onCompleted() {
  116.                 boolean isRunning = isRunning();
  117.                 boolean isAbandon = grpcConn.isAbandon();
  118.                 if (isRunning && !isAbandon) {
  119.                     LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server", grpcConn.getConnectionId());
  120.                     if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
  121.                         switchServerAsync();
  122.                     }
  123.                 } else {
  124.                     LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);
  125.                 }
  126.             }
  127.         });
  128.     }
  129.     ...
  130. }
复制代码
(4)总结
1.webp
 
11.gRPC客户端的心跳机制(健康检查)
(1)线程任务一:处理连接成功或连接断开时的通知
(2)线程任务二:处理重连或健康检查
 
RpcClient的start()方法会调用GrpcClient的connectToServer()方法连接服务端,不管连接是否成功,最后都会往不同的阻塞队列中添加事件。
 
如果连接成功,那么就往RpcClient的eventLinkedBlockingQueue添加连接事件。如果连接失败,那么就往RpcClient的reconnectionSignal队列添加重连对象。而这两个阻塞队列中的数据处理,便是由执行RpcClient的start()方法时启动的两个线程任务进行处理的。
 
(1)线程任务一:处理连接成功或连接断开时的通知
这个任务主要在连接成功或者连接断开时,修改一些属性状态。通过eventLinkedBlockingQueue的take()方法从队列取到连接事件后,会判断连接事件是否建立连接还是断开连接。
 
如果是建立连接,那么就调用RpcClient的notifyConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为true。
 
如果是断开连接,那么就调用RpcClient的notifyDisConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为false。
  1. public abstract class RpcClient implements Closeable {
  2.     protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);
  3.     protected ScheduledExecutorService clientEventExecutor;
  4.     protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
  5.     private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);
  6.     //listener called where connection's status changed. 连接状态改变的监听器
  7.     protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();
  8.     ...
  9.    
  10.     public final void start() throws NacosException {
  11.         //利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTING
  12.         boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
  13.         if (!success) {
  14.             return;
  15.         }
  16.         //接下来创建调度线程池执行器,并提交两个任务
  17.         clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
  18.             Thread t = new Thread(r);
  19.             t.setName("com.alibaba.nacos.client.remote.worker");
  20.             t.setDaemon(true);
  21.             return t;
  22.         });
  23.    
  24.         //任务1:处理连接成功或连接断开时的线程
  25.         clientEventExecutor.submit(() -> {
  26.             while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
  27.                 ConnectionEvent take;
  28.                 try {
  29.                     take = eventLinkedBlockingQueue.take();
  30.                     if (take.isConnected()) {
  31.                         notifyConnected();
  32.                     } else if (take.isDisConnected()) {
  33.                         notifyDisConnected();
  34.                     }
  35.                 } catch (Throwable e) {
  36.                     // Do nothing
  37.                 }
  38.             }   
  39.         });
  40.         //任务2:向服务端上报心跳或重连的线程
  41.         clientEventExecutor.submit(() -> {
  42.             ...
  43.         });
  44.     }
  45.     ...
  46.                                    
  47.     //Notify when client new connected.
  48.     protected void notifyConnected() {
  49.         if (connectionEventListeners.isEmpty()) {
  50.             return;
  51.         }
  52.         LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name);
  53.         for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
  54.             try {
  55.                 connectionEventListener.onConnected();
  56.             } catch (Throwable throwable) {
  57.                 LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name, connectionEventListener.getClass().getName());
  58.             }
  59.         }
  60.     }
  61.    
  62.     //Notify when client disconnected.
  63.     protected void notifyDisConnected() {
  64.         if (connectionEventListeners.isEmpty()) {
  65.             return;
  66.         }
  67.         LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name);
  68.         for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
  69.             try {
  70.                 connectionEventListener.onDisConnect();
  71.             } catch (Throwable throwable) {
  72.                 LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name, connectionEventListener.getClass().getName());
  73.             }
  74.         }
  75.     }
  76.     ...
  77.    
  78.     //Register connection handler. Will be notified when inner connection's state changed.
  79.     //在执行NamingGrpcClientProxy.start()方法时会将NamingGrpcRedoService对象注册到connectionEventListeners中
  80.     public synchronized void registerConnectionListener(ConnectionEventListener connectionEventListener) {
  81.         LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Registry connection listener to current client:{}", name, connectionEventListener.getClass().getName());
  82.         this.connectionEventListeners.add(connectionEventListener);
  83.     }
  84.     ...
  85. }
  86. public class NamingGrpcRedoService implements ConnectionEventListener {
  87.     private volatile boolean connected = false;
  88.     ...
  89.    
  90.     @Override
  91.     public void onConnected() {
  92.         connected = true;
  93.         LogUtils.NAMING_LOGGER.info("Grpc connection connect");
  94.     }
  95.    
  96.     @Override
  97.     public void onDisConnect() {
  98.         connected = false;
  99.         LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");
  100.         synchronized (registeredInstances) {
  101.             registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));
  102.         }
  103.         synchronized (subscribes) {
  104.             subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));
  105.         }
  106.         LogUtils.NAMING_LOGGER.warn("mark to redo completed");
  107.     }
  108.     ...
  109. }
复制代码
(2)线程任务二:处理重连或健康检查
如果RpcClient的start()方法在调用GrpcClient的connectToServer()方法连接服务端时失败了,那么会往RpcClient.reconnectionSignal队列添加重连对象的,而这个任务就会获取reconnectionSignal队列中的重连对象进行重连。
 
因为reconnectionSignal中的数据是当连接失败时放入的,所以如果从reconnectionSignal中获取不到重连对象,等同于连接成功。
 
注意:这个任务从reconnectionSignal阻塞队列中获取重连对象时,调用的是阻塞队列的take()方法,而不是阻塞队列的poll()方法。BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态。BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回null。
 
情况一:如果从reconnectionSignal队列中获取到的重连对象为null
首先判断存活时间是否大于 5s,如果大于则调用RpcClient.healthCheck()方法发起健康检查的RPC请求。健康检查的触发方法是currentConnection.request()方法,健康检查的请求类型是HealthCheckRequest。
 
如果健康检查成功,只需刷新存活时间即可。如果健康检查失败,则需要尝试与服务端重新建立连接。
 
情况二:如果从reconnectionSignal队列中获取到的重连对象不为null
那么就调用RpcClient的reconnect()方法进行重新连接,该方法会通过GrpcClient的connectToServer()方法尝试与服务端建立连接。
  1. public abstract class RpcClient implements Closeable {
  2.     protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);
  3.     protected ScheduledExecutorService clientEventExecutor;
  4.     protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
  5.     private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);
  6.     ...
  7.    
  8.     public final void start() throws NacosException {
  9.         //利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTING
  10.         boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
  11.         if (!success) {
  12.             return;
  13.         }
  14.         //接下来创建调度线程池执行器,并提交两个任务
  15.         clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
  16.             Thread t = new Thread(r);
  17.             t.setName("com.alibaba.nacos.client.remote.worker");
  18.             t.setDaemon(true);
  19.             return t;
  20.         });
  21.    
  22.         //任务1:处理连接成功或连接断开时的线程
  23.         clientEventExecutor.submit(() -> {
  24.             ...     
  25.         });
  26.         //任务2:向服务端上报心跳或重连的线程
  27.         clientEventExecutor.submit(() -> {
  28.             while (true) {
  29.                 try {
  30.                     if (isShutdown()) {
  31.                         break;
  32.                     }
  33.                     //这里从reconnectionSignal阻塞队列中获取任务不是调用take()方法,而是调用poll()方法,并且指定了5s的最大读取时间
  34.                     //BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态
  35.                     //BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回null
  36.                     ReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS);
  37.                     //reconnectContext为null,说明从reconnectionSignal中获取不到数据
  38.                     //由于reconnectionSignal中的数据是当连接失败时放入的
  39.                     //所以从reconnectionSignal中获取不到数据,等同于连接成功
  40.                     if (reconnectContext == null) {
  41.                         //check alive time.
  42.                         //检查存活时间,默认存活时间为5s,超过5s就需要做健康检查
  43.                         if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
  44.                             //调用RpcClient.healthCheck()方法,发起健康检查请求
  45.                             boolean isHealthy = healthCheck();
  46.                             //如果向服务端发起健康检查请求失败,则需要尝试重新建立连接
  47.                             if (!isHealthy) {
  48.                                 if (currentConnection == null) {
  49.                                     continue;
  50.                                 }
  51.                                 LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server healthy check fail, currentConnection = {}", name, currentConnection.getConnectionId());
  52.                                 //判断连接状态是否关闭,如果是则结束异步任务
  53.                                 RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
  54.                                 if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
  55.                                     break;
  56.                                 }
  57.                                 //修改RpcClient的连接状态为不健康
  58.                                 boolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
  59.                                 //给reconnectContext属性赋值,准备尝试重连
  60.                                 if (statusFLowSuccess) {
  61.                                     //重新赋值,注意这里没有continue,所以逻辑会接着往下执行
  62.                                     reconnectContext = new ReconnectContext(null, false);
  63.                                 } else {
  64.                                     continue;
  65.                                 }
  66.                             } else {
  67.                                 //如果向服务端发起健康检查请求成功,则刷新RpcClient的存活时间
  68.                                 lastActiveTimeStamp = System.currentTimeMillis();
  69.                                 continue;
  70.                             }
  71.                         } else {
  72.                             continue;
  73.                         }
  74.                     }
  75.                     
  76.                     if (reconnectContext.serverInfo != null) {
  77.                         //clear recommend server if server is not in server list.
  78.                         //如果服务器不在服务器列表中,则清除推荐服务器,即设置reconnectContext.serverInfo为null
  79.                         boolean serverExist = false;
  80.                         //遍历服务端列表
  81.                         for (String server : getServerListFactory().getServerList()) {
  82.                             ServerInfo serverInfo = resolveServerInfo(server);
  83.                             if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
  84.                                 serverExist = true;
  85.                                 reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
  86.                                 break;
  87.                             }
  88.                         }
  89.                         //reconnectContext.serverInfo不存在服务端列表中,就清除服务器信息,设置reconnectContext.serverInfo为null
  90.                         if (!serverExist) {
  91.                             LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Recommend server is not in server list, ignore recommend server {}", name, reconnectContext.serverInfo.getAddress());
  92.                             reconnectContext.serverInfo = null;
  93.                         }
  94.                     }
  95.                     //进行重新连接,RpcClient.reconnect()方法中会调用GrpcClient.connectToServer()方法尝试与服务端建立连接
  96.                     reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
  97.                 } catch (Throwable throwable) {
  98.                     //Do nothing
  99.                 }
  100.             }
  101.         });
  102.     }
  103.    
  104.     private boolean healthCheck() {
  105.         HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
  106.         if (this.currentConnection == null) {
  107.             return false;
  108.         }
  109.         try {
  110.             //利用currentConnection连接对象,发起RPC请求,请求类型是HealthCheckRequest
  111.             Response response = this.currentConnection.request(healthCheckRequest, 3000L);
  112.             //not only check server is ok, also check connection is register.
  113.             return response != null && response.isSuccess();
  114.         } catch (NacosException e) {
  115.             //ignore
  116.         }
  117.         return false;
  118.     }
  119.     ...
  120. }
复制代码
(3)总结
2.webp
 
12.gRPC服务端如何处理客户端的建立连接请求
(1)gRPC服务端是如何启动的
(2)connectionId如何绑定Client对象的
 
(1)gRPC服务端是如何启动的
BaseRpcServer类有一个被@PostConstruct修饰的start()方法,该方法会调用BaseGrpcServer的startServer()方法来启动gRPC服务端。
 
在BaseGrpcServer的startServer()方法中,首先会调用BaseGrpcServer的addServices()方法添加服务,然后会使用建造者模式通过ServerBuilder创建gRPC框架的Server对象,最后启动gRPC框架的Server服务端,即启动一个NettyServer服务端。
  1. //abstract rpc server.
  2. public abstract class BaseRpcServer {
  3.     ...
  4.     //Start sever. 启动gRPC服务端
  5.     @PostConstruct
  6.     public void start() throws Exception {
  7.         String serverName = getClass().getSimpleName();
  8.         Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());
  9.         //调用BaseGrpcServer.startServer()方法启动gRPC服务端
  10.         startServer();
  11.         Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());
  12.         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  13.             Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
  14.             try {
  15.                 BaseRpcServer.this.stopServer();
  16.                 Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
  17.             } catch (Exception e) {
  18.                 Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
  19.             }
  20.         }));
  21.     }
  22.    
  23.     //get service port.
  24.     public int getServicePort() {
  25.         return EnvUtil.getPort() + rpcPortOffset();
  26.     }
  27.     ...
  28. }
  29. //Grpc implementation as a rpc server.
  30. public abstract class BaseGrpcServer extends BaseRpcServer {
  31.     ...
  32.     @Override
  33.     public void startServer() throws Exception {
  34.         final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
  35.    
  36.         //server interceptor to set connection id. 定义请求拦截器
  37.         ServerInterceptor serverInterceptor = new ServerInterceptor() {
  38.             @Override
  39.             public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers, ServerCallHandler<T, S> next) {
  40.                 Context ctx = Context.current()
  41.                     .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
  42.                     .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
  43.                     .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
  44.                     .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
  45.                 if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
  46.                     Channel internalChannel = getInternalChannel(call);
  47.                     ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
  48.                 }
  49.                 return Contexts.interceptCall(ctx, call, headers, next);
  50.             }
  51.         };
  52.         //1.调用BaseGrpcServer.addServices()方法添加服务
  53.         addServices(handlerRegistry, serverInterceptor);
  54.         //2.创建一个gRPC框架的Server对象,使用了建造者模式
  55.         server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
  56.         .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
  57.         .compressorRegistry(CompressorRegistry.getDefaultInstance())
  58.         .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
  59.         .addTransportFilter(new ServerTransportFilter() {
  60.             @Override
  61.             public Attributes transportReady(Attributes transportAttrs) {
  62.                 InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
  63.                 InetSocketAddress localAddress = (InetSocketAddress) transportAttrs.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
  64.                 int remotePort = remoteAddress.getPort();
  65.                 int localPort = localAddress.getPort();
  66.                 String remoteIp = remoteAddress.getAddress().getHostAddress();
  67.                 Attributes attrWrapper = transportAttrs.toBuilder()
  68.                     .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
  69.                     .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
  70.                     .set(TRANS_KEY_LOCAL_PORT, localPort).build();
  71.                 String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
  72.                 Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
  73.                 return attrWrapper;
  74.             }
  75.             
  76.             @Override
  77.             public void transportTerminated(Attributes transportAttrs) {
  78.                 String connectionId = null;
  79.                 try {
  80.                     connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
  81.                 } catch (Exception e) {
  82.                         //Ignore
  83.                 }
  84.                 if (StringUtils.isNotBlank(connectionId)) {
  85.                     Loggers.REMOTE_DIGEST.info("Connection transportTerminated,connectionId = {} ", connectionId);
  86.                     connectionManager.unregister(connectionId);
  87.                 }
  88.             }
  89.         }).build();
  90.         
  91.         //3.启动gRPC框架的Server
  92.         server.start();
  93.     }
  94.     ...
  95. }
复制代码
(2)connectionId如何绑定Client对象的
BaseGrpcServer的startServer()方法在执行addServices()方法添加服务时,就会对connectionId与Client对象进行绑定。
 
绑定会由GrpcBiStreamRequestAcceptor的requestBiStream()方法触发。具体就是会调用ConnectionManager.register()方法来实现绑定,即先通过执行"connections.put(connectionId, connection)"代码,将connectionId和connection连接对象,放入到ConnectionManager的connections这个Map属性中。再执行ClientConnectionEventListenerRegistry的notifyClientConnected()方法,把Connection连接对象包装成Client对象。
 
将Connection连接对象包装成Client对象时,又会继续调用ConnectionBasedClientManager的clientConnected()方法,该方法便会根据connectionId创建出一个Client对象,然后将其放入到ConnectionBasedClientManager的clients这个Map中,从而实现connectionId与Client对象的关联。
  1. //Grpc implementation as a rpc server.
  2. public abstract class BaseGrpcServer extends BaseRpcServer {
  3.     ...
  4.     private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
  5.         //unary common call register.
  6.         final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
  7.             .setType(MethodDescriptor.MethodType.UNARY)
  8.             .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
  9.             .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
  10.             .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
  11.         //对gRPC客户端请求的服务进行映射处理
  12.         final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
  13.       
  14.         //构建ServerServiceDefinition服务
  15.         final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();
  16.         //添加服务到gRPC的请求流程中
  17.         handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
  18.    
  19.         //bi stream register.
  20.         //处理客户端连接对象的关联
  21.         //也就是调用GrpcBiStreamRequestAcceptor.requestBiStream()方法对ConnectionId与Client对象进行绑定
  22.         final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall((responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
  23.    
  24.         final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
  25.             .setType(MethodDescriptor.MethodType.BIDI_STREAMING)
  26.             .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
  27.             .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
  28.             .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
  29.    
  30.         final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
  31.         handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
  32.     }
  33.     ...
  34. }
  35. @Service
  36. public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase {
  37.     ...
  38.     @Override
  39.     public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
  40.         StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
  41.             ...
  42.             @Override
  43.             public void onNext(Payload payload) {
  44.                 ...
  45.                 //创建连接信息对象,把一些元信息放入到这个对象中
  46.                 ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
  47.                     remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
  48.                     setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
  49.                 metaInfo.setTenant(setUpRequest.getTenant());
  50.                 //把连接信息包装到连接对象中
  51.                 Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
  52.                 connection.setAbilities(setUpRequest.getAbilities());
  53.                 boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
  54.                 //ConnectionManager.register()方法,会将connectionId和连接对象进行绑定
  55.                 if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
  56.                     ...
  57.                 }
  58.                 ...
  59.             }
  60.             ...
  61.         };
  62.         return streamObserver;
  63.     }
  64.     ...
  65. }
  66. @Service
  67. public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
  68.     //存储connectionId对应的Connection对象
  69.     Map<String, Connection> connections = new ConcurrentHashMap<>();
  70.     ...
  71.     //register a new connect.
  72.     public synchronized boolean register(String connectionId, Connection connection) {
  73.         if (connection.isConnected()) {
  74.             if (connections.containsKey(connectionId)) {
  75.                 return true;
  76.             }
  77.             if (!checkLimit(connection)) {
  78.                 return false;
  79.             }
  80.             if (traced(connection.getMetaInfo().clientIp)) {
  81.                 connection.setTraced(true);
  82.             }
  83.             //将connectionId与Connection连接对象进行绑定
  84.             connections.put(connectionId, connection);
  85.             connectionForClientIp.get(connection.getMetaInfo().clientIp).getAndIncrement();
  86.             //把Connection连接对象包装成Client对象
  87.             clientConnectionEventListenerRegistry.notifyClientConnected(connection);
  88.             Loggers.REMOTE_DIGEST.info("new connection registered successfully, connectionId = {},connection={} ", connectionId, connection);
  89.             return true;
  90.         }
  91.         return false;
  92.     }
  93.     ...
  94. }
  95. @Service
  96. public class ClientConnectionEventListenerRegistry {
  97.     final List<ClientConnectionEventListener> clientConnectionEventListeners = new ArrayList<ClientConnectionEventListener>();
  98.     //notify where a new client connected
  99.     public void notifyClientConnected(final Connection connection) {
  100.         for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
  101.             try {
  102.                 //调用ConnectionBasedClientManager.clientConnected()方法
  103.                 clientConnectionEventListener.clientConnected(connection);
  104.             } catch (Throwable throwable) {
  105.                 Loggers.REMOTE.info("[NotifyClientConnected] failed for listener {}", clientConnectionEventListener.getName(), throwable);
  106.             }
  107.         }
  108.     }
  109.     ...
  110. }
  111. @Component("connectionBasedClientManager")
  112. public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
  113.     private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
  114.    
  115.     @Override
  116.     public void clientConnected(Connection connect) {
  117.         if (!RemoteConstants.LABEL_MODULE_NAMING.equals(connect.getMetaInfo().getLabel(RemoteConstants.LABEL_MODULE))) {
  118.             return;
  119.         }
  120.         //把Connection对象中的信息取出来,放到ClientAttributes对象中
  121.         ClientAttributes attributes = new ClientAttributes();
  122.         attributes.addClientAttribute(ClientConstants.CONNECTION_TYPE, connect.getMetaInfo().getConnectType());
  123.         attributes.addClientAttribute(ClientConstants.CONNECTION_METADATA, connect.getMetaInfo());
  124.         //传入connectionId和连接信息
  125.         clientConnected(connect.getMetaInfo().getConnectionId(), attributes);
  126.     }
  127.    
  128.     @Override
  129.     public boolean clientConnected(String clientId, ClientAttributes attributes) {
  130.         String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);
  131.         ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
  132.         //这里的clientId就是connectionId,根据connectionId创建出Client对象
  133.         return clientConnected(clientFactory.newClient(clientId, attributes));
  134.     }
  135.    
  136.     @Override
  137.     public boolean clientConnected(final Client client) {
  138.         //最后将connectionId与Client对象进行绑定,放入到ConnectionBasedClientManager的clients这个Map中
  139.         clients.computeIfAbsent(client.getClientId(), s -> {
  140.             Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
  141.             return (ConnectionBasedClient) client;
  142.         });
  143.         return true;
  144.     }
  145.     ...
  146. }
复制代码
(3)总结
3.webp
 
13.gRPC服务端如何映射各种请求与对应的Handler处理类
gRPC服务端会如何处理客户端请求,如何找到对应的Handler处理类。
 
在gRPC服务端启动时,会调用BaseGrpcServer的startServer()方法,其中就会执行到BaseGrpcServer的addServices()方法。在BaseGrpcServer的addServices()方法中,就会进行请求与Handler映射,也就是调用GrpcRequestAcceptor的request()方法进行请求与Handler映射。
 
在GrpcRequestAcceptor的request()方法中,首先会从请求对象中获取请求type,然后会通过请求type获取一个Handler对象,最后调用RequestHandler的模版方法handleRequest(),从而调用具体Handler对象的handle()方法。
  1. //Grpc implementation as a rpc server.
  2. public abstract class BaseGrpcServer extends BaseRpcServer {
  3.     @Autowired
  4.     private GrpcRequestAcceptor grpcCommonRequestAcceptor;
  5.     ...
  6.     private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
  7.         //unary common call register.
  8.         final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
  9.             .setType(MethodDescriptor.MethodType.UNARY)
  10.             .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
  11.             .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
  12.             .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
  13.         //对gRPC客户端发出的请求进行Handler处理类的映射处理
  14.         final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
  15.       
  16.         //构建ServerServiceDefinition服务
  17.         final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();
  18.         //添加服务到gRPC的请求流程中
  19.         handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
  20.    
  21.         //bi stream register.
  22.         //处理客户端连接对象的关联
  23.         //也就是调用GrpcBiStreamRequestAcceptor.requestBiStream()方法对ConnectionId与Client对象进行绑定
  24.         final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall((responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
  25.    
  26.         final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
  27.             .setType(MethodDescriptor.MethodType.BIDI_STREAMING)
  28.             .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
  29.             .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
  30.             .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
  31.    
  32.         final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
  33.         handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
  34.     }
  35.     ...
  36. }
  37. @Service
  38. public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {
  39.     ...
  40.     @Override
  41.     public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
  42.         ...
  43.         //首先从请求对象中获取请求type
  44.         String type = grpcRequest.getMetadata().getType();
  45.         ...
  46.         //然后通过请求type获取一个Handler对象
  47.         RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
  48.         ...
  49.         //最后调用RequestHandler的模版方法handleRequest(),从而调用具体Handler对象的handle()方法
  50.         Response response = requestHandler.handleRequest(request, requestMeta);
  51.         ...
  52.     }
  53.     ...
  54. }
  55. public abstract class RequestHandler<T extends Request, S extends Response> {
  56.     @Autowired
  57.     private RequestFilters requestFilters;
  58.    
  59.     //Handler request.
  60.     public Response handleRequest(T request, RequestMeta meta) throws NacosException {
  61.         for (AbstractRequestFilter filter : requestFilters.filters) {
  62.             try {
  63.                 Response filterResult = filter.filter(request, meta, this.getClass());
  64.                 if (filterResult != null && !filterResult.isSuccess()) {
  65.                     return filterResult;
  66.                 }
  67.             } catch (Throwable throwable) {
  68.                 Loggers.REMOTE.error("filter error", throwable);
  69.             }
  70.         }
  71.         //调用具体Handler的handle()方法
  72.         return handle(request, meta);
  73.     }
  74.     //Handler request.
  75.     public abstract S handle(T request, RequestMeta meta) throws NacosException;
  76. }
  77. @Service
  78. public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> {
  79.     Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();
  80.    
  81.     @Autowired
  82.     private TpsMonitorManager tpsMonitorManager;
  83.    
  84.     //Get Request Handler By request Type.
  85.     public RequestHandler getByRequestType(String requestType) {
  86.         return registryHandlers.get(requestType);
  87.     }
  88.    
  89.     @Override
  90.     public void onApplicationEvent(ContextRefreshedEvent event) {
  91.         //获取全部继承了RequestHandler类的实现类
  92.         Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
  93.         Collection<RequestHandler> values = beansOfType.values();
  94.         for (RequestHandler requestHandler : values) {
  95.             Class<?> clazz = requestHandler.getClass();
  96.             boolean skip = false;
  97.             while (!clazz.getSuperclass().equals(RequestHandler.class)) {
  98.                 if (clazz.getSuperclass().equals(Object.class)) {
  99.                     skip = true;
  100.                     break;
  101.                 }
  102.                 clazz = clazz.getSuperclass();
  103.             }
  104.             if (skip) {
  105.                 continue;
  106.             }
  107.             try {
  108.                 Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
  109.                 if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
  110.                     TpsControl tpsControl = method.getAnnotation(TpsControl.class);
  111.                     String pointName = tpsControl.pointName();
  112.                     TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName);
  113.                     tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);
  114.                 }
  115.             } catch (Exception e) {
  116.                 //ignore.
  117.             }
  118.             Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];
  119.             registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
  120.         }
  121.     }
  122. }
复制代码
 
14.gRPC简单介绍
(1)gRPC是什么
(2)gRPC的特性
(3)gRPC和Dubbo的区别
 
(1)gRPC是什么
gRPC是一个高性能、开源和通用的RPC框架。gRPC基于ProtoBuf序列化协议开发,且支持众多开发语言。gRPC是面向服务端和移动端,基于HTTP 2设计的,带来诸如双向流、流控、头部压缩、单TCP连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。
 
(2)gRPC的特性
一.gRPC可以跨语言使用
 
二.基于IDL(接口定义语言Interface Define Language)文件定义服务
通过proto3工具生成指定语言的数据结构、服务端接口以及客户端Stub。
 
三.通信协议基于标准的HTTP 2设计
支持双向流、消息头压缩、单TCP的多路复用、服务端推送等特性,这些特性使得gRPC在移动端设备上更加省电和节省网络流量。
 
四.序列化支持ProtoBuf和JSON
ProtoBuf是一种语言无关的高性能序列化框架,它是基于HTTP2和ProtoBuf的,这保障了gRPC调用的高性能。
 
五.安装简单,扩展方便
使用gRPC框架每秒可达到百万RPC。
 
(3)gRPC和Dubbo的区别
一.通讯协议
gRPC基于HTTP 2.0,Dubbo基于TCP。
 
二.序列化
gRPC使用ProtoBuf,Dubbo使用Hession2等基于Java的序列化技术。
 
三.服务注册与发现
gRPC是应用级别的服务注册,Dubbo2.0及之前的版本都是基于更细力度的服务来进行注册,Dubbo3.0之后转向应用级别的服务注册。
 
四.编程语言
gRPC可以使用任何语言(HTTP和ProtoBuf天然就是跨语言的),而Dubbo只能使用在构建在JVM之上的语言。
 
五.服务治理
gRPC自身的服务治理能力很弱,只能基于HTTP连接维度进行容错,而Dubbo可以基于服务维度进行治理。
 
总结:gRPC的优势在于跨语言、跨平台,但服务治理能力弱。Dubbo服务治理能力强,但受编程语言限制无法跨语言使用。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册