一、整体思路
长网页文本往往超过 LLM 单次处理的 token 限制,我们需要设计一个 map-reduce 流水线来拆分、局部总结、归并:
- 加载网页内容
- 拆分成可控大小的 chunk
- 对每个 chunk 做初步总结 (map)
- 汇总所有初步总结 (reduce)
- 如有需要递归 reduce 直到满足 token 限制
- 输出最终总结
接下来我们用代码实现!
二、准备工作
1. 初始化 LLM
首先我们通过 init_chat_model 加载 LLM:- # llm_env.py
- from langchain.chat_models import init_chat_model
- llm = init_chat_model("gpt-4o-mini", model_provider="openai")
复制代码 三、主程序 main.py
1. 导入依赖 & 初始化
- import os
- import sys
- sys.path.append(os.getcwd())
- from langchain_community.document_loaders import WebBaseLoader
- from langchain.chains.combine_documents import create_stuff_documents_chain
- from langchain.chains.llm import LLMChain
- from langchain_core.prompts import ChatPromptTemplate
- from langchain_text_splitters import CharacterTextSplitter
- import operator
- from typing import Annotated, List, Literal, TypedDict
- from langchain.chains.combine_documents.reduce import collapse_docs, split_list_of_docs
- from langchain_core.documents import Document
- from langgraph.constants import Send
- from langgraph.graph import END, START, StateGraph
- from llm_set import llm_env
- llm = llm_env.llm
复制代码 2. 加载网页
- loader = WebBaseLoader("https://en.wikipedia.org/wiki/Artificial_intelligence")
- docs = loader.load()
复制代码 通过 WebBaseLoader 可以轻松加载网页文本到 docs 列表中。
3. 定义 Prompt 模板
- Map 阶段 Prompt- map_prompt = ChatPromptTemplate.from_messages(
- [("system", "Write a concise summary of the following: \\n\\n{context}")]
- )
复制代码 - Reduce 阶段 Prompt- reduce_template = """
- The following is a set of summaries:
- {docs}
- Take these and distill it into a final, consolidated summary
- of the main themes.
- """
- reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
复制代码 4. 拆分文档 chunk
- text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
- split_docs = text_splitter.split_documents(docs)
- print(f"Split into {len(split_docs)} chunks")
复制代码 将网页内容拆分成多个 chunk,chunk 大小设置 1000 tokens,便于单次处理。
5. 定义 Token 长度计算
- token_max = 1000
- def length_function(documents: List[Document]) -> int:
- return sum(llm.get_num_tokens(d.page_content) for d in documents)
复制代码 计算输入文档 token 总量,用于判断是否需要继续 collapse。
6. 定义状态
主状态:- class OverallState(TypedDict):
- contents: List[str]
- summaries: Annotated[list, operator.add]
- collapsed_summaries: List[Document]
- final_summary: str
复制代码 Map 阶段状态:- class SummaryState(TypedDict):
- content: str
复制代码 7. 生成初步 summary (Map 阶段)
- def generate_summary(state: SummaryState):
- prompt = map_prompt.invoke(state["content"])
- response = llm.invoke(prompt)
- return {"summaries": [response.content]}
复制代码 8. Map 调度逻辑
- def map_summaries(state: OverallState):
- return [
- Send("generate_summary", {"content": content}) for content in state["contents"]
- ]
复制代码 9. 收集 summary
- def collect_summaries(state: OverallState):
- return {
- "collapsed_summaries": [Document(summary) for summary in state["summaries"]]
- }
复制代码 10. Reduce 逻辑
- 内部 reduce 函数- def _reduce(input: dict) -> str:
- prompt = reduce_prompt.invoke(input)
- response = llm.invoke(prompt)
- return response.content
复制代码 - Collapse summaries- def collapse_summaries(state: OverallState):
- docs_lists = split_list_of_docs(
- state["collapsed_summaries"],
- length_function,
- token_max,
- )
- results = []
- for doc_list in docs_lists:
- combined = collapse_docs(doc_list, _reduce)
- results.append(combined)
- return {"collapsed_summaries": results}
复制代码 11. 是否继续 collapse
- def should_collapse(state: OverallState):
- num_tokens = length_function(state["collapsed_summaries"])
- if num_tokens > token_max:
- return "collapse_summaries"
- else:
- return "generate_final_summary"
复制代码 12. 生成最终 summary
- def generate_final_summary(state: OverallState):
- response = _reduce(state["collapsed_summaries"])
- return {"final_summary": response}
复制代码 四、构建流程图 (StateGraph)
- graph = StateGraph(OverallState)
- graph.add_node("generate_summary", generate_summary)
- graph.add_node("collect_summaries", collect_summaries)
- graph.add_node("collapse_summaries", collapse_summaries)
- graph.add_node("generate_final_summary", generate_final_summary)
- graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
- graph.add_edge("generate_summary", "collect_summaries")
- graph.add_conditional_edges("collect_summaries", should_collapse)
- graph.add_conditional_edges("collapse_summaries", should_collapse)
- graph.add_edge("generate_final_summary", END)
- app = graph.compile()
复制代码 五、执行总结流程
- for step in app.stream(
- {"contents": [doc.page_content for doc in split_docs]},
- {"recursion_limit": 10},
- ):
- print(list(step.keys()))
复制代码 通过 .stream() 启动整个流水线,传入切片后的 contents,流式输出每步结果,直到最终汇总完成。
六、总结
通过这个示例,你可以看到:
✅ 使用 LangChain + LLM 轻松实现 网页总结
✅ 设计了 自动 map-reduce 流程,支持长文本拆分和递归 reduce
✅ 通过 StateGraph 灵活编排流程、
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |