村亢 发表于 2025-6-2 22:17:56

Netty 心跳机制实现(客户端与服务端)

Netty 心跳机制实现(客户端与服务端)

Netty 的心跳机制是保持长连接有效性的重要手段,可以检测连接是否存活并及时释放无效连接。下面介绍客户端和服务端的完整实现方案。
一、服务端实现

1. 基础心跳检测

public class HeartbeatServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
      ChannelPipeline pipeline = ch.pipeline();

      // 添加编解码器
      pipeline.addLast(new StringDecoder());
      pipeline.addLast(new StringEncoder());

      // 心跳检测
      // 参数说明:readerIdleTime, writerIdleTime, allIdleTime, 时间单位
      pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
      pipeline.addLast(new HeartbeatServerHandler());
    }
}

public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
    // 心跳丢失计数器 不需要单独做心跳机制,它可以通过IdleStateHandler进行检测
    //private Map<String, Integer> lossConnectMap = new HashMap<>();

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                ctx.channel().close();
                //IdleStateHandler 会检测空闲情况,所以直接关闭,不需要再弄计数器,
                //String socketAddress = ctx.channel().remoteAddress().toString();
                //int lossConnectCount = 0;
                //if (lossConnectMap.containsKey(socketAddress)) {
                //    lossConnectCount = lossConnectMap.get(socketAddress);
                //}
                //lossConnectCount++;
                //lossConnectMap.put(socketAddress, lossConnectCount);
                //logger.info("关闭不活跃: " + ctx.channel().remoteAddress() + " " + lossConnectCount);
                //if (lossConnectCount > 2) {
                //    logger.info("关闭不活跃连接: " + ctx.channel());
                //    ctx.channel().close();
                //}
            }
      } else {
            super.userEventTriggered(ctx, evt);
      }
    }
   
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
      //Netty 不需要单独做心跳机制,它可以通过IdleStateHandler进行检测

      // 收到任何消息都重置计数器
      //if ("HEARTBEAT".equals(msg)) {
      //    if (lossConnectMap.containsKey(socketAddress)) {
      //      lossConnectMap.put(socketAddress, 0);
      //    }
      //    System.out.println("收到心跳: " + ctx.channel());
      //    ctx.writeAndFlush("HEARTBEAT_RESPONSE");
      //} else {
      //    // 处理其他业务消息
      //}
    }
}2. 完整心跳交互方案

public class AdvancedHeartbeatServerHandler extends ChannelInboundHandlerAdapter {
    private static final ByteBuf HEARTBEAT_SEQUENCE =
      Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));
   
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                // 读空闲(没有收到客户端消息)
                System.out.println("读空闲,关闭连接: " + ctx.channel());
                ctx.close();
            } else if (state == IdleState.WRITER_IDLE) {
                // 写空闲(可以主动发送心跳包)
                System.out.println("写空闲,发送心跳包");
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                   .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
      } else {
            super.userEventTriggered(ctx, evt);
      }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
      String message = (String) msg;
      //Netty 不需要单独做心跳机制,它可以通过IdleStateHandler进行检测
      //if ("HEARTBEAT_REQUEST".equals(message)) {
      //    // 响应客户端心跳
      //    ctx.writeAndFlush("HEARTBEAT_RESPONSE");
      //} else {
      //    // 处理业务消息
      //}
    }
}二、客户端实现

1. 基础心跳实现

public class HeartbeatClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
      ChannelPipeline pipeline = ch.pipeline();
      
      pipeline.addLast(new StringDecoder());
      pipeline.addLast(new StringEncoder());
      
      // 客户端设置写空闲检测(定期发送心跳)
      pipeline.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
      pipeline.addLast(new HeartbeatClientHandler());
    }
}

public class HeartbeatClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                // 写空闲时发送心跳
                ctx.writeAndFlush("HEARTBEAT");
                System.out.println("客户端发送心跳");
            }
      }
    }
   
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
      if ("HEARTBEAT_RESPONSE".equals(msg)) {
            System.out.println("收到服务端心跳响应");
      }
    }
}2. 完整心跳交互方案

