【OpenClaw】通过Nanobot源码学习架构---(3)AgentLoop
目录
- 【OpenClaw】通过Nanobot源码学习架构---(3)AgentLoop
- 0x00 概要
- 0x01 原理
- 1.1 Agent:负责“执行”
- 1.2 Pi-Agent框架
- 0x02 AgentLoop
- 2.1 架构
- 2.2 流程
- 2.3 定义和初始化
- 2.4 run
- 2.5 _dispatch
- 2.6 _process_message()
- 2.7 _run_agent_loop()
- 0xFF 参考
0x00 概要
OpenClaw 应该有40万行代码,阅读理解起来难度过大,因此,本系列通过Nanobot来学习 OpenClaw 的特色。
Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架,定位为"Ultra-Lightweight OpenClaw"。非常适合学习Agent架构。
Agent 是“业务执行者”,解决“消息怎样变成模型调用、工具执行和最终回复”。它们具备独立的上下文(与主对话隔离)和可使用的特定工具,并且具备定义明确的角色和方法论。每次 Agent 收到消息时运行的核心推理周期如下:
- 从总线接收消息
- 组装上下文
- 推理该做什么(这是 LLM 调用)
- 根据决定行动(调用工具、执行命令)
- 观察结果,保存状态
- 判断:我完成了吗?还是再循环一次?
- 完成后回复
注:本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 原理
1.1 Agent:负责“执行”
一个 Agent = 一个完整的 AI “大脑实例”,每个 Agent 都拥有独立资源。Agent 是“执行平面”,解决“消息怎样变成模型调用、工具执行和最终回复”。具体如下图(来自MiniClaw)。- Feishu Cloud
- |
- | HTTP POST /feishu/events
- | (im.message.receive_v1)
- v
- [ESP32 Webhook Server :18790]
- |
- | message_bus_push_inbound()
- v
- [Message Bus] ──> [Agent Loop] ──> [Message Bus]
- (Claude/GPT) |
- | outbound dispatch
- v
- [feishu_send_message()]
- |
- | POST /im/v1/messages
- v
- Feishu API
复制代码 下图是Agent 的最小循环。每个 AI Agent 都需要这个循环。模型决定何时调用工具、何时停止。代码只是执行模型的要求。- An agentic loop is the full “real” run of an agent: intake → context assembly → model inference → tool execution → streaming replies → persistence.
-
- THE AGENT PATTERN
- =================
- User --> messages[] --> LLM --> response
- |
- stop_reason == "tool_use"?
- / \
- yes no
- | |
- execute tools return text
- append results
- loop back -----------------> messages[]
复制代码 1.2 Pi-Agent框架
OpenClaw所使用的引擎是Pi-Agent框架,它是一个仅有四个工具、系统提示词不到1000个token,秉持“精简至上”原则的AI编程Agent。与其他编程Agent相比,Pi的工程设计和决策机制极为简洁,形成了鲜明对比。
下图是 OpenClaw 的循环概要。- runEmbeddedPiAgent()
- └── while (true) { // 主重试循环
- ├── 检查重试次数限制 (MAX_RUN_LOOP_ITERATIONS)
- ├── 调用 runEmbeddedAttempt() // 单次推理尝试
- ├── 处理 context overflow → 自动压缩
- ├── 处理 auth failure → profile轮换
- ├── 处理 timeout → 重试或报错
- └── 成功则返回 payloads
- }
复制代码 Pi的设计理念可以总结为:不是为LLM打造一个复杂的“控制台”,而是给它一把“多功能小刀”——工具虽少但实用,提示虽简但明确,让模型的原生能力成为主导,而不是被框架的复杂性所掩盖。Pi 这种设计理念是基于一个关键事实——经过强化学习训练的前沿LLM模型,已经具备了很强的理解和执行能力。它们能明确知道“编码Agent”的主要任务是什么,根本不需要长篇大论的系统提示词和复杂的辅助模块来“指导”它们工作。
从数据层面分析:Pi的系统提示词加上工具定义,总长度还不到1000个token,仅仅是Claude Code的十分之一;内置工具也只有4个,远少于同类产品。这说明,Pi在主流Agent都在强化的方面,几乎都做了简化:
- 系统提示词简短明了
- 内置工具数量精简
- 没有复杂的规划模式和多代理通信协议(Plan Mode和MCP)支持
- 更没有难以监控的子Agent
Pi的核心策略是:去除冗余辅助模块,让LLM模型发挥核心作用,用最简洁的结构实现最核心的功能。
或许有人会问:如此简单的设计,真的能应对复杂的编码任务吗?实际上,Pi的简洁并非“简陋”,而是“精准”。接下来,我们详细解析这4个内置工具的设计思路——read、write、edit、bash:
工具主要功能read读取文件、审查代码、获取上下文信息write创建文件、写入内容edit修改代码、进行增量更新bash执行命令、操作环境、通过自我调用来拆分任务这四个工具几乎涵盖了编码Agent的所有核心需求。特别是bash工具的引入,既实现了复杂任务的拆分和执行,保证了功能的完整性,又避免了引入子Agent可能带来的不可预测性和监控难题——这就是Pi敢于放弃子Agent架构的原因。
同时,Pi使用简短的系统提示词,并非降低了对LLM的引导标准,而是充分信任前沿LLM的能力。正如Mario Zechner所倡导的:与其用大量token去“教导”LLM如何成为Agent,不如用简洁的提示词明确其核心任务,让LLM充分发挥自身的理解和执行能力。
这种设计思路带来了三大好处:
- 节省上下文空间——降低推理成本,提高运行效率
- 行为更加灵活自主——LLM能根据实际情况动态调整策略,不受冗长规则限制
- 更好的适应性——简洁的结构意味着更低的认知负担和更强的泛化能力
0x02 AgentLoop
AgentLoop 是nanobot Agent运行的核心。智能体循环是区分聊天机器人和智能体的关键。
2.1 架构
AgentLoop 类的架构如下:
2.2 流程
下面是一个 AI Agent(智能体)的消息处理流程图,展示了从消息接收到响应发送的完整链路,包括 LLM 交互、工具调用循环等核心机制。- 入口:消息到达(InboundMessage)
- ↓
- AgentLoop.run() - 监听并接收消息
- ↓
- AgentLoop._dispatch() - 分派处理
- ↓
- AgentLoop._process_message() - 主要处理逻辑
- ↓
- ContextBuilder.build_messages() - 构建上下文
- ↓
- AgentLoop._run_agent_loop() - 核心代理循环
- ↓
- Provider.chat() - LLM交互
- ↓
- ← 判断是否有工具调用
- ↓ 否
- ← 返回最终内容
- ↓ 是
- ← 执行工具调用
- ↓
- ContextBuilder.add_tool_result() - 添加工具结果
- ↓
- ← 继续循环直到没有更多工具调用
- ↓
- AgentLoop._save_turn() - 保存交互记录
- ↓
- 通过MessageBus发布OutboundMessage - 发送响应
复制代码 部分环节详细拆解如下
2.3 定义和初始化
AgentLoop 的定义和初始化代码如下- class AgentLoop:
- """
- The agent loop is the core processing engine.
- It:
- 1. Receives messages from the bus
- 2. Builds context with history, memory, skills
- 3. Calls the LLM
- 4. Executes tool calls
- 5. Sends responses back
- """
- def __init__(
- self,
- bus: MessageBus, # 消息总线,用于接收/发送消息
- provider: LLMProvider, # LLM提供者(如OpenAI/本地模型)
- workspace: Path, # Agent工作目录,用于隔离文件操作
- model: str | None = None, # 使用的LLM模型名称
- max_iterations: int = 40, # Agent最大迭代次数(防止无限循环)
- temperature: float = 0.1, # LLM温度参数(越低越确定)
- max_tokens: int = 4096, # LLM最大生成Token数
- memory_window: int = 100, # 记忆窗口大小(会话历史最大条数)
- brave_api_key: str | None = None, # Brave搜索API密钥(用于网页搜索工具)
- exec_config: ExecToolConfig | None = None, # 命令执行工具配置
- cron_service: CronService | None = None, # 定时任务服务(可选)
- restrict_to_workspace: bool = False, # 是否限制Agent仅操作工作区
- session_manager: SessionManager | None = None, # 会话管理器(可选)
- mcp_servers: dict | None = None, # MCP服务器配置(可选)
- channels_config: ChannelsConfig | None = None, # 通道配置(可选)
- ):
- # 解决循环导入问题:仅运行时导入ExecToolConfig
- from nanobot.config.schema import ExecToolConfig
-
- # 基础属性初始化
- self.bus = bus # 消息总线实例
- self.channels_config = channels_config # 通道配置
- self.provider = provider # LLM提供者实例
- self.workspace = workspace # 工作目录路径
- # 模型名称:优先传入值,否则使用LLM提供者默认模型
- self.model = model or provider.get_default_model()
- self.max_iterations = max_iterations # 最大迭代次数
- self.temperature = temperature # LLM温度
- self.max_tokens = max_tokens # LLM最大Token数
- self.memory_window = memory_window # 记忆窗口大小
- self.brave_api_key = brave_api_key # Brave API密钥
- # 执行工具配置:默认空配置
- self.exec_config = exec_config or ExecToolConfig()
- self.cron_service = cron_service # 定时任务服务
- self.restrict_to_workspace = restrict_to_workspace # 工作区限制开关
- # 核心组件初始化
- self.context = ContextBuilder(workspace) # 上下文构建器:构建LLM输入上下文
- # 会话管理器:优先传入实例,否则创建新实例
- self.sessions = session_manager or SessionManager(workspace)
- self.tools = ToolRegistry() # 工具注册表:管理所有可用工具
-
- # 子Agent管理器:用于生成子Agent处理子任务
- self.subagents = SubagentManager(
- provider=provider,
- workspace=workspace,
- bus=bus,
- model=self.model,
- temperature=self.temperature,
- max_tokens=self.max_tokens,
- brave_api_key=brave_api_key,
- exec_config=self.exec_config,
- restrict_to_workspace=restrict_to_workspace,
- )
- # 运行状态与资源管理属性
- self._running = False # Agent循环是否运行
- self._mcp_servers = mcp_servers or {} # MCP服务器配置
- self._mcp_stack: AsyncExitStack | None = None # MCP连接上下文栈
- self._mcp_connected = False # MCP是否已连接
- self._mcp_connecting = False # MCP是否正在连接
- self._consolidating: set[str] = set() # 正在进行记忆合并的会话Key集合
- self._consolidation_tasks: set[asyncio.Task] = set() # 记忆合并任务集合
- self._consolidation_locks: dict[str, asyncio.Lock] = {} # 会话记忆合并锁
- self._active_tasks: dict[str, list[asyncio.Task]] = {} # 活跃任务:session_key -> 任务列表
- self._processing_lock = asyncio.Lock() # 全局消息处理锁(防止并发冲突)
- self._register_default_tools() # 注册默认工具
- def _register_default_tools(self) -> None:
- """Register the default set of tools. 注册默认工具集"""
- # 确定文件工具的允许目录:如果限制工作区则为工作目录,否则为None(无限制)
- allowed_dir = self.workspace if self.restrict_to_workspace else None
- # 注册文件系统工具:读/写/编辑/列目录
- for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool):
- self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
-
- # 注册命令执行工具
- self.tools.register(ExecTool(
- working_dir=str(self.workspace), # 工作目录
- timeout=self.exec_config.timeout, # 执行超时时间
- restrict_to_workspace=self.restrict_to_workspace, # 工作区限制
- path_append=self.exec_config.path_append, # 环境变量PATH追加
- ))
-
- # 注册网页相关工具:搜索/爬取
- self.tools.register(WebSearchTool(api_key=self.brave_api_key))
- self.tools.register(WebFetchTool())
-
- # 注册消息发送工具:回调函数为消息总线发布出站消息
- self.tools.register(MessageTool(send_callback=self.bus.publish_outbound))
-
- # 注册子Agent生成工具
- self.tools.register(SpawnTool(manager=self.subagents))
-
- # 如果有定时任务服务,注册定时任务工具
- if self.cron_service:
- self.tools.register(CronTool(self.cron_service))
- async def _connect_mcp(self) -> None:
- """Connect to configured MCP servers (one-time, lazy). 连接MCP服务器(懒加载,仅一次)"""
- # 跳过条件:已连接/正在连接/无MCP配置
- if self._mcp_connected or self._mcp_connecting or not self._mcp_servers:
- return
-
- self._mcp_connecting = True # 标记为正在连接
- from nanobot.agent.tools.mcp import connect_mcp_servers # 延迟导入MCP连接函数
- try:
- # 创建异步上下文栈,用于管理MCP连接资源
- self._mcp_stack = AsyncExitStack()
- await self._mcp_stack.__aenter__() # 进入上下文栈
- # 连接MCP服务器,将工具注册到MCP
- await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
- self._mcp_connected = True # 标记为已连接
- except Exception as e:
- # 连接失败:记录日志,下次消息处理时重试
- logger.error("Failed to connect MCP servers (will retry next message): {}", e)
- if self._mcp_stack:
- try:
- await self._mcp_stack.aclose() # 关闭上下文栈
- except Exception:
- pass
- self._mcp_stack = None
- finally:
- self._mcp_connecting = False # 清除正在连接标记
- def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
- """Update context for all tools that need routing info. 更新需要路由信息的工具上下文"""
- # 消息工具:设置通道/聊天ID/消息ID(用于消息发送路由)
- if message_tool := self.tools.get("message"):
- if isinstance(message_tool, MessageTool):
- message_tool.set_context(channel, chat_id, message_id)
- # 子Agent生成工具:设置通道/聊天ID
- if spawn_tool := self.tools.get("spawn"):
- if isinstance(spawn_tool, SpawnTool):
- spawn_tool.set_context(channel, chat_id)
- # 定时任务工具:设置通道/聊天ID
- if cron_tool := self.tools.get("cron"):
- if isinstance(cron_tool, CronTool):
- cron_tool.set_context(channel, chat_id)
复制代码 2.4 run
run 是代理的主循环入口。
- 核心作用:run 负责持续消费消息总线的入站消息,并异步分发处理,同时保证 /stop 指令的实时响应。
- 关键逻辑:
- 1 秒超时消费消息:避免主线程阻塞,确保 /stop 能及时被处理;
- 异步任务分发:非 /stop 消息通过 _dispatch 异步处理,不阻塞主循环;
- 任务追踪:通过 _active_tasks 记录各会话的活跃任务,配合回调自动清理,支持 /stop 批量终止。
- 异常处理:超时无消息时直接跳过,不中断主循环,保证代理持续运行。
- async def run(self) -> None:
- """Run the agent loop, dispatching messages as tasks to stay responsive to /stop."""
- # 将代理运行状态标记为True,表示开始运行
- self._running = True
- # 异步连接MCP服务器(懒加载,仅首次执行,失败会在后续重试)
- await self._connect_mcp()
- # 记录日志:代理循环已启动
- logger.info("Agent loop started")
- # 核心循环:只要代理处于运行状态,就持续消费并处理消息
- while self._running:
- try:
- # 从消息总线消费入站消息,设置1秒超时(避免无限阻塞,保证/stop指令响应性)
- # asyncio.wait_for:超时会抛出TimeoutError,触发continue继续循环
- msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
- except asyncio.TimeoutError:
- # 超时无消息时,跳过本次循环,继续等待下一轮
- continue
- # 判断消息内容是否为/stop指令(忽略首尾空格、大小写)
- if msg.content.strip().lower() == "/stop":
- # 处理/stop指令:终止当前会话的所有活跃任务和子代理
- await self._handle_stop(msg)
- else:
- # 非/stop指令:创建异步任务处理消息(保证主线程不阻塞,响应后续/stop)
- task = asyncio.create_task(self._dispatch(msg))
- # 将任务添加到_active_tasks映射中(session_key为键,便于后续批量终止)
- # setdefault:如果session_key不存在则创建空列表,再追加任务
- self._active_tasks.setdefault(msg.session_key, []).append(task)
- # 为任务添加完成回调:任务结束后从_active_tasks中移除(避免内存泄漏)
- # 匿名函数参数k绑定当前msg.session_key,t为完成的任务对象
- # 逻辑:如果任务仍在对应session的任务列表中,则移除;否则无操作
- task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)
复制代码 2.5 _dispatch
_dispatch 是消息分发的核心方法。
- 核心作用:_dispatch 在全局锁保护下执行消息处理,保证串行化,同时统一处理异常和响应发布。
- 关键逻辑:
- 全局锁 _processing_lock:避免多任务并发处理消息导致的资源冲突;
- 响应发布规则:有响应则发布响应、CLI 渠道无响应则发布空消息、异常则发布错误提示;
- 异常处理:区分任务取消异常(重新抛出)和通用异常(记录 + 返回错误提示),保证异常链路清晰。
- 边界处理:针对 CLI 渠道做特殊适配,发布空消息避免命令行交互阻塞。
- async def _dispatch(self, msg: InboundMessage) -> None:
- """Process a message under the global lock."""
- # 获取全局处理锁(异步上下文管理器),确保消息串行处理,避免资源竞争
- async with self._processing_lock:
- try:
- # 调用核心消息处理方法,传入入站消息,获取出站响应(可能为None)
- response = await self._process_message(msg)
- # 如果处理后有非空的出站响应
- if response is not None:
- # 将响应发布到消息总线的出站队列
- await self.bus.publish_outbound(response)
- # 如果无响应且消息渠道是CLI(命令行界面)
- elif msg.channel == "cli":
- # 向CLI渠道发布空内容的出站消息(保证CLI交互的完整性,避免阻塞)
- await self.bus.publish_outbound(OutboundMessage(
- channel=msg.channel, chat_id=msg.chat_id,
- content="", metadata=msg.metadata or {},
- ))
- # 捕获任务取消异常(如/stop指令触发的任务终止)
- except asyncio.CancelledError:
- # 记录日志:会话对应的任务已被取消
- logger.info("Task cancelled for session {}", msg.session_key)
- # 重新抛出取消异常,让上层逻辑处理(如清理任务列表)
- raise
- # 捕获所有其他未预期的异常
- except Exception:
- # 记录异常日志(包含堆栈信息),便于问题排查
- logger.exception("Error processing message for session {}", msg.session_key)
- # 向消息来源渠道发布统一的错误提示消息
- await self.bus.publish_outbound(OutboundMessage(
- channel=msg.channel, chat_id=msg.chat_id,
- content="Sorry, I encountered an error.",
- ))
复制代码 2.6 _process_message()
_process_message 是单条消息处理的核心入口。
核心作用:_process_message 支持系统消息、斜杠命令、普通对话三种场景,完成「上下文构建→代理循环→结果保存→响应返回」全流程。
关键逻辑:
- 系统消息处理:解析渠道信息,独立构建会话和上下文,适用于后台任务;
- 斜杠命令:/new 合并记忆并清空会话,/help 返回命令列表;
- 记忆合并:未合并消息达阈值时异步执行,避免阻塞主流程;
- 进度回调:实时推送处理进度(含工具调用提示),提升交互体验;
- 重复回复防护:消息工具已发送过消息则返回 None,避免重复响应。
边界处理:
- 兜底默认回复:无最终内容时返回标准化提示;
- 媒体消息支持:构建上下文时兼容图片等媒体内容;
- 会话锁机制:通过合并锁避免并发修改会话记忆。
[code] async def _process_message( self, msg: InboundMessage, session_key: str | None = None, on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> OutboundMessage | None: """ rocess a single inbound message and return the response.""" # 处理系统消息:从chat_id中解析原始渠道和聊天ID(格式为"channel:chat_id") if msg.channel == "system": # 拆分chat_id:有分隔符则拆分为渠道+聊天ID,否则默认CLI渠道 channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id else ("cli", msg.chat_id)) # 记录日志:正在处理来自指定发送者的系统消息 logger.info(" rocessing system message from {}", msg.sender_id) # 构建会话唯一标识(渠道+聊天ID) key = f"{channel}:{chat_id}" # 获取或创建该会话(不存在则新建) session = self.sessions.get_or_create(key) # 为工具设置上下文(渠道、聊天ID、消息ID,用于消息路由) self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) # 从会话中获取历史消息(最多保留memory_window条,控制上下文长度) history = session.get_history(max_messages=self.memory_window) # 构建LLM所需的完整上下文消息(历史+当前消息+渠道信息) messages = self.context.build_messages( history=history, current_message=msg.content, channel=channel, chat_id=chat_id, ) # 运行代理核心循环,获取最终回复内容、使用的工具列表、所有消息 final_content, _, all_msgs = await self._run_agent_loop(messages) # 保存本轮对话到会话(跳过已存在的历史消息,仅保存新内容) self._save_turn(session, all_msgs, 1 + len(history)) # 将更新后的会话持久化到本地 self.sessions.save(session) # 返回系统消息处理结果:无内容则默认"Background task completed." return OutboundMessage(channel=channel, chat_id=chat_id, content=final_content or "Background task completed.") # 非系统消息:截取消息内容预览(超过80字符则截断加省略号) preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content # 记录日志:正在处理来自指定渠道/发送者的消息(展示预览) logger.info(" rocessing message from {}:{}: {}", msg.channel, msg.sender_id, preview) # 确定会话key:优先使用传入的session_key,否则使用消息自带的session_key key = session_key or msg.session_key # 获取或创建该会话 session = self.sessions.get_or_create(key) # 处理斜杠命令(Slash commands) # 标准化命令:去除首尾空格并转为小写 cmd = msg.content.strip().lower() # 处理"/new"命令:新建会话(合并当前记忆并清空) if cmd == "/new": # 获取该会话的记忆合并锁(避免并发合并) lock = self._get_consolidation_lock(session.key) # 将会话标记为"正在合并记忆" self._consolidating.add(session.key) try: # 加锁执行记忆合并(异步锁,防止并发操作) async with lock: # 截取会话中未合并的消息(从上次合并位置到末尾) snapshot = session.messages[session.last_consolidated:] # 如果有未合并的消息 if snapshot: # 创建临时会话对象,仅包含未合并的消息 temp = Session(key=session.key) temp.messages = list(snapshot) # 执行记忆合并(归档所有消息),失败则返回错误提示 if not await self._consolidate_memory(temp, archive_all=True): return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Memory archival failed, session not cleared. Please try again.", ) # 捕获合并过程中的所有异常 except Exception: # 记录异常日志(含堆栈),便于排查 logger.exception("/new archival failed for {}", session.key) # 返回合并失败的错误提示 return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Memory archival failed, session not cleared. Please try again.", ) # 无论成功/失败,最终执行: finally: # 取消会话的"正在合并"标记 self._consolidating.discard(session.key) # 清理该会话的合并锁(未锁定则移除) self._prune_consolidation_lock(session.key, lock) # 清空当前会话的所有消息 session.clear() # 保存清空后的会话 self.sessions.save(session) # 使会话缓存失效(确保下次获取最新状态) self.sessions.invalidate(session.key) # 返回新建会话成功的提示 return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="New session started.") # 处理"/help"命令:返回可用命令列表 if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |