找回密码
 立即注册
首页 业界区 安全 App消息推送的简单实现

App消息推送的简单实现

杭环 6 小时前
现在手机App消息推送一般用极光之类的第三方服务来实现。但有一些消息,前端没有展示需求,一条数据的长度有限,但数量很大。比如硬件设备产生的消息,需要推送到前端,这时使用第三方服务如果免费的容易被限流,要不就得使用付费服务。
这里用在服务端与App端建立WebSocket连接的方式实现了简单的消息推送,这里服务端用的是Java,App端用的是Flutter。

  • 服务端实现

  • 添加依赖
    1. <dependency>
    2.             <groupId>io.netty</groupId>
    3.             netty-all</artifactId>
    4. </dependency>
    复制代码
    用Netty来管理WebSocket连接
  • 实现ChannelHandler继承SimpleChannelInboundHandler来处理WebSocket消息
    1. public class PushWebsocketHandler extends SimpleChannelInboundHandler<WebSocketFrame>
    复制代码
    在这个类里面重载二个方法
    1.   @Override
    2.     protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
    3.         if (frame instanceof PingWebSocketFrame){
    4.             ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
    5.         } else if (frame instanceof TextWebSocketFrame) {
    6. //            resolveTextFrame(ctx, (TextWebSocketFrame) frame);
    7.         } else if (frame instanceof CloseWebSocketFrame) {
    8.             var user = pool.removeChannel(ctx.channel());
    9.             if (user != null){
    10.                 log.info("用户{}({})下线", user.getUserId(),user.getToken());
    11.             }
    12.             ctx.close();
    13.         }
    14.     }
    复制代码
    上面channelRead0方法中处理消息,PingWebSocketFrame处理ping消息,CloseWebSocketFrame处理关闭消息,因为当前设计中消息是单向,所以代码中TextWebSocketFrame的部分注释掉了。
    1.     @Override
    2.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    3.         if (msg instanceof FullHttpRequest){
    4.             var request = (FullHttpRequest) msg;
    5.             String uri = request.uri();
    6.             if (uri.contains(websocketPath) && "websocket".equals(request.headers().get("Upgrade"))){
    7.                 int index = uri.lastIndexOf("/");
    8.                 if (index >= uri.length() - 1) {
    9.                     closeClient(ctx, "无效的连接");
    10.                 } else {
    11.                     String token = uri.substring(index + 1);
    12.                     if (tokenUtil.hasAccount(token)) {
    13.                         var account = tokenUtil.getAccount(token);
    14.                         log.info("用户{}({})上线",account.getUserId(), token);
    15.                         pool.addChannel(account, ctx.channel());
    16.                         request.setUri(websocketPath);
    17.                     } else {
    18.                         closeClient(ctx, "token无效:" + token);
    19.                     }
    20.                 }
    21.             }else{
    22.                 closeClient(ctx,"无效的连接");
    23.             }
    24.         }
    25.         super.channelRead(ctx, msg);
    26.     }<br>
    复制代码
     上面的channelRead方法处理连接请求,如果是个WebSocket请求,则从url中取得token,token验证通过则把当前连接加入池中。closeClient关闭当前连接
    1.     private void closeClient(ChannelHandlerContext ctx,String reason){
    2.         log.info(reason);
    3.         ctx.writeAndFlush(new CloseWebSocketFrame(400,reason)).addListener(ChannelFutureListener.CLOSE);
    4.     }
    复制代码
    下面则是对于一些异常情况的处理,连接断开、连接空闲、连接发生异常等
    1.gif
    2.gif
    1. @Override
    2.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    3.         var user = pool.removeChannel(ctx.channel());
    4.         if (user != null){
    5.             log.info("用户{}({})连接断了", user.getUserId(),user.getToken());
    6.         }
    7.         super.channelInactive(ctx);
    8.     }
    9.     @Override
    10.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    11.         if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
    12.             IdleStateEvent event = (IdleStateEvent) evt;
    13.             if (event.state() == IdleState.READER_IDLE) {
    14.                 var user = pool.removeChannel(ctx.channel());
    15.                 if (user != null){
    16.                     log.info("用户{}({})长时间未响应,断开连接", user.getUserId(),user.getToken());
    17.                 }
    18.                 ctx.close();
    19.             }
    20.         }
    21.     }
    22.     @Override
    23.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    24.         if (ctx.channel().isActive()) {
    25.             var userId = pool.removeChannel(ctx.channel());
    26.             log.info("发生异常:{},用户{}连接释放",cause.getLocalizedMessage(), userId);
    27.             ctx.close();
    28.         }
    29.     }
    复制代码
    View Code
  • 启动Netty服务启动的主体代码
    1. bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
    2.                     .childHandler(new ChannelInitializer<SocketChannel>() {
    3.                         @Override
    4.                         public void initChannel(SocketChannel channel) {
    5.                             channel.pipeline()
    6.                                     .addLast(new IdleStateHandler(605,
    7.                                             0, 0, TimeUnit.SECONDS))
    8.                                     .addLast(new HttpServerCodec())
    9.                                     .addLast(new ChunkedWriteHandler())
    10.                                     .addLast(new HttpObjectAggregator(1024*64))
    11.                                     .addLast(handler)
    12.                                     .addLast(new WebSocketServerProtocolHandler(websocketPath))
    13.                                     ;
    14.                         }
    15.                     }).option(ChannelOption.SO_BACKLOG, 1024)
    16.                     .option(ChannelOption.SO_REUSEADDR, true)
    17.                     .childOption(ChannelOption.TCP_NODELAY, true)
    18.                     .childOption(ChannelOption.SO_KEEPALIVE, true);
    19.             ChannelFuture future = bootstrap.bind(port).sync();
    20.             if (future.isSuccess()) {
    21.                 log.info("服务器端口【{}】bind成功", port);
    22.             } else {
    23.                 log.error("服务器端口【{}】bind失败", port);
    24.             }
    25.             future.channel().closeFuture().sync();
    复制代码
    倒数第2个addLast里的handler就是我们上一步定义的PushWebsocketHandler
  • 推送消息下面是给应用层调用的推送方法
    1.     public void pushMessage(PushBody body) throws JsonProcessingException {
    2.         if (body.getUsers().size() == 0) return;
    3.         for(var user : body.getUsers()){
    4.             var channel = pool.getChannel(user);
    5.             if (channel.size() > 0){
    6.                 for(var item : channel){
    7.                     var id = UUID.randomUUID().toString();
    8.                     var key = item.getChannelId() + "-" + id;
    9.                     var msgPackage = new MessagePackage()
    10.                             .setMessageId(key)
    11.                             .setContent(body.getMessage());
    12.                     ObjectMapper objectMapper = new ObjectMapper();
    13.                     String text = objectMapper.writeValueAsString(msgPackage);
    14.                     item.getChannel().writeAndFlush(new TextWebSocketFrame(text));
    15.                     log.info("给用户【{}】推送了{}消息",user,body.getTitle());
    16.                 }
    17.             }
    18.         }
    19.     }
    复制代码
    同一消息可以推送给多个用户,一个用户可以有多个活跃连接
  • nginx配置要让服务器正常地处理WebSocket请求,nginx必须作一下配置
    1. location /wss {
    2.       proxy_pass http://127.0.0.1:7001/;
    3.       proxy_http_version 1.1;
    4.       proxy_set_header Upgrade $http_upgrade;
    5.       proxy_set_header Connection "Upgrade";
    6.       proxy_read_timeout    900;
    7.       proxy_send_timeout    900;
    8. }
    复制代码
     


  • App端实现
   App端处理WebSocket需引入一个web_socket_client
  1. dependencies:
  2.   web_socket_client: ^0.1.1
