找回密码
 立即注册
首页 业界区 业界 谈谈如何使用Netty开发实现高性能的RPC服务器 ...

谈谈如何使用Netty开发实现高性能的RPC服务器

垢峒 2025-5-29 00:12:07
  RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的前提之下,调用远程计算机上运行的某个对象,使用起来就像调用本地的对象一样。目前典型的RPC实现框架有:Thrift(facebook开源)、Dubbo(alibaba开源)等等。RPC框架针对网络协议、网络I/O模型的封装是透明的,对于调用的客户端而言,它就认为自己在调用本地的一个对象。至于传输层上,运用的是TCP协议、UDP协议、亦或是HTTP协议,一概不关心。从网络I/O模型上来看,是基于select、poll、epoll方式、还是IOCP(I/O Completion Port)方式承载实现的,对于调用者而言也不用关心。
  目前,主流的RPC框架都支持跨语言调用,即有所谓的IDL(接口定义语言),其实,这个并不是RPC所必须要求的。如果你的RPC框架没有跨语言的要求,IDL就可以不用包括了。
  最后,值得一提的是,衡量一个RPC框架性能的好坏与否,RPC的网络I/O模型的选择,至关重要。在此基础上,设计出来的RPC服务器,可以考虑支持阻塞式同步IO、非阻塞式同步IO、当然还有所谓的多路复用IO模型、异步IO模型。支持不同的网络IO模型,在高并发的状态下,处理性能上会有很大的差别。还有一个衡量的标准,就是选择的传输协议。是基于TCP协议、还是HTTP协议、还是UDP协议?对性能也有一定的影响。但是从我目前了解的情况来看,大多数RPC开源实现框架都是基于TCP、或者HTTP的,目测没有采用UDP协议做为主要的传输协议的。
  明白了RPC的使用原理和性能要求。现在,我们能不能撇开那些RPC开源框架,自己动手开发一个高性能的RPC服务器呢?我想,还是可以的。现在本人就使用Java,基于Netty,开发实现一个高性能的RPC服务器。
  如何实现、基于什么原理?并发处理性能如何?请继续接着看下文。
  我们有的时候,为了提高单个节点的通信吞吐量,提高通信性能。如果是基于Java后端的,一般首选的是NIO框架(No-block IO)。但是问题也来了,Java的NIO掌握起来要相当的技术功底,和足够的技术积累,使用起来才能得心应手。一般的开发人员,如果要使用NIO开发一个后端的TCP/HTTP服务器,附带考虑TCP粘包、网络通信异常、消息链接处理等等网络通信细节,开发门槛太高,所以比较明智的选择是,采用业界主流的NIO框架进行服务器后端开发。主流的NIO框架主要有Netty、Mina。它们主要都是基于TCP通信,非阻塞的IO、灵活的IO线程池而设计的,应对高并发请求也是绰绰有余。随着Netty、Mina这样优秀的NIO框架,设计上日趋完善,Java后端高性能服务器开发,在技术上提供了有力的支持保障,从而打破了C++在服务器后端,一统天下的局面。因为在此之前,Java的NIO一直受人诟病,让人敬而远之!
  既然,这个RPC服务器是基于Netty的,那就在说说Netty吧。实际上Netty是对JAVA NIO框架的再次封装,它的开源网址是http://netty.io/,本文中使用的Netty版本是:4.0版本,可以通过http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2,进行下载使用。那也许你会问,如何使用Netty进行RPC服务器的开发呢?实际不难,下面我就简单的说明一下技术原理:
  1、定义RPC请求消息、应答消息结构,里面要包括RPC的接口定义模块、包括远程调用的类名、方法名称、参数结构、参数值等信息。
  2、服务端初始化的时候通过容器加载RPC接口定义和RPC接口实现类对象的映射关系,然后等待客户端发起调用请求。
  3、客户端发起的RPC消息里面包含,远程调用的类名、方法名称、参数结构、参数值等信息,通过网络,以字节流的方式送给RPC服务端,RPC服务端接收到字节流的请求之后,去对应的容器里面,查找客户端接口映射的具体实现对象。
  4、RPC服务端找到实现对象的参数信息,通过反射机制创建该对象的实例,并返回调用处理结果,最后封装成RPC应答消息通知到客户端。
  5、客户端通过网络,收到字节流形式的RPC应答消息,进行拆包、解析之后,显示远程调用结果。
  上面说的是很简单,但是实现的时候,我们还要考虑如下的问题:
  1、RPC服务器的传输层是基于TCP协议的,出现粘包咋办?这样客户端的请求,服务端不是会解析失败?好在Netty里面已经提供了解决TCP粘包问题的解码器:LengthFieldBasedFrameDecoder,可以靠它轻松搞定TCP粘包问题。
  2、Netty服务端的线程模型是单线程、多线程(一个线程负责客户端连接,连接成功之后,丢给后端IO的线程池处理)、还是主从模式(客户端连接、后端IO处理都是基于线程池的实现)。当然在这里,我出于性能考虑,使用了Netty主从线程池模型。
  3、Netty的IO处理线程池,如果遇到非常耗时的业务,出现阻塞了咋办?这样不是很容易把后端的NIO线程给挂死、阻塞?本文的处理方式是,对于复杂的后端业务,分派到专门的业务线程池里面,进行异步回调处理。
  4、RPC消息的传输是通过字节流在NIO的通道(Channel)之间传输,那具体如何实现呢?本文,是通过基于Java原生对象序列化机制的编码、解码器(ObjectEncoder、ObjectDecoder)进行实现的。当然出于性能考虑,这个可能不是最优的方案。更优的方案是把消息的编码、解码器,搞成可以配置实现的。具体比如可以通过:protobuf、JBoss Marshalling方式进行解码和编码,以提高网络消息的传输效率。
  5、RPC服务器要考虑多线程、高并发的使用场景,所以线程安全是必须的。此外尽量不要使用synchronized进行加锁,改用轻量级的ReentrantLock方式进行代码块的条件加锁。比如本文中的RPC消息处理回调,就有这方面的使用。
  6、RPC服务端的服务接口对象和服务接口实现对象要能轻易的配置,轻松进行加载、卸载。在这里,本文是通过Spring容器进行统一的对象管理。
  综上所述,本文设计的RPC服务器调用的流程图如下所示:
     
1.jpeg

  客户端并发发起RPC调用请求,然后RPC服务端使用Netty连接器,分派出N个NIO连接线程,这个时候Netty连接器的任务结束。然后NIO连接线程是统一放到Netty NIO处理线程池进行管理,这个线程池里面会对具体的RPC请求连接进行消息编码、消息解码、消息处理等等一系列操作。最后进行消息处理(Handler)的时候,处于性能考虑,这里的设计是,直接把复杂的消息处理过程,丢给专门的RPC业务处理线程池集中处理,然后Handler对应的NIO线程就立即返回、不会阻塞。这个时候RPC调用结束,客户端会异步等待服务端消息的处理结果,本文是通过消息回调机制实现(MessageCallBack)。
  再来说一说Netty对于RPC消息的解码、编码、处理对应的模块和流程,具体如下图所示:
   
2.jpeg

  从上图可以看出客户端、服务端对RPC消息编码、解码、处理调用的模块以及调用顺序了。Netty就是把这样一个一个的处理器串在一起,形成一个责任链,统一进行调用。
  说了这么多,现在先简单看下,我设计实现的NettyRPC的代码目录层级结构:
     
3.jpeg

  其中newlandframework.netty.rpc.core包是NettyRPC的核心实现。newlandframework.netty.rpc.model包里面,则封装了RPC消息请求、应答报文结构,以及RPC服务接口与实现绑定关系的容器定义。newlandframework.netty.rpc.config里面定义了NettyRPC的服务端文件配置属性。
  下面先来看下newlandframework.netty.rpc.model包中定义的内容。具体是RPC消息请求、应答消息的结构定义:
  RPC请求消息结构
  1. /**
  2. * @filename:MessageRequest.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:rpc服务请求结构
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.model;
  12. import java.io.Serializable;
  13. import org.apache.commons.lang.builder.ToStringBuilder;
  14. import org.apache.commons.lang.builder.ToStringStyle;
  15. public class MessageRequest implements Serializable {
  16.     private String messageId;
  17.     private String className;
  18.     private String methodName;
  19.     private Class<?>[] typeParameters;
  20.     private Object[] parametersVal;
  21.     public String getMessageId() {
  22.         return messageId;
  23.     }
  24.     public void setMessageId(String messageId) {
  25.         this.messageId = messageId;
  26.     }
  27.     public String getClassName() {
  28.         return className;
  29.     }
  30.     public void setClassName(String className) {
  31.         this.className = className;
  32.     }
  33.     public String getMethodName() {
  34.         return methodName;
  35.     }
  36.     public void setMethodName(String methodName) {
  37.         this.methodName = methodName;
  38.     }
  39.     public Class<?>[] getTypeParameters() {
  40.         return typeParameters;
  41.     }
  42.     public void setTypeParameters(Class<?>[] typeParameters) {
  43.         this.typeParameters = typeParameters;
  44.     }
  45.     public Object[] getParameters() {
  46.         return parametersVal;
  47.     }
  48.     public void setParameters(Object[] parametersVal) {
  49.         this.parametersVal = parametersVal;
  50.     }
  51.     public String toString() {
  52.         return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
  53.                 .append("messageId", messageId).append("className", className)
  54.                 .append("methodName", methodName).toString();
  55.     }
  56. }
复制代码
  RPC应答消息结构
  1. /**
  2. * @filename:MessageResponse.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:rpc服务应答结构
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.model;
  12. import java.io.Serializable;
  13. import org.apache.commons.lang.builder.ToStringBuilder;
  14. import org.apache.commons.lang.builder.ToStringStyle;
  15. public class MessageResponse implements Serializable {
  16.     private String messageId;
  17.     private String error;
  18.     private Object resultDesc;
  19.     public String getMessageId() {
  20.         return messageId;
  21.     }
  22.     public void setMessageId(String messageId) {
  23.         this.messageId = messageId;
  24.     }
  25.     public String getError() {
  26.         return error;
  27.     }
  28.     public void setError(String error) {
  29.         this.error = error;
  30.     }
  31.     public Object getResult() {
  32.         return resultDesc;
  33.     }
  34.     public void setResult(Object resultDesc) {
  35.         this.resultDesc = resultDesc;
  36.     }
  37.     public String toString() {
  38.         return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
  39.                 .append("messageId", messageId).append("error", error).toString();
  40.     }
  41. }
复制代码
  RPC服务接口定义、服务接口实现绑定关系容器定义,提供给spring作为容器使用。
  1. /**
  2. * @filename:MessageKeyVal.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:rpc服务映射容器
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.model;
  12. import java.util.Map;
  13. public class MessageKeyVal {
  14.     private Map<String, Object> messageKeyVal;
  15.     public void setMessageKeyVal(Map<String, Object> messageKeyVal) {
  16.         this.messageKeyVal = messageKeyVal;
  17.     }
  18.     public Map<String, Object> getMessageKeyVal() {
  19.         return messageKeyVal;
  20.     }
  21. }
复制代码
  好了,定义好核心模型结构之后,现在再向大家展示一下NettyRPC核心包:newlandframework.netty.rpc.core的关键部分实现代码,首先是业务线程池相关类的实现代码,具体如下:
  线程工厂定义实现
  1. /**
  2. * @filename:NamedThreadFactory.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:线程工厂
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import java.util.concurrent.ThreadFactory;
  13. import java.util.concurrent.atomic.AtomicInteger;
  14. public class NamedThreadFactory implements ThreadFactory {
  15.     private static final AtomicInteger threadNumber = new AtomicInteger(1);
  16.     private final AtomicInteger mThreadNum = new AtomicInteger(1);
  17.     private final String prefix;
  18.     private final boolean daemoThread;
  19.     private final ThreadGroup threadGroup;
  20.     public NamedThreadFactory() {
  21.         this("rpcserver-threadpool-" + threadNumber.getAndIncrement(), false);
  22.     }
  23.     public NamedThreadFactory(String prefix) {
  24.         this(prefix, false);
  25.     }
  26.     public NamedThreadFactory(String prefix, boolean daemo) {
  27.         this.prefix = prefix + "-thread-";
  28.         daemoThread = daemo;
  29.         SecurityManager s = System.getSecurityManager();
  30.         threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
  31.     }
  32.     public Thread newThread(Runnable runnable) {
  33.         String name = prefix + mThreadNum.getAndIncrement();
  34.         Thread ret = new Thread(threadGroup, runnable, name, 0);
  35.         ret.setDaemon(daemoThread);
  36.         return ret;
  37.     }
  38.     public ThreadGroup getThreadGroup() {
  39.         return threadGroup;
  40.     }
  41. }
复制代码
  业务线程池定义实现
  1. /**
  2. * @filename:RpcThreadPool.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:rpc线程池封装
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import java.util.concurrent.Executor;
  13. import java.util.concurrent.LinkedBlockingQueue;
  14. import java.util.concurrent.SynchronousQueue;
  15. import java.util.concurrent.ThreadPoolExecutor;
  16. import java.util.concurrent.TimeUnit;
  17. public class RpcThreadPool {
  18.     //独立出线程池主要是为了应对复杂耗I/O操作的业务,不阻塞netty的handler线程而引入
  19.     //当然如果业务足够简单,把处理逻辑写入netty的handler(ChannelInboundHandlerAdapter)也未尝不可
  20.     public static Executor getExecutor(int threads, int queues) {
  21.         String name = "RpcThreadPool";
  22.         return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
  23.                 queues == 0 ? new SynchronousQueue<Runnable>()
  24.                         : (queues < 0 ? new LinkedBlockingQueue<Runnable>()
  25.                                 : new LinkedBlockingQueue<Runnable>(queues)),
  26.                 new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
  27.     }
  28. }
复制代码
  1. /**
  2. * @filename:AbortPolicyWithReport.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:线程池异常策略
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import java.util.concurrent.RejectedExecutionException;
  13. import java.util.concurrent.ThreadPoolExecutor;
  14. public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
  15.     private final String threadName;
  16.     public AbortPolicyWithReport(String threadName) {
  17.         this.threadName = threadName;
  18.     }
  19.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  20.         String msg = String.format("RpcServer["
  21.                 + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d),"
  22.                 + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)]",
  23.                 threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
  24.                 e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
  25.         System.out.println(msg);
  26.         throw new RejectedExecutionException(msg);
  27.     }
  28. }
复制代码
  RPC调用客户端定义实现
  1. /**
  2. * @filename:MessageSendExecutor.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc客户端执行模块
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import java.lang.reflect.Proxy;
  13. public class MessageSendExecutor {
  14.     private RpcServerLoader loader = RpcServerLoader.getInstance();
  15.     public MessageSendExecutor(String serverAddress) {
  16.         loader.load(serverAddress);
  17.     }
  18.     public void stop() {
  19.         loader.unLoad();
  20.     }
  21.     public static <T> T execute(Class<T> rpcInterface) {
  22.         return (T) Proxy.newProxyInstance(
  23.                 rpcInterface.getClassLoader(),
  24.                 new Class<?>[]{rpcInterface},
  25.                 new MessageSendProxy<T>(rpcInterface)
  26.         );
  27.     }
  28. }
复制代码
  这里的RPC客户端实际上,是动态代理了MessageSendProxy,当然这里是应用了,JDK原生的动态代理实现,你还可以改成CGLIB(Code Generation Library)方式。不过本人测试了一下CGLIB方式,在高并发的情况下面会出现空指针异常,但是同样的情况,JDK原生的动态代理却没有问题。并发程度不高的情况下面,两种代理方式都运行正常。后续再深入研究看看吧!废话不说了,现在给出MessageSendProxy的实现方式
  1. /**
  2. * @filename:MessageSendProxy.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc客户端消息处理
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import java.lang.reflect.InvocationHandler;
  13. import java.lang.reflect.Method;
  14. import java.util.UUID;
  15. import newlandframework.netty.rpc.model.MessageRequest;
  16. public class MessageSendProxy<T> implements InvocationHandler {
  17.     private Class<T> cls;
  18.     public MessageSendProxy(Class<T> cls) {
  19.         this.cls = cls;
  20.     }
  21.     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  22.         MessageRequest request = new MessageRequest();
  23.         request.setMessageId(UUID.randomUUID().toString());
  24.         request.setClassName(method.getDeclaringClass().getName());
  25.         request.setMethodName(method.getName());
  26.         request.setTypeParameters(method.getParameterTypes());
  27.         request.setParameters(args);
  28.         MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler();
  29.         MessageCallBack callBack = handler.sendRequest(request);
  30.         return callBack.start();
  31.     }
  32. }
复制代码
  进一步发现MessageSendProxy其实是把消息发送给RpcServerLoader模块,它的代码如下:
  1. /**
  2. * @filename:RpcServerLoader.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:rpc服务器配置加载
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import io.netty.channel.EventLoopGroup;
  13. import io.netty.channel.nio.NioEventLoopGroup;
  14. import java.net.InetSocketAddress;
  15. import java.util.concurrent.ThreadPoolExecutor;
  16. import java.util.concurrent.locks.Condition;
  17. import java.util.concurrent.locks.Lock;
  18. import java.util.concurrent.locks.ReentrantLock;
  19. import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
  20. public class RpcServerLoader {
  21.     private volatile static RpcServerLoader rpcServerLoader;
  22.     private final static String DELIMITER = ":";
  23.     private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;
  24.     //方法返回到Java虚拟机的可用的处理器数量
  25.     private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
  26.     //netty nio线程池
  27.     private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel);
  28.     private static ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);
  29.     private MessageSendHandler messageSendHandler = null;
  30.     //等待Netty服务端链路建立通知信号
  31.     private Lock lock = new ReentrantLock();
  32.     private Condition signal = lock.newCondition();
  33.     private RpcServerLoader() {
  34.     }
  35.     //并发双重锁定
  36.     public static RpcServerLoader getInstance() {
  37.         if (rpcServerLoader == null) {
  38.             synchronized (RpcServerLoader.class) {
  39.                 if (rpcServerLoader == null) {
  40.                     rpcServerLoader = new RpcServerLoader();
  41.                 }
  42.             }
  43.         }
  44.         return rpcServerLoader;
  45.     }
  46.     public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) {
  47.         String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER);
  48.         if (ipAddr.length == 2) {
  49.             String host = ipAddr[0];
  50.             int port = Integer.parseInt(ipAddr[1]);
  51.             final InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
  52.             threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, this, serializeProtocol));
  53.         }
  54.     }
  55.     public void setMessageSendHandler(MessageSendHandler messageInHandler) {
  56.         try {
  57.             lock.lock();
  58.             this.messageSendHandler = messageInHandler;
  59.             //唤醒所有等待客户端RPC线程
  60.             signal.signalAll();
  61.         } finally {
  62.             lock.unlock();
  63.         }
  64.     }
  65.     public MessageSendHandler getMessageSendHandler() throws InterruptedException {
  66.         try {
  67.             lock.lock();
  68.             //Netty服务端链路没有建立完毕之前,先挂起等待
  69.             if (messageSendHandler == null) {
  70.                 signal.await();
  71.             }
  72.             return messageSendHandler;
  73.         } finally {
  74.             lock.unlock();
  75.         }
  76.     }
  77.     public void unLoad() {
  78.         messageSendHandler.close();
  79.         threadPoolExecutor.shutdown();
  80.         eventLoopGroup.shutdownGracefully();
  81.     }
  82.     public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) {
  83.         this.serializeProtocol = serializeProtocol;
  84.     }
  85. }
复制代码
  好了,现在一次性给出RPC客户端消息编码、解码、处理的模块实现代码。
  1. /**
  2. * @filename:MessageSendInitializeTask.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc客户端线程任务处理
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import io.netty.bootstrap.Bootstrap;
  13. import io.netty.channel.ChannelFuture;
  14. import io.netty.channel.ChannelFutureListener;
  15. import io.netty.channel.ChannelOption;
  16. import io.netty.channel.EventLoopGroup;
  17. import io.netty.channel.socket.nio.NioSocketChannel;
  18. import java.net.InetSocketAddress;
  19. public class MessageSendInitializeTask implements Runnable {
  20.     private EventLoopGroup eventLoopGroup = null;
  21.     private InetSocketAddress serverAddress = null;
  22.     private RpcServerLoader loader = null;
  23.     MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcServerLoader loader) {
  24.         this.eventLoopGroup = eventLoopGroup;
  25.         this.serverAddress = serverAddress;
  26.         this.loader = loader;
  27.     }
  28.     public void run() {
  29.         Bootstrap b = new Bootstrap();
  30.         b.group(eventLoopGroup)
  31.                 .channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
  32.         b.handler(new MessageSendChannelInitializer());
  33.         ChannelFuture channelFuture = b.connect(serverAddress);
  34.         channelFuture.addListener(new ChannelFutureListener() {
  35.             public void operationComplete(final ChannelFuture channelFuture) throws Exception {
  36.                 if (channelFuture.isSuccess()) {
  37.                     MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class);
  38.                     MessageSendInitializeTask.this.loader.setMessageSendHandler(handler);
  39.                 }
  40.             }
  41.         });
  42.     }
  43. }
复制代码
  1. /**
  2. * @filename:MessageSendChannelInitializer.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc客户端管道初始化
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import io.netty.channel.ChannelInitializer;
  13. import io.netty.channel.ChannelPipeline;
  14. import io.netty.channel.socket.SocketChannel;
  15. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  16. import io.netty.handler.codec.LengthFieldPrepender;
  17. import io.netty.handler.codec.serialization.ClassResolvers;
  18. import io.netty.handler.codec.serialization.ObjectDecoder;
  19. import io.netty.handler.codec.serialization.ObjectEncoder;
  20. public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> {
  21.     //ObjectDecoder 底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候,
  22.     //消息头开始即为长度字段,占据4个字节。这里出于保持兼容的考虑
  23.     final public static int MESSAGE_LENGTH = 4;
  24.     protected void initChannel(SocketChannel socketChannel) throws Exception {
  25.         ChannelPipeline pipeline = socketChannel.pipeline();
  26.         //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
  27.         //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);
  28.         pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageSendChannelInitializer.MESSAGE_LENGTH, 0, MessageSendChannelInitializer.MESSAGE_LENGTH));
  29.         //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
  30.         pipeline.addLast(new LengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH));
  31.         pipeline.addLast(new ObjectEncoder());
  32.         //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
  33.         pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
  34.         pipeline.addLast(new MessageSendHandler());
  35.     }
  36. }
复制代码
  1. /**
  2. * @filename:MessageSendHandler.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc客户端处理模块
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import io.netty.buffer.Unpooled;
  13. import io.netty.channel.Channel;
  14. import io.netty.channel.ChannelFutureListener;
  15. import io.netty.channel.ChannelHandlerContext;
  16. import io.netty.channel.ChannelInboundHandlerAdapter;
  17. import java.net.SocketAddress;
  18. import java.util.concurrent.ConcurrentHashMap;
  19. import newlandframework.netty.rpc.model.MessageRequest;
  20. import newlandframework.netty.rpc.model.MessageResponse;
  21. public class MessageSendHandler extends ChannelInboundHandlerAdapter {
  22.     private ConcurrentHashMap<String, MessageCallBack> mapCallBack = new ConcurrentHashMap<String, MessageCallBack>();
  23.     private volatile Channel channel;
  24.     private SocketAddress remoteAddr;
  25.     public Channel getChannel() {
  26.         return channel;
  27.     }
  28.     public SocketAddress getRemoteAddr() {
  29.         return remoteAddr;
  30.     }
  31.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
  32.         super.channelActive(ctx);
  33.         this.remoteAddr = this.channel.remoteAddress();
  34.     }
  35.     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  36.         super.channelRegistered(ctx);
  37.         this.channel = ctx.channel();
  38.     }
  39.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  40.         MessageResponse response = (MessageResponse) msg;
  41.         String messageId = response.getMessageId();
  42.         MessageCallBack callBack = mapCallBack.get(messageId);
  43.         if (callBack != null) {
  44.             mapCallBack.remove(messageId);
  45.             callBack.over(response);
  46.         }
  47.     }
  48.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  49.         ctx.close();
  50.     }
  51.     public void close() {
  52.         channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
  53.     }
  54.     public MessageCallBack sendRequest(MessageRequest request) {
  55.         MessageCallBack callBack = new MessageCallBack(request);
  56.         mapCallBack.put(request.getMessageId(), callBack);
  57.         channel.writeAndFlush(request);
  58.         return callBack;
  59.     }
  60. }
复制代码
  最后给出RPC服务端的实现。首先是通过spring自动加载RPC服务接口、接口实现容器绑定加载,初始化Netty主/从线程池等操作,具体是通过MessageRecvExecutor模块实现的,现在给出实现代码:
  1. /**
  2. * @filename:MessageRecvExecutor.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc服务器执行模块
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import io.netty.bootstrap.ServerBootstrap;
  13. import io.netty.channel.ChannelFuture;
  14. import io.netty.channel.ChannelOption;
  15. import io.netty.channel.EventLoopGroup;
  16. import io.netty.channel.nio.NioEventLoopGroup;
  17. import io.netty.channel.socket.nio.NioServerSocketChannel;
  18. import java.nio.channels.spi.SelectorProvider;
  19. import java.util.Iterator;
  20. import java.util.Map;
  21. import java.util.Set;
  22. import java.util.concurrent.ConcurrentHashMap;
  23. import java.util.concurrent.ThreadFactory;
  24. import java.util.concurrent.ThreadPoolExecutor;
  25. import java.util.logging.Level;
  26. import newlandframework.netty.rpc.model.MessageKeyVal;
  27. import org.springframework.beans.BeansException;
  28. import org.springframework.beans.factory.InitializingBean;
  29. import org.springframework.context.ApplicationContext;
  30. import org.springframework.context.ApplicationContextAware;
  31. public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {
  32.     private String serverAddress;
  33.     private final static String DELIMITER = ":";
  34.     private Map<String, Object> handlerMap = new ConcurrentHashMap<String, Object>();
  35.     private static ThreadPoolExecutor threadPoolExecutor;
  36.     public MessageRecvExecutor(String serverAddress) {
  37.         this.serverAddress = serverAddress;
  38.     }
  39.     public static void submit(Runnable task) {
  40.         if (threadPoolExecutor == null) {
  41.             synchronized (MessageRecvExecutor.class) {
  42.                 if (threadPoolExecutor == null) {
  43.                     threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);
  44.                 }
  45.             }
  46.         }
  47.         threadPoolExecutor.submit(task);
  48.     }
  49.     public void setApplicationContext(ApplicationContext ctx) throws BeansException {
  50.         try {
  51.             MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));
  52.             Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal();
  53.             Set s = rpcServiceObject.entrySet();
  54.             Iterator<Map.Entry<String, Object>> it = s.iterator();
  55.             Map.Entry<String, Object> entry;
  56.             while (it.hasNext()) {
  57.                 entry = it.next();
  58.                 handlerMap.put(entry.getKey(), entry.getValue());
  59.             }
  60.         } catch (ClassNotFoundException ex) {
  61.             java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);
  62.         }
  63.     }
  64.     public void afterPropertiesSet() throws Exception {
  65.         //netty的线程池模型设置成主从线程池模式,这样可以应对高并发请求
  66.         //当然netty还支持单线程、多线程网络IO模型,可以根据业务需求灵活配置
  67.         ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");
  68.         
  69.         //方法返回到Java虚拟机的可用的处理器数量
  70.         int parallel = Runtime.getRuntime().availableProcessors() * 2;
  71.    
  72.         EventLoopGroup boss = new NioEventLoopGroup();
  73.         EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider());
  74.         
  75.         try {
  76.             ServerBootstrap bootstrap = new ServerBootstrap();
  77.             bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
  78.                     .childHandler(new MessageRecvChannelInitializer(handlerMap))
  79.                     .option(ChannelOption.SO_BACKLOG, 128)
  80.                     .childOption(ChannelOption.SO_KEEPALIVE, true);
  81.             String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);
  82.             if (ipAddr.length == 2) {
  83.                 String host = ipAddr[0];
  84.                 int port = Integer.parseInt(ipAddr[1]);
  85.                 ChannelFuture future = bootstrap.bind(host, port).sync();
  86.                 System.out.printf("[author tangjie] Netty RPC Server start success ip:%s port:%d\n", host, port);
  87.                 future.channel().closeFuture().sync();
  88.             } else {
  89.                 System.out.printf("[author tangjie] Netty RPC Server start fail!\n");
  90.             }
  91.         } finally {
  92.             worker.shutdownGracefully();
  93.             boss.shutdownGracefully();
  94.         }
  95.     }
  96. }
复制代码
  最后还是老规矩,给出RPC服务端消息编码、解码、处理的核心模块代码实现,具体如下:
  1. /**
  2. * @filename:MessageRecvChannelInitializer.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc服务端管道初始化
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import io.netty.channel.ChannelInitializer;
  13. import io.netty.channel.ChannelPipeline;
  14. import io.netty.channel.socket.SocketChannel;
  15. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  16. import io.netty.handler.codec.LengthFieldPrepender;
  17. import io.netty.handler.codec.serialization.ClassResolvers;
  18. import io.netty.handler.codec.serialization.ObjectDecoder;
  19. import io.netty.handler.codec.serialization.ObjectEncoder;
  20. import java.util.Map;
  21. public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> {
  22.     //ObjectDecoder 底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候,
  23.     //消息头开始即为长度字段,占据4个字节。这里出于保持兼容的考虑
  24.     final public static int MESSAGE_LENGTH = 4;
  25.     private Map<String, Object> handlerMap = null;
  26.     MessageRecvChannelInitializer(Map<String, Object> handlerMap) {
  27.         this.handlerMap = handlerMap;
  28.     }
  29.     protected void initChannel(SocketChannel socketChannel) throws Exception {
  30.         ChannelPipeline pipeline = socketChannel.pipeline();
  31.         //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
  32.         //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);
  33.         pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH));
  34.         //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
  35.         pipeline.addLast(new LengthFieldPrepender(MessageRecvChannelInitializer.MESSAGE_LENGTH));
  36.         pipeline.addLast(new ObjectEncoder());
  37.         //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
  38.         pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
  39.         pipeline.addLast(new MessageRecvHandler(handlerMap));
  40.     }
  41. }
复制代码
  1. /**
  2. * @filename:MessageRecvHandler.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc服务器消息处理
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import io.netty.channel.ChannelHandlerContext;
  13. import io.netty.channel.ChannelInboundHandlerAdapter;
  14. import java.util.Map;
  15. import newlandframework.netty.rpc.model.MessageRequest;
  16. import newlandframework.netty.rpc.model.MessageResponse;
  17. public class MessageRecvHandler extends ChannelInboundHandlerAdapter {
  18.     private final Map<String, Object> handlerMap;
  19.     public MessageRecvHandler(Map<String, Object> handlerMap) {
  20.         this.handlerMap = handlerMap;
  21.     }
  22.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  23.         MessageRequest request = (MessageRequest) msg;
  24.         MessageResponse response = new MessageResponse();
  25.         MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap, ctx);
  26.         //不要阻塞nio线程,复杂的业务逻辑丢给专门的线程池
  27.         MessageRecvExecutor.submit(recvTask);
  28.     }
  29.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  30.         //网络有异常要关闭通道
  31.         ctx.close();
  32.     }
  33. }
复制代码
  1. /**
  2. * @filename:MessageRecvInitializeTask.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc服务器消息线程任务处理
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import io.netty.channel.ChannelFuture;
  13. import io.netty.channel.ChannelFutureListener;
  14. import io.netty.channel.ChannelHandlerContext;
  15. import java.util.Map;
  16. import newlandframework.netty.rpc.model.MessageRequest;
  17. import newlandframework.netty.rpc.model.MessageResponse;
  18. import org.apache.commons.beanutils.MethodUtils;
  19. public class MessageRecvInitializeTask implements Runnable {
  20.     private MessageRequest request = null;
  21.     private MessageResponse response = null;
  22.     private Map<String, Object> handlerMap = null;
  23.     private ChannelHandlerContext ctx = null;
  24.     public MessageResponse getResponse() {
  25.         return response;
  26.     }
  27.     public MessageRequest getRequest() {
  28.         return request;
  29.     }
  30.     public void setRequest(MessageRequest request) {
  31.         this.request = request;
  32.     }
  33.     MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap, ChannelHandlerContext ctx) {
  34.         this.request = request;
  35.         this.response = response;
  36.         this.handlerMap = handlerMap;
  37.         this.ctx = ctx;
  38.     }
  39.     public void run() {
  40.         response.setMessageId(request.getMessageId());
  41.         try {
  42.             Object result = reflect(request);
  43.             response.setResult(result);
  44.         } catch (Throwable t) {
  45.             response.setError(t.toString());
  46.             t.printStackTrace();
  47.             System.err.printf("RPC Server invoke error!\n");
  48.         }
  49.         ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
  50.             public void operationComplete(ChannelFuture channelFuture) throws Exception {
  51.                 System.out.println("RPC Server Send message-id respone:" + request.getMessageId());
  52.             }
  53.         });
  54.     }
  55.     private Object reflect(MessageRequest request) throws Throwable {
  56.         String className = request.getClassName();
  57.         Object serviceBean = handlerMap.get(className);
  58.         String methodName = request.getMethodName();
  59.         Object[] parameters = request.getParameters();
  60.         return MethodUtils.invokeMethod(serviceBean, methodName, parameters);
  61.     }
  62. }
复制代码
  然后是RPC消息处理的回调实现模块代码
  1. /**
  2. * @filename:MessageCallBack.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:Rpc消息回调
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.core;
  12. import java.util.concurrent.TimeUnit;
  13. import java.util.concurrent.locks.Condition;
  14. import java.util.concurrent.locks.Lock;
  15. import java.util.concurrent.locks.ReentrantLock;
  16. import newlandframework.netty.rpc.model.MessageRequest;
  17. import newlandframework.netty.rpc.model.MessageResponse;
  18. public class MessageCallBack {
  19.     private MessageRequest request;
  20.     private MessageResponse response;
  21.     private Lock lock = new ReentrantLock();
  22.     private Condition finish = lock.newCondition();
  23.     public MessageCallBack(MessageRequest request) {
  24.         this.request = request;
  25.     }
  26.     public Object start() throws InterruptedException {
  27.         try {
  28.             lock.lock();
  29.             //设定一下超时时间,rpc服务器太久没有相应的话,就默认返回空吧。
  30.             finish.await(10*1000, TimeUnit.MILLISECONDS);
  31.             if (this.response != null) {
  32.                 return this.response.getResult();
  33.             } else {
  34.                 return null;
  35.             }
  36.         } finally {
  37.             lock.unlock();
  38.         }
  39.     }
  40.     public void over(MessageResponse reponse) {
  41.         try {
  42.             lock.lock();
  43.             finish.signal();
  44.             this.response = reponse;
  45.         } finally {
  46.             lock.unlock();
  47.         }
  48.     }
  49. }
复制代码
  到此为止,NettyRPC的关键部分:服务端、客户端的模块已经通过Netty全部实现了。现在给出spring加载配置rpc-invoke-config.xml的内容:
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.        xmlns:context="http://www.springframework.org/schema/context"
  5.        xsi:schemaLocation="http://www.springframework.org/schema/beans
  6.        http://www.springframework.org/schema/beans/spring-beans.xsd
  7.        http://www.springframework.org/schema/context
  8.        http://www.springframework.org/schema/context/spring-context.xsd">
  9.   <context:component-scan base-package="newlandframework.netty.rpc.core"/>
  10.   <context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server.properties"/>
  11.   <bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal">
  12.     <property name="messageKeyVal">
  13.       <map>
  14.         <entry key="newlandframework.netty.rpc.servicebean.Calculate">
  15.           <ref bean="calc"/>
  16.         </entry>
  17.       </map>
  18.     </property>
  19.   </bean>
  20.   <bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/>
  21.   <bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor">
  22.     <constructor-arg name="serverAddress" value="${rpc.server.addr}"/>
  23.   </bean>
  24. </beans>
复制代码
  再贴出RPC服务绑定ip信息的配置文件:rpc-server.properties的内容。
  1. #rpc server's ip address config
  2. rpc.server.addr=127.0.0.1:18888
复制代码
  最后NettyRPC服务端启动方式参考如下:
  1. new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config.xml");
复制代码
  如果一切顺利,没有出现意外的话,控制台上面,会出现如下截图所示的情况:
4.jpeg

  如果出现了,说明NettyRPC服务器,已经启动成功!
  上面基于Netty的RPC服务器,并发处理性能如何呢?实践是检验真理的唯一标准,下面我们就来实战一下。
  下面的测试案例,是基于RPC远程调用两数相加函数,并返回计算结果。客户端同时开1W个线程,同一时刻,瞬时发起并发计算请求,然后观察Netty的RPC服务器是否有正常应答回复响应,以及客户端是否有正常返回调用计算结果。值得注意的是,测试案例是基于1W个线程瞬时并发请求而设计的,并不是1W个线程循环发起请求。这两者对于衡量RPC服务器的并发处理性能,还是有很大差别的。当然,前者对于并发性能的处理要求,要高上很多很多。
  现在,先给出RPC计算接口、RPC计算接口实现类的代码实现:
  1. /**
  2. * @filename:Calculate.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:计算器定义接口
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.servicebean;
  12. public interface Calculate {
  13.     //两数相加
  14.     int add(int a, int b);
  15. }
复制代码
  1. /**
  2. * @filename:CalculateImpl.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:计算器定义接口实现
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.servicebean;
  12. public class CalculateImpl implements Calculate {
  13.     //两数相加
  14.     public int add(int a, int b) {
  15.         return a + b;
  16.     }
  17. }
复制代码
  下面是瞬时并发RPC请求的测试样例:
  1. /**
  2. * @filename:CalcParallelRequestThread.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:并发线程模拟
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.servicebean;
  12. import newlandframework.netty.rpc.core.MessageSendExecutor;
  13. import java.util.concurrent.CountDownLatch;
  14. import java.util.logging.Level;
  15. import java.util.logging.Logger;
  16. public class CalcParallelRequestThread implements Runnable {
  17.     private CountDownLatch signal;
  18.     private CountDownLatch finish;
  19.     private MessageSendExecutor executor;
  20.     private int taskNumber = 0;
  21.     public CalcParallelRequestThread(MessageSendExecutor executor, CountDownLatch signal, CountDownLatch finish, int taskNumber) {
  22.         this.signal = signal;
  23.         this.finish = finish;
  24.         this.taskNumber = taskNumber;
  25.         this.executor = executor;
  26.     }
  27.     public void run() {
  28.         try {
  29.             signal.await();
  30.             Calculate calc = executor.execute(Calculate.class);
  31.             int add = calc.add(taskNumber, taskNumber);
  32.             System.out.println("calc add result:[" + add + "]");
  33.             finish.countDown();
  34.         } catch (InterruptedException ex) {
  35.             Logger.getLogger(CalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex);
  36.         }
  37.     }
  38. }
复制代码
  1. /**
  2. * @filename:RpcParallelTest.java
  3. *
  4. * Newland Co. Ltd. All rights reserved.
  5. *
  6. * @Description:rpc并发测试代码
  7. * @author tangjie
  8. * @version 1.0
  9. *
  10. */
  11. package newlandframework.netty.rpc.servicebean;
  12. import java.util.concurrent.CountDownLatch;
  13. import newlandframework.netty.rpc.core.MessageSendExecutor;
  14. import org.apache.commons.lang.time.StopWatch;
  15. public class RpcParallelTest {
  16.     public static void main(String[] args) throws Exception {
  17.         final MessageSendExecutor executor = new MessageSendExecutor("127.0.0.1:18888");
  18.         //并行度10000
  19.         int parallel = 10000;
  20.         //开始计时
  21.         StopWatch sw = new StopWatch();
  22.         sw.start();
  23.         CountDownLatch signal = new CountDownLatch(1);
  24.         CountDownLatch finish = new CountDownLatch(parallel);
  25.         for (int index = 0; index < parallel; index++) {
  26.             CalcParallelRequestThread client = new CalcParallelRequestThread(executor, signal, finish, index);
  27.             new Thread(client).start();
  28.         }
  29.         
  30.         //10000个并发线程瞬间发起请求操作
  31.         signal.countDown();
  32.         finish.await();
  33.         
  34.         sw.stop();
  35.         String tip = String.format("RPC调用总共耗时: [%s] 毫秒", sw.getTime());
  36.         System.out.println(tip);
  37.         executor.stop();
  38.     }
  39. }
