眸胝 发表于 2026-2-9 18:45:18

Langchain 1.0后astream_events事件类型及生命周期简析

本文为博客园用户“孤舟晓月”原创,发布于博客园,备份与B站。若你在其他站点看到,说明它被盗了......
前置知识

langchain使用流式输出通常采用stream(同步)和astream(异步)两种模式,类似与下面的代码段:
print("开始流式输出...")# 流式输出for chunk in graph.stream(initial_state, config=my_config):    print(f"流式块: {chunk}")print("流式输出测试完成!")chunk就是大模型返回的信息。通常只包含少量的必需字段。也就存在很多问题:
比如无法取得token的细分使用量,部分模型(比如deepseek-reasoner)的封装无法在流式输出中实时呈现推理内容等等。
一、astream_event产生的事件类型

当然,langchain在1.0中也给出了解法,即使用astream_events方法。该方法会返回一系列的关键事件,以便咱们精准检测整个智能体的运行情况乃至修改相关数据。
langchain官方的参考地址为:astream_events
1. 官方文档

放个截图,方便后面的内容展开
https://img2024.cnblogs.com/blog/345865/202602/345865-20260209180902305-89118290.png
其中我遇到过的事件类型部分摘录如下:

[*]on_chat_model_start
[*]on_chat_model_stream
[*]on_chat_model_end
[*]on_chain_start
[*]on_chain_stream
[*]on_chain_end
[*]on_tool_start
[*]on_tool_end
.......
2. 所有事件的共性字段

上面的文档翻译一下,就是:astream_event 会迭代返回多个StreamEvent对象,所有的StreamEvent都具有下列共有字段:
字段名类型描述与作用eventstring事件类型的唯一标识,如 ”on_chain_start”。namestring产生事件的组件或对象的名称,例如 ”LangGraph”、”model”、”tools”、”ChatDeepSeekCustomized”、”weather_tool”。run_idstring当前事件运行的唯一ID。用于标识一个特定的执行实例。parent_idsarray父级运行的ID列表。清晰展示了执行的层级和调用关系。例如,ChatDeepSeekCustomized 事件的 parent_ids 会包含其所属的 model 链和顶级 LangGraph 的 run_id。tagsarray标签列表,用于分类或标记运行。常见标签如 ”graph: step: 1″、”seq: step: 1″。metadataobject元数据字典,包含执行的上下文信息。不同事件类型的元数据丰富程度不同,但以下LangGraph相关字段非常常见:
• thread_id: 执行线程ID。
• langgraph_node: 当前所在的图节点名。
• langgraph_step: 执行步骤。
• langgraph_checkpoint_ns: 检查点命名空间。dataobject与该事件相关的数据。该字段的内容 这取决于活动的类型。!!!重要!!!在官方文档中,密密麻麻列举了N多事件,遗憾的是截止2026年2月9日,官方文档并没有给出这些事件的生命周期。 于是就有了本篇文章。
接下来,我会先给出测试的DEMO和框架的输出,当然,输出已经被转换为标准的JSON格式。
坐稳,这就出发!
二、测试DEMO

