找回密码
 立即注册
首页 业界区 业界 Django实时通信实战:WebSocket与ASGI全解析(下) ...

Django实时通信实战:WebSocket与ASGI全解析(下)

裴竹悦 2025-9-26 11:40:45
一、实战:构建实时聊天室

环境准备

下面将使用 Django Channels 构建一个多用户实时聊天室。Django Channels的介绍、安装与配置,参考上篇。
实现 WebSocket 消费者

创建mysite\myapp_infra\websocket\consumers.py文件,这是处理 WebSocket 连接的核心。主要实现了如下方法:

  • connect:处理新连接,验证用户token,将用户通道写入缓存,并加入默认组
  • disconnect:处理连接断开,删除用户的缓存通道记录,将用户从所在的房间组中移除
  • receive:处理接收到的消息。首先进行心跳检测(ping/pong),然后验证用户身份,解析两层 JSON 结构的消息内容。根据消息类型 demo-message-send 处理文本消息:若指定接收用户则单发,否则群发广播。
  • broadcast_message:处理群发消息,从事件中提取payload数据并发送给所有连接的客户端
  • single_message:处理单发消息,从事件中提取payload数据并发送给指定的客户端
  1. """WebScoket 消费者"""
  2. import json
  3. import logging
  4. from django.core.cache import cache
  5. from django.conf import settings
  6. from channels.generic.websocket import AsyncWebsocketConsumer
  7. from rest_framework_simplejwt.tokens import RefreshToken
  8. from rest_framework_simplejwt.exceptions import TokenError
  9. logger = logging.getLogger(__name__)
  10. class InfraConsumer(AsyncWebsocketConsumer):
  11.     """基础设施-WebSocket 异步消费者"""
  12.     async def connect(self):
  13.         """当客户端发起 WebSocket 连接时调用"""
  14.         query_params = self.scope["query_string"].decode()
  15.         # 从查询参数获取token
  16.         token_param = [
  17.             param.split("=")
  18.             for param in query_params.split("&")
  19.             if param.startswith("token=")
  20.         ]
  21.         if not token_param or len(token_param[0]) != 2:
  22.             logger.error("缺少token参数")
  23.             await self.close(code=4001)  # 自定义错误码
  24.             return
  25.         token = token_param[0][1]
  26.         try:
  27.             # 验证Refresh Token有效性
  28.             refresh = RefreshToken(token)
  29.             user_id = refresh["user_id"]
  30.             self.scope["user"] = {"id": user_id, "token_type": "refresh"}
  31.             # 登录成功后,将用户通道写入缓存
  32.             cache.set(f"user_{user_id}_channel", self.channel_name)
  33.             # 加入默认组
  34.             self.room_group_name = settings.DEFAULT_GROUP_NAME
  35.             await self.channel_layer.group_add(self.room_group_name, self.channel_name)
  36.             # 接受客户端的WebSocket连接请求
  37.             await self.accept()
  38.         except TokenError as e:
  39.             logger.error("无效token")
  40.             await self.close(code=4003)
  41.         except Exception as e:
  42.             logger.error(str(e))
  43.             await self.close(code=4000)
  44.     async def disconnect(self, close_code):
  45.         """当客户端断开 WebSocket 连接时调用"""
  46.         # 获取当前用户
  47.         user = self.scope.get("user")
  48.         if user:
  49.             user_id = user.get("id")
  50.             # 移除用户通道
  51.             cache.delete(f"user_{user_id}_channel")
  52.             # 将用户从组中移除
  53.             room_group_name = getattr(self, "room_group_name", None)
  54.             if room_group_name:
  55.                 await self.channel_layer.group_discard(
  56.                     room_group_name, self.channel_name
  57.                 )
  58.     async def receive(self, text_data=None, bytes_data=None):
  59.         """当接收到客户端发送的消息时调用"""
  60.         # 心跳检测
  61.         if text_data == "ping":
  62.             await self.send(text_data="pong")
  63.             return
  64.         user = self.scope.get("user")
  65.         if not user:
  66.             logger.warning(f"匿名用户访问拒绝:{text_data}")
  67.             return
  68.         logger.info(f"收到用户 {user.get('id')} 发送消息:{text_data}")
  69.         # 因为消息采用了两次JSON封装,这里进行两次JSON解析
  70.         try:
  71.             outer_payload = json.loads(text_data)  # 外层JSON解释
  72.             message_type = outer_payload.get("type")
  73.             raw_content = outer_payload.get("content", "{}")
  74.             inner_content = json.loads(raw_content)  # 内层JSON解释
  75.         except json.JSONDecodeError:
  76.             logger.error("内容解析失败")
  77.             return
  78.         # 处理业务消息
  79.         if message_type == "demo-message-send":
  80.             message_text = inner_content.get("text", "").strip()
  81.             if not message_text:
  82.                 logger.warning("消息内容不能为空")
  83.                 return
  84.             # 构建响应字典
  85.             content = {
  86.                 "fromUserId": user.get("id"),
  87.                 "text": message_text,
  88.                 "single": False,  # 默认群发
  89.             }
  90.             # 判断接收对象
  91.             target_user_id = inner_content.get("toUserId")
  92.             if target_user_id not in [None, "", 0]:
  93.                 # 单发
  94.                 await self._send_single_message(target_user_id, content)
  95.             else:
  96.                 # 群发广播
  97.                 await self._send_broadcast_message(content)
  98.     def _build_response(self, content):
  99.         """构建标准化的响应消息,进行两次JSON封装"""
  100.         # 第一次封装:添加消息类型
  101.         inner_message = {
  102.             "type": "demo-message-receive",
  103.             "content": json.dumps(content),
  104.         }
  105.         # 第二次封装:整体序列化
  106.         return json.dumps(inner_message)
  107.     async def _send_single_message(self, target_user_id, content):
  108.         """向指定用户发送单条消息"""
  109.         # 获取用户通道
  110.         target_channel = cache.get(f"user_{target_user_id}_channel")
  111.         if not target_channel:
  112.             return False
  113.         # 构建并发送消息
  114.         message = self._build_response(content)
  115.         await self.channel_layer.send(
  116.             target_channel,
  117.             {
  118.                 "type": "single.message",
  119.                 "payload": message,
  120.             },
  121.         )
  122.         return True
  123.     async def _send_broadcast_message(self, content):
  124.         """向房间内所有用户广播消息"""
  125.         message = self._build_response(content)
  126.         await self.channel_layer.group_send(
  127.             self.room_group_name,
  128.             {
  129.                 "type": "broadcast.message",
  130.                 "payload": message,
  131.             },
  132.         )
  133.     async def broadcast_message(self, event):
  134.         """处理群发消息"""
  135.         payload = event["payload"]
  136.         await self.send(text_data=payload)
  137.     async def single_message(self, event):
  138.         """处理单发消息"""
  139.         payload = event["payload"]
  140.         await self.send(text_data=payload)
复制代码
点击查看完整代码
配置 WebSocket 路由

创建mysite\myapp_infra\websocket\routing.py文件,定义 WebSocket 的 URL 路由
  1. """WebSocket 路由配置"""
  2. from django.urls import re_path, path
  3. from .consumers import InfraConsumer
  4. websocket_urlpatterns = [
  5.     # 调用as_asgi()类方法来获取一个 ASGI 应用程序
  6.     re_path(r"^infra/ws/?$", InfraConsumer.as_asgi()),
  7. ]
复制代码
然后在项目的mysite\mysite\asgi.py配置中添加 ASGI 路由
  1. import os
  2. from django.core.asgi import get_asgi_application
  3. from channels.routing import ProtocolTypeRouter, URLRouter
  4. from channels.auth import AuthMiddlewareStack
  5. # 设置环境变量并初始化Django应用
  6. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
  7. django_application = get_asgi_application()
  8. def get_websocket_application():
  9.     """延迟导入WebSocket路由(避免循环导入)"""
  10.     from myapp_infra.websocket.routing import websocket_urlpatterns
  11.     return AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
  12. # 协议路由:区分HTTP和WebSocket请求
  13. application = ProtocolTypeRouter(
  14.     {
  15.         "http": django_application,
  16.         "websocket": get_websocket_application(),
  17.     }
  18. )
复制代码
点击查看完整代码
创建聊天界面模板

以Vue3界面为例,代码实现了一个 WebSocket 客户端界面,功能包括:

  • 连接控制:显示连接状态、开启/关闭 WebSocket 连接。
  • 消息发送:输入消息内容并选择接收人(单发或群发),点击发送按钮通过 WebSocket 发送消息。
  • 消息接收与展示:监听 WebSocket 返回的数据,解析不同类型的消息(如单发、群发、系统通知)并在右侧列表中倒序展示。
  • 用户列表获取:页面加载时获取用户列表用于选择消息接收人。
1.png

点击查看完整代码
实现效果

使用 ASGI 运行项目
  1. # 开发环境启动
  2. uvicorn mysite.asgi:application --reload
复制代码
使用不同的浏览器,分别登录不同的用户,实现实时互发消息。
2.png

二、生产环境部署

Nginx 配置

使用 Nginx 作为反向代理,添加以下配置来处理 WebSocket 连接。这段配置告诉 Nginx 如何正确处理 WebSocket 升级请求。
  1. location /ws/ {
  2.     proxy_pass http://backend;
  3.     proxy_http_version 1.1;
  4.     proxy_set_header Upgrade $http_upgrade;
  5.     proxy_set_header Connection "upgrade";
  6. }
复制代码
使用 Gunicorn+Uvicorn 部署

Gunicorn(全称 Green Unicorn)是一个用于 UNIX 的 高性能 Python WSGI HTTP 服务器。Gunicorn只能用于 Linux 系统,Windows上无法使用。
安装
  1. pip install gunicorn
  2. # 查看版本
  3. gunicorn -v
  4. gunicorn -h
复制代码
Gunicorn 作为进程管理器,配合 Uvicorn 工作进程处理 ASGI 应用

  • -w 工作进程的数量。默认启动 1 个 Worker 进程
  • -k 要运行的工作进程类型。
  • -b 指定要绑定的服务器套接字。
  1. # 基本启动命令
  2. gunicorn -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker mysite.asgi:application
复制代码
优化 Worker 数量

Gunicorn Worker 数量的设置对性能影响很大,推荐的公式是:(CPU核心数 × 2) + 1。我们可以编写一个启动脚本来自动计算最佳 Worker 数量
  1. #!/bin/bash
  2. # run_gunicorn.sh
  3. # 计算最佳 Worker 数
  4. CORES=$(nproc)
  5. WORKERS=$((CORES * 2 + 1))
  6. # 限制最大Worker数(避免内存不足)
  7. MAX_WORKERS=8
  8. if [ $WORKERS -gt $MAX_WORKERS ]; then
  9.   WORKERS=$MAX_WORKERS
  10. fi
  11. # 启动命令
  12. gunicorn -b 0.0.0.0:8000 \
  13.   --workers $WORKERS \
  14.   --max-requests 1000 \         # 预防内存泄漏
  15.   --timeout 120 \               # 超时控制
  16.   --keep-alive 5 \              # Keep-Alive
  17.   -k uvicorn.workers.UvicornWorker \
  18.   mysite.asgi:application
复制代码
添加执行权限并运行
  1. chmod +x run_gunicorn.sh
  2. ./run_gunicorn.sh
复制代码
您正在阅读的是《Django从入门到实战》专栏!关注不迷路~

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册