找回密码
 立即注册
首页 业界区 业界 【OpenClaw】通过 Nanobot 源码学习架构---(3)AgentLo ...

【OpenClaw】通过 Nanobot 源码学习架构---(3)AgentLoop

些耨努 5 小时前
【OpenClaw】通过Nanobot源码学习架构---(3)AgentLoop


目录

  • 【OpenClaw】通过Nanobot源码学习架构---(3)AgentLoop

    • 0x00 概要
    • 0x01 原理

      • 1.1 Agent:负责“执行”
      • 1.2 Pi-Agent框架

    • 0x02 AgentLoop

      • 2.1 架构
      • 2.2 流程
      • 2.3 定义和初始化
      • 2.4 run
      • 2.5 _dispatch
      • 2.6 _process_message()
      • 2.7 _run_agent_loop()

    • 0xFF 参考


0x00 概要

OpenClaw 应该有40万行代码,阅读理解起来难度过大,因此,本系列通过Nanobot来学习 OpenClaw 的特色。
Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架,定位为"Ultra-Lightweight OpenClaw"。非常适合学习Agent架构。
Agent 是“业务执行者”,解决“消息怎样变成模型调用、工具执行和最终回复”。它们具备独立的上下文(与主对话隔离)和可使用的特定工具,并且具备定义明确的角色和方法论。每次 Agent 收到消息时运行的核心推理周期如下:

  • 从总线接收消息
  • 组装上下文
  • 推理该做什么(这是 LLM 调用)
  • 根据决定行动(调用工具、执行命令)
  • 观察结果,保存状态
  • 判断:我完成了吗?还是再循环一次?
  • 完成后回复
注:本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 原理

1.1 Agent:负责“执行”

一个 Agent = 一个完整的 AI “大脑实例”,每个 Agent 都拥有独立资源。Agent 是“执行平面”,解决“消息怎样变成模型调用、工具执行和最终回复”。具体如下图(来自MiniClaw)。
  1. Feishu Cloud
  2.     |
  3.     |  HTTP POST /feishu/events
  4.     |  (im.message.receive_v1)
  5.     v
  6. [ESP32 Webhook Server :18790]
  7.     |
  8.     |  message_bus_push_inbound()
  9.     v
  10. [Message Bus] ──> [Agent Loop] ──> [Message Bus]
  11.                    (Claude/GPT)         |
  12.                                         |  outbound dispatch
  13.                                         v
  14.                               [feishu_send_message()]
  15.                                         |
  16.                                         |  POST /im/v1/messages
  17.                                         v
  18.                                    Feishu API
复制代码
下图是Agent 的最小循环。每个 AI Agent 都需要这个循环。模型决定何时调用工具、何时停止。代码只是执行模型的要求。
  1. An agentic loop is the full “real” run of an agent: intake → context assembly → model inference → tool execution → streaming replies → persistence.
  2.    
  3.                    THE AGENT PATTERN
  4.                     =================
  5.     User --> messages[] --> LLM --> response
  6.                                       |
  7.                             stop_reason == "tool_use"?
  8.                            /                          \
  9.                          yes                           no
  10.                           |                             |
  11.                     execute tools                    return text
  12.                     append results
  13.                     loop back -----------------> messages[]
复制代码
1.2 Pi-Agent框架

OpenClaw所使用的引擎是Pi-Agent框架,它是一个仅有四个工具、系统提示词不到1000个token,秉持“精简至上”原则的AI编程Agent。与其他编程Agent相比,Pi的工程设计和决策机制极为简洁,形成了鲜明对比。
下图是 OpenClaw 的循环概要。
  1. runEmbeddedPiAgent()
  2.   └── while (true) {  // 主重试循环
  3.         ├── 检查重试次数限制 (MAX_RUN_LOOP_ITERATIONS)
  4.         ├── 调用 runEmbeddedAttempt()  // 单次推理尝试
  5.         ├── 处理 context overflow → 自动压缩
  6.         ├── 处理 auth failure → profile轮换
  7.         ├── 处理 timeout → 重试或报错
  8.         └── 成功则返回 payloads
  9.       }
复制代码
Pi的设计理念可以总结为:不是为LLM打造一个复杂的“控制台”,而是给它一把“多功能小刀”——工具虽少但实用,提示虽简但明确,让模型的原生能力成为主导,而不是被框架的复杂性所掩盖。Pi 这种设计理念是基于一个关键事实——经过强化学习训练的前沿LLM模型,已经具备了很强的理解和执行能力。它们能明确知道“编码Agent”的主要任务是什么,根本不需要长篇大论的系统提示词和复杂的辅助模块来“指导”它们工作。
从数据层面分析:Pi的系统提示词加上工具定义,总长度还不到1000个token,仅仅是Claude Code的十分之一;内置工具也只有4个,远少于同类产品。这说明,Pi在主流Agent都在强化的方面,几乎都做了简化:

  • 系统提示词简短明了
  • 内置工具数量精简
  • 没有复杂的规划模式和多代理通信协议(Plan Mode和MCP)支持
  • 更没有难以监控的子Agent
Pi的核心策略是:去除冗余辅助模块,让LLM模型发挥核心作用,用最简洁的结构实现最核心的功能。
或许有人会问:如此简单的设计,真的能应对复杂的编码任务吗?实际上,Pi的简洁并非“简陋”,而是“精准”。接下来,我们详细解析这4个内置工具的设计思路——read、write、edit、bash:
工具主要功能read读取文件、审查代码、获取上下文信息write创建文件、写入内容edit修改代码、进行增量更新bash执行命令、操作环境、通过自我调用来拆分任务这四个工具几乎涵盖了编码Agent的所有核心需求。特别是bash工具的引入,既实现了复杂任务的拆分和执行,保证了功能的完整性,又避免了引入子Agent可能带来的不可预测性和监控难题——这就是Pi敢于放弃子Agent架构的原因。
同时,Pi使用简短的系统提示词,并非降低了对LLM的引导标准,而是充分信任前沿LLM的能力。正如Mario Zechner所倡导的:与其用大量token去“教导”LLM如何成为Agent,不如用简洁的提示词明确其核心任务,让LLM充分发挥自身的理解和执行能力。
这种设计思路带来了三大好处:

  • 节省上下文空间——降低推理成本,提高运行效率
  • 行为更加灵活自主——LLM能根据实际情况动态调整策略,不受冗长规则限制
  • 更好的适应性——简洁的结构意味着更低的认知负担和更强的泛化能力
0x02 AgentLoop

AgentLoop 是nanobot Agent运行的核心。智能体循环是区分聊天机器人和智能体的关键。
2.1 架构

AgentLoop 类的架构如下:
1.png

2.2 流程

下面是一个 AI Agent(智能体)的消息处理流程图,展示了从消息接收到响应发送的完整链路,包括 LLM 交互、工具调用循环等核心机制。
  1. 入口:消息到达(InboundMessage)
  2.     ↓
  3. AgentLoop.run() - 监听并接收消息
  4.     ↓
  5. AgentLoop._dispatch() - 分派处理
  6.     ↓
  7. AgentLoop._process_message() - 主要处理逻辑
  8.     ↓
  9. ContextBuilder.build_messages() - 构建上下文
  10.     ↓
  11. AgentLoop._run_agent_loop() - 核心代理循环
  12.     ↓
  13. Provider.chat() - LLM交互
  14.     ↓
  15. ← 判断是否有工具调用
  16.     ↓ 否
  17. ← 返回最终内容
  18.     ↓ 是
  19. ← 执行工具调用
  20.     ↓
  21. ContextBuilder.add_tool_result() - 添加工具结果
  22.     ↓
  23. ← 继续循环直到没有更多工具调用
  24.     ↓
  25. AgentLoop._save_turn() - 保存交互记录
  26.     ↓
  27. 通过MessageBus发布OutboundMessage - 发送响应
复制代码
部分环节详细拆解如下
2.png

2.3 定义和初始化

AgentLoop 的定义和初始化代码如下
  1. class AgentLoop:
  2.     """
  3.     The agent loop is the core processing engine.
  4.     It:
  5.     1. Receives messages from the bus
  6.     2. Builds context with history, memory, skills
  7.     3. Calls the LLM
  8.     4. Executes tool calls
  9.     5. Sends responses back
  10.     """
  11.     def __init__(
  12.         self,
  13.         bus: MessageBus,                # 消息总线,用于接收/发送消息
  14.         provider: LLMProvider,          # LLM提供者(如OpenAI/本地模型)
  15.         workspace: Path,                # Agent工作目录,用于隔离文件操作
  16.         model: str | None = None,       # 使用的LLM模型名称
  17.         max_iterations: int = 40,       # Agent最大迭代次数(防止无限循环)
  18.         temperature: float = 0.1,       # LLM温度参数(越低越确定)
  19.         max_tokens: int = 4096,         # LLM最大生成Token数
  20.         memory_window: int = 100,       # 记忆窗口大小(会话历史最大条数)
  21.         brave_api_key: str | None = None,  # Brave搜索API密钥(用于网页搜索工具)
  22.         exec_config: ExecToolConfig | None = None,  # 命令执行工具配置
  23.         cron_service: CronService | None = None,    # 定时任务服务(可选)
  24.         restrict_to_workspace: bool = False,        # 是否限制Agent仅操作工作区
  25.         session_manager: SessionManager | None = None,  # 会话管理器(可选)
  26.         mcp_servers: dict | None = None,              # MCP服务器配置(可选)
  27.         channels_config: ChannelsConfig | None = None,  # 通道配置(可选)
  28.     ):
  29.         # 解决循环导入问题:仅运行时导入ExecToolConfig
  30.         from nanobot.config.schema import ExecToolConfig
  31.         
  32.         # 基础属性初始化
  33.         self.bus = bus                          # 消息总线实例
  34.         self.channels_config = channels_config  # 通道配置
  35.         self.provider = provider                # LLM提供者实例
  36.         self.workspace = workspace              # 工作目录路径
  37.         # 模型名称:优先传入值,否则使用LLM提供者默认模型
  38.         self.model = model or provider.get_default_model()
  39.         self.max_iterations = max_iterations    # 最大迭代次数
  40.         self.temperature = temperature          # LLM温度
  41.         self.max_tokens = max_tokens            # LLM最大Token数
  42.         self.memory_window = memory_window      # 记忆窗口大小
  43.         self.brave_api_key = brave_api_key      # Brave API密钥
  44.         # 执行工具配置:默认空配置
  45.         self.exec_config = exec_config or ExecToolConfig()
  46.         self.cron_service = cron_service        # 定时任务服务
  47.         self.restrict_to_workspace = restrict_to_workspace  # 工作区限制开关
  48.         # 核心组件初始化
  49.         self.context = ContextBuilder(workspace)  # 上下文构建器:构建LLM输入上下文
  50.         # 会话管理器:优先传入实例,否则创建新实例
  51.         self.sessions = session_manager or SessionManager(workspace)
  52.         self.tools = ToolRegistry()  # 工具注册表:管理所有可用工具
  53.         
  54.         # 子Agent管理器:用于生成子Agent处理子任务
  55.         self.subagents = SubagentManager(
  56.             provider=provider,
  57.             workspace=workspace,
  58.             bus=bus,
  59.             model=self.model,
  60.             temperature=self.temperature,
  61.             max_tokens=self.max_tokens,
  62.             brave_api_key=brave_api_key,
  63.             exec_config=self.exec_config,
  64.             restrict_to_workspace=restrict_to_workspace,
  65.         )
  66.         # 运行状态与资源管理属性
  67.         self._running = False                  # Agent循环是否运行
  68.         self._mcp_servers = mcp_servers or {}   # MCP服务器配置
  69.         self._mcp_stack: AsyncExitStack | None = None  # MCP连接上下文栈
  70.         self._mcp_connected = False             # MCP是否已连接
  71.         self._mcp_connecting = False            # MCP是否正在连接
  72.         self._consolidating: set[str] = set()   # 正在进行记忆合并的会话Key集合
  73.         self._consolidation_tasks: set[asyncio.Task] = set()  # 记忆合并任务集合
  74.         self._consolidation_locks: dict[str, asyncio.Lock] = {}  # 会话记忆合并锁
  75.         self._active_tasks: dict[str, list[asyncio.Task]] = {}  # 活跃任务:session_key -> 任务列表
  76.         self._processing_lock = asyncio.Lock()  # 全局消息处理锁(防止并发冲突)
  77.         self._register_default_tools()          # 注册默认工具
  78.     def _register_default_tools(self) -> None:
  79.         """Register the default set of tools. 注册默认工具集"""
  80.         # 确定文件工具的允许目录:如果限制工作区则为工作目录,否则为None(无限制)
  81.         allowed_dir = self.workspace if self.restrict_to_workspace else None
  82.         # 注册文件系统工具:读/写/编辑/列目录
  83.         for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool):
  84.             self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
  85.         
  86.         # 注册命令执行工具
  87.         self.tools.register(ExecTool(
  88.             working_dir=str(self.workspace),       # 工作目录
  89.             timeout=self.exec_config.timeout,      # 执行超时时间
  90.             restrict_to_workspace=self.restrict_to_workspace,  # 工作区限制
  91.             path_append=self.exec_config.path_append,          # 环境变量PATH追加
  92.         ))
  93.         
  94.         # 注册网页相关工具:搜索/爬取
  95.         self.tools.register(WebSearchTool(api_key=self.brave_api_key))
  96.         self.tools.register(WebFetchTool())
  97.         
  98.         # 注册消息发送工具:回调函数为消息总线发布出站消息
  99.         self.tools.register(MessageTool(send_callback=self.bus.publish_outbound))
  100.         
  101.         # 注册子Agent生成工具
  102.         self.tools.register(SpawnTool(manager=self.subagents))
  103.         
  104.         # 如果有定时任务服务,注册定时任务工具
  105.         if self.cron_service:
  106.             self.tools.register(CronTool(self.cron_service))
  107.     async def _connect_mcp(self) -> None:
  108.         """Connect to configured MCP servers (one-time, lazy). 连接MCP服务器(懒加载,仅一次)"""
  109.         # 跳过条件:已连接/正在连接/无MCP配置
  110.         if self._mcp_connected or self._mcp_connecting or not self._mcp_servers:
  111.             return
  112.         
  113.         self._mcp_connecting = True  # 标记为正在连接
  114.         from nanobot.agent.tools.mcp import connect_mcp_servers  # 延迟导入MCP连接函数
  115.         try:
  116.             # 创建异步上下文栈,用于管理MCP连接资源
  117.             self._mcp_stack = AsyncExitStack()
  118.             await self._mcp_stack.__aenter__()  # 进入上下文栈
  119.             # 连接MCP服务器,将工具注册到MCP
  120.             await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
  121.             self._mcp_connected = True  # 标记为已连接
  122.         except Exception as e:
  123.             # 连接失败:记录日志,下次消息处理时重试
  124.             logger.error("Failed to connect MCP servers (will retry next message): {}", e)
  125.             if self._mcp_stack:
  126.                 try:
  127.                     await self._mcp_stack.aclose()  # 关闭上下文栈
  128.                 except Exception:
  129.                     pass
  130.                 self._mcp_stack = None
  131.         finally:
  132.             self._mcp_connecting = False  # 清除正在连接标记
  133.     def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
  134.         """Update context for all tools that need routing info. 更新需要路由信息的工具上下文"""
  135.         # 消息工具:设置通道/聊天ID/消息ID(用于消息发送路由)
  136.         if message_tool := self.tools.get("message"):
  137.             if isinstance(message_tool, MessageTool):
  138.                 message_tool.set_context(channel, chat_id, message_id)
  139.         # 子Agent生成工具:设置通道/聊天ID
  140.         if spawn_tool := self.tools.get("spawn"):
  141.             if isinstance(spawn_tool, SpawnTool):
  142.                 spawn_tool.set_context(channel, chat_id)
  143.         # 定时任务工具:设置通道/聊天ID
  144.         if cron_tool := self.tools.get("cron"):
  145.             if isinstance(cron_tool, CronTool):
  146.                 cron_tool.set_context(channel, chat_id)
复制代码
2.4 run

run 是代理的主循环入口。

  • 核心作用:run 负责持续消费消息总线的入站消息,并异步分发处理,同时保证 /stop 指令的实时响应。
  • 关键逻辑:

    • 1 秒超时消费消息:避免主线程阻塞,确保 /stop 能及时被处理;
    • 异步任务分发:非 /stop 消息通过 _dispatch 异步处理,不阻塞主循环;
    • 任务追踪:通过 _active_tasks 记录各会话的活跃任务,配合回调自动清理,支持 /stop 批量终止。

  • 异常处理:超时无消息时直接跳过,不中断主循环,保证代理持续运行。
  1.     async def run(self) -> None:
  2.         """Run the agent loop, dispatching messages as tasks to stay responsive to /stop."""
  3.         # 将代理运行状态标记为True,表示开始运行
  4.         self._running = True
  5.         # 异步连接MCP服务器(懒加载,仅首次执行,失败会在后续重试)
  6.         await self._connect_mcp()
  7.         # 记录日志:代理循环已启动
  8.         logger.info("Agent loop started")
  9.         # 核心循环:只要代理处于运行状态,就持续消费并处理消息
  10.         while self._running:
  11.             try:
  12.                 # 从消息总线消费入站消息,设置1秒超时(避免无限阻塞,保证/stop指令响应性)
  13.                 # asyncio.wait_for:超时会抛出TimeoutError,触发continue继续循环
  14.                 msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
  15.             except asyncio.TimeoutError:
  16.                 # 超时无消息时,跳过本次循环,继续等待下一轮
  17.                 continue
  18.             # 判断消息内容是否为/stop指令(忽略首尾空格、大小写)
  19.             if msg.content.strip().lower() == "/stop":
  20.                 # 处理/stop指令:终止当前会话的所有活跃任务和子代理
  21.                 await self._handle_stop(msg)
  22.             else:
  23.                 # 非/stop指令:创建异步任务处理消息(保证主线程不阻塞,响应后续/stop)
  24.                 task = asyncio.create_task(self._dispatch(msg))
  25.                 # 将任务添加到_active_tasks映射中(session_key为键,便于后续批量终止)
  26.                 # setdefault:如果session_key不存在则创建空列表,再追加任务
  27.                 self._active_tasks.setdefault(msg.session_key, []).append(task)
  28.                 # 为任务添加完成回调:任务结束后从_active_tasks中移除(避免内存泄漏)
  29.                 # 匿名函数参数k绑定当前msg.session_key,t为完成的任务对象
  30.                 # 逻辑:如果任务仍在对应session的任务列表中,则移除;否则无操作
  31.                 task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)
复制代码
2.5 _dispatch

_dispatch 是消息分发的核心方法。

  • 核心作用:_dispatch 在全局锁保护下执行消息处理,保证串行化,同时统一处理异常和响应发布。
  • 关键逻辑:

    • 全局锁 _processing_lock:避免多任务并发处理消息导致的资源冲突;
    • 响应发布规则:有响应则发布响应、CLI 渠道无响应则发布空消息、异常则发布错误提示;
    • 异常处理:区分任务取消异常(重新抛出)和通用异常(记录 + 返回错误提示),保证异常链路清晰。

  • 边界处理:针对 CLI 渠道做特殊适配,发布空消息避免命令行交互阻塞。
  1.     async def _dispatch(self, msg: InboundMessage) -> None:
  2.         """Process a message under the global lock."""
  3.         # 获取全局处理锁(异步上下文管理器),确保消息串行处理,避免资源竞争
  4.         async with self._processing_lock:
  5.             try:
  6.                 # 调用核心消息处理方法,传入入站消息,获取出站响应(可能为None)
  7.                 response = await self._process_message(msg)
  8.                 # 如果处理后有非空的出站响应
  9.                 if response is not None:
  10.                     # 将响应发布到消息总线的出站队列
  11.                     await self.bus.publish_outbound(response)
  12.                 # 如果无响应且消息渠道是CLI(命令行界面)
  13.                 elif msg.channel == "cli":
  14.                     # 向CLI渠道发布空内容的出站消息(保证CLI交互的完整性,避免阻塞)
  15.                     await self.bus.publish_outbound(OutboundMessage(
  16.                         channel=msg.channel, chat_id=msg.chat_id,
  17.                         content="", metadata=msg.metadata or {},
  18.                     ))
  19.             # 捕获任务取消异常(如/stop指令触发的任务终止)
  20.             except asyncio.CancelledError:
  21.                 # 记录日志:会话对应的任务已被取消
  22.                 logger.info("Task cancelled for session {}", msg.session_key)
  23.                 # 重新抛出取消异常,让上层逻辑处理(如清理任务列表)
  24.                 raise
  25.             # 捕获所有其他未预期的异常
  26.             except Exception:
  27.                 # 记录异常日志(包含堆栈信息),便于问题排查
  28.                 logger.exception("Error processing message for session {}", msg.session_key)
  29.                 # 向消息来源渠道发布统一的错误提示消息
  30.                 await self.bus.publish_outbound(OutboundMessage(
  31.                     channel=msg.channel, chat_id=msg.chat_id,
  32.                     content="Sorry, I encountered an error.",
  33.                 ))
复制代码
2.6 _process_message()

_process_message 是单条消息处理的核心入口。
核心作用:_process_message 支持系统消息、斜杠命令、普通对话三种场景,完成「上下文构建→代理循环→结果保存→响应返回」全流程。
关键逻辑

  • 系统消息处理:解析渠道信息,独立构建会话和上下文,适用于后台任务;
  • 斜杠命令:/new 合并记忆并清空会话,/help 返回命令列表;
  • 记忆合并:未合并消息达阈值时异步执行,避免阻塞主流程;
  • 进度回调:实时推送处理进度(含工具调用提示),提升交互体验;
  • 重复回复防护:消息工具已发送过消息则返回 None,避免重复响应。
边界处理

  • 兜底默认回复:无最终内容时返回标准化提示;
  • 媒体消息支持:构建上下文时兼容图片等媒体内容;
  • 会话锁机制:通过合并锁避免并发修改会话记忆。
[code]    async def _process_message(        self,        msg: InboundMessage,        session_key: str | None = None,        on_progress: Callable[[str], Awaitable[None]] | None = None,    ) -> OutboundMessage | None:        """rocess a single inbound message and return the response."""        # 处理系统消息:从chat_id中解析原始渠道和聊天ID(格式为"channel:chat_id")        if msg.channel == "system":            # 拆分chat_id:有分隔符则拆分为渠道+聊天ID,否则默认CLI渠道            channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id                                else ("cli", msg.chat_id))            # 记录日志:正在处理来自指定发送者的系统消息            logger.info("rocessing system message from {}", msg.sender_id)            # 构建会话唯一标识(渠道+聊天ID)            key = f"{channel}:{chat_id}"            # 获取或创建该会话(不存在则新建)            session = self.sessions.get_or_create(key)            # 为工具设置上下文(渠道、聊天ID、消息ID,用于消息路由)            self._set_tool_context(channel, chat_id, msg.metadata.get("message_id"))            # 从会话中获取历史消息(最多保留memory_window条,控制上下文长度)            history = session.get_history(max_messages=self.memory_window)            # 构建LLM所需的完整上下文消息(历史+当前消息+渠道信息)            messages = self.context.build_messages(                history=history,                current_message=msg.content, channel=channel, chat_id=chat_id,            )            # 运行代理核心循环,获取最终回复内容、使用的工具列表、所有消息            final_content, _, all_msgs = await self._run_agent_loop(messages)            # 保存本轮对话到会话(跳过已存在的历史消息,仅保存新内容)            self._save_turn(session, all_msgs, 1 + len(history))            # 将更新后的会话持久化到本地            self.sessions.save(session)            # 返回系统消息处理结果:无内容则默认"Background task completed."            return OutboundMessage(channel=channel, chat_id=chat_id,                                  content=final_content or "Background task completed.")        # 非系统消息:截取消息内容预览(超过80字符则截断加省略号)        preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content        # 记录日志:正在处理来自指定渠道/发送者的消息(展示预览)        logger.info("rocessing message from {}:{}: {}", msg.channel, msg.sender_id, preview)        # 确定会话key:优先使用传入的session_key,否则使用消息自带的session_key        key = session_key or msg.session_key        # 获取或创建该会话        session = self.sessions.get_or_create(key)        # 处理斜杠命令(Slash commands)        # 标准化命令:去除首尾空格并转为小写        cmd = msg.content.strip().lower()        # 处理"/new"命令:新建会话(合并当前记忆并清空)        if cmd == "/new":            # 获取该会话的记忆合并锁(避免并发合并)            lock = self._get_consolidation_lock(session.key)            # 将会话标记为"正在合并记忆"            self._consolidating.add(session.key)            try:                # 加锁执行记忆合并(异步锁,防止并发操作)                async with lock:                    # 截取会话中未合并的消息(从上次合并位置到末尾)                    snapshot = session.messages[session.last_consolidated:]                    # 如果有未合并的消息                    if snapshot:                        # 创建临时会话对象,仅包含未合并的消息                        temp = Session(key=session.key)                        temp.messages = list(snapshot)                        # 执行记忆合并(归档所有消息),失败则返回错误提示                        if not await self._consolidate_memory(temp, archive_all=True):                            return OutboundMessage(                                channel=msg.channel, chat_id=msg.chat_id,                                content="Memory archival failed, session not cleared. Please try again.",                            )            # 捕获合并过程中的所有异常            except Exception:                # 记录异常日志(含堆栈),便于排查                logger.exception("/new archival failed for {}", session.key)                # 返回合并失败的错误提示                return OutboundMessage(                    channel=msg.channel, chat_id=msg.chat_id,                    content="Memory archival failed, session not cleared. Please try again.",                )            # 无论成功/失败,最终执行:            finally:                # 取消会话的"正在合并"标记                self._consolidating.discard(session.key)                # 清理该会话的合并锁(未锁定则移除)                self._prune_consolidation_lock(session.key, lock)            # 清空当前会话的所有消息            session.clear()            # 保存清空后的会话            self.sessions.save(session)            # 使会话缓存失效(确保下次获取最新状态)            self.sessions.invalidate(session.key)            # 返回新建会话成功的提示            return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,                                  content="New session started.")        # 处理"/help"命令:返回可用命令列表        if cmd == "/help":            return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,                                  content="
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册