扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
第六章:全栈项目实战示例:实时协作系统
一、需求分析:实时白板/协同编辑场景
实时协作系统需要实现多用户同时操作同一文档/白板,并实时同步所有变更。核心需求包括:
- 毫秒级延迟:用户操作需在300ms内同步给所有参与者
- 操作一致性:保证最终所有客户端呈现相同内容
- 冲突处理:解决多用户同时修改同一区域的问题
- 状态恢复:断线重连后自动同步最新状态
sequenceDiagram participant 用户A participant 用户B participant 服务器 用户A ->> 服务器: 用户A登录 服务器 -->> 用户A: 登录成功响应 用户B ->> 服务器: 用户B登录 服务器 -->> 用户B: 登录成功响应 用户A ->> 服务器: 用户A创建新的白板会话 服务器 -->> 用户A: 白板会话ID 用户B ->> 服务器: 加入现有的白板会话 (会话ID) 服务器 -->> 用户B: 加入成功响应 用户A ->> 服务器: 用户A在白板上绘图/编辑 服务器 -->> 用户B: 更新操作 用户B ->> 服务器: 用户B在白板上绘图/编辑 服务器 -->> 用户A: 更新操作 用户A ->> 服务器: 保存白板内容 服务器 -->> 用户A: 保存成功确认 用户B ->> 服务器: 检视白板内容 服务器 -->> 用户B: 白板内容数据 用户A ->> 服务器: 退出白板会话 服务器 -->> 用户A: 退出成功确认 用户B ->> 服务器: 退出白板会话 服务器 -->> 用户B: 退出成功确认二、后端WebSocket服务搭建
依赖安装:- pip install fastapi==0.104.0 websockets==12.0 uvicorn==0.23.2 pydantic==2.5.2
复制代码 核心代码实现:- import asyncio
- import logging
- from typing import List, Dict
- from fastapi import FastAPI, WebSocket
- from pydantic import BaseModel
- app = FastAPI()
- logger = logging.getLogger("uvicorn.error")
- class Operation(BaseModel):
- type: str # "insert" or "delete"
- position: int
- content: str = "" # 插入内容
- length: int = 1 # 删除长度
- client_id: str = "" # 客户端标识
- version: int = 0 # 操作版本号
- # OT转换引擎(示例)
- class OTEngine:
- @staticmethod
- def transform(op1: Operation, op2: Operation) -> Operation:
- """操作转换核心算法"""
- # 插入 vs 插入
- if op1.type == "insert" and op2.type == "insert":
- if op1.position < op2.position:
- return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
- elif op1.position > op2.position:
- return op2
- else: # 相同位置按客户端ID排序
- return op2 if op1.client_id < op2.client_id else Operation(
- **{**op2.dict(), "position": op2.position + len(op1.content)})
- # 插入 vs 删除
- elif op1.type == "insert" and op2.type == "delete":
- if op1.position <= op2.position:
- return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
- else:
- return Operation(**{**op2.dict(), "position": op2.position})
- # 删除 vs 插入
- elif op1.type == "delete" and op2.type == "insert":
- if op1.position < op2.position:
- return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
- else:
- return op2
- # 删除 vs 删除
- else:
- if op1.position < op2.position:
- return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
- elif op1.position > op2.position:
- return Operation(**{**op2.dict(), "position": op2.position})
- else: # 相同位置取范围更大的删除
- return op2 if op1.length >= op2.length else Operation(**{**op2.dict(), "length": op1.length})
- # 协同编辑房间管理器
- class CollaborationRoom:
- def __init__(self, room_id: str):
- self.room_id = room_id
- self.connections = set() # 实际使用redis实现, 这里使用set模拟
- self.document = ""
- self.version = 0
- self.pending_ops: List[Operation] = []
- self.lock = asyncio.Lock()
- self.client_states: Dict[str, int] = {} # 客户端最后确认版本
- async def add_connection(self, websocket: WebSocket, client_id: str):
- async with self.lock:
- self.connections.add(websocket)
- self.client_states[client_id] = self.version
- # 发送初始状态
- await websocket.send_json({
- "type": "snapshot",
- "document": self.document,
- "version": self.version
- })
- async def remove_connection(self, websocket: WebSocket, client_id: str):
- async with self.lock:
- self.connections.discard(websocket)
- if client_id in self.client_states:
- del self.client_states[client_id]
- async def apply_operation(self, operation: Operation):
- """应用操作转换并更新文档"""
- async with self.lock:
- # 转换所有待处理操作
- transformed_op = operation
- for pending in self.pending_ops:
- transformed_op = OTEngine.transform(pending, transformed_op)
- # 应用转换后的操作
- if transformed_op.type == "insert":
- self.document = (self.document[:transformed_op.position] +
- transformed_op.content +
- self.document[transformed_op.position:])
- elif transformed_op.type == "delete":
- start = transformed_op.position
- end = min(start + transformed_op.length, len(self.document))
- self.document = self.document[:start] + self.document[end:]
- # 更新状态
- self.version += 1
- self.pending_ops.append(transformed_op)
- # 广播转换后的操作
- broadcast_tasks = []
- for conn in self.connections:
- try:
- broadcast_tasks.append(conn.send_json({
- "type": "operation",
- "operation": transformed_op.dict(),
- "document": self.document,
- "version": self.version
- }))
- except:
- pass
- await asyncio.gather(*broadcast_tasks, return_exceptions=True)
- # 清除已处理的操作
- min_client_version = min(self.client_states.values(), default=self.version)
- self.pending_ops = [op for op in self.pending_ops if op.version >= min_client_version]
- # 全局房间管理
- room_manager: Dict[str, CollaborationRoom] = {} # 实际使用redis实现, 这里使用字典模拟
- global_lock = asyncio.Lock()
- @app.websocket("/ws/{room_id}/{client_id}")
- async def websocket_endpoint(websocket: WebSocket, room_id: str, client_id: str):
- await websocket.accept()
- # 获取或创建房间
- async with global_lock:
- if room_id not in room_manager:
- room_manager[room_id] = CollaborationRoom(room_id)
- room = room_manager[room_id]
- # 加入房间
- await room.add_connection(websocket, client_id)
- logger.info(f"Client {client_id} joined room {room_id}")
- try:
- while True:
- data = await websocket.receive_json()
- op = Operation(**data)
- op.client_id = client_id
- # 应用操作转换
- await room.apply_operation(op)
- except Exception as e:
- logger.error(f"Error in room {room_id}: {str(e)}")
- finally:
- # 离开房间
- await room.remove_connection(websocket, client_id)
- logger.info(f"Client {client_id} left room {room_id}")
- # 清理空房间
- async with global_lock:
- if not room.connections:
- del room_manager[room_id]
- logger.info(f"Room {room_id} closed")
复制代码 关键机制:
- 使用WebSocket协议替代HTTP长轮询
- 维护活动连接池active_connections
- 通过Pydantic模型验证操作格式
- 广播模式实现实时同步
优化技巧:
- 使用requestAnimationFrame合并高频操作
- 添加操作版本号解决时序问题
- 实现本地缓存防止数据丢失
- 添加心跳机制检测连接状态
三、前端Vue.js连接实现
组件核心代码:- <template>
-
-
- 房间: {{ roomId }} | 用户: {{ clientId }}
- {{ statusText }}
-
-
-
-
-
- 在线用户 ({{ users.length }})
-
-
- {{ user }}
-
-
-
- </template>
复制代码 sequenceDiagram participant ClientA participant Server participant ClientB ClientA->>ClientA: 用户输入操作 ClientA->>ClientA: 乐观更新UI ClientA->>Server: 发送操作(位置+内容) Server->>Server: 应用OT转换 Server->>ClientA: 广播确认操作 Server->>ClientB: 广播转换后操作 ClientB->>ClientB: 应用转换后操作四、消息同步策略与冲突解决
- 状态同步机制:
sequenceDiagram participant ClientA participant Server participant ClientB ClientA->>Server: 发送操作OP_A(位置P) Server->>Server: 转换操作:OP_A' = OT(OP_A, 待处理操作) Server->>Server: 更新文档状态 Server->>ClientA: 广播转换后的OP_A' Server->>ClientB: 广播转换后的OP_A' ClientB->>ClientB: 应用OP_A'更新本地文档 ClientB->>Server: 发送新操作OP_B
- 冲突解决流程:
- 客户端发送操作时携带当前位置和版本
- 服务端对并发操作进行转换排序
- 转换后操作广播给所有客户端
- 客户端收到操作后无条件应用
- 消息协议设计:
- // 快照消息(初始同步)
- {
- "type": "snapshot",
- "document": "当前文本",
- "version": 15
- }
- // 操作消息
- {
- "type": "operation",
- "operation": {
- "type": "insert",
- "position": 5,
- "content": "hello",
- "client_id": "user1",
- "version": 16
- },
- "document": "更新后文本",
- "version": 16
- }
复制代码 同步策略对比:
策略优点缺点最后写入优先实现简单可能丢失用户操作操作转换(OT)精确解决冲突算法复杂度高CRDT无需中心协调内存消耗较大五、压力测试与部署方案
压力测试命令:- pip install websocket-client
- python -m websockets ws://localhost:8000/ws/room1 -c 1000 -m "测试消息"
复制代码 部署架构:- 客户端 → Nginx → FastAPI (Uvicorn) → Redis Cluster
复制代码 Nginx配置要点:- http {
- map $http_upgrade $connection_upgrade {
- default upgrade;
- '' close;
- }
- server {
- location /ws/ {
- proxy_pass http://backend;
- proxy_http_version 1.1;
- proxy_set_header Upgrade $http_upgrade;
- proxy_set_header Connection $connection_upgrade;
- }
- }
- }
复制代码 课后 Quiz
Q1:当两个用户同时在第5个字符位置插入不同内容时,如何保证最终一致性?
答案:采用操作转换算法,根据操作逻辑时间戳调整插入位置。例如用户A插入"X",用户B插入"Y",最终在第5位显示"YX"或"XY"
,取决于操作到达服务器的顺序。
Q2:WebSocket连接频繁断开可能是什么原因?
解决方案:
- 检查防火墙设置是否允许WS协议
- 配置合理的心跳间隔(建议30秒)
- 增加Nginx的超时设置:
- proxy_read_timeout 3600s;
- proxy_send_timeout 3600s;
复制代码 常见报错处理
错误1:403 Forbidden
- 原因:跨域请求被阻止
- 解决:添加CORS中间件
- from fastapi.middleware.cors import CORSMiddleware
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
复制代码 错误2:1006 Connection Closed Abnormally
- 原因:客户端未正确处理断开事件
- 解决:添加重连机制
- function connect() {
- const ws = new WebSocket(url)
- ws.onclose = () => setTimeout(connect, 1000)
- }
复制代码 通过本文的实践,开发者可以掌握实时协作系统的核心实现技术。建议在开发过程中使用wireshark
工具监控WebSocket流量,并配合Chrome DevTools的Performance面板进行前端性能优化。
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何用WebSocket打造毫秒级实时协作系统?
往期文章归档:
- 如何让你的WebSocket连接既安全又高效?
- 如何让多客户端会话管理不再成为你的技术噩梦? - cmdragon's Blog
- 如何在FastAPI中玩转WebSocket消息处理?
- 如何在FastAPI中玩转WebSocket,让实时通信不再烦恼? - cmdragon's Blog
- WebSocket与HTTP协议究竟有何不同?FastAPI如何让长连接变得如此简单? - cmdragon's Blog
- FastAPI如何玩转安全防护,让黑客望而却步?
- 如何用三层防护体系打造坚不可摧的 API 安全堡垒? - cmdragon's Blog
- FastAPI安全加固:密钥轮换、限流策略与安全头部如何实现三重防护? - cmdragon's Blog
- 如何在FastAPI中巧妙玩转数据脱敏,让敏感信息安全无忧? - cmdragon's Blog
- RBAC权限模型如何让API访问控制既安全又灵活? - cmdragon's Blog
- FastAPI中的敏感数据如何在不泄露的情况下翩翩起舞?
- FastAPI安全认证的终极秘籍:OAuth2与JWT如何完美融合? - cmdragon's Blog
- 如何在FastAPI中打造坚不可摧的Web安全防线? - cmdragon's Blog
- 如何用 FastAPI 和 RBAC 打造坚不可摧的安全堡垒? - cmdragon's Blog
- FastAPI权限配置:你的系统真的安全吗? - cmdragon's Blog
- FastAPI权限缓存:你的性能瓶颈是否藏在这只“看不见的手”里? | cmdragon's Blog
- FastAPI日志审计:你的权限系统是否真的安全无虞? | cmdragon's Blog
- 如何在FastAPI中打造坚不可摧的安全防线? | cmdragon's Blog
- 如何在FastAPI中实现权限隔离并让用户乖乖听话? | cmdragon's Blog
- 如何在FastAPI中玩转权限控制与测试,让代码安全又优雅? | cmdragon's Blog
- 如何在FastAPI中打造一个既安全又灵活的权限管理系统? | cmdragon's Blog
- FastAPI访问令牌的权限声明与作用域管理:你的API安全真的无懈可击吗? | cmdragon's Blog
- 如何在FastAPI中构建一个既安全又灵活的多层级权限系统? | cmdragon's Blog
- FastAPI如何用角色权限让Web应用安全又灵活? | cmdragon's Blog
- FastAPI权限验证依赖项究竟藏着什么秘密? | cmdragon's Blog
- 如何用FastAPI和Tortoise-ORM打造一个既高效又灵活的角色管理系统? | cmdragon's Blog
- JWT令牌如何在FastAPI中实现安全又高效的生成与验证? | cmdragon's Blog
- 你的密码存储方式是否在向黑客招手? | cmdragon's Blog
- 如何在FastAPI中轻松实现OAuth2认证并保护你的API? | cmdragon's Blog
- FastAPI安全机制:从OAuth2到JWT的魔法通关秘籍 | cmdragon's Blog
- FastAPI认证系统:从零到令牌大师的奇幻之旅 | cmdragon's Blog
- FastAPI安全异常处理:从401到422的奇妙冒险 | cmdragon's Blog
- FastAPI权限迷宫:RBAC与多层级依赖的魔法通关秘籍 | cmdragon's Blog
- JWT令牌:从身份证到代码防伪的奇妙之旅 | cmdragon's Blog
- FastAPI安全认证:从密码到令牌的魔法之旅 | cmdragon's Blog
- 密码哈希:Bcrypt的魔法与盐值的秘密 | cmdragon's Blog
免费好用的热门在线工具
- CMDragon 在线工具 - 高级AI工具箱与开发者套件 | 免费好用的在线工具
- 应用商店 - 发现1000+提升效率与开发的AI工具和实用程序 | 免费好用的在线工具
- CMDragon 更新日志 - 最新更新、功能与改进 | 免费好用的在线工具
- 支持我们 - 成为赞助者 | 免费好用的在线工具
- AI文本生成图像 - 应用商店 | 免费好用的在线工具
- 临时邮箱 - 应用商店 | 免费好用的在线工具
- 二维码解析器 - 应用商店 | 免费好用的在线工具
- 文本转思维导图 - 应用商店 | 免费好用的在线工具
- 正则表达式可视化工具 - 应用商店 | 免费好用的在线工具
- 文件隐写工具 - 应用商店 | 免费好用的在线工具
- IPTV 频道探索器 - 应用商店 | 免费好用的在线工具
- 快传 - 应用商店 | 免费好用的在线工具
- 随机抽奖工具 - 应用商店 | 免费好用的在线工具
- 动漫场景查找器 - 应用商店 | 免费好用的在线工具
- 时间工具箱 - 应用商店 | 免费好用的在线工具
- 网速测试 - 应用商店 | 免费好用的在线工具
- AI 智能抠图工具 - 应用商店 | 免费好用的在线工具
- 背景替换工具 - 应用商店 | 免费好用的在线工具
- 艺术二维码生成器 - 应用商店 | 免费好用的在线工具
- Open Graph 元标签生成器 - 应用商店 | 免费好用的在线工具
- 图像对比工具 - 应用商店 | 免费好用的在线工具
- 图片压缩专业版 - 应用商店 | 免费好用的在线工具
- 密码生成器 - 应用商店 | 免费好用的在线工具
- SVG优化器 - 应用商店 | 免费好用的在线工具
- 调色板生成器 - 应用商店 | 免费好用的在线工具
- 在线节拍器 - 应用商店 | 免费好用的在线工具
- IP归属地查询 - 应用商店 | 免费好用的在线工具
- CSS网格布局生成器 - 应用商店 | 免费好用的在线工具
- 邮箱验证工具 - 应用商店 | 免费好用的在线工具
- 书法练习字帖 - 应用商店 | 免费好用的在线工具
- 金融计算器套件 - 应用商店 | 免费好用的在线工具
- 中国亲戚关系计算器 - 应用商店 | 免费好用的在线工具
- Protocol Buffer 工具箱 - 应用商店 | 免费好用的在线工具
- IP归属地查询 - 应用商店 | 免费好用的在线工具
- 图片无损放大 - 应用商店 | 免费好用的在线工具
- 文本比较工具 - 应用商店 | 免费好用的在线工具
- IP批量查询工具 - 应用商店 | 免费好用的在线工具
- 域名查询工具 - 应用商店 | 免费好用的在线工具
- DNS工具箱 - 应用商店 | 免费好用的在线工具
- 网站图标生成器 - 应用商店 | 免费好用的在线工具
- XML Sitemap
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |