参考:https://www.cnblogs.com/mc-74120/p/13622008.html
pom文件- <dependency>
- <groupId>io.netty</groupId>
- netty-all</artifactId>
- </dependency>
复制代码 启动类- @EnableFeignClients
- @EnableDiscoveryClient
- @EnableScheduling
- @SpringBootApplication
- @EnableAsync
- public class ChimetaCoreApplication implements CommandLineRunner{
-
- @Autowired
- private NettyServerListener nettyServerListener;
-
- public static void main(String[] args) {
- SpringApplication.run(ChimetaCoreApplication.class, args);
- }
-
- @Override
- public void run(String... args) throws Exception {
-
- nettyServerListener.start();
- }
- }
复制代码 服务端代码的listener ServerChannelHandlerAdapter处理类MessageDispatcher分派各个处理器- package com.chimeta.netty;
- import com.chimeta.netty.service.TerminalService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- import com.chimeta.netty.constant.MessagingConst;
- import com.chimeta.netty.model.SessionCloseReason;
- import com.chimeta.netty.protobuf.ImProto.ImMsg;
- import com.chimeta.netty.service.LoginService;
- import com.chimeta.netty.util.MessageBuilder;
- import com.google.protobuf.InvalidProtocolBufferException;
- import lombok.extern.slf4j.Slf4j;
- import javax.annotation.Resource;
- /**
- * 请求分排器
- */
- @Component
- @Slf4j
- public class MessageDispatcher{
-
- @Autowired
- private LoginService loginService;
- @Resource
- private TerminalService terminalService;
-
- /**
- * 消息分发处理
- *
- * @param gameMsg
- * @throws InvalidProtocolBufferException
- */
- @Async
- public void dispatch(ImMsg imMsg, IoSession currSession) throws InvalidProtocolBufferException {
- if(imMsg.getId() < 0) {
- currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "Invalid message!"));
- return;
- }
- //log.info("接收到的消息TypeUrl是: "+imMsg.getMsg().getTypeUrl());
- switch(imMsg.getMsg().getTypeUrl()) {
-
- case MessagingConst.TYPE_ONLINE_REQUEST:
- // 处理设备上线请求
- loginService.doLogin(imMsg, currSession);
- break;
- case MessagingConst.TYPE_USER_LOGON_REQUEST:
- // 处理请求
- loginService.doUserLogon(imMsg, currSession);
- break;
- case MessagingConst.TYPE_USER_LOGOFF_REQUEST:
- // 处理请求
- loginService.doUserLogoff(imMsg, currSession);
- break;
- case MessagingConst.TYPE_TERMINAL_STATE_REQUEST:
- // 我写的
- terminalService.multiInsert(imMsg, currSession);
- break;
- default:
- if(currSession != null) {
- // 返回客户端发来的心跳消息
- responseHeartbeat(imMsg, currSession);
- }
- break;
- }
- }
-
- /**
- * 发送心跳包消息
- * @param gameMsg
- * @param currSession
- * @return
- */
- public boolean sendHeartbeat(IoSession currSession) {
-
- try {
- if(currSession.isHeartbeated()) {
- return false;
- }
- ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();
-
- currSession.sendMessage(imMsgBuilder.build());
-
- currSession.setHeartbeated(true);
-
- return true;
- }catch(Exception e) {
- log.error("主动发送心跳包时发生异常:", e);
- currSession.close(SessionCloseReason.EXCEPTION);
- return false;
- }
-
- }
- /**
- * 返回客户端发来的心跳包消息
- * @param imMsg
- * @param currSession
- */
- private void responseHeartbeat(ImMsg imMsg,IoSession currSession) {
- ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();
-
- currSession.sendMessage(imMsgBuilder.build());
- }
-
- }
复制代码 最后到service业务处理TerminalService- package com.chimeta.netty.service;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.chimeta.common.entity.terminal.TerminalStateMonitorDO;
- import com.chimeta.netty.IoSession;
- import com.chimeta.netty.constant.MessagingConst;
- import com.chimeta.netty.model.DeviceInfo;
- import com.chimeta.netty.protobuf.ImProto;
- import com.chimeta.netty.util.MessageBuilder;
- import com.chimeta.terminal.mapper.TerminalStateMonitorMapper;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import org.springframework.util.CollectionUtils;
- import java.math.BigDecimal;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * 盒子设备相关的实现类
- */
- @Service
- @Slf4j
- public class TerminalService extends ServiceImpl<TerminalStateMonitorMapper, TerminalStateMonitorDO> {
- @Transactional(rollbackFor = Exception.class)
- public void multiInsert(ImProto.ImMsg imMsg, IoSession currSession){
- DeviceInfo deviceInfo = currSession.getDevice();
- if(deviceInfo == null) {
- currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "device not online!"));
- return;
- }
- try {
- ImProto.TerminalStateList terminalStateList = imMsg.getMsg().unpack(ImProto.TerminalStateList.class);
- log.info("TerminalService multiInsert TerminalStateList:{}", terminalStateList);
- List<ImProto.TerminalState> requestTerminalStateList = terminalStateList.getTerminalStateList();
- if (!CollectionUtils.isEmpty(requestTerminalStateList)){
- List<TerminalStateMonitorDO> tmplist = new ArrayList<>();
- for (ImProto.TerminalState requestTerminalState : requestTerminalStateList){
- TerminalStateMonitorDO terminalStateMonitorDO = new TerminalStateMonitorDO();
- terminalStateMonitorDO.setBatteryLevel(requestTerminalState.getBatteryLevel());
- terminalStateMonitorDO.setChargingState(requestTerminalState.getChargingState());
- terminalStateMonitorDO.setTemperature(BigDecimal.valueOf(requestTerminalState.getTemperature()));
- terminalStateMonitorDO.setUniqueCode(deviceInfo.getDeviceCode());
- terminalStateMonitorDO.setStateTime(requestTerminalState.getStateTime());
- tmplist.add(terminalStateMonitorDO);
- }
- this.saveBatch(tmplist);
- }
- } catch (Exception e) {
- log.error("TerminalService multiInsert error:{}", e);
- }
- }
- }
复制代码 至此,服务端的处理逻辑写完,然后比较费时间的是自己写client的请求,终于经过两三天时间总结好了,写了个test类,如下- package com.chimeta.core;
- import com.chimeta.netty.protobuf.ImProto;
- import com.google.protobuf.Any;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.protobuf.ProtobufEncoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
- import lombok.extern.slf4j.Slf4j;
- import org.junit.jupiter.api.Test;
- import org.junit.runner.RunWith;
- import org.mockito.junit.MockitoJUnitRunner;
- @Slf4j
- @RunWith(MockitoJUnitRunner.class)
- class NettyTerminalTest {
- @Test
- public void tryTest() throws InterruptedException {
- ImProto.TerminalStateList terminalstateList = ImProto.TerminalStateList.newBuilder().build();
- for (int i = 0; i < 3; i++) {
- ImProto.TerminalState build = ImProto.TerminalState.newBuilder()
- .setBatteryLevel(i)
- .setChargingState(i * 11)
- .setTemperature(i * 11.1)
- .setStateTime(i * 111)
- .build();
- terminalstateList = terminalstateList.toBuilder().addTerminalState(build).build();
- }
- ImProto.ImMsg imMsg = ImProto.ImMsg.newBuilder().setId(66).setMsg(Any.pack(terminalstateList)).build();
- Channel channel = new Bootstrap()
- .group(new NioEventLoopGroup(1))
- .handler(new ChannelInitializer<NioSocketChannel>() {
- @Override
- protected void initChannel(NioSocketChannel ch) throws Exception {
- System.out.println("初始化连接...");
- ch.pipeline().addLast("encode", new ProtobufEncoder())
- .addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufVarint32LengthFieldPrepender());
- }
- })
- .channel(NioSocketChannel.class).connect("192.168.123.123", 10001)
- .sync()
- .channel();
- // channel.pipeline().addLast(new StringEncoder()).writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(imMsg.toByteArray()));
- channel.pipeline().writeAndFlush(Unpooled.copiedBuffer(imMsg.toByteArray()));
- System.out.println("over!");
- }
- }
复制代码 好了,记录下,以后就不会忘记了
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |