找回密码
 立即注册
首页 业界区 业界 【OpenClaw】通过 Nanobot 源码学习架构---(2)外层控 ...

【OpenClaw】通过 Nanobot 源码学习架构---(2)外层控制逻辑

蔡如风 昨天 21:10
【OpenClaw】通过Nanobot源码学习架构---(2)外层控制逻辑


目录

  • 【OpenClaw】通过Nanobot源码学习架构---(2)外层控制逻辑

    • 0x00 概要
    • 0x01 Commands

      • 1.1 逻辑意义

        • 1.1.1 Commands vs Agent
        • 1.1.2 Commands 运行场景

      • 1.2 Typer 库
      • 1.3 主要命令

        • 1.3.1 系统设置命令
        • 1.3.2 服务启动命令
        • 1.3.3 直接交互命令

          • 处理逻辑
          • 与Gateway模式的区别

        • 1.3.4 通信渠道管理命令
        • 1.3.5 通道登录命令
        • 1.3.6 定时任务管理命令
        • 1.3.7 添加命令
        • 1.3.8 删除命令
        • 1.3.9 启用/或禁用命令
        • 1.3.10 手动执行命令
        • 1.3.11 状态查询命令
        • 1.3.12 OAuth 认证命令


    • 0x02 Gateway

      • 2.1 核心作用

        • 2.1.1 整体作用
        • 2.1.2 核心特色

      • 2.2 图例

        • async run() 服务启动顺序
        • gateway() 内部组件依赖关系

      • 2.3 代码

    • 0x03 Channel

      • 3.1 需求
      • 3.2 OpenClaw
      • 3.3 基类
      • 3.4 QQChannel

        • 3.4.1 架构设计

          • 与 MessageBus 交互
          • 与 AgentLoop 交互
          • 与 SessionManager 交互
          • 生命周期管理

        • 3.4.2 消息交互机制

          • 消息通道
          • 启动流程
          • 消息接收流程
          • 消息处理流程
          • 响应发送流程
          • 特殊处理

        • 3.4.3 QQChannel 核心模块逻辑关系图
        • 3.4.4 QQChannel 核心流程流程图
        • 3.4.5 代码


    • 0xFF 参考


0x00 概要

OpenClaw 应该有40万行代码,阅读理解起来难度过大,因此,本系列通过Nanobot来学习 OpenClaw 的特色。
Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架,定位为"Ultra-Lightweight OpenClaw"。非常适合学习Agent架构。
在 Nanobot 中,如下几个主要组件担任了外层控制架构,各组件职责为:

  • Commands(commands.py):

    • nanobot 应用程序的统一命令行入口
    • Commands 调用 Agent 完成具体工作
    • Commands 是手动触发器。可以将 Command 想象成手动变速箱,何时换挡由用户掌控。



  • Gateway:

    • 是 commands.py 中的一部分,由 Command 启动
    • 系统入口,也是长生命周期控制平面,协调各组件启动和运行
    • 启动MessageBus、AgentLoop、ChannelManager
    • 协调 CronService 和 HeartbeatService

  • Channel(如QQchannel):

    • 是 ingress/egress 适配层
    • 接收外部消息
    • 将消息发布到.MessageBus
    • 发送响应消息回外部平台

  • MessageBus:

    • 解耦 Channel 和 AgentLoop
    • 提供异步消息队列机制

我们本篇来分析这几个组件(MessageBus不做单独介绍,而是穿插在其它组件中)。
注:本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 Commands

commands.py 是 nanobot 应用程序的统一命令行入口,它定义了所有可用的命令行命令和选项,使用户可以通过终端与 nanobot 进行交互,它集成了以下核心功能:

  • 系统初始化:配置和工作区设置
  • 服务管理:启动和管理整个 nanobot 系统
  • 用户交互:提供直观的命令行和交互式界面
  • 资源管理:定时任务、通信渠道和提供商管理
  • 状态监控:系统健康状况检查
1.1 逻辑意义

Command 是用户通过输入 /command-name 手动触发的指令。
1.1.1 Commands vs Agent

Commands 和 Agent 是系统中的不同角色,总体来说,Commands 实质上是“触发器”,决定某件事“何时(手动)”运行。Agents 则是拥有独立上下文和工具的“执行者”,决定了“做”什么。

  • Commands = 手动触发(由用户决定何时执行),即刻显式运行Agents;
  • Agents = 执行者(真正干活的),完成具体工作。Agents 是具备独立上下文、工具和指令的专业执行者。但是,Agent 不会自行启动,需要由 Command、Skill 或 LLM 根据需求来调用。
1.1.2 Commands 运行场景

使用 Command 的场景为:

  • 任务简单,且不需要隔离上下文。
  • 用户希望对多步骤工作流保有手动控制权。
  • 这个操作会产生某些后果,而用户希望在这些后果发生之前,先由用户自己确认。
  • 用户会在特定时机反复执行的工作流程,而用户希望在自己认为合适的那一刻手动启动它。
Command 本身就是完整的工作流。整个系统的协作方式如下:

  • 用户输入 /XXX(Command - 手动触发)
  • Command 运行,Command 告知 Claude:“调用 XXX agent”
  • Agent 加载自己的上下文和工具
  • Agent 完成具体工作
  • Agent 返回结果
  • 用户获得可操作的报告
1.2 Typer 库

Nanobot中,Typer 库的作用如下:

  • 应用程序初始化:使用 Typer 库创建命令行应用程序框架。Typer 框架的作用是提供一个结构化的命令行接口定义和参数解析系统,使开发者能够轻松创建具有多层命令结构的复杂 CLI 应用程序。所有命令通过共享的配置文件系统协同工作,不受终端会话限制。
  • 全局命令选项:定义通用选项如版本信息(--version, -v)。
  • 独立进程模型:每个cLI命令都在独立的操作系统进程中运行,nanobot onboard、nanobot  gateway、nanobot agent分别对应不同的进程。每个命令都是独立的入口点,通过命令行参数触发,执行不同命令的运行模式。
  • 命令分类

    • 初始化类命令(如onboard):执行一次性配置任务:创建配置目录、生成默认配置文件、初始化工作区;执行完成后立即退出,不启动任何长期运行的服务; 示例:nanobot onboard → 创建~/.nan obot/config·json → 程序退出
    • 服务类命令(如gateway):启动长期运行的主服务进程; 初始化并启动所有核心组件:MessageBus、Agen tLoop、ChannelManager等;持续运行直到手动停止,处理来自各渠道的消息;示例:nanobot  gateway → 启动webSocket连接 → 持续监听消息 → 长期运行
    • 交互类命令(如agent):启动独立的CLI交互会话,创建独立的AgentLoop实例,在当前终端提供交互式对话,可与gateway并行运行

  1. app = typer.Typer(
  2.     name="nanobot",
  3.     help=f"{__logo__} nanobot - Personal AI Assistant",
  4.     no_args_is_help=True,
  5. )
复制代码
1.3 主要命令

1.3.1 系统设置命令
  1. @app.command("onboard")
复制代码
其功能如下:

  • 创建配置文件(~/.nanobot/config.json)
  • 初始化工作区目录
  • 创建默认模板文件(AGENTS.md, SOUL.md, USER.md, TOOLS.md, IDENTITY.md)
  • 创建内存目录结构
1.3.2 服务启动命令
  1. @app.command("gateway")
复制代码
其功能如下:

  • 启动网关服务
  • 启动消息总线(MessageBus)
  • 创建 LLM 提供商实例
  • 初始化 AgentLoop
  • 设置 CronService(定时任务)
  • 配置 HeartbeatService(心跳服务)
  • 启动所有启用的通信渠道 ,连接多个通道处理逻辑
  • 运行异步事件循环
1.3.3 直接交互命令
  1. @app.command("agent")
复制代码
agent()是CLI的核心实现,是nanobotagent命令的具体函数实现,是nanobot系统的交互式对话入口点。
处理逻辑


  • 创建 MessageBus 和 AgentLoop 实例
  • 提供直接的命令行交互界面,与Agent直接交互参数:
    message:发送给代理的消息
    session_id:会话ID
    markdown:是否染为Markdown
    logs:是否显示运行日志
  • 根据是否有消息参数决定是单次调用还是交互模式
  • 单次模式:直接调用agent_loop.process_direct(),即直接发送消息并接收回复
  • 交互模式:通过MessageBus与AgentLoop通信,提供持续对话界面

    • 使用 prompt_toolkit 处理输入
    • 支持历史记录和多行粘贴
    • 渲染 Markdown 格式的回复

与Gateway模式的区别


  • 交互方式

    • CLI模式:直接终端输入/输出,单用户交互
    • Gateway模式:通过多平台Channe1接 收消息,支持多用户并发

  • 消息路由

    • CLI模式:绕过MessageBus的标准路由机制,直接本地处理
    • Gateway模式:严格遵循Channel→MessageBus→AgentLoop→MessageBus→Channel的完整流程总结

1.3.4 通信渠道管理命令
  1. @channels_app.command("status")
复制代码
其功能如下:

  • 显示所有渠道的状态(WhatsApp, Discord, Feishu, Mochat, Telegram, Slack, DingTalk, QQ, Email)
  • 显示配置详情
  • 指示是否启用
1.3.5 通道登录命令
  1. @channels_app.command("login")
复制代码
其功能如下:

  • 设置并启动设备桥接(bridge)
  • 处理二维码扫描认证流程
  • 支持 WhatsApp 等需要设备连接的服务
1.3.6 定时任务管理命令
  1. @cron_app.command("list")
复制代码

  • 显示所有定时任务
  • 显示任务ID、名称、调度计划、状态和下次运行时间
1.3.7 添加命令
  1. @cron_app.command("add")
复制代码

  • 添加新的定时任务
  • 支持多种调度模式(every, cron, at)
  • 验证时区设置
1.3.8 删除命令
  1. @cron_app.command("remove")
复制代码

  • 删除指定ID的定时任务
1.3.9 启用/或禁用命令
  1. @cron_app.command("enable/disable")
复制代码

  • 启用或禁用定时任务
1.3.10 手动执行命令
  1. @cron_app.command("run")
复制代码

  • 手动执行指定的定时任务
1.3.11 状态查询命令
  1. @app.command("status")
复制代码

  • 检查配置文件是否存在
  • 验证工作区路径
  • 显示当前模型配置
  • 检查各提供商的API密钥状态
1.3.12 OAuth 认证命令
  1. @provider_app.command("login")
复制代码

  • 支持 OpenAI Codex OAuth 登录
  • 支持 GitHub Copilot 设备流认证
  • 动态注册和处理不同提供商的认证流程
0x02 Gateway

对于 Nanobot 来说,gateway()函数是 Nanobot 网关的启动入口,负责初始化并串联所有核心组件(消息总线、Agent 循环、通道管理、定时任务、心跳服务等),仅通过轻量化的代码组织就完成了 OpenClaw 同等核心的 “多通道交互 + 定时任务 + 心跳检测” 能力,体现了 Nanobot “3000 行 Python 实现同等核心能力” 的极致轻量化设计理念。
Gateway 并不是主要响应用户命令的组件,它的作用更像是一个消息路由中心和协调器,Gateway  启动和协调所有服务,不直接处理业务逻辑,而是启动整个系统; 所有的消息 (包括 CLI 输入和第三方应用消息)都通过MessageBus路由到统一的AgentLoop进行处理。
以下是 OpenClaw 的 Gateway 架构。
1.png

以下是 Nanbobot 的整体架构。可以看出来 Gateway 的作用: 解决“消息和控制平面怎么进系统”
2.png

2.1 核心作用

2.1.1 整体作用

gateway函数是 Nanobot 的 “中枢神经”,通过初始化并串联AgentLoop(核心执行)、MessageBus(通信)、CronService(定时)、HeartbeatService(心跳)、ChannelManager(通道)等组件,完成了从 “配置加载” 到 “组件启停” 的全生命周期管理。

  • 初始化全局配置、消息总线、AI 模型提供商、会话管理器等基础组件;
  • 构建 Agent 核心执行循环(AgentLoop),集成定时任务(CronService)、Shell 执行、MCP 服务等核心能力;
  • 配置定时任务回调逻辑,让 CronJob 能通过 Agent 执行并推送结果;
  • 初始化通道管理器(ChannelManager),支持多渠道(CLI / 第三方平台)交互;
  • 实现心跳服务(HeartbeatService),定期执行预设任务并推送结果;
  • 统一管理所有组件的启动 / 停止生命周期,保证系统优雅启停。
2.1.2 核心特色

gateway 的特色如下:

  • 组件化解耦 + 按需串联:所有核心能力(Agent、Cron、Heartbeat、Channels)均为独立组件,通过消息总线(MessageBus)和回调函数解耦,仅在网关层完成串联,既保证模块化,又简化交互逻辑。
  • 轻量化生命周期管理:通过 asyncio 协程统一管理所有异步组件的启动 / 停止,捕获 KeyboardInterrupt 实现优雅关机,无冗余的生命周期框架,符合 3000 行代码的轻量化定位。
  • 灵活的任务分发机制:Cron 任务和心跳任务均通过 Agent 统一执行,结果可推送至指定通道(CLI / 第三方平台),兼顾 “通用执行逻辑” 和 “个性化推送”。
  • 容错与降级设计:心跳任务的通道选择有明确的降级逻辑(优先最近会话→兜底 CLI),避免因通道不可用导致服务异常,提升系统稳定性。
  • 配置驱动:所有核心参数(端口、模型、超时、心跳间隔)均从配置文件加载,无需硬编码,适配不同部署场景。
2.2 图例

async run() 服务启动顺序

run是gateway的主运行函数。
  1.     async def run():
  2.         try:
  3.             await cron.start()
  4.             await heartbeat.start()
  5.             await asyncio.gather(
  6.                 agent.run(),
  7.                 channels.start_all(),
  8.             )
  9.         except KeyboardInterrupt:
  10.             console.print("\nShutting down...")
  11.         finally:
  12.             await agent.close_mcp()
  13.             heartbeat.stop()
  14.             cron.stop()
  15.             agent.stop()
  16.             await channels.stop_all()
复制代码
具体图例如下:
3.png

gateway() 内部组件依赖关系

4.png

2.3 代码

gateway函数是 Nanobot 的核心启动入口,承担 “组件初始化 + 生命周期管理 + 核心能力串联” 的核心职责。
  1. @app.command()
  2. def gateway(
  3.     port: int = typer.Option(18790, "--port", "-p", help="Gateway port"),
  4.     verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
  5. ):
  6.     """Start the nanobot gateway."""
  7.     # 延迟导入核心模块(避免启动时加载冗余依赖,提升启动速度)
  8.     from nanobot.config.loader import load_config, get_data_dir  # 配置加载/数据目录工具
  9.     from nanobot.bus.queue import MessageBus  # 消息总线:组件间通信核心
  10.     from nanobot.agent.loop import AgentLoop  # Agent核心执行循环:处理用户指令/工具调用
  11.     from nanobot.channels.manager import ChannelManager  # 通道管理器:管理CLI/第三方平台等交互通道
  12.     from nanobot.session.manager import SessionManager  # 会话管理器:存储/管理用户会话
  13.     from nanobot.cron.service import CronService  # 定时任务服务:管理CronJob的调度与执行
  14.     from nanobot.cron.types import CronJob  # CronJob数据类型:定义定时任务结构
  15.     from nanobot.heartbeat.service import HeartbeatService  # 心跳服务:定期执行预设任务
  16.    
  17.     # 若开启详细输出模式,配置日志为DEBUG级别(便于调试)
  18.     if verbose:
  19.         import logging
  20.         logging.basicConfig(level=logging.DEBUG)
  21.    
  22.     # 在控制台打印启动信息,包含logo和网关端口
  23.     console.print(f"{__logo__} Starting nanobot gateway on port {port}...")
  24.    
  25.     # 加载全局配置文件(所有组件的配置均从这里读取)
  26.     config = load_config()
  27.     # 初始化消息总线:作为所有组件间异步通信的核心枢纽
  28.     bus = MessageBus()
  29.     # 基于配置创建AI模型提供商(如OpenAI/火山方舟等,适配不同LLM)
  30.     provider = _make_provider(config)
  31.     # 初始化会话管理器:基于配置的工作目录存储用户会话数据
  32.     session_manager = SessionManager(config.workspace_path)
  33.    
  34.     # 先创建定时任务服务(回调函数需在Agent创建后设置,此处先初始化存储)
  35.     # 定义cron任务的持久化存储路径:数据目录/cron/jobs.json
  36.     cron_store_path = get_data_dir() / "cron" / "jobs.json"
  37.     # 初始化CronService,指定任务存储路径
  38.     cron = CronService(cron_store_path)
  39.    
  40.     # 创建Agent核心执行循环:集成所有核心能力的核心组件
  41.     agent = AgentLoop(
  42.         bus=bus,  # 关联消息总线,用于接收/发送事件
  43.         provider=provider,  # 关联AI模型提供商,用于LLM推理
  44.         workspace=config.workspace_path,  # 指定Agent的工作目录(存储文件/任务数据)
  45.         model=config.agents.defaults.model,  # 默认使用的LLM模型(如doubao-seed-lite)
  46.         temperature=config.agents.defaults.temperature,  # LLM温度参数(控制生成随机性)
  47.         max_tokens=config.agents.defaults.max_tokens,  # LLM生成的最大token数
  48.         max_iterations=config.agents.defaults.max_tool_iterations,  # 工具调用的最大迭代次数(防止死循环)
  49.         memory_window=config.agents.defaults.memory_window,  # Agent记忆窗口(保留最近的会话轮次)
  50.         brave_api_key=config.tools.web.search.api_key or None,  # 网页搜索工具的API Key(可选)
  51.         exec_config=config.tools.exec,  # Shell执行工具的配置(超时/安全规则等)
  52.         cron_service=cron,  # 关联定时任务服务,让Agent能访问定时任务
  53.         restrict_to_workspace=config.tools.restrict_to_workspace,  # 是否限制文件操作仅在工作目录(安全管控)
  54.         session_manager=session_manager,  # 关联会话管理器,用于读写用户会话
  55.         mcp_servers=config.tools.mcp_servers,  # MCP服务配置(多模态/扩展工具)
  56.         channels_config=config.channels,  # 通道配置(支持的交互渠道)
  57.     )
  58.    
  59.     # 设置定时任务的执行回调函数(需在Agent创建后,因依赖Agent执行逻辑)
  60.     async def on_cron_job(job: CronJob) -> str | None:
  61.         """Execute a cron job through the agent.
  62.         核心逻辑:将定时任务的消息传递给Agent执行,支持结果推送至指定通道
  63.         """
  64.         # 调用Agent的直接处理方法执行定时任务消息
  65.         response = await agent.process_direct(
  66.             job.payload.message,  # 定时任务的核心消息(如"检查GitHub星数")
  67.             session_key=f"cron:{job.id}", # 为定时任务创建独立会话Key(避免干扰用户会话)
  68.             channel=job.payload.channel or "cli",  # 任务关联的通道(默认CLI)
  69.             chat_id=job.payload.to or "direct",  # 任务推送的目标ChatID(默认direct)
  70.         )
  71.         # 若任务配置了"推送结果"且指定了目标,则通过消息总线推送执行结果
  72.         if job.payload.deliver and job.payload.to:
  73.             from nanobot.bus.events import OutboundMessage  # 导入出站消息事件类型
  74.             await bus.publish_outbound(OutboundMessage(
  75.                 channel=job.payload.channel or "cli",  # 推送的目标通道
  76.                 chat_id=job.payload.to,  # 推送的目标ChatID
  77.                 content=response or ""  # 推送的内容(Agent执行结果)
  78.             ))
  79.         # 返回Agent执行结果(供CronService记录)
  80.         return response
  81.     # 将回调函数绑定到CronService,定时任务触发时自动执行
  82.     cron.on_job = on_cron_job
  83.    
  84.     # 初始化通道管理器:管理所有启用的交互通道(CLI/微信/钉钉等)
  85.     channels = ChannelManager(config, bus)
  86.     def _pick_heartbeat_target() -> tuple[str, str]:
  87.         """Pick a routable channel/chat target for heartbeat-triggered messages.
  88.         核心逻辑:为心跳任务选择合适的推送通道(优先最近活跃的非内部会话)
  89.         """
  90.         # 获取所有启用的通道集合
  91.         enabled = set(channels.enabled_channels)
  92.         # 优先选择最近更新的非内部会话对应的启用通道(保证推送至活跃用户)
  93.         for item in session_manager.list_sessions():
  94.             key = item.get("key") or ""
  95.             if ":" not in key:  # 会话Key格式应为"channel:chat_id",无分隔符则跳过
  96.                 continue
  97.             channel, chat_id = key.split(":", 1)  # 拆分通道和ChatID
  98.             if channel in {"cli", "system"}:  # 跳过CLI/系统内部通道(非用户交互通道)
  99.                 continue
  100.             if channel in enabled and chat_id:  # 通道已启用且有有效ChatID
  101.                 return channel, chat_id
  102.         # 降级策略:无合适通道时兜底使用CLI通道(保持原有行为,逻辑显式化)
  103.         return "cli", "direct"
  104.     # 初始化心跳服务的核心回调函数:执行心跳任务
  105.     async def on_heartbeat_execute(tasks: str) -> str:
  106.         """Phase 2: execute heartbeat tasks through the full agent loop.
  107.         核心逻辑:通过Agent完整执行循环处理心跳任务,选择合适的通道执行
  108.         """
  109.         # 选择心跳任务的执行/推送目标通道
  110.         channel, chat_id = _pick_heartbeat_target()
  111.         # 定义空的进度回调函数(心跳任务无需输出进度)
  112.         async def _silent(*_args, **_kwargs):
  113.             pass
  114.         # 调用Agent直接处理方法执行心跳任务
  115.         return await agent.process_direct(
  116.             tasks,  # 心跳任务的具体内容(如"检查系统状态")
  117.             session_key="heartbeat",  # 心跳任务专属会话Key
  118.             channel=channel,  # 执行任务的通道
  119.             chat_id=chat_id,  # 执行任务的ChatID
  120.             on_progress=_silent,  # 禁用进度回调
  121.         )
  122.     # 初始化心跳服务的通知回调函数:推送心跳任务执行结果
  123.     async def on_heartbeat_notify(response: str) -> None:
  124.         """Deliver a heartbeat response to the user's channel.
  125.         核心逻辑:将心跳任务执行结果推送至用户的活跃通道(跳过CLI通道)
  126.         """
  127.         from nanobot.bus.events import OutboundMessage  # 导入出站消息事件类型
  128.         # 选择推送目标通道
  129.         channel, chat_id = _pick_heartbeat_target()
  130.         if channel == "cli":
  131.             return  # CLI通道无外部用户,无需推送
  132.         # 通过消息总线推送心跳任务结果
  133.         await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response))
  134.     # 读取心跳服务配置
  135.     hb_cfg = config.gateway.heartbeat
  136.     # 初始化心跳服务
  137.     heartbeat = HeartbeatService(
  138.         workspace=config.workspace_path,  # 工作目录(存储心跳任务配置)
  139.         provider=provider,  # AI模型提供商(用于生成/执行心跳任务)
  140.         model=agent.model,  # 使用与Agent相同的LLM模型
  141.         on_execute=on_heartbeat_execute,  # 心跳任务执行回调
  142.         on_notify=on_heartbeat_notify,  # 心跳任务结果推送回调
  143.         interval_s=hb_cfg.interval_s,  # 心跳任务执行间隔(秒)
  144.         enabled=hb_cfg.enabled,  # 是否启用心跳服务
  145.     )
  146.    
  147.     # 打印通道启用状态(可视化启动信息)
  148.     if channels.enabled_channels:
  149.         console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}")
  150.     else:
  151.         console.print("[yellow]Warning: No channels enabled[/yellow]")
  152.    
  153.     # 打印定时任务状态(可视化启动信息)
  154.     cron_status = cron.status()
  155.     if cron_status["jobs"] > 0:
  156.         console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs")
  157.    
  158.     # 打印心跳服务状态(可视化启动信息)
  159.     console.print(f"[green]✓[/green] Heartbeat: every {hb_cfg.interval_s}s")
  160.    
  161.     # 定义网关核心运行函数:统一启动/停止所有异步组件
  162.     async def run():
  163.         try:
  164.             # 启动定时任务服务
  165.             await cron.start()
  166.             # 启动心跳服务
  167.             await heartbeat.start()
  168.             # 并发启动Agent核心循环和所有启用的通道
  169.             await asyncio.gather(
  170.                 agent.run(),  # 启动Agent执行循环(处理指令/工具调用)
  171.                 channels.start_all(),  # 启动所有启用的通道(监听用户输入)
  172.             )
  173.         except KeyboardInterrupt:
  174.             # 捕获用户中断(Ctrl+C),打印关机提示
  175.             console.print("\nShutting down...")
  176.         finally:
  177.             # 优雅关闭所有组件(无论正常退出还是异常)
  178.             await agent.close_mcp()  # 关闭MCP服务连接
  179.             heartbeat.stop()  # 停止心跳服务
  180.             cron.stop()  # 停止定时任务服务
  181.             agent.stop()  # 停止Agent执行循环
  182.             await channels.stop_all()  # 停止所有通道
  183.    
  184.     # 启动网关主循环(阻塞直到退出)
  185.     asyncio.run(run())
复制代码
0x03 Channel

3.1 需求

让大语言模型能调用工具确实很强大,但如果只能通过命令行和它交流,那使用场景就太受限了。理想的情况是:你能直接用语音跟它说话,它也能用自然的方式回应你,并且整个对话就发生在你平时常用的聊天软件里——比如 Telegram、WhatsApp、Discord 或 Slack。这就是 Channel 要解决的问题。
Channel 本质上是个适配器。它把你的智能代理(Agent)连接到各种即时通讯平台。不同平台的消息格式各不相同,Channel 的作用就是把这些五花八门的格式统一转换成 Agent 能理解的标准输入;等 Agent 生成回复后,再把标准输出转回对应平台能接受的格式发出去。
为了不让消息处理拖慢接收速度,Channel 和 Agent 之间通常会加一个消息总线(Message Bus)——说白了就是一个队列。新消息进来先存进队列,Agent 慢慢从队列里取走处理。这样即使 LLM 思考得慢,也不会阻塞新消息的接收。
  1. 用户输入 → Channel → MessageBus → AgentLoop → ContextBuilder → LLM
  2.                                      ↓
  3.                                 Tool Execution
  4.                                      ↓
  5.                               MessageBus → Channel → 用户输出
复制代码
3.2 OpenClaw

我们首先看看 OpenClaw 的 Channel。
用户可以通过不同的IM软件(飞书、Whatsapp、Telegram等)发送消息给OpenClaw,其中每个IM软件都视为一个Channel。
入站消息怎么找到对应 agent:通道收到消息后,会调用路由层(src/routing/resolve-route.ts):

  • 输入维度:channel + accountId + peer(群/私聊) + teamId/guildId
  • 匹配 bindings
  • 命中后返回 agentId + sessionKey
  • 没命中走默认 agent
出站消息统一发送的流程如下:

  • 先拿到目标通道插件 getChannelPlugin(channel)
  • 检查该插件是否实现 outbound.sendText/sendMedia
  • 统一做切片/分段(超长文本)
  • 最终调用具体通道的发送实现
3.3 基类

nanobot 会在 gateway 中同时启动多个通道,通过 ChannelManager 统一管理所有通道。其他通道继承相同的基类 BaseChannel,实现相同的生命周期方法,使用统一的消息格式。
  1. class BaseChannel(ABC):
  2.     """
  3.     Abstract base class for chat channel implementations.
  4.    
  5.     Each channel (Telegram, Discord, etc.) should implement this interface
  6.     to integrate with the nanobot message bus.
  7.     """
  8.    
  9.     name: str = "base"
  10.    
  11.     def __init__(self, config: Any, bus: MessageBus):
  12.         """
  13.         Initialize the channel.
  14.         
  15.         Args:
  16.             config: Channel-specific configuration.
  17.             bus: The message bus for communication.
  18.         """
  19.         self.config = config
  20.         self.bus = bus
  21.         self._running = False
  22.    
  23.     @abstractmethod
  24.     async def start(self) -> None:
  25.         """
  26.         Start the channel and begin listening for messages.
  27.         
  28.         This should be a long-running async task that:
  29.         1. Connects to the chat platform
  30.         2. Listens for incoming messages
  31.         3. Forwards messages to the bus via _handle_message()
  32.         """
  33.         pass
  34.    
  35.     @abstractmethod
  36.     async def stop(self) -> None:
  37.         """Stop the channel and clean up resources."""
  38.         pass
  39.    
  40.     @abstractmethod
  41.     async def send(self, msg: OutboundMessage) -> None:
  42.         """
  43.         Send a message through this channel.
  44.         
  45.         Args:
  46.             msg: The message to send.
  47.         """
  48.         pass
  49.    
  50.     def is_allowed(self, sender_id: str) -> bool:
  51.         """
  52.         Check if a sender is allowed to use this bot.
  53.         
  54.         Args:
  55.             sender_id: The sender's identifier.
  56.         
  57.         Returns:
  58.             True if allowed, False otherwise.
  59.         """
  60.         allow_list = getattr(self.config, "allow_from", [])
  61.         
  62.         # If no allow list, allow everyone
  63.         if not allow_list:
  64.             return True
  65.         
  66.         sender_str = str(sender_id)
  67.         if sender_str in allow_list:
  68.             return True
  69.         if "|" in sender_str:
  70.             for part in sender_str.split("|"):
  71.                 if part and part in allow_list:
  72.                     return True
  73.         return False
  74.    
  75.     async def _handle_message(
  76.         self,
  77.         sender_id: str,
  78.         chat_id: str,
  79.         content: str,
  80.         media: list[str] | None = None,
  81.         metadata: dict[str, Any] | None = None,
  82.         session_key: str | None = None,
  83.     ) -> None:
  84.         """
  85.         Handle an incoming message from the chat platform.
  86.         
  87.         This method checks permissions and forwards to the bus.
  88.         
  89.         Args:
  90.             sender_id: The sender's identifier.
  91.             chat_id: The chat/channel identifier.
  92.             content: Message text content.
  93.             media: Optional list of media URLs.
  94.             metadata: Optional channel-specific metadata.
  95.             session_key: Optional session key override (e.g. thread-scoped sessions).
  96.         """
  97.         if not self.is_allowed(sender_id):
  98.             logger.warning(
  99.                 "Access denied for sender {} on channel {}. "
  100.                 "Add them to allowFrom list in config to grant access.",
  101.                 sender_id, self.name,
  102.             )
  103.             return
  104.         
  105.         msg = InboundMessage(
  106.             channel=self.name,
  107.             sender_id=str(sender_id),
  108.             chat_id=str(chat_id),
  109.             content=content,
  110.             media=media or [],
  111.             metadata=metadata or {},
  112.             session_key_override=session_key,
  113.         )
  114.         
  115.         await self.bus.publish_inbound(msg)
  116.    
  117.     @property
  118.     def is_running(self) -> bool:
  119.         """Check if the channel is running."""
  120.         return self._running
复制代码
3.4 QQChannel

QQChannel 是 nanobot 与 QQ 平台进行通信的桥梁,实现了 QQ 私聊消息的接收和发送功能,使用 botpy SDK 通过 WebSocket 连接 QQ 服务器。即,通过标准化接口与其他组件协作,使得 nanobot 能够通过 QQ 与用户交互。
3.4.1 架构设计

实际上,QQChannel 实现了一个QQ机器人,QQChannel 继承 BaseChannel,依赖 botpy SDK 实现 QQ 通信,通过 MessageBus 与 AgentLoop 交互(利用 MessageBus 进行异步消息传递),是「QQ 平台 ↔ Nanobot 核心」的桥梁。
QQChannel 既不是独立的线程也不是独立的进程,而是在主进程中通过异步事件循环协作运行的一个组件。

  • 单进程异步模型:QQchannel与其他组件共享同一个进程和事件循环非阻塞协作:通过async/await实现协作式并发
  • 事件驱动:botpySDK通过回调机制处理QQ消息事件
  • 消息总线:通过MessageBus解耦通道和代理处理逻辑
  • 继承结构:继承自 BaseChannel,遵循统一的通道接口,实现了标准的 start、stop、send 方法
  • 自动重连机制:WebSocket 连接异常断开时,自动等待 5 秒后重连,保证 QQ 机器人的长连接稳定性;
  • 消息去重处理:通过固定长度的双端队列记录已处理消息 ID,避免重复处理相同消息;
  • 鲁棒的异常处理:对消息收发、连接管理等关键流程做异常捕获,记录日志且不影响整体服务运行;
  • 轻量的资源管理:使用 deque 限制已处理消息 ID 的存储长度(最大 1000),避免内存泄漏;
  • 配置校验前置:启动前校验 SDK 是否安装、app_id/secret 是否配置,提前暴露配置问题;
  • C2C 消息适配:专注处理 QQ 单聊(C2C)消息,适配个人用户与机器人的交互场景。
与 MessageBus 交互

MessageBus的解耦作用如下:

  • Inbound 流向:QQChannel.publish_inbound() → MessageBus.inbound_queue → AgentLoop.consume_inbound()
  • Outbound流向:AgentLoop.publish_outbound() → MessageBus.outbound_queue → Channel.send()
  1. await self._handle_message(
  2.     sender_id=user_id, # QQ 用户 ID
  3.     chat_id=user_id, # 对话 ID(私聊中与用户 ID 相同)
  4.     content=content, # 消息内容
  5.     metadata={"message_id": data.id}, # 元数据
  6. )
复制代码
与 AgentLoop 交互

上下文传递

  • 通过 channel 名称("qq")和 chat_id(QQ 用户 ID)识别会话
  • 保持会话状态和上下文连续性
与 SessionManager 交互

会话管理

  • 通过 channel:chat_id 格式创建会话键(qq:user_openid)
  • 每个 QQ 用户拥有独立的会话历史,每个会话通过session_key(通常是channel:chat_id)标识
  • 利用 SessionManager 管理会话状态,保证同一用户的连续对话体验
会话键格式

  • QQ 会话使用 "qqpenid" 格式作为唯一标识
  • 每个 QQ 用户拥有独立的会话历史和上下文
工具上下文设置

  • AgentLoop._set_tool_context() 设置当前会话的上下文
  • 确保 MessageTool、CronTool 等工具知道当前的 channel 和 chat_id
生命周期管理

统一管理

  • ChannelManager.start_all() 和 stop_all() 方法统一控制所有通道
  • Gateway 关闭时确保 QQ 通道正确清理资源
资源清理

  • QQChannel.stop() 方法关闭 WebSocket 连接
  • 清理内部状态和缓存
3.4.2 消息交互机制

QQChannel 的功能为:
消息通道


  • 处理 QQ 用户发送的消息
  • 将消息转换为内部格式并发送到消息总线
  • 发送 nanobot 的回复到指定 QQ 用户
  • 在 _run_bot 方法中实现自动重连机制
  • 异常时等待 5 秒后重新连接
其消息流转路径为
  1. 用户发送 QQ 消息 → QQChannel接收 → MessageBus 入站队列 → AgentLoop 处理  → AI 处理 →  发布响应  → MessageBus 出站队列 → QQChannel 发送 → 用户接收
复制代码
消息流转机制具体如下:
启动流程

Gateway 启动流程如下,在 gateway 命令中 async def run() 会启动各个通道。
  1. await cron.start()
  2. await heartbeat.start()
  3. await asyncio.gather(agent.run(),channels.start_all) # 会启动所有配置的通道
复制代码
ChannelManager 会启动 QQChannel

  • 从配置文件加载 QQ 通道配置(QQConfig)
  • 验证 App ID 和密钥是否配置正确
  • ChannelManager 创建 QQChannel 实例
  • 传入共享的 MessageBus 实例
  1. await qq_channel.start() # 调用 QQChannel.start()
复制代码
最终如下:
  1. 主进程(gateway)
  2. ├─ 主事件循环(asyncio)
  3. │        ├─ AgentLoop.run() #主代理循环
  4. │   ├─ ChannelManager.start_all()
  5. │   │    ├─ QQchannel.start() #QQ通道启动
  6. │   │    │        └─ _run_bot() # botpy SDK连接循环
  7. │   │    ├─ 其他通道.start()#如 Telegram,Discord 等#定时任务服务
  8. │   │    └─ CronService.start() # 定时任务服务
  9. │   └─ HeartbeatService.start)#心跳服务
复制代码
消息接收流程


  • QQ 用户发送消息到机器人
  • botpy SDK 接收到消息事件
  • 触发 _on_message 回调方法
在消息接收流程中,消息处理与转发细节如下:

  • 消息过滤:_on_message 验证消息唯一性,防止重复处理

    • 检查消息 ID 避免重复处理

      • 使用 deque 维护最近处理的消息 ID
      • 最大保留 1000 条记录防止重复处理

    • 验证消息内容是否为空
    • 提取发送者信息和内容

  • 将 QQ 消息转换为 InboundMessage 格式,设置正确的 channel("qq")和 chat_id(用户 ID)
  1. # 消息总线传输await self._handle_message(
  2.     sender_id=user_id, # QQ 用户 ID
  3.     chat_id=user_id, # 对话 ID(私聊中与用户 ID 相同)
  4.     content=content, # 消息内容
  5.     metadata={"message_id": data.id}, # 元数据
  6. )
复制代码
消息总线传输

  • BaseChannel._handle_message 将消息发布到 MessageBus 的入站队列
  • MessageBus 负责异步消息传递
消息处理流程

Agent 接收消息

  • AgentLoop.run() 从 MessageBus 消费消息
  • 调用 _dispatch() 处理消息
会话管理

  • 根据 channel="qq" 和 chat_id(QQ 用户 ID)创建唯一的会话键
  • 使用 SessionManager 管理对话历史
消息处理

  • AgentLoop._process_message() 执行完整的 AI 处理循环
  • 调用 LLM,执行工具,生成响应
响应发送流程

Agent处理完消息之后,响应发送流程如下:
响应生成

  • AgentLoop 生成响应后,将其包装为 OutboundMessage
  • 发布到 MessageBus 的出站队列
消息路由

  • ChannelManager 监听出站消息队列
  • 根据 channel 字段将消息路由到对应的通道
QQ 消息发送

  • QQChannel.send() 方法被调用
  • 使用 botpy API 将消息发送给 QQ 用户:
    1. await self._client.api.post_c2c_message(
    2.     openid=msg.chat_id, # QQ 用户 ID
    3.     msg_type=0, # 消息类型
    4.     content=msg.content, # 消息内容
    5. )
    复制代码
特殊处理

特殊处理如下:

  • 停止:标记状态 → 关闭连接 → 日志记录;
  • 任务取消

    • 支持/stop命令中断当前处理任务
    • 通过asyncio.Task.cancel()实现优雅停止

  • 错误恢复

    • Channel连接断开后自动重连
    • 消息处理失败时返回错误信息而非崩溃

3.4.3 QQChannel 核心模块逻辑关系图

5.png

3.4.4 QQChannel 核心流程流程图

6.png

