现在手机App消息推送一般用极光之类的第三方服务来实现。但有一些消息,前端没有展示需求,一条数据的长度有限,但数量很大。比如硬件设备产生的消息,需要推送到前端,这时使用第三方服务如果免费的容易被限流,要不就得使用付费服务。
这里用在服务端与App端建立WebSocket连接的方式实现了简单的消息推送,这里服务端用的是Java,App端用的是Flutter。
- 添加依赖
- <dependency>
- <groupId>io.netty</groupId>
- netty-all</artifactId>
- </dependency>
复制代码 用Netty来管理WebSocket连接
- 实现ChannelHandler继承SimpleChannelInboundHandler来处理WebSocket消息
- public class PushWebsocketHandler extends SimpleChannelInboundHandler<WebSocketFrame>
复制代码 在这个类里面重载二个方法- @Override
- protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
- if (frame instanceof PingWebSocketFrame){
- ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
- } else if (frame instanceof TextWebSocketFrame) {
- // resolveTextFrame(ctx, (TextWebSocketFrame) frame);
- } else if (frame instanceof CloseWebSocketFrame) {
- var user = pool.removeChannel(ctx.channel());
- if (user != null){
- log.info("用户{}({})下线", user.getUserId(),user.getToken());
- }
- ctx.close();
- }
- }
复制代码 上面channelRead0方法中处理消息,PingWebSocketFrame处理ping消息,CloseWebSocketFrame处理关闭消息,因为当前设计中消息是单向,所以代码中TextWebSocketFrame的部分注释掉了。- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof FullHttpRequest){
- var request = (FullHttpRequest) msg;
- String uri = request.uri();
- if (uri.contains(websocketPath) && "websocket".equals(request.headers().get("Upgrade"))){
- int index = uri.lastIndexOf("/");
- if (index >= uri.length() - 1) {
- closeClient(ctx, "无效的连接");
- } else {
- String token = uri.substring(index + 1);
- if (tokenUtil.hasAccount(token)) {
- var account = tokenUtil.getAccount(token);
- log.info("用户{}({})上线",account.getUserId(), token);
- pool.addChannel(account, ctx.channel());
- request.setUri(websocketPath);
- } else {
- closeClient(ctx, "token无效:" + token);
- }
- }
- }else{
- closeClient(ctx,"无效的连接");
- }
- }
- super.channelRead(ctx, msg);
- }<br>
复制代码 上面的channelRead方法处理连接请求,如果是个WebSocket请求,则从url中取得token,token验证通过则把当前连接加入池中。closeClient关闭当前连接- private void closeClient(ChannelHandlerContext ctx,String reason){
- log.info(reason);
- ctx.writeAndFlush(new CloseWebSocketFrame(400,reason)).addListener(ChannelFutureListener.CLOSE);
- }
复制代码 下面则是对于一些异常情况的处理,连接断开、连接空闲、连接发生异常等
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- var user = pool.removeChannel(ctx.channel());
- if (user != null){
- log.info("用户{}({})连接断了", user.getUserId(),user.getToken());
- }
- super.channelInactive(ctx);
- }
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) {
- var user = pool.removeChannel(ctx.channel());
- if (user != null){
- log.info("用户{}({})长时间未响应,断开连接", user.getUserId(),user.getToken());
- }
- ctx.close();
- }
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- if (ctx.channel().isActive()) {
- var userId = pool.removeChannel(ctx.channel());
- log.info("发生异常:{},用户{}连接释放",cause.getLocalizedMessage(), userId);
- ctx.close();
- }
- }
复制代码 View Code
- 启动Netty服务启动的主体代码
- bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel channel) {
- channel.pipeline()
- .addLast(new IdleStateHandler(605,
- 0, 0, TimeUnit.SECONDS))
- .addLast(new HttpServerCodec())
- .addLast(new ChunkedWriteHandler())
- .addLast(new HttpObjectAggregator(1024*64))
- .addLast(handler)
- .addLast(new WebSocketServerProtocolHandler(websocketPath))
- ;
- }
- }).option(ChannelOption.SO_BACKLOG, 1024)
- .option(ChannelOption.SO_REUSEADDR, true)
- .childOption(ChannelOption.TCP_NODELAY, true)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
- ChannelFuture future = bootstrap.bind(port).sync();
- if (future.isSuccess()) {
- log.info("服务器端口【{}】bind成功", port);
- } else {
- log.error("服务器端口【{}】bind失败", port);
- }
- future.channel().closeFuture().sync();
复制代码 倒数第2个addLast里的handler就是我们上一步定义的PushWebsocketHandler
- 推送消息下面是给应用层调用的推送方法
- public void pushMessage(PushBody body) throws JsonProcessingException {
- if (body.getUsers().size() == 0) return;
- for(var user : body.getUsers()){
- var channel = pool.getChannel(user);
- if (channel.size() > 0){
- for(var item : channel){
- var id = UUID.randomUUID().toString();
- var key = item.getChannelId() + "-" + id;
- var msgPackage = new MessagePackage()
- .setMessageId(key)
- .setContent(body.getMessage());
- ObjectMapper objectMapper = new ObjectMapper();
- String text = objectMapper.writeValueAsString(msgPackage);
- item.getChannel().writeAndFlush(new TextWebSocketFrame(text));
- log.info("给用户【{}】推送了{}消息",user,body.getTitle());
- }
- }
- }
- }
复制代码 同一消息可以推送给多个用户,一个用户可以有多个活跃连接
- nginx配置要让服务器正常地处理WebSocket请求,nginx必须作一下配置
- location /wss {
- proxy_pass http://127.0.0.1:7001/;
- proxy_http_version 1.1;
- proxy_set_header Upgrade $http_upgrade;
- proxy_set_header Connection "Upgrade";
- proxy_read_timeout 900;
- proxy_send_timeout 900;
- }
复制代码
App端处理WebSocket需引入一个web_socket_client- dependencies:
- web_socket_client: ^0.1.1
复制代码 在App启动,用户登录之后,把消息推送的地址拼接出来
static WebSocket? socket;- static Future<void> refreshToken(String token, {reconnect = false}) async {
- final url = AppConfig.serverAddress.replaceAll('https', 'wss');
- wsUrl = Uri.parse('$url/wss/push/$token');
- if (reconnect) {
- //关掉原来的,重新连接
- health?.cancel();
- pushListener?.cancel();
- socket?.close(1001, 'TOKEN_REFRESH');
- socketConnect();
- }
- }
复制代码 url里面/wss对应的是nginx配置里的 location, /push对应的是channelRead方法里的websocketPath,跟服务端保持一致即可
reconnect参数,用于重新刷新token后调用时用,首次启动时传false即可。
下面就是建立连接的方法- static void socketConnect() {
- final backoff = LinearBackoff(
- initial: Duration.zero,
- increment: const Duration(seconds: 3),
- maximum: const Duration(seconds: 3),
- );
- socket = WebSocket(wsUrl!, backoff: backoff);
- pushListener = socket?.messages.listen((message) {
- final json = convert.jsonDecode(message);
- resolveMessage(json["content"]);
- });
- // connectListener = socket?.connection.listen((state)async {
- // if (state is Disconnected){ //断了不用自己重连
- // if (kDebugMode) {
- // print('跟推送服务连接断了..................');
- // }
- // }
- // });
- socket?.connection.firstWhere((state) => state is Connected).then((value) {
- if (kDebugMode) {
- print('跟推送服务建立了连接..................');
- }
- health = TimerUtil()
- ..setInterval(30 * 1000) //30秒保活
- ..setOnTimerTickCallback((millisUntilFinished) {
- socket?.send('--');
- })
- ..startTimer();
- });
- }
复制代码- resolveMessage是解析推送消息的方法,这里就不引用了。<br><br>这个方案不管客户端有没有收到,几分钟或几秒后就推送新的数据过去了,数据丢就丢了,也没大不了的。这种简单实现刚好能满足我们的实际需要。
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |