找回密码
 立即注册
首页 业界区 业界 如何用WebSocket打造毫秒级实时协作系统? ...

如何用WebSocket打造毫秒级实时协作系统?

唐嘉懿 昨天 08:57
1.jpeg
2.jpeg
扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
第六章:全栈项目实战示例:实时协作系统

一、需求分析:实时白板/协同编辑场景

实时协作系统需要实现多用户同时操作同一文档/白板,并实时同步所有变更。核心需求包括:

  • 毫秒级延迟:用户操作需在300ms内同步给所有参与者
  • 操作一致性:保证最终所有客户端呈现相同内容
  • 冲突处理:解决多用户同时修改同一区域的问题
  • 状态恢复:断线重连后自动同步最新状态
sequenceDiagram    participant 用户A    participant 用户B    participant 服务器        用户A ->> 服务器: 用户A登录    服务器 -->> 用户A: 登录成功响应        用户B ->> 服务器: 用户B登录    服务器 -->> 用户B: 登录成功响应        用户A ->> 服务器: 用户A创建新的白板会话    服务器 -->> 用户A: 白板会话ID        用户B ->> 服务器: 加入现有的白板会话 (会话ID)    服务器 -->> 用户B: 加入成功响应        用户A ->> 服务器: 用户A在白板上绘图/编辑    服务器 -->> 用户B: 更新操作        用户B ->> 服务器: 用户B在白板上绘图/编辑    服务器 -->> 用户A: 更新操作        用户A ->> 服务器: 保存白板内容    服务器 -->> 用户A: 保存成功确认        用户B ->> 服务器: 检视白板内容    服务器 -->> 用户B: 白板内容数据        用户A ->> 服务器: 退出白板会话    服务器 -->> 用户A: 退出成功确认        用户B ->> 服务器: 退出白板会话    服务器 -->> 用户B: 退出成功确认二、后端WebSocket服务搭建

依赖安装
  1. pip install fastapi==0.104.0 websockets==12.0 uvicorn==0.23.2 pydantic==2.5.2
复制代码
核心代码实现
  1. import asyncio
  2. import logging
  3. from typing import List, Dict
  4. from fastapi import FastAPI, WebSocket
  5. from pydantic import BaseModel
  6. app = FastAPI()
  7. logger = logging.getLogger("uvicorn.error")
  8. class Operation(BaseModel):
  9.     type: str  # "insert" or "delete"
  10.     position: int
  11.     content: str = ""  # 插入内容
  12.     length: int = 1  # 删除长度
  13.     client_id: str = ""  # 客户端标识
  14.     version: int = 0  # 操作版本号
  15. # OT转换引擎(示例)
  16. class OTEngine:
  17.     @staticmethod
  18.     def transform(op1: Operation, op2: Operation) -> Operation:
  19.         """操作转换核心算法"""
  20.         # 插入 vs 插入
  21.         if op1.type == "insert" and op2.type == "insert":
  22.             if op1.position < op2.position:
  23.                 return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
  24.             elif op1.position > op2.position:
  25.                 return op2
  26.             else:  # 相同位置按客户端ID排序
  27.                 return op2 if op1.client_id < op2.client_id else Operation(
  28.                     **{**op2.dict(), "position": op2.position + len(op1.content)})
  29.         # 插入 vs 删除
  30.         elif op1.type == "insert" and op2.type == "delete":
  31.             if op1.position <= op2.position:
  32.                 return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
  33.             else:
  34.                 return Operation(**{**op2.dict(), "position": op2.position})
  35.         # 删除 vs 插入
  36.         elif op1.type == "delete" and op2.type == "insert":
  37.             if op1.position < op2.position:
  38.                 return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
  39.             else:
  40.                 return op2
  41.         # 删除 vs 删除
  42.         else:
  43.             if op1.position < op2.position:
  44.                 return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
  45.             elif op1.position > op2.position:
  46.                 return Operation(**{**op2.dict(), "position": op2.position})
  47.             else:  # 相同位置取范围更大的删除
  48.                 return op2 if op1.length >= op2.length else Operation(**{**op2.dict(), "length": op1.length})
  49. # 协同编辑房间管理器
  50. class CollaborationRoom:
  51.     def __init__(self, room_id: str):
  52.         self.room_id = room_id
  53.         self.connections = set()  # 实际使用redis实现, 这里使用set模拟
  54.         self.document = ""
  55.         self.version = 0
  56.         self.pending_ops: List[Operation] = []
  57.         self.lock = asyncio.Lock()
  58.         self.client_states: Dict[str, int] = {}  # 客户端最后确认版本
  59.     async def add_connection(self, websocket: WebSocket, client_id: str):
  60.         async with self.lock:
  61.             self.connections.add(websocket)
  62.             self.client_states[client_id] = self.version
  63.             # 发送初始状态
  64.             await websocket.send_json({
  65.                 "type": "snapshot",
  66.                 "document": self.document,
  67.                 "version": self.version
  68.             })
  69.     async def remove_connection(self, websocket: WebSocket, client_id: str):
  70.         async with self.lock:
  71.             self.connections.discard(websocket)
  72.             if client_id in self.client_states:
  73.                 del self.client_states[client_id]
  74.     async def apply_operation(self, operation: Operation):
  75.         """应用操作转换并更新文档"""
  76.         async with self.lock:
  77.             # 转换所有待处理操作
  78.             transformed_op = operation
  79.             for pending in self.pending_ops:
  80.                 transformed_op = OTEngine.transform(pending, transformed_op)
  81.             # 应用转换后的操作
  82.             if transformed_op.type == "insert":
  83.                 self.document = (self.document[:transformed_op.position] +
  84.                                  transformed_op.content +
  85.                                  self.document[transformed_op.position:])
  86.             elif transformed_op.type == "delete":
  87.                 start = transformed_op.position
  88.                 end = min(start + transformed_op.length, len(self.document))
  89.                 self.document = self.document[:start] + self.document[end:]
  90.             # 更新状态
  91.             self.version += 1
  92.             self.pending_ops.append(transformed_op)
  93.             # 广播转换后的操作
  94.             broadcast_tasks = []
  95.             for conn in self.connections:
  96.                 try:
  97.                     broadcast_tasks.append(conn.send_json({
  98.                         "type": "operation",
  99.                         "operation": transformed_op.dict(),
  100.                         "document": self.document,
  101.                         "version": self.version
  102.                     }))
  103.                 except:
  104.                     pass
  105.             await asyncio.gather(*broadcast_tasks, return_exceptions=True)
  106.             # 清除已处理的操作
  107.             min_client_version = min(self.client_states.values(), default=self.version)
  108.             self.pending_ops = [op for op in self.pending_ops if op.version >= min_client_version]
  109. # 全局房间管理
  110. room_manager: Dict[str, CollaborationRoom] = {}  # 实际使用redis实现, 这里使用字典模拟
  111. global_lock = asyncio.Lock()
  112. @app.websocket("/ws/{room_id}/{client_id}")
  113. async def websocket_endpoint(websocket: WebSocket, room_id: str, client_id: str):
  114.     await websocket.accept()
  115.     # 获取或创建房间
  116.     async with global_lock:
  117.         if room_id not in room_manager:
  118.             room_manager[room_id] = CollaborationRoom(room_id)
  119.         room = room_manager[room_id]
  120.     # 加入房间
  121.     await room.add_connection(websocket, client_id)
  122.     logger.info(f"Client {client_id} joined room {room_id}")
  123.     try:
  124.         while True:
  125.             data = await websocket.receive_json()
  126.             op = Operation(**data)
  127.             op.client_id = client_id
  128.             # 应用操作转换
  129.             await room.apply_operation(op)
  130.     except Exception as e:
  131.         logger.error(f"Error in room {room_id}: {str(e)}")
  132.     finally:
  133.         # 离开房间
  134.         await room.remove_connection(websocket, client_id)
  135.         logger.info(f"Client {client_id} left room {room_id}")
  136.         # 清理空房间
  137.         async with global_lock:
  138.             if not room.connections:
  139.                 del room_manager[room_id]
  140.                 logger.info(f"Room {room_id} closed")
复制代码
关键机制

  • 使用WebSocket协议替代HTTP长轮询
  • 维护活动连接池active_connections
  • 通过Pydantic模型验证操作格式
  • 广播模式实现实时同步
优化技巧

  • 使用requestAnimationFrame合并高频操作
  • 添加操作版本号解决时序问题
  • 实现本地缓存防止数据丢失
  • 添加心跳机制检测连接状态
三、前端Vue.js连接实现

组件核心代码
  1. <template>
  2.   
  3.    
  4.       房间: {{ roomId }} | 用户: {{ clientId }}
  5.       {{ statusText }}
  6.    
  7.    
  8.       
  9.    
  10.    
  11.       在线用户 ({{ users.length }})
  12.       
  13.         
  14.         {{ user }}
  15.       
  16.    
  17.   
  18. </template>
复制代码
sequenceDiagram    participant ClientA    participant Server    participant ClientB    ClientA->>ClientA: 用户输入操作    ClientA->>ClientA: 乐观更新UI    ClientA->>Server: 发送操作(位置+内容)    Server->>Server: 应用OT转换    Server->>ClientA: 广播确认操作    Server->>ClientB: 广播转换后操作    ClientB->>ClientB: 应用转换后操作四、消息同步策略与冲突解决


  • 状态同步机制
    sequenceDiagram    participant ClientA    participant Server    participant ClientB        ClientA->>Server: 发送操作OP_A(位置P)    Server->>Server: 转换操作:OP_A' = OT(OP_A, 待处理操作)    Server->>Server: 更新文档状态    Server->>ClientA: 广播转换后的OP_A'    Server->>ClientB: 广播转换后的OP_A'    ClientB->>ClientB: 应用OP_A'更新本地文档    ClientB->>Server: 发送新操作OP_B
  • 冲突解决流程

    • 客户端发送操作时携带当前位置和版本
    • 服务端对并发操作进行转换排序
    • 转换后操作广播给所有客户端
    • 客户端收到操作后无条件应用

  • 消息协议设计
    1. // 快照消息(初始同步)
    2. {
    3.   "type": "snapshot",
    4.   "document": "当前文本",
    5.   "version": 15
    6. }
    7. // 操作消息
    8. {
    9.   "type": "operation",
    10.   "operation": {
    11.      "type": "insert",
    12.      "position": 5,
    13.      "content": "hello",
    14.      "client_id": "user1",
    15.      "version": 16
    16.   },
    17.   "document": "更新后文本",
    18.   "version": 16
    19. }
    复制代码
同步策略对比
策略优点缺点最后写入优先实现简单可能丢失用户操作操作转换(OT)精确解决冲突算法复杂度高CRDT无需中心协调内存消耗较大五、压力测试与部署方案

压力测试命令
  1. pip install websocket-client
  2. python -m websockets ws://localhost:8000/ws/room1 -c 1000 -m "测试消息"
复制代码
部署架构
  1. 客户端 → Nginx → FastAPI (Uvicorn) → Redis Cluster
复制代码
Nginx配置要点
  1. http {
  2.     map $http_upgrade $connection_upgrade {
  3.         default upgrade;
  4.         '' close;
  5.     }
  6.     server {
  7.         location /ws/ {
  8.             proxy_pass http://backend;
  9.             proxy_http_version 1.1;
  10.             proxy_set_header Upgrade $http_upgrade;
  11.             proxy_set_header Connection $connection_upgrade;
  12.         }
  13.     }
  14. }
复制代码
课后 Quiz

Q1:当两个用户同时在第5个字符位置插入不同内容时,如何保证最终一致性?
答案:采用操作转换算法,根据操作逻辑时间戳调整插入位置。例如用户A插入"X",用户B插入"Y",最终在第5位显示"YX"或"XY"
,取决于操作到达服务器的顺序。
Q2:WebSocket连接频繁断开可能是什么原因?
解决方案

  • 检查防火墙设置是否允许WS协议
  • 配置合理的心跳间隔(建议30秒)
  • 增加Nginx的超时设置:
    1. proxy_read_timeout 3600s;
    2. proxy_send_timeout 3600s;
    复制代码
常见报错处理

错误1:403 Forbidden

  • 原因:跨域请求被阻止
  • 解决:添加CORS中间件
    1. from fastapi.middleware.cors import CORSMiddleware
    2. app.add_middleware(
    3.     CORSMiddleware,
    4.     allow_origins=["*"],
    5.     allow_credentials=True,
    6.     allow_methods=["*"],
    7.     allow_headers=["*"],
    8. )
    复制代码
错误2:1006 Connection Closed Abnormally

  • 原因:客户端未正确处理断开事件
  • 解决:添加重连机制
    1. function connect() {
    2.   const ws = new WebSocket(url)
    3.   ws.onclose = () => setTimeout(connect, 1000)
    4. }
    复制代码
通过本文的实践,开发者可以掌握实时协作系统的核心实现技术。建议在开发过程中使用wireshark
工具监控WebSocket流量,并配合Chrome DevTools的Performance面板进行前端性能优化。
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何用WebSocket打造毫秒级实时协作系统?
往期文章归档:


  • 如何让你的WebSocket连接既安全又高效?
  • 如何让多客户端会话管理不再成为你的技术噩梦? - cmdragon's Blog
  • 如何在FastAPI中玩转WebSocket消息处理?
  • 如何在FastAPI中玩转WebSocket,让实时通信不再烦恼? - cmdragon's Blog
  • WebSocket与HTTP协议究竟有何不同?FastAPI如何让长连接变得如此简单? - cmdragon's Blog
  • FastAPI如何玩转安全防护,让黑客望而却步?
  • 如何用三层防护体系打造坚不可摧的 API 安全堡垒? - cmdragon's Blog
  • FastAPI安全加固:密钥轮换、限流策略与安全头部如何实现三重防护? - cmdragon's Blog
  • 如何在FastAPI中巧妙玩转数据脱敏,让敏感信息安全无忧? - cmdragon's Blog
  • RBAC权限模型如何让API访问控制既安全又灵活? - cmdragon's Blog
  • FastAPI中的敏感数据如何在不泄露的情况下翩翩起舞?
  • FastAPI安全认证的终极秘籍:OAuth2与JWT如何完美融合? - cmdragon's Blog
  • 如何在FastAPI中打造坚不可摧的Web安全防线? - cmdragon's Blog
  • 如何用 FastAPI 和 RBAC 打造坚不可摧的安全堡垒? - cmdragon's Blog
  • FastAPI权限配置:你的系统真的安全吗? - cmdragon's Blog
  • FastAPI权限缓存:你的性能瓶颈是否藏在这只“看不见的手”里? | cmdragon's Blog
  • FastAPI日志审计:你的权限系统是否真的安全无虞? | cmdragon's Blog
  • 如何在FastAPI中打造坚不可摧的安全防线? | cmdragon's Blog
  • 如何在FastAPI中实现权限隔离并让用户乖乖听话? | cmdragon's Blog
  • 如何在FastAPI中玩转权限控制与测试,让代码安全又优雅? | cmdragon's Blog
  • 如何在FastAPI中打造一个既安全又灵活的权限管理系统? | cmdragon's Blog
  • FastAPI访问令牌的权限声明与作用域管理:你的API安全真的无懈可击吗? | cmdragon's Blog
  • 如何在FastAPI中构建一个既安全又灵活的多层级权限系统? | cmdragon's Blog
  • FastAPI如何用角色权限让Web应用安全又灵活? | cmdragon's Blog
  • FastAPI权限验证依赖项究竟藏着什么秘密? | cmdragon's Blog
  • 如何用FastAPI和Tortoise-ORM打造一个既高效又灵活的角色管理系统? | cmdragon's Blog
  • JWT令牌如何在FastAPI中实现安全又高效的生成与验证? | cmdragon's Blog
  • 你的密码存储方式是否在向黑客招手? | cmdragon's Blog
  • 如何在FastAPI中轻松实现OAuth2认证并保护你的API? | cmdragon's Blog
  • FastAPI安全机制:从OAuth2到JWT的魔法通关秘籍 | cmdragon's Blog
  • FastAPI认证系统:从零到令牌大师的奇幻之旅 | cmdragon's Blog
  • FastAPI安全异常处理:从401到422的奇妙冒险 | cmdragon's Blog
  • FastAPI权限迷宫:RBAC与多层级依赖的魔法通关秘籍 | cmdragon's Blog
  • JWT令牌:从身份证到代码防伪的奇妙之旅 | cmdragon's Blog
  • FastAPI安全认证:从密码到令牌的魔法之旅 | cmdragon's Blog
  • 密码哈希:Bcrypt的魔法与盐值的秘密 | cmdragon's Blog