3.4.5 代码
  1. def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]":
  2.     """Create a botpy Client subclass bound to the given channel."""
  3.     intents = botpy.Intents(public_messages=True, direct_message=True)
  4.     class _Bot(botpy.Client):
  5.         def __init__(self):
  6.             super().__init__(intents=intents)
  7.         async def on_ready(self):
  8.             logger.info("QQ bot ready: {}", self.robot.name)
  9.         async def on_c2c_message_create(self, message: "C2CMessage"):
  10.             await channel._on_message(message)
  11.         async def on_direct_message_create(self, message):
  12.             await channel._on_message(message)
  13.     return _Bot
  14. class QQChannel(BaseChannel):
  15.     """QQ channel using botpy SDK with WebSocket connection."""
  16.     # 渠道名称(唯一标识,供框架识别)
  17.     name = "qq"
  18.     def __init__(self, config: QQConfig, bus: MessageBus):
  19.         # 调用父类初始化(配置+消息总线)
  20.         super().__init__(config, bus)
  21.         # 类型注解:限定config为QQConfig类型(便于类型检查)
  22.         self.config: QQConfig = config
  23.         # QQ机器人客户端实例(懒初始化,start时创建)
  24.         self._client: "BotClient | None" = None
  25.         # 已处理消息ID队列:双端队列,最大长度1000(用于消息去重)
  26.         self._processed_ids: deque = deque(maxlen=1000)
  27.     async def start(self) -> None:
  28.         """Start the QQ bot."""
  29.         # 校验1:QQ SDK是否安装(未安装则记录错误并返回)
  30.         if not QQ_AVAILABLE:
  31.             logger.error("QQ SDK not installed. Run: pip install qq-botpy")
  32.             return
  33.         # 校验2:配置是否完整(app_id/secret是必填项)
  34.         if not self.config.app_id or not self.config.secret:
  35.             logger.error("QQ app_id and secret not configured")
  36.             return
  37.         # 标记渠道为运行状态
  38.         self._running = True
  39.         # 创建自定义Bot类(绑定消息回调到当前QQChannel实例)
  40.         BotClass = _make_bot_class(self)
  41.         # 初始化Bot客户端实例
  42.         self._client = BotClass()
  43.         # 记录启动日志(标识支持C2C私聊)
  44.         logger.info("QQ bot started (C2C private message)")
  45.         # 启动Bot连接(带自动重连)
  46.         await self._run_bot()
  47.     async def _run_bot(self) -> None:
  48.         """Run the bot connection with auto-reconnect."""
  49.         # 运行循环:只要渠道处于运行状态,就持续尝试连接
  50.         while self._running:
  51.             try:
  52.                 # 启动Bot WebSocket连接(使用配置的app_id/secret)
  53.                 await self._client.start(appid=self.config.app_id, secret=self.config.secret)
  54.             except Exception as e:
  55.                 # 连接异常时记录警告日志(不终止循环,后续重连)
  56.                 logger.warning("QQ bot error: {}", e)
  57.             # 渠道仍在运行时,等待5秒后重连
  58.             if self._running:
  59.                 logger.info("Reconnecting QQ bot in 5 seconds...")
  60.                 await asyncio.sleep(5)
  61.     async def stop(self) -> None:
  62.         """Stop the QQ bot."""
  63.         # 标记渠道为停止状态(终止重连循环)
  64.         self._running = False
  65.         # 关闭Bot客户端连接(若已初始化)
  66.         if self._client:
  67.             try:
  68.                 await self._client.close()
  69.             except Exception:
  70.                 # 关闭异常时静默处理(避免影响服务停止)
  71.                 pass
  72.         # 记录停止日志
  73.         logger.info("QQ bot stopped")
  74.     async def send(self, msg: OutboundMessage) -> None:
  75.         """Send a message through QQ."""
  76.         # 客户端未初始化时记录警告并返回
  77.         if not self._client:
  78.             logger.warning("QQ client not initialized")
  79.             return
  80.         try:
  81.             # 调用QQ API发送C2C私聊消息
  82.             await self._client.api.post_c2c_message(
  83.                 openid=msg.chat_id,  # 接收者openid(对应QQ用户ID)
  84.                 msg_type=0,          # 消息类型:0=文本消息
  85.                 content=msg.content, # 消息内容
  86.             )
  87.         except Exception as e:
  88.             # 发送失败时记录错误日志
  89.             logger.error("Error sending QQ message: {}", e)
  90.     async def _on_message(self, data: "C2CMessage") -> None:
  91.         """Handle incoming message from QQ."""
  92.         try:
  93.             # 消息去重:已处理的消息ID直接返回
  94.             if data.id in self._processed_ids:
  95.                 return
  96.             # 将当前消息ID加入已处理队列(自动淘汰超1000条的旧ID)
  97.             self._processed_ids.append(data.id)
  98.             # 提取消息发送者信息(兼容不同版本SDK的字段名)
  99.             author = data.author
  100.             user_id = str(getattr(author, 'id', None) or getattr(author, 'user_openid', 'unknown'))
  101.             # 提取消息内容并去除首尾空白符
  102.             content = (data.content or "").strip()
  103.             # 空消息直接忽略
  104.             if not content:
  105.                 return
  106.             # 调用父类的消息处理方法(转发到消息总线/AgentLoop)
  107.             await self._handle_message(
  108.                 sender_id=user_id,          # 发送者ID
  109.                 chat_id=user_id,            # 聊天ID(C2C场景下等于发送者ID)
  110.                 content=content,            # 消息内容
  111.                 metadata={"message_id": data.id},  # 附加元数据(消息ID)
  112.             )
  113.         except Exception:
  114.             # 消息处理异常时记录完整堆栈(便于排查问题)
  115.             logger.exception("Error handling QQ message")
复制代码
0xFF 参考

打造你的 Claw 帝国:OpenClaw底层原理揭秘!
OpenClaw 之后,AI 应用该怎么设计?
李宏毅OpenClaw技术全面解析:System Prompt驱动的身份构建 + 工具链递归调用 + HEARTBEAT主动心跳,AI Agent架构运作原理深度拆解
我给 OpenClaw 杀了 47 次僵尸进程,终于想明白了一些事
万字】带你实现一个Agent(上),从Tools、MCP到Skills
3500 行代码打造轻量级AI Agent:Nanobot 架构深度解析
Kimi Agent产品很厉害,然后呢?
OpenClaw真完整解说:架构与智能体内核
https://github.com/shareAI-lab/learn-claude-code
深入理解OpenClaw技术架构与实现原理(上)
深度解析:一张图拆解OpenClaw的Agent核心设计
OpenClaw小龙虾架构全面解析
OpenClaw架构-Agent Runtime 运行时深度拆解
OpenClaw 架构详解 · 第一部分:控制平面、会话管理与事件循环
从回答问题到替你做事,AI Agent 为什么突然火了?
[[源码导读] OpenClaw 架构深潜](https://www.cnblogs.com/liuscraft/p/19731669)

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册