App消息推送的简单实现
现在手机App消息推送一般用极光之类的第三方服务来实现。但有一些消息,前端没有展示需求,一条数据的长度有限,但数量很大。比如硬件设备产生的消息,需要推送到前端,这时使用第三方服务如果免费的容易被限流,要不就得使用付费服务。这里用在服务端与App端建立WebSocket连接的方式实现了简单的消息推送,这里服务端用的是Java,App端用的是Flutter。
[*]服务端实现
[*]添加依赖<dependency>
<groupId>io.netty</groupId>
netty-all</artifactId>
</dependency>用Netty来管理WebSocket连接
[*]实现ChannelHandler继承SimpleChannelInboundHandler来处理WebSocket消息
public class PushWebsocketHandler extends SimpleChannelInboundHandler<WebSocketFrame>在这个类里面重载二个方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof PingWebSocketFrame){
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
} else if (frame instanceof TextWebSocketFrame) {
// resolveTextFrame(ctx, (TextWebSocketFrame) frame);
} else if (frame instanceof CloseWebSocketFrame) {
var user = pool.removeChannel(ctx.channel());
if (user != null){
log.info("用户{}({})下线", user.getUserId(),user.getToken());
}
ctx.close();
}
}上面channelRead0方法中处理消息,PingWebSocketFrame处理ping消息,CloseWebSocketFrame处理关闭消息,因为当前设计中消息是单向,所以代码中TextWebSocketFrame的部分注释掉了。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest){
var request = (FullHttpRequest) msg;
String uri = request.uri();
if (uri.contains(websocketPath) && "websocket".equals(request.headers().get("Upgrade"))){
int index = uri.lastIndexOf("/");
if (index >= uri.length() - 1) {
closeClient(ctx, "无效的连接");
} else {
String token = uri.substring(index + 1);
if (tokenUtil.hasAccount(token)) {
var account = tokenUtil.getAccount(token);
log.info("用户{}({})上线",account.getUserId(), token);
pool.addChannel(account, ctx.channel());
request.setUri(websocketPath);
} else {
closeClient(ctx, "token无效:" + token);
}
}
}else{
closeClient(ctx,"无效的连接");
}
}
super.channelRead(ctx, msg);
}<br> 上面的channelRead方法处理连接请求,如果是个WebSocket请求,则从url中取得token,token验证通过则把当前连接加入池中。closeClient关闭当前连接
private void closeClient(ChannelHandlerContext ctx,String reason){
log.info(reason);
ctx.writeAndFlush(new CloseWebSocketFrame(400,reason)).addListener(ChannelFutureListener.CLOSE);
}下面则是对于一些异常情况的处理,连接断开、连接空闲、连接发生异常等
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
var user = pool.removeChannel(ctx.channel());
if (user != null){
log.info("用户{}({})连接断了", user.getUserId(),user.getToken());
}
super.channelInactive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
var user = pool.removeChannel(ctx.channel());
if (user != null){
log.info("用户{}({})长时间未响应,断开连接", user.getUserId(),user.getToken());
}
ctx.close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (ctx.channel().isActive()) {
var userId = pool.removeChannel(ctx.channel());
log.info("发生异常:{},用户{}连接释放",cause.getLocalizedMessage(), userId);
ctx.close();
}
}View Code
[*]启动Netty服务启动的主体代码
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new IdleStateHandler(605,
0, 0, TimeUnit.SECONDS))
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpObjectAggregator(1024*64))
.addLast(handler)
.addLast(new WebSocketServerProtocolHandler(websocketPath))
;
}
}).option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
if (future.isSuccess()) {
log.info("服务器端口【{}】bind成功", port);
} else {
log.error("服务器端口【{}】bind失败", port);
}
future.channel().closeFuture().sync();倒数第2个addLast里的handler就是我们上一步定义的PushWebsocketHandler
[*]推送消息下面是给应用层调用的推送方法
public void pushMessage(PushBody body) throws JsonProcessingException {
if (body.getUsers().size() == 0) return;
for(var user : body.getUsers()){
var channel = pool.getChannel(user);
if (channel.size() > 0){
for(var item : channel){
var id = UUID.randomUUID().toString();
var key = item.getChannelId() + "-" + id;
var msgPackage = new MessagePackage()
.setMessageId(key)
.setContent(body.getMessage());
ObjectMapper objectMapper = new ObjectMapper();
String text = objectMapper.writeValueAsString(msgPackage);
item.getChannel().writeAndFlush(new TextWebSocketFrame(text));
log.info("给用户【{}】推送了{}消息",user,body.getTitle());
}
}
}
}同一消息可以推送给多个用户,一个用户可以有多个活跃连接
[*]nginx配置要让服务器正常地处理WebSocket请求,nginx必须作一下配置
location /wss {
proxy_pass http://127.0.0.1:7001/;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_read_timeout 900;
proxy_send_timeout 900;
}
[*]App端实现
App端处理WebSocket需引入一个web_socket_client
dependencies:
web_socket_client: ^0.1.1 在App启动,用户登录之后,把消息推送的地址拼接出来
static WebSocket? socket;static Future<void> refreshToken(String token, {reconnect = false}) async {
final url = AppConfig.serverAddress.replaceAll('https', 'wss');
wsUrl = Uri.parse('$url/wss/push/$token');
if (reconnect) {
//关掉原来的,重新连接
health?.cancel();
pushListener?.cancel();
socket?.close(1001, 'TOKEN_REFRESH');
socketConnect();
}
}url里面/wss对应的是nginx配置里的 location, /push对应的是channelRead方法里的websocketPath,跟服务端保持一致即可
reconnect参数,用于重新刷新token后调用时用,首次启动时传false即可。
下面就是建立连接的方法
static void socketConnect() {
final backoff = LinearBackoff(
initial: Duration.zero,
increment: const Duration(seconds: 3),
maximum: const Duration(seconds: 3),
);
socket = WebSocket(wsUrl!, backoff: backoff);
pushListener = socket?.messages.listen((message) {
final json = convert.jsonDecode(message);
resolveMessage(json["content"]);
});
// connectListener = socket?.connection.listen((state)async {
// if (state is Disconnected){ //断了不用自己重连
// if (kDebugMode) {
// print('跟推送服务连接断了..................');
// }
// }
// });
socket?.connection.firstWhere((state) => state is Connected).then((value) {
if (kDebugMode) {
print('跟推送服务建立了连接..................');
}
health = TimerUtil()
..setInterval(30 * 1000) //30秒保活
..setOnTimerTickCallback((millisUntilFinished) {
socket?.send('--');
})
..startTimer();
});
}resolveMessage是解析推送消息的方法,这里就不引用了。<br><br>这个方案不管客户端有没有收到,几分钟或几秒后就推送新的数据过去了,数据丢就丢了,也没大不了的。这种简单实现刚好能满足我们的实际需要。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]