复制代码
    在App启动,用户登录之后,把消息推送的地址拼接出来
 static WebSocket? socket;
  1. static Future<void> refreshToken(String token, {reconnect = false}) async {
  2.     final url = AppConfig.serverAddress.replaceAll('https', 'wss');
  3.     wsUrl = Uri.parse('$url/wss/push/$token');
  4.     if (reconnect) {
  5.       //关掉原来的,重新连接
  6.       health?.cancel();
  7.       pushListener?.cancel();
  8.       socket?.close(1001, 'TOKEN_REFRESH');
  9.       socketConnect();
  10.     }
  11.   }
复制代码
url里面/wss对应的是nginx配置里的 location, /push对应的是channelRead方法里的websocketPath,跟服务端保持一致即可
reconnect参数,用于重新刷新token后调用时用,首次启动时传false即可。
下面就是建立连接的方法
  1.   static void socketConnect() {
  2.     final backoff = LinearBackoff(
  3.       initial: Duration.zero,
  4.       increment: const Duration(seconds: 3),
  5.       maximum: const Duration(seconds: 3),
  6.     );
  7.     socket = WebSocket(wsUrl!, backoff: backoff);
  8.     pushListener = socket?.messages.listen((message) {
  9.       final json = convert.jsonDecode(message);
  10.       resolveMessage(json["content"]);
  11.     });
  12.     // connectListener = socket?.connection.listen((state)async {
  13.     //   if (state is Disconnected){ //断了不用自己重连
  14.     //     if (kDebugMode) {
  15.     //       print('跟推送服务连接断了..................');
  16.     //     }
  17.     //   }
  18.     // });
  19.     socket?.connection.firstWhere((state) => state is Connected).then((value) {
  20.       if (kDebugMode) {
  21.         print('跟推送服务建立了连接..................');
  22.       }
  23.       health = TimerUtil()
  24.         ..setInterval(30 * 1000) //30秒保活
  25.         ..setOnTimerTickCallback((millisUntilFinished) {
  26.           socket?.send('--');
  27.         })
  28.         ..startTimer();
  29.     });
  30.   }
复制代码
  1. resolveMessage是解析推送消息的方法,这里就不引用了。<br><br>这个方案不管客户端有没有收到,几分钟或几秒后就推送新的数据过去了,数据丢就丢了,也没大不了的。这种简单实现刚好能满足我们的实际需要。
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册