诘琅 发表于 2025-9-13 11:02:28

App消息推送的简单实现

现在手机App消息推送一般用极光之类的第三方服务来实现。但有一些消息,前端没有展示需求,一条数据的长度有限,但数量很大。比如硬件设备产生的消息,需要推送到前端,这时使用第三方服务如果免费的容易被限流,要不就得使用付费服务。
这里用在服务端与App端建立WebSocket连接的方式实现了简单的消息推送,这里服务端用的是Java,App端用的是Flutter。

[*]服务端实现

[*]添加依赖<dependency>
            <groupId>io.netty</groupId>
            netty-all</artifactId>
</dependency>用Netty来管理WebSocket连接
[*]实现ChannelHandler继承SimpleChannelInboundHandler来处理WebSocket消息
public class PushWebsocketHandler extends SimpleChannelInboundHandler<WebSocketFrame>在这个类里面重载二个方法
  @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
      if (frame instanceof PingWebSocketFrame){
            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
      } else if (frame instanceof TextWebSocketFrame) {
//            resolveTextFrame(ctx, (TextWebSocketFrame) frame);
      } else if (frame instanceof CloseWebSocketFrame) {
            var user = pool.removeChannel(ctx.channel());
            if (user != null){
                log.info("用户{}({})下线", user.getUserId(),user.getToken());
            }
            ctx.close();
      }
    }上面channelRead0方法中处理消息,PingWebSocketFrame处理ping消息,CloseWebSocketFrame处理关闭消息,因为当前设计中消息是单向,所以代码中TextWebSocketFrame的部分注释掉了。
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      if (msg instanceof FullHttpRequest){
            var request = (FullHttpRequest) msg;
            String uri = request.uri();
            if (uri.contains(websocketPath) && "websocket".equals(request.headers().get("Upgrade"))){
                int index = uri.lastIndexOf("/");
                if (index >= uri.length() - 1) {
                  closeClient(ctx, "无效的连接");
                } else {
                  String token = uri.substring(index + 1);
                  if (tokenUtil.hasAccount(token)) {
                        var account = tokenUtil.getAccount(token);
                        log.info("用户{}({})上线",account.getUserId(), token);
                        pool.addChannel(account, ctx.channel());

                        request.setUri(websocketPath);
                  } else {
                        closeClient(ctx, "token无效:" + token);
                  }
                }
            }else{
                closeClient(ctx,"无效的连接");
            }

      }
      super.channelRead(ctx, msg);
    }<br> 上面的channelRead方法处理连接请求,如果是个WebSocket请求,则从url中取得token,token验证通过则把当前连接加入池中。closeClient关闭当前连接
    private void closeClient(ChannelHandlerContext ctx,String reason){
      log.info(reason);
      ctx.writeAndFlush(new CloseWebSocketFrame(400,reason)).addListener(ChannelFutureListener.CLOSE);
    }下面则是对于一些异常情况的处理,连接断开、连接空闲、连接发生异常等
@Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      var user = pool.removeChannel(ctx.channel());
      if (user != null){
            log.info("用户{}({})连接断了", user.getUserId(),user.getToken());
      }
      super.channelInactive(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                var user = pool.removeChannel(ctx.channel());
                if (user != null){
                  log.info("用户{}({})长时间未响应,断开连接", user.getUserId(),user.getToken());
                }
                ctx.close();
            }
      }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      if (ctx.channel().isActive()) {
            var userId = pool.removeChannel(ctx.channel());
            log.info("发生异常:{},用户{}连接释放",cause.getLocalizedMessage(), userId);
            ctx.close();
      }
    }View Code
[*]启动Netty服务启动的主体代码
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                  .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) {
                            channel.pipeline()
                                    .addLast(new IdleStateHandler(605,
                                          0, 0, TimeUnit.SECONDS))
                                    .addLast(new HttpServerCodec())
                                    .addLast(new ChunkedWriteHandler())
                                    .addLast(new HttpObjectAggregator(1024*64))
                                    .addLast(handler)
                                    .addLast(new WebSocketServerProtocolHandler(websocketPath))
                                    ;
                        }
                  }).option(ChannelOption.SO_BACKLOG, 1024)
                  .option(ChannelOption.SO_REUSEADDR, true)
                  .childOption(ChannelOption.TCP_NODELAY, true)
                  .childOption(ChannelOption.SO_KEEPALIVE, true);


            ChannelFuture future = bootstrap.bind(port).sync();
            if (future.isSuccess()) {
                log.info("服务器端口【{}】bind成功", port);
            } else {
                log.error("服务器端口【{}】bind失败", port);
            }
            future.channel().closeFuture().sync();倒数第2个addLast里的handler就是我们上一步定义的PushWebsocketHandler
[*]推送消息下面是给应用层调用的推送方法
    public void pushMessage(PushBody body) throws JsonProcessingException {
      if (body.getUsers().size() == 0) return;
      for(var user : body.getUsers()){
            var channel = pool.getChannel(user);
            if (channel.size() > 0){
                for(var item : channel){
                  var id = UUID.randomUUID().toString();
                  var key = item.getChannelId() + "-" + id;
                  var msgPackage = new MessagePackage()
                            .setMessageId(key)
                            .setContent(body.getMessage());

                  ObjectMapper objectMapper = new ObjectMapper();
                  String text = objectMapper.writeValueAsString(msgPackage);
                  item.getChannel().writeAndFlush(new TextWebSocketFrame(text));
                  log.info("给用户【{}】推送了{}消息",user,body.getTitle());
                }
            }
      }
    }同一消息可以推送给多个用户,一个用户可以有多个活跃连接
[*]nginx配置要让服务器正常地处理WebSocket请求,nginx必须作一下配置
location /wss {
      proxy_pass http://127.0.0.1:7001/;
      proxy_http_version 1.1;
      proxy_set_header Upgrade $http_upgrade;
      proxy_set_header Connection "Upgrade";
      proxy_read_timeout    900;
      proxy_send_timeout    900;



[*]App端实现
   App端处理WebSocket需引入一个web_socket_client
dependencies:
web_socket_client: ^0.1.1    在App启动,用户登录之后,把消息推送的地址拼接出来
 static WebSocket? socket;static Future<void> refreshToken(String token, {reconnect = false}) async {
    final url = AppConfig.serverAddress.replaceAll('https', 'wss');
    wsUrl = Uri.parse('$url/wss/push/$token');

    if (reconnect) {
      //关掉原来的,重新连接
      health?.cancel();
      pushListener?.cancel();
      socket?.close(1001, 'TOKEN_REFRESH');

      socketConnect();
    }
}url里面/wss对应的是nginx配置里的 location, /push对应的是channelRead方法里的websocketPath,跟服务端保持一致即可
reconnect参数,用于重新刷新token后调用时用,首次启动时传false即可。
下面就是建立连接的方法
static void socketConnect() {
    final backoff = LinearBackoff(
      initial: Duration.zero,
      increment: const Duration(seconds: 3),
      maximum: const Duration(seconds: 3),
    );
    socket = WebSocket(wsUrl!, backoff: backoff);

    pushListener = socket?.messages.listen((message) {
      final json = convert.jsonDecode(message);
      resolveMessage(json["content"]);
    });

    // connectListener = socket?.connection.listen((state)async {
    //   if (state is Disconnected){ //断了不用自己重连
    //   if (kDebugMode) {
    //       print('跟推送服务连接断了..................');
    //   }
    //   }
    // });

    socket?.connection.firstWhere((state) => state is Connected).then((value) {
      if (kDebugMode) {
      print('跟推送服务建立了连接..................');
      }
      health = TimerUtil()
      ..setInterval(30 * 1000) //30秒保活
      ..setOnTimerTickCallback((millisUntilFinished) {
          socket?.send('--');
      })
      ..startTimer();
    });
}resolveMessage是解析推送消息的方法,这里就不引用了。<br><br>这个方案不管客户端有没有收到,几分钟或几秒后就推送新的数据过去了,数据丢就丢了,也没大不了的。这种简单实现刚好能满足我们的实际需要。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: App消息推送的简单实现