public class AdvancedHeartbeatClientHandler extends ChannelInboundHandlerAdapter {
    private static final ByteBuf HEARTBEAT_SEQUENCE =
      Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT_REQUEST", CharsetUtil.UTF_8));
   
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
      // 连接建立后立即发送一次心跳
      sendHeartbeat(ctx);
      super.channelActive(ctx);
    }
   
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                // 写空闲时发送心跳
                sendHeartbeat(ctx);
            } else if (state == IdleState.READER_IDLE) {
                // 读空闲(未收到服务端响应)
                System.out.println("服务端无响应,关闭连接");
                ctx.close();
            }
      } else {
            super.userEventTriggered(ctx, evt);
      }
    }
   
    private void sendHeartbeat(ChannelHandlerContext ctx) {
      ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
         .addListener(future -> {
               if (!future.isSuccess()) {
                   System.err.println("心跳发送失败: " + future.cause());
               }
         });
    }
   
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
      String message = (String) msg;
      if ("HEARTBEAT".equals(message)) {
            // 响应服务端心跳
            ctx.writeAndFlush("HEARTBEAT_RESPONSE");
      } else if ("HEARTBEAT_RESPONSE".equals(message)) {
            // 收到服务端对客户端心跳的响应
            System.out.println("心跳正常");
      }
    }
}三、WebSocket 心跳实现

对于 WebSocket 连接,心跳机制需要特殊处理:
服务端实现

public class WebSocketHeartbeatServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
      String text = msg.text();
      if ("HEARTBEAT".equals(text)) {
            ctx.writeAndFlush(new TextWebSocketFrame("HEARTBEAT_RESPONSE"));
      } else {
            // 处理其他WebSocket消息
      }
    }
   
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleEvent = (IdleStateEvent) evt;
            if (idleEvent.state() == IdleState.READER_IDLE) {
                ctx.close();
            } else if (idleEvent.state() == IdleState.WRITER_IDLE) {
                ctx.writeAndFlush(new TextWebSocketFrame("HEARTBEAT"));
            }
      }
    }
}客户端实现

public class WebSocketHeartbeatClientHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
      String text = msg.text();
      if ("HEARTBEAT".equals(text)) {
            ctx.writeAndFlush(new TextWebSocketFrame("HEARTBEAT_RESPONSE"));
      }
    }
   
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                ctx.writeAndFlush(new TextWebSocketFrame("HEARTBEAT"));
            }
      }
    }
}四、最佳实践建议


[*]合理设置超时时间:

[*]生产环境建议读空闲时间设置为60-120秒
[*]写空闲时间设置为30-60秒

[*]心跳协议设计:

[*]使用固定格式的心跳消息(如"HEARTBEAT")
[*]可以考虑携带时间戳或序列号用于调试

[*]重连机制:
// 客户端重连示例
public class ReconnectHandler extends ChannelInboundHandlerAdapter {
    private final Bootstrap bootstrap;
    private int retries = 0;
   
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      if (retries < 3) {
            long delay = 1L << retries;
            retries++;
            ctx.channel().eventLoop().schedule(() -> {
                System.out.println("尝试重连..." + retries);
                bootstrap.connect();
            }, delay, TimeUnit.SECONDS);
      }
      ctx.fireChannelInactive();
    }
}
[*]监控与日志:

[*]记录心跳异常情况
[*]监控连接存活率

[*]性能考虑:

[*]使用共享的ByteBuf作为心跳消息
[*]避免在心跳处理器中执行耗时操作

通过以上实现,可以构建健壮的Netty心跳机制,有效维护长连接的可靠性。
Netty 心跳机制中写空闲检测的考量

在 Netty 心跳机制中,写空闲(WRITER_IDLE)检测和读空闲(READER_IDLE)检测各有不同的应用场景和考量因素。是否需要同时使用两者取决于具体业务需求。
一、写空闲检测的主要考虑场景


[*]客户端主动保活(最常见场景)

[*]当客户端需要维持与服务端的连接时(如移动设备通过NAT网关连接)
[*]防止中间设备(路由器、防火墙等)因长时间无数据流动而断开连接
[*]典型实现:客户端定期发送心跳包

[*]服务端主动检测(特殊场景)

[*]当服务端需要确认客户端是否存活但客户端无法主动发送心跳时
[*]双向心跳检测机制中
[*]需要服务端主动推送数据的场景(如实时监控系统)

[*]对称性心跳设计

[*]在金融、支付等对可靠性要求高的系统中
[*]双方向都保持活跃检测,提高连接可靠性

二、是否只需要读空闲检测?

可以仅使用读空闲检测的场景:


[*]纯服务端检测模式

[*]客户端会定期发送数据(包括业务数据和心跳)
[*]服务端只需要检测是否在指定时间内收到任何数据

[*]客户端可靠主动发送心跳

[*]客户端能保证按时发送心跳包
[*]网络环境稳定(如内网通信)

[*]节省资源考虑

[*]减少不必要的写操作
[*]简化心跳逻辑

需要同时使用写空闲检测的场景:


[*]NAT环境下的长连接
// 典型NAT环境下的客户端配置
pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS)); // 只检测写空闲
[*]需要服务端主动保活的系统
// 服务端需要保持连接活跃
pipeline.addLast(new IdleStateHandler(60, 30, 0, TimeUnit.SECONDS)); // 读写都检测
[*]双向心跳验证
// 高可靠性系统的心跳设计
// 服务端:
pipeline.addLast(new IdleStateHandler(60, 45, 0, TimeUnit.SECONDS));

// 客户端:
pipeline.addLast(new IdleStateHandler(75, 30, 0, TimeUnit.SECONDS));
三、实际应用建议

推荐方案1:客户端单边心跳(最常见)

// 客户端配置
pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS)); // 只检测写空闲
pipeline.addLast(new HeartbeatClientHandler());

// 服务端配置
pipeline.addLast(new IdleStateHandler(90, 0, 0, TimeUnit.SECONDS)); // 只检测读空闲适用场景:大多数移动应用、WebSocket通信等
优点:

[*]客户端主动保活,避免NAT超时
[*]服务端只需检测客户端是否存活
[*]实现简单
推荐方案2:双向心跳检测

// 服务端配置
pipeline.addLast(new IdleStateHandler(60, 45, 0, TimeUnit.SECONDS));

// 客户端配置
pipeline.addLast(new IdleStateHandler(75, 30, 0, TimeUnit.SECONDS));适用场景:

[*]金融支付系统
[*]物联网关键设备通信
[*]对连接可靠性要求极高的场景
优点:

[*]双方向连接状态确认
[*]更高的可靠性
[*]能更快发现单向网络中断情况
推荐方案3:自适应心跳

// 可根据网络条件动态调整
public class AdaptiveIdleStateHandler extends IdleStateHandler {
    private boolean isMobileNetwork;
   
    public AdaptiveIdleStateHandler() {
      super(60, 30, 0, TimeUnit.SECONDS);
    }
   
    @Override
    protected long nextDelay(IdleState state) {
      if (isMobileNetwork && state == IdleState.WRITER_IDLE) {
            return 25; // 移动网络下更频繁发送
      }
      return super.nextDelay(state);
    }
}四、关键决策因素


[*]网络环境:

[*]公网/NAT环境:需要写空闲检测
[*]内网环境:可能只需读空闲检测

[*]客户端类型:

[*]移动设备:需要主动保活(写空闲)
[*]服务端:通常只需检测客户端是否存活(读空闲)

[*]业务需求:

[*]普通消息推送:单边检测足够
[*]金融交易:建议双向检测

[*]资源消耗:

[*]写空闲检测会增加少量网络流量
[*]读空闲检测不会产生额外流量

五、典型案例

案例1:IM即时通讯系统

// 客户端(移动设备)
pipeline.addLast(new IdleStateHandler(0, 25, 0, TimeUnit.SECONDS)); // 只写空闲

// 服务端
pipeline.addLast(new IdleStateHandler(120, 0, 0, TimeUnit.SECONDS)); // 只读空闲理由:移动设备需要保持NAT映射,服务端只需确认客户端是否在线
案例2:物联网数据采集

// 设备端(客户端)
pipeline.addLast(new IdleStateHandler(0, 60, 0, TimeUnit.SECONDS));

// 服务端
pipeline.addLast(new IdleStateHandler(180, 120, 0, TimeUnit.SECONDS));理由:设备可能处于不稳定网络环境,需要双方向检测
总结

是否需要写空闲检测取决于具体场景:

[*]大多数情况下:客户端需要写空闲检测(主动保活),服务端只需读空闲检测
[*]高可靠性系统:建议使用双向检测
[*]内网稳定环境:可能只需读空闲检测
最佳实践是根据实际网络条件和业务需求,选择适当的组合方式。对于公网应用,特别是移动端,写空闲检测通常是必要的。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Netty 心跳机制实现(客户端与服务端)