找回密码
 立即注册
首页 业界区 业界 springboot的netty代码实操

springboot的netty代码实操

上官银柳 2025-6-9 08:59:34
参考:https://www.cnblogs.com/mc-74120/p/13622008.html
pom文件
  1. <dependency>
  2.     <groupId>io.netty</groupId>
  3.     netty-all</artifactId>
  4. </dependency>
复制代码
启动类
  1. @EnableFeignClients
  2. @EnableDiscoveryClient
  3. @EnableScheduling
  4. @SpringBootApplication
  5. @EnableAsync
  6. public class ChimetaCoreApplication  implements CommandLineRunner{
  7.    
  8.     @Autowired
  9.     private NettyServerListener nettyServerListener;
  10.    
  11.     public static void main(String[] args) {
  12.         SpringApplication.run(ChimetaCoreApplication.class, args);
  13.     }
  14.    
  15.     @Override
  16.     public void run(String... args) throws Exception {
  17.       
  18.        nettyServerListener.start();
  19.     }
  20. }
复制代码
服务端代码的listener
  1. package com.chimeta.netty;
  2. import javax.annotation.PreDestroy;
  3. import javax.annotation.Resource;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.stereotype.Component;
  6. import com.chimeta.netty.protobuf.ImProto;
  7. import io.netty.bootstrap.ServerBootstrap;
  8. import io.netty.channel.ChannelFuture;
  9. import io.netty.channel.ChannelInitializer;
  10. import io.netty.channel.ChannelOption;
  11. import io.netty.channel.EventLoopGroup;
  12. import io.netty.channel.nio.NioEventLoopGroup;
  13. import io.netty.channel.socket.SocketChannel;
  14. import io.netty.channel.socket.nio.NioServerSocketChannel;
  15. import io.netty.handler.codec.protobuf.ProtobufDecoder;
  16. import io.netty.handler.codec.protobuf.ProtobufEncoder;
  17. import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
  18. import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
  19. import io.netty.handler.logging.LogLevel;
  20. import io.netty.handler.logging.LoggingHandler;
  21. import io.netty.handler.timeout.IdleStateHandler;
  22. import lombok.extern.slf4j.Slf4j;
  23. /**
  24. * 服务启动监听器
  25. *
  26. * @author mwan
  27. */
  28. @Component
  29. @Slf4j
  30. public class NettyServerListener {
  31.     /**
  32.      * 创建bootstrap
  33.      */
  34.     ServerBootstrap serverBootstrap = new ServerBootstrap();
  35.     /**
  36.      * BOSS
  37.      */
  38.     EventLoopGroup boss = new NioEventLoopGroup();
  39.     /**
  40.      * Worker
  41.      */
  42.     EventLoopGroup work = new NioEventLoopGroup();
  43.     /**
  44.      * 通道适配器
  45.      */
  46.     @Resource
  47.     private ServerChannelHandlerAdapter channelHandlerAdapter;
  48.     /**
  49.      * 从配置中心获取NETTY服务器配置
  50.      */
  51.     @Value("${server.netty.port:10001}")
  52.     private int NETTY_PORT;
  53.    
  54.     @Value("${server.netty.maxthreads:5000}")
  55.     private int MAX_THREADS;
  56.     /**
  57.      * 关闭服务器方法
  58.      */
  59.     @PreDestroy
  60.     public void close() {
  61.         log.info("关闭服务器....");
  62.         //优雅退出
  63.         boss.shutdownGracefully();
  64.         work.shutdownGracefully();
  65.     }
  66.     /**
  67.      * 开启及服务线程
  68.      */
  69.     public void start() {
  70.         serverBootstrap.group(boss, work)
  71.                 .channel(NioServerSocketChannel.class)
  72.                 .option(ChannelOption.SO_BACKLOG, MAX_THREADS) //最大客户端连接数为1024  
  73.                 .handler(new LoggingHandler(LogLevel.INFO)).childOption(ChannelOption.SO_KEEPALIVE, true); ;
  74.         try {
  75.             //设置事件处理
  76.             serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  77.                 @Override
  78.                 protected void initChannel(SocketChannel ch) throws Exception {
  79.                     // 下面的每一个addLast都有自己的含义,需要每个都过一下
  80.                     ch.pipeline().addLast(new IdleStateHandler(18,0,0));
  81.                     ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
  82.                     //ch.pipeline().addLast(new CustomProtobufInt32FrameDecoder());
  83.                     ch.pipeline().addLast(new ProtobufDecoder(ImProto.ImMsg.getDefaultInstance()));
  84.                     ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
  85.                     //ch.pipeline().addLast(new CustomProtobufInt32LengthFieldPrepender());
  86.                     ch.pipeline().addLast(new ProtobufEncoder());
  87.                     // 业务处理
  88.                     ch.pipeline().addLast(channelHandlerAdapter);
  89.                 }
  90.             });
  91.             log.info("netty服务器在[{}]端口启动监听", NETTY_PORT);
  92.             ChannelFuture f = serverBootstrap.bind(NETTY_PORT).sync();
  93.             f.channel().closeFuture().sync();
  94.         } catch (InterruptedException e) {
  95.             log.error("[出现异常] 释放资源", e);
  96.             boss.shutdownGracefully();
  97.             work.shutdownGracefully();
  98.             log.info("服务已关闭!");
  99.         }
  100.     }
  101. }