"""使用LangGraph create_agent方法创建智能体,ChatDeepSeekCustomized与多个工具的协作该测试模拟一个复杂的任务场景,需要智能体调用多个工具才能完成ChatDeepSeekCustomized为我基于langchain框架定制的deepseek封装,支持使用reasoner模型发起的tool-call和推理思维链透传。如果你想运行这个测试,请:1. 切换为langchain-deepseek库提供的ChatDeepseek封装。2. 自行注册deepseek的API"""import asyncioimport osimport sys    # 添加项目根目录到Python路径sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))    from langchain_core.messages import SystemMessage, HumanMessage, AIMessagefrom langchain_core.tools import toolfrom langchain.agents import create_agentfrom langgraph.checkpoint.memory import MemorySaverfrom extends.custom_deepseek_chat import ChatDeepSeekCustomized      @tooldef calculator_tool(operation: str) -> str:      """计算器工具,执行基本数学运算"""      try:          # 解析操作          result = eval(operation)          return f"计算结果: {result}"      except Exception as e:          return f"计算错误: {str(e)}"      @tooldef search_tool(query: str) -> str:      """搜索工具,模拟信息检索"""      return f"搜索结果: 关于'{query}'的信息是模拟的,实际应用中应连接真实搜索引擎"      @tooldef analysis_tool(data: str) -> str:      """分析工具,对数据进行分析"""      return f"分析结果: '{data}'的数据分析已完成"      @tooldef weather_tool(city: str) -> str:      """天气工具,模拟查询天气"""      return f"天气预报: {city}的天气是晴朗的,温度约22°C"      @tooldef database_query_tool(table: str, condition: str) -> str:      """数据库查询工具,模拟数据库查询"""      return f"数据库查询结果: 从表'{table}'中找到满足条件'{condition}'的记录共5条"      def create_complex_task_agent():      """创建一个需要多次工具调用才能完成复杂任务的智能体"""      print("创建使用ChatDeepSeekCustomed的LangGraph智能体...")      # 设置API密钥   # !!!----------填入自己的API KEY-----------!!!    DEEPSEEK_API_KEY = "sk-你的API KEY"      # 为了获得比较清晰的events,直接使用ChatDeepSeekCustomized,启用reasoning    model = ChatDeepSeekCustomized(          model="deepseek-reasoner",          api_key=DEEPSEEK_API_KEY,          temperature=0.1,          include_reasoning_content=True# 启用推理内容输出      )      # 定义工具列表      tools =       # 创建系统提示 - 字符串格式      system_prompt_str = """      你是一个高级AI助手,能够使用多种工具解决问题。你需要合理规划工具调用顺序,完成复杂任务。      在每次调用工具后,你会收到结果,然后根据结果决定下一步行动。    请使用推理来决定如何最好地解决问题。    你可以进行多步思考,并在必要时调用工具。      """      # 定义checkpoint存储类型      my_cp_memory = MemorySaver()      # 创建Agent - 使用正确的参数名      graph = create_agent(          tools=tools,          model=model,# 使用我们的自定义模型          system_prompt=system_prompt_str,          checkpointer=my_cp_memory      )      return graph, tools      async def test_streaming():      """测试流式输出,观察推理内容是否正常输出"""      print("\n开始测试流式输出...")      print("开始测试复杂任务...")      try:          from langchain.agents import AgentState          from langchain_core.runnables import RunnableConfig            # 创建智能体          graph, tools = create_complex_task_agent()            # 定义一个复杂任务,需要多次工具调用          complex_task = (            "我需要规划一次去北京的旅行。请帮我:\n"            "1. 查询北京的当前天气\n"            "2. 计算从上海到北京的距离(假设直线距离约为1000公里)\n"            "3. 基于距离计算旅行所需的油费(假设每公里油耗费用为0.8元)\n"            "4. 分析旅游预算的合理性\n"            "5. 最后给我一个完整的旅行建议"          )            print(f"任务: {complex_task}")          print("\n开始执行任务...")            class StateInAgent(AgentState):            pass            initial_state = StateInAgent(messages=)            # 配置          my_config = RunnableConfig()          my_config["configurable"] = {            "thread_id": "test_streaming",          }            print("开始流式输出...")          events = []          # 测试流式输出          async for event in graph.astream_events(initial_state, config=my_config):            if event['event'] == 'on_chat_model_stream':                  if not should_stop_append_on_chat_model_stream(events):                      events.append(event)                      print(event)            else:                  events.append(event)                  print(event)          print("流式输出测试完成!")          return True      except Exception as e:          print(f"流式输出测试失败: {e}")          import traceback          traceback.print_exc()          return False    def should_stop_append_on_chat_model_stream(event_list, max_count=10):   """   因为大模型一次简单的推理就可能产生数百个on_chat_model_stream事件,这里做一个   定制化判断,如果事件队列末尾有连续10个on_chat_model_stream事件了,就停止入队列。   """      stop_flag =False      # 判断最近的12个事件中,有多少个on_chat_model_stream事件,如果超过10个,则返回true      last_max_events = event_list[-max_count:]      for event in last_max_events:          if event['event'] == 'on_chat_model_stream':            stop_flag = True          else:            stop_flag = False      return stop_flag    async def main():      """主函数"""      print("
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

户烫擞 发表于 2026-2-13 11:25:04

前排留名,哈哈哈

官厌 发表于 2026-2-13 22:17:28

鼓励转贴优秀软件安全工具和文档!

押疙 发表于 2026-2-19 16:08:40

东西不错很实用谢谢分享

蒙飘 发表于 2026-2-21 08:27:06

感谢分享

骛扼铮 发表于 2026-2-24 09:04:43

谢谢分享,试用一下

鞭氅 发表于 2026-2-25 15:26:29

这个有用。

裆趾针 发表于 2026-2-26 11:13:48

感谢分享

钦娅芬 发表于 2026-3-7 08:38:20

懂技术并乐意极积无私分享的人越来越少。珍惜

扔飒 发表于 2026-3-7 11:03:56

感谢,下载保存了

语樊偿 发表于 2026-3-8 12:26:20

热心回复!

列蜜瘘 发表于 2026-3-10 09:29:13

谢谢分享,试用一下

迭婵椟 发表于 2026-3-11 03:15:55

分享、互助 让互联网精神温暖你我

吟氅 发表于 2026-3-11 23:04:43

感谢分享
页: [1]
查看完整版本: Langchain 1.0后astream_events事件类型及生命周期简析