复制代码
  好了,现在先启动NettyRPC服务器,确认没有问题之后,运行并发RPC请求客户端,看下客户端打印的计算结果,以及处理耗时。
     
5.jpeg

  从上面来看,10000个瞬时RPC计算请求,总共耗时接近11秒。我们在来看下NettyRPC的服务端运行情况,如下所示:
     
6.jpeg

  可以很清楚地看到,RPC服务端都有收到客户端发起的RPC计算请求,并返回消息应答。
  最后我们还是要分别验证一下,RPC服务端是否存在丢包、粘包、IO阻塞的情况?1W个并发计算请求,是否成功接收处理并应答了?实际情况说明一切,看下图所示:
      
7.jpeg

   非常给力,RPC的服务端确实成功接收到了客户端发起的1W笔瞬时并发计算请求,并且成功应答处理了。并没有出现:丢包、粘包、IO阻塞的情况。再看下RPC客户端,是否成功得到计算结果的应答返回了呢?
  
8.jpeg

  很好,RPC的客户端,确实收到了RPC服务端计算的1W笔加法请求的计算结果,而且耗时接近11秒。由此可见,基于Netty+业务线程池的NettyRPC服务器,应对并发多线程RPC请求,处理起来是得心应手,游刃有余!
  最后,本文通过Netty这个NIO框架,实现了一个很简单的“高性能”的RPC服务器,代码虽然写出来了,但是还是有一些值得改进的地方,比如:
  1、对象序列化传输可以支持目前主流的序列化框架:protobuf、JBoss Marshalling、Avro等等。
  2、Netty的线程模型可以根据业务需求,进行定制。因为,并不是每笔业务都需要这么强大的并发处理性能。
  3、目前RPC计算只支持一个RPC服务接口映射绑定一个对应的实现,后续要支持一对多的情况。
  4、业务线程池的启动参数、线程池并发阻塞容器模型等等,可以配置化管理。
  5、Netty的Handler处理部分,对于复杂的业务逻辑,现在是统一分派到特定的线程池进行后台异步处理。当然你还可以考虑JMS(消息队列)方式进行解耦,统一分派给消息队列的订阅者,统一处理。目前实现JMS的开源框架也有很多,ActiveMQ、RocketMQ等等,都可以考虑。
  本文实现的NettyRPC,对于面前的您而言,一定还有很多地方,可以加以完善和改进,优化改进的工作就交给您自由发挥了。
  由于本人技术能力、认知水平有限。本文中有说不对的地方,恳请园友们批评指正!不吝赐教!最后,感谢面前的您,耐心的阅读完本文,相信现在的你,对于Java开发高性能的服务端应用,又有了一个更深入的了解!本文算是对我Netty学习成果的阶段性总结,后续有时间,我还会继续推出Netty工业级开发的相关文章,敬请期待!
  PS:还有兴趣的朋友可以参考、阅读一下,我的另外一篇文章:Netty实现高性能RPC服务器优化篇之消息序列化。此外,自从在博客园发表了两篇:基于Netty开发高性能RPC服务器的文章之后,本人收到很多园友们索要源代码进行学习交流的请求。为了方便大家,本人把NettyRPC的代码开源托管到github上面,欢迎有兴趣的朋友一起学习、研究!
  附上NettyRPC项目的下载路径:https://github.com/tang-jie/NettyRPC
 
  Netty工业级开发系列文章进阶:Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇
  谈谈如何使用Netty开发实现高性能的RPC服务器、Netty实现高性能RPC服务器优化篇之消息序列化。这两篇文章主要设计的思路是,基于Netty构建了一个高性能的RPC服务器,而这些前期代码的准备工作,主要是为了设计、实现一个基于Netty的分布式消息队列系统做铺垫,本人把这个分布式消息队列系统,命名为:AvatarMQ。作为Netty工业级开发系列的进阶篇,感兴趣的朋友可以点击关注:Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇,一定不会让您失望!
  AvatarMQ项目开源网址:https://github.com/tang-jie/AvatarMQ。

来源:新程序网络收集,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册