复制代码
 ServerChannelHandlerAdapter处理类
  1. package com.chimeta.netty;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import com.chimeta.netty.model.SessionCloseReason;
  6. import com.chimeta.netty.protobuf.ImProto.ImMsg;
  7. import com.chimeta.netty.util.ChannelUtils;
  8. import com.google.protobuf.InvalidProtocolBufferException;
  9. import com.google.protobuf.util.JsonFormat;
  10. import io.netty.channel.ChannelHandler.Sharable;
  11. import io.netty.handler.timeout.IdleState;
  12. import io.netty.handler.timeout.IdleStateEvent;
  13. import io.netty.channel.Channel;
  14. import io.netty.channel.ChannelHandlerContext;
  15. import io.netty.channel.ChannelInboundHandlerAdapter;
  16. import lombok.extern.slf4j.Slf4j;
  17. /**
  18. * 通信服务处理器
  19. */
  20. @Component
  21. @Sharable
  22. @Slf4j
  23. public class ServerChannelHandlerAdapter extends ChannelInboundHandlerAdapter {
  24.     /**
  25.      * 注入请求分排器
  26.      */
  27.     @Autowired
  28.     private MessageDispatcher messageDispatcher;
  29.    
  30.     @Autowired
  31.     private DeviceSessionManager sessionManager;
  32.     /** 用来记录当前在线连接数。应该把它设计成线程安全的。  */
  33.     //private AtomicInteger sessionCount = new AtomicInteger(0);
  34.    
  35.     @Override  
  36.     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  
  37.         super.handlerAdded(ctx);
  38.         
  39.         if (!ChannelUtils.addChannelSession(ctx.channel(), new IoSession(ctx.channel()))) {
  40.           ctx.channel().close();
  41.           log.error("Duplicate session,IP=[{}]",ChannelUtils.getRemoteIp(ctx.channel()));
  42.        }     
  43.         //String server_ip = NetworkUtils.getRealIp();//获得本机IP
  44.         // 缓存计数器加1
  45.     }  
  46.       
  47.     @Override  
  48.     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  
  49.         super.handlerRemoved(ctx);
  50.         // 缓存计数器减1
  51.         //String server_ip = NetworkUtils.getRealIp();//获得本机IP
  52.         log.info(ctx.channel().id()+"离开了");  
  53.     }
  54.    
  55.     @Override
  56.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  57.         
  58.         ImMsg gameMessage = (ImMsg)msg;
  59.         final Channel channel = ctx.channel();
  60.        IoSession session = ChannelUtils.getSessionBy(channel);
  61.        if(session.isHeartbeated()) {
  62.           session.setHeartbeated(false);
  63.        }
  64.       
  65.        String deviceCode="";
  66.        if(session.getDevice() != null && StringUtils.isNotBlank(session.getDevice().getDeviceCode())) {
  67.           deviceCode = session.getDevice().getDeviceCode();
  68.        }
  69. //     if(!MessagingConst.TYPE_UPOS_REQUEST.equals(gameMessage.getMsg().getTypeUrl())) {
  70.           try {
  71.              log.info("Inbound message is :" + JsonFormat.printer().usingTypeRegistry(DeviceSessionManager.typeRegistry).print(gameMessage.toBuilder())
  72.                    + ", from device " + deviceCode);
  73.           } catch (InvalidProtocolBufferException e) {
  74.              log.info("Inbound message is :" + gameMessage.toString());
  75.           }
  76. //     }
  77.       
  78.        messageDispatcher.dispatch(gameMessage, session);
  79.     }
  80.      
  81.     @Override
  82.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
  83.         ctx.flush();  
  84.     }
  85.    
  86.     @Override  
  87.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
  88.             throws Exception {  
  89.         
  90.         log.error("通信发生异常:", cause);
  91.         ctx.close();   
  92.     }
  93.    
  94.     /**
  95.      * 一段时间未进行读写操作 回调
  96.      */
  97.     @Override
  98.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  99.         /*心跳处理*/
  100.         if (evt instanceof IdleStateEvent) {
  101.             IdleStateEvent event = (IdleStateEvent) evt;
  102.             if (event.state() == IdleState.READER_IDLE) {
  103.                 /*读超时*/
  104.                 log.info("READER_IDLE read overtime,close session");
  105.                 final Channel channel = ctx.channel();
  106.                IoSession session = ChannelUtils.getSessionBy(channel);
  107.                
  108.              /*
  109.               * if(messageDispatcher.sendHeartbeat(session) == false) { //如果心跳检测失败,则连接异常,主动断开
  110.               * session.setSessionCloseReason(SessionCloseReason.OVER_TIME); ctx.close(); };
  111.               */
  112.                
  113.                session.setSessionCloseReason(SessionCloseReason.OVER_TIME);
  114.               ctx.close();
  115.                
  116.             } else if (event.state() == IdleState.WRITER_IDLE) {
  117.                 /*写超时*/   
  118.                 log.info("WRITER_IDLE 写超时");
  119.             } else if (event.state() == IdleState.ALL_IDLE) {
  120.                 /*总超时*/
  121.                 log.info("ALL_IDLE 总超时");
  122.             }
  123.         }
  124.     }
  125.     @Override
  126.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  127.         sessionManager.unregisterUserContext(ctx.channel());
  128.         log.info(ctx.channel().id() + "已掉线!");
  129.         // 这里加入玩家的掉线处理
  130.         ctx.close();
  131.     }
  132. }
复制代码
MessageDispatcher分派各个处理器
  1. package com.chimeta.netty;
  2. import com.chimeta.netty.service.TerminalService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.scheduling.annotation.Async;
  5. import org.springframework.stereotype.Component;
  6. import com.chimeta.netty.constant.MessagingConst;
  7. import com.chimeta.netty.model.SessionCloseReason;
  8. import com.chimeta.netty.protobuf.ImProto.ImMsg;
  9. import com.chimeta.netty.service.LoginService;
  10. import com.chimeta.netty.util.MessageBuilder;
  11. import com.google.protobuf.InvalidProtocolBufferException;
  12. import lombok.extern.slf4j.Slf4j;
  13. import javax.annotation.Resource;
  14. /**
  15. * 请求分排器
  16. */
  17. @Component
  18. @Slf4j
  19. public class MessageDispatcher{
  20.    
  21.     @Autowired
  22.     private LoginService loginService;
  23.     @Resource
  24.     private TerminalService terminalService;
  25.    
  26.     /**
  27.      * 消息分发处理
  28.      *
  29.      * @param gameMsg
  30.      * @throws InvalidProtocolBufferException
  31.      */
  32.     @Async
  33.     public void dispatch(ImMsg imMsg, IoSession currSession) throws InvalidProtocolBufferException {
  34.         if(imMsg.getId() < 0) {
  35.            currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "Invalid message!"));
  36.            return;
  37.         }
  38.         //log.info("接收到的消息TypeUrl是: "+imMsg.getMsg().getTypeUrl());
  39.         switch(imMsg.getMsg().getTypeUrl()) {
  40.         
  41.             case MessagingConst.TYPE_ONLINE_REQUEST:
  42.                // 处理设备上线请求
  43.                loginService.doLogin(imMsg, currSession);
  44.                break;
  45.             case MessagingConst.TYPE_USER_LOGON_REQUEST:
  46.                // 处理请求
  47.                loginService.doUserLogon(imMsg, currSession);
  48.                break;
  49.             case MessagingConst.TYPE_USER_LOGOFF_REQUEST:
  50.                // 处理请求
  51.                loginService.doUserLogoff(imMsg, currSession);
  52.                break;
  53.           case MessagingConst.TYPE_TERMINAL_STATE_REQUEST:
  54.                 // 我写的
  55.              terminalService.multiInsert(imMsg, currSession);
  56.              break;
  57.             default:
  58.                if(currSession != null) {
  59.                   // 返回客户端发来的心跳消息
  60.                   responseHeartbeat(imMsg, currSession);
  61.                }
  62.                break;
  63.         }
  64.     }
  65.    
  66.     /**
  67.      * 发送心跳包消息
  68.      * @param gameMsg
  69.      * @param currSession
  70.      * @return
  71.      */
  72.     public boolean sendHeartbeat(IoSession currSession) {
  73.       
  74.        try {
  75.           if(currSession.isHeartbeated()) {
  76.              return false;
  77.           }
  78.           ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();
  79.          
  80.           currSession.sendMessage(imMsgBuilder.build());
  81.          
  82.           currSession.setHeartbeated(true);
  83.          
  84.           return true;
  85.        }catch(Exception e) {
  86.           log.error("主动发送心跳包时发生异常:", e);
  87.           currSession.close(SessionCloseReason.EXCEPTION);
  88.           return false;
  89.        }
  90.       
  91.     }
  92.     /**
  93.      * 返回客户端发来的心跳包消息
  94.      * @param imMsg
  95.      * @param currSession
  96.      */
  97.     private void responseHeartbeat(ImMsg imMsg,IoSession currSession) {
  98.        ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();
  99.       
  100.        currSession.sendMessage(imMsgBuilder.build());
  101.     }
  102.    
  103. }
复制代码
最后到service业务处理TerminalService
  1. package com.chimeta.netty.service;
  2. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  3. import com.chimeta.common.entity.terminal.TerminalStateMonitorDO;
  4. import com.chimeta.netty.IoSession;
  5. import com.chimeta.netty.constant.MessagingConst;
  6. import com.chimeta.netty.model.DeviceInfo;
  7. import com.chimeta.netty.protobuf.ImProto;
  8. import com.chimeta.netty.util.MessageBuilder;
  9. import com.chimeta.terminal.mapper.TerminalStateMonitorMapper;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.stereotype.Service;
  12. import org.springframework.transaction.annotation.Transactional;
  13. import org.springframework.util.CollectionUtils;
  14. import java.math.BigDecimal;
  15. import java.util.ArrayList;
  16. import java.util.List;
  17. /**
  18. * 盒子设备相关的实现类
  19. */
  20. @Service
  21. @Slf4j
  22. public class TerminalService extends ServiceImpl<TerminalStateMonitorMapper, TerminalStateMonitorDO> {
  23.     @Transactional(rollbackFor = Exception.class)
  24.     public void multiInsert(ImProto.ImMsg imMsg, IoSession currSession){
  25.         DeviceInfo deviceInfo = currSession.getDevice();
  26.         if(deviceInfo == null) {
  27.             currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "device not online!"));
  28.             return;
  29.         }
  30.         try {
  31.             ImProto.TerminalStateList terminalStateList = imMsg.getMsg().unpack(ImProto.TerminalStateList.class);
  32.             log.info("TerminalService multiInsert TerminalStateList:{}", terminalStateList);
  33.             List<ImProto.TerminalState> requestTerminalStateList = terminalStateList.getTerminalStateList();
  34.             if (!CollectionUtils.isEmpty(requestTerminalStateList)){
  35.                 List<TerminalStateMonitorDO> tmplist = new ArrayList<>();
  36.                 for (ImProto.TerminalState requestTerminalState : requestTerminalStateList){
  37.                     TerminalStateMonitorDO terminalStateMonitorDO = new TerminalStateMonitorDO();
  38.                     terminalStateMonitorDO.setBatteryLevel(requestTerminalState.getBatteryLevel());
  39.                     terminalStateMonitorDO.setChargingState(requestTerminalState.getChargingState());
  40.                     terminalStateMonitorDO.setTemperature(BigDecimal.valueOf(requestTerminalState.getTemperature()));
  41.                     terminalStateMonitorDO.setUniqueCode(deviceInfo.getDeviceCode());
  42.                     terminalStateMonitorDO.setStateTime(requestTerminalState.getStateTime());
  43.                     tmplist.add(terminalStateMonitorDO);
  44.                 }
  45.                 this.saveBatch(tmplist);
  46.             }
  47.         } catch (Exception e) {
  48.             log.error("TerminalService multiInsert error:{}", e);
  49.         }
  50.     }
  51. }
复制代码
至此,服务端的处理逻辑写完,然后比较费时间的是自己写client的请求,终于经过两三天时间总结好了,写了个test类,如下
  1. package com.chimeta.core;
  2. import com.chimeta.netty.protobuf.ImProto;
  3. import com.google.protobuf.Any;
  4. import io.netty.bootstrap.Bootstrap;
  5. import io.netty.buffer.Unpooled;
  6. import io.netty.channel.Channel;
  7. import io.netty.channel.ChannelInitializer;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import io.netty.handler.codec.protobuf.ProtobufEncoder;
  11. import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
  12. import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.junit.jupiter.api.Test;
  15. import org.junit.runner.RunWith;
  16. import org.mockito.junit.MockitoJUnitRunner;
  17. @Slf4j
  18. @RunWith(MockitoJUnitRunner.class)
  19. class NettyTerminalTest {
  20.     @Test
  21.     public void tryTest()  throws InterruptedException {
  22.         ImProto.TerminalStateList terminalstateList = ImProto.TerminalStateList.newBuilder().build();
  23.         for (int i = 0; i < 3; i++) {
  24.             ImProto.TerminalState build = ImProto.TerminalState.newBuilder()
  25.                     .setBatteryLevel(i)
  26.                     .setChargingState(i * 11)
  27.                     .setTemperature(i * 11.1)
  28.                     .setStateTime(i * 111)
  29.                     .build();
  30.             terminalstateList = terminalstateList.toBuilder().addTerminalState(build).build();
  31.         }
  32.         ImProto.ImMsg imMsg = ImProto.ImMsg.newBuilder().setId(66).setMsg(Any.pack(terminalstateList)).build();
  33.         Channel channel = new Bootstrap()
  34.                 .group(new NioEventLoopGroup(1))
  35.                 .handler(new ChannelInitializer<NioSocketChannel>() {
  36.                     @Override
  37.                     protected void initChannel(NioSocketChannel ch) throws Exception {
  38.                         System.out.println("初始化连接...");
  39.                         ch.pipeline().addLast("encode", new ProtobufEncoder())
  40.                                 .addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufVarint32LengthFieldPrepender());
  41.                     }
  42.                 })
  43.                 .channel(NioSocketChannel.class).connect("192.168.123.123", 10001)
  44.                 .sync()
  45.                 .channel();
  46. //        channel.pipeline().addLast(new StringEncoder()).writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(imMsg.toByteArray()));
  47.         channel.pipeline().writeAndFlush(Unpooled.copiedBuffer(imMsg.toByteArray()));
  48.         System.out.println("over!");
  49.     }
  50. }
复制代码
 好了,记录下,以后就不会忘记了
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册