免费好用的热门在线工具


  • CMDragon 在线工具 - 高级AI工具箱与开发者套件 | 免费好用的在线工具
  • 应用商店 - 发现1000+提升效率与开发的AI工具和实用程序 | 免费好用的在线工具
  • CMDragon 更新日志 - 最新更新、功能与改进 | 免费好用的在线工具
  • 支持我们 - 成为赞助者 | 免费好用的在线工具
  • AI文本生成图像 - 应用商店 | 免费好用的在线工具
  • 临时邮箱 - 应用商店 | 免费好用的在线工具
  • 二维码解析器 - 应用商店 | 免费好用的在线工具
  • 文本转思维导图 - 应用商店 | 免费好用的在线工具
  • 正则表达式可视化工具 - 应用商店 | 免费好用的在线工具
  • 文件隐写工具 - 应用商店 | 免费好用的在线工具
  • IPTV 频道探索器 - 应用商店 | 免费好用的在线工具
  • 快传 - 应用商店 | 免费好用的在线工具
  • 随机抽奖工具 - 应用商店 | 免费好用的在线工具
  • 动漫场景查找器 - 应用商店 | 免费好用的在线工具
  • 时间工具箱 - 应用商店 | 免费好用的在线工具
  • 网速测试 - 应用商店 | 免费好用的在线工具
  • AI 智能抠图工具 - 应用商店 | 免费好用的在线工具
  • 背景替换工具 - 应用商店 | 免费好用的在线工具
  • 艺术二维码生成器 - 应用商店 | 免费好用的在线工具
  • Open Graph 元标签生成器 - 应用商店 | 免费好用的在线工具
  • 图像对比工具 - 应用商店 | 免费好用的在线工具
  • 图片压缩专业版 - 应用商店 | 免费好用的在线工具
  • 密码生成器 - 应用商店 | 免费好用的在线工具
  • SVG优化器 - 应用商店 | 免费好用的在线工具
  • 调色板生成器 - 应用商店 | 免费好用的在线工具
  • 在线节拍器 - 应用商店 | 免费好用的在线工具
  • IP归属地查询 - 应用商店 | 免费好用的在线工具
  • CSS网格布局生成器 - 应用商店 | 免费好用的在线工具
  • 邮箱验证工具 - 应用商店 | 免费好用的在线工具
  • 书法练习字帖 - 应用商店 | 免费好用的在线工具
  • 金融计算器套件 - 应用商店 | 免费好用的在线工具
  • 中国亲戚关系计算器 - 应用商店 | 免费好用的在线工具
  • Protocol Buffer 工具箱 - 应用商店 | 免费好用的在线工具
  • IP归属地查询 - 应用商店 | 免费好用的在线工具
  • 图片无损放大 - 应用商店 | 免费好用的在线工具
  • 文本比较工具 - 应用商店 | 免费好用的在线工具
  • IP批量查询工具 - 应用商店 | 免费好用的在线工具
  • 域名查询工具 - 应用商店 | 免费好用的在线工具
  • DNS工具箱 - 应用商店 | 免费好用的在线工具
  • 网站图标生成器 - 应用商店 | 免费好用的在线工具
  • XML Sitemap

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册