蛟当罟 发表于 2025-9-20 12:33:52

[MCP][07]logging和progress等功能说明

前言

截至目前(2025年9月19日),除了基础的Prompt、Resource和Tool概念,FastMCP还提供了以下功能:Sampling、Elicitation、Roots、Logging、Progress、Proxy、Middleware、Composition和Authentication等功能

[*]Sampling,采样,在server端调用client的llm,实现功能解耦
[*]Elicition,征询,实现人工介入
[*]Roots,Client告知Server可访问的资源
[*]Logging,将Server日志发送给Client
[*]Progress,Server端将进度发送给Client
[*]Proxy,代理其它MCP Server
[*]Middleware,拦截MCP通信中的请求和响应
[*]Composition,Server端将多个servers组合成一个server对外提供
[*]Authentication,Client和Server之间安全认证
其中Sampling和Elicitation在我的实际开发中用到的比较多,所以我在前面章节中单独拎出来介绍了。FastMCP官方文档也说了Authentication还在迅速迭代中,虽然已经有了相关文档,但本文暂时就不涉及了,等这个功能稳定了再具体细说。剩下的功能会在本文中一次性全部介绍完,篇幅较长,可以根据章节名跳转到自己需要关注的内容。本文大部分参考自官方文档。
Roots

Roots 是客户端向服务器告知其可访问资源的一种机制。服务器可利用此信息调整行为或提供更相关的响应。
静态Roots

from fastmcp import Client

client = Client(
    "my_mcp_server.py",
    roots=["/path/to/root1", "/path/to/root2"]
)动态Roots

from fastmcp import Client
from fastmcp.client.roots import RequestContext

async def roots_callback(context: RequestContext) -> list:
    print(f"Server requested roots (Request ID: {context.request_id})")
    return ["/path/to/root1", "/path/to/root2"]

client = Client(
    "my_mcp_server.py",
    roots=roots_callback
)Logging

Logging,从服务器向 MCP 客户端发送消息。FastMCP提供了一个logger(fastmcp.utilities.logging.get_logger()),也可以用python标准库的logging。
服务器日志功能允许 MCP 工具向客户端发送调试(debug)、信息(info)、警告(warning)和错误(error)级别的消息。这有助于用户了解函数执行过程,在开发和运行阶段辅助调试。一般用于以下场景:

[*]调试:发送详细的执行信息,帮助诊断问题
[*]进度可见性:让用户了解工具当前正在执行的操作
[*]错误报告:向客户端传达问题及其上下文
[*]审计追踪:为合规或分析目的生成工具执行记录
与标准 Python 日志不同,MCP 服务器 Logging 会直接将消息发送至客户端,使其在客户端界面或日志中可见。
Server 示例

在任意tool函数中使用Context提供的日志方法:
from fastmcp import FastMCP, Context

mcp = FastMCP("custom")

@mcp.tool
async def analyze_data(data: list, ctx: Context) -> dict:
    """通过全面日志记录分析数值数据。"""
    await ctx.debug("开始分析数值数据")
    await ctx.info(f"正在分析 {len(data)} 个数据点")
   
    try:
      if not data:
            await ctx.warning("提供了空数据列表")
            return {"error": "空数据列表"}
      
      result = sum(data) / len(data)
      await ctx.info(f"分析完成,平均值为:{result}")
      return {"average": result, "count": len(data)}
      
    except Exception as e:
      await ctx.error(f"分析失败:{str(e)}")
      raise

if __name__ == "__main__":
    mcp.run(transport="stdio", show_banner=False)所有日志方法(debug、info、warning、error、log)现在均支持 extra 参数,该参数接受一个字典,用于传递任意结构化数据。这使得客户端可接收结构化日志,便于创建丰富且可查询的日志记录。
@mcp.tool
async def process_transaction(transaction_id: str, amount: float, ctx: Context):
    await ctx.info(
      f"正在处理交易 {transaction_id}",
      extra={
            "transaction_id": transaction_id,
            "amount": amount,
            "currency": "USD"
      }
    )
    # ... 处理逻辑 ...Client 示例

import asyncio
from pathlib import Path
from fastmcp.client import Client, StdioTransport
from fastmcp.client.logging import LogMessage
import logging
import sys

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
# This mapping is useful for converting MCP level strings to Python's levels
LOGGING_LEVEL_MAP = logging.getLevelNamesMapping()

class MCPClient:
    def __init__(self):
      self.mcp_client = Client(
            StdioTransport(
                command = str(Path(__file__).parent.parent / ".venv" / "bin" / "python"),
                args = ["demo09-server.py"],
                cwd = str(Path(__file__).parent)
            ),
            log_handler=self.logging_handler,
      )

    async def logging_handler(self, message: LogMessage):
      """
      Handles incoming logs from the MCP server and forwards them
      to the standard Python logging system.
      """
      msg = message.data.get('msg')
      extra = message.data.get('extra')

      # Convert the MCP log level to a Python log level
      level = LOGGING_LEVEL_MAP.get(message.level.upper(), logging.INFO)
      logger.log(level, msg, extra=extra)


    async def generate(self):
      async with self.mcp_client:
            await self.mcp_client.ping()

            rst = await self.mcp_client.call_tool("analyze_data", arguments={"data": })
            print(rst)

async def main():
    client = MCPClient()
    await client.generate()

if __name__ == "__main__":
    asyncio.run(main())client运行输出
开始分析数值数据
正在分析 5 个数据点
分析完成,平均值为:3.0
CallToolResult(content=, structured_content={'average': 3.0, 'count': 5}, data={'average': 3.0, 'count': 5}, is_error=False)Progress

Progress 功能允许 MCPtool 向 Client 通知长时间运行操作的当前进度。这使得Client能够显示进度指示器,从而在执行耗时任务时提供更佳的用户体验。Progress 在以下方面具有重要价值:

[*]用户体验:让用户了解长时间运行操作的当前状态
[*]进度指示器:使客户端能够显示进度条或百分比
[*]防止超时:表明操作正在持续进行中,避免被误判为无响应
[*]调试用途:追踪执行进度,便于性能分析
Server 示例

from fastmcp import FastMCP, Context
import asyncio

mcp = FastMCP("custom")

@mcp.tool
async def process_items(items: list, ctx: Context) -> dict:
    """处理项目列表,并发送进度更新。"""
    total = len(items)
    results = []
   
    for i, item in enumerate(items):
      # 每处理一个项目,报告当前进度
      await ctx.report_progress(progress=i, total=total)
      
      # 模拟处理耗时
      await asyncio.sleep(0.1)
      results.append(item.upper())
   
    # 报告 100% 完成
    await ctx.report_progress(progress=total, total=total)
   
    return {"processed": len(results), "results": results}

if __name__ == "__main__":
    mcp.run(transport="stdio", show_banner=False)Client 示例

import asyncio
from pathlib import Path
from fastmcp.client import Client, StdioTransport


class MCPClient:
    def __init__(self):
      self.mcp_client = Client(
            StdioTransport(
                command = str(Path(__file__).parent.parent / ".venv" / "bin" / "python"),
                args = ["demo09-server.py"],
                cwd = str(Path(__file__).parent)
            ),
            progress_handler=self.progress_handler,
      )

    async def progress_handler(self, progress: float, total: float | None, message: str | None) -> None:
      if total is not None:
            percentage = (progress / total) * 100
            print(f"Progress: {percentage:.1f}% - {message or ''}")
      else:
            print(f"Progress: {progress} - {message or ''}")

    async def generate(self):
      async with self.mcp_client:
            await self.mcp_client.ping()

            rst = await self.mcp_client.call_tool("process_items", arguments={"items": ["item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8", "item9", "item10", "item11", "item12", "item13", "item14", "item15"]})
            print(rst)

async def main():
    client = MCPClient()
    await client.generate()

if __name__ == "__main__":
    asyncio.run(main())client运行输出
Progress: 0.0% -
Progress: 6.7% -
Progress: 13.3% -
Progress: 20.0% -
Progress: 26.7% -
Progress: 33.3% -
Progress: 40.0% -
Progress: 46.7% -
Progress: 53.3% -
Progress: 60.0% -
Progress: 66.7% -
Progress: 73.3% -
Progress: 80.0% -
Progress: 86.7% -
Progress: 93.3% -
Progress: 100.0% -
CallToolResult(content=}', annotations=None, meta=None)], structured_content={'processed': 15, 'results': ['ITEM1', 'ITEM2', 'ITEM3', 'ITEM4', 'ITEM5', 'ITEM6', 'ITEM7', 'ITEM8', 'ITEM9', 'ITEM10', 'ITEM11', 'ITEM12', 'ITEM13', 'ITEM14', 'ITEM15']}, data={'processed': 15, 'results': ['ITEM1', 'ITEM2', 'ITEM3', 'ITEM4', 'ITEM5', 'ITEM6', 'ITEM7', 'ITEM8', 'ITEM9', 'ITEM10', 'ITEM11', 'ITEM12', 'ITEM13', 'ITEM14', 'ITEM15']}, is_error=False)Proxy

FastMCP 的 Proxy 允许一个 FastMCP 服务器实例作为前端,代理另一个 MCP 服务器(该服务器可能是远程的、运行在不同传输协议上的,甚至是另一个 FastMCP 实例)。此功能通过 FastMCP.as_proxy() 类方法实现。作为代理服务器,它本身不直接实现工具或资源。当它接收到请求(如 tools/call 或 resources/read)时,会将该请求转发至一个_后端_ MCP 服务器,接收其响应,再将响应原样返回给原始客户端。
sequenceDiagram    participant ClientApp as 您的客户端(如 Claude Desktop)    participant FastMCPProxy as FastMCP 代理服务器    participant BackendServer as 后端 MCP 服务器(如远程 SSE)    ClientApp->>FastMCPProxy: MCP 请求(如 stdio)    Note over FastMCPProxy, BackendServer: 代理转发请求    FastMCPProxy->>BackendServer: MCP 请求(如 sse)    BackendServer-->>FastMCPProxy: MCP 响应(如 sse)    Note over ClientApp, FastMCPProxy: 代理转发响应    FastMCPProxy-->>ClientApp: MCP 响应(如 stdio)核心优势

[*]会话隔离:每个请求拥有独立隔离的会话,确保并发操作安全
[*]传输协议桥接:通过一种传输协议暴露运行在另一种传输协议上的服务器
[*]高级 MCP 功能支持:自动转发采样(sampling)、引导(elicitation)、日志和进度报告
[*]安全性:作为后端服务器的受控网关
[*]简化架构:即使后端位置或传输协议变更,前端仍保持单一接入点
使用代理服务器时,特别是连接到基于 HTTP 的后端服务器时,需注意延迟可能显著增加。例如,list_tools() 操作可能耗时数百毫秒,而本地工具仅需 1–2 毫秒。挂载代理服务器时,此延迟会影响父服务器的所有操作,而不仅仅是与被代理工具的交互。
如果您的使用场景对低延迟有严格要求,建议使用 import_server() 方法在启动时复制工具,而非在运行时进行代理。
快速入门

推荐使用 ProxyClient 创建代理,它提供完整的 MCP 功能支持,并自动实现会话隔离:
from fastmcp import FastMCP
from fastmcp.server.proxy import ProxyClient

# 创建支持完整 MCP 功能的代理
proxy = FastMCP.as_proxy(
    ProxyClient("backend_server.py"),
    name="MyProxy"
)

# 运行代理(例如,通过 stdio 供 Claude Desktop 使用)
if __name__ == "__main__":
    proxy.run()此单一设置即可提供:

[*]安全的并发请求处理
[*]自动转发高级 MCP 功能(采样、引导等)
[*]会话隔离,防止上下文混淆
[*]与所有 MCP 客户端完全兼容
高级MCP功能支持

ProxyClient 会自动在后端服务器与连接到代理的客户端之间转发高级 MCP 协议功能,确保完整的 MCP 兼容性。支持的功能:

[*]Roots:将文件系统根目录访问请求转发给客户端
[*]Sampling:将后端发起的 LLM 补全请求转发给客户端
[*]Elicitation:将用户输入请求转发给客户端
[*]Logging:将后端日志消息转发至客户端
[*]Progress:在长时间操作中转发进度通知
也可以自定义功能支持,比如设置为None来选择性禁用转发
# 禁用采样,但保留其他功能
backend = ProxyClient(
    "backend_server.py",
    sampling_handler=None,# 禁用 LLM 采样转发
    log_handler=None      # 禁用日志转发
)基于配置的代理

你可以直接从符合 MCPConfig 模式的配置字典创建代理。这对于快速设置指向远程服务器的代理非常有用,无需手动配置每个连接细节。
from fastmcp import FastMCP

# 直接从配置字典创建代理
config = {
    "mcpServers": {
      "default": {# 对于单服务器配置,通常使用 'default'
            "url": "https://example.com/mcp",
            "transport": "http"
      }
    }
}

# 创建指向配置服务器的代理(自动创建 ProxyClient)
proxy = FastMCP.as_proxy(config, name="Config-Based Proxy")

# 通过 stdio 传输协议本地运行
if __name__ == "__main__":
    proxy.run()多服务器的设置

你可以通过在配置中指定多个条目来创建指向多个服务器的代理。系统会自动以配置名称作为前缀挂载它们:
# 多服务器配置
config = {
    "mcpServers": {
      "weather": {
            "url": "https://weather-api.example.com/mcp",
            "transport": "http"
      },
      "calendar": {
            "url": "https://calendar-api.example.com/mcp",
            "transport": "http"
      }
    }
}

# 创建统一的多服务器代理
composite_proxy = FastMCP.as_proxy(config, name="Composite Proxy")

# 工具和资源可通过前缀访问:
# - weather_get_forecast, calendar_add_event
# - weather://weather/icons/sunny, calendar://calendar/events/today显式会话管理

在内部,FastMCP.as_proxy() 使用 FastMCPProxy 类。您通常无需直接与此类交互,但在高级场景下它可供使用。FastMCPProxy 要求显式会话管理——不会执行任何自动检测。您必须选择您的会话策略:
# 在所有请求间共享会话(并发时需谨慎)
shared_client = ProxyClient("backend_server.py")
def shared_session_factory():
    return shared_client

proxy = FastMCPProxy(client_factory=shared_session_factory)

# 为每个请求创建新会话(推荐)
def fresh_session_factory():
    return ProxyClient("backend_server.py")

proxy = FastMCPProxy(client_factory=fresh_session_factory)如需自动选择会话策略,请使用便捷方法 FastMCP.as_proxy()。
# 带有特定配置的自定义工厂
def custom_client_factory():
    client = ProxyClient("backend_server.py")
    # 在此处添加任何自定义配置
    return client

proxy = FastMCPProxy(client_factory=custom_client_factory)Middleware

MCP 中间件允许您在请求和响应流经服务器时对其进行拦截和修改。可以将其视为一条管道,每个中间件均可检查当前操作、进行修改,然后将控制权传递给链中的下一个中间件。与传统的 Web 中间件不同,MCP 中间件专为 Model Context Protocol 设计,为各类 MCP 操作(如工具调用、资源读取和提示请求)提供专用钩子。
MCP 中间件是一个全新概念,未来版本中可能发生破坏性变更。
MCP 中间件的常见应用场景包括:

[*]身份验证与授权:在执行操作前验证客户端权限
[*]日志与监控:追踪使用模式与性能指标
[*]速率限制:按客户端或操作类型控制请求频率
[*]请求/响应转换:在数据到达工具前或离开后对其进行修改
[*]缓存:存储频繁请求的数据以提升性能
[*]错误处理:为服务器提供一致的错误响应
中间件工作原理

FastMCP 中间件基于管道模型运行。当请求进入时,它会按添加到服务器的顺序依次流经各个中间件。每个中间件均可:
检查传入的请求及其上下文
在传递给下一个中间件或处理器前修改请求
通过调用 call_next() 执行链中的下一个中间件/处理器
在返回前检查并修改响应
处理执行过程中发生的错误关键在于,中间件形成一条链,每个环节决定是继续处理还是完全终止链的执行。
如果你熟悉 ASGI 中间件,FastMCP 中间件的基本结构会感觉似曾相识。其核心是一个可调用类,接收一个包含当前 JSON-RPC 消息信息的上下文对象,以及一个用于继续中间件链的处理器函数。
重要的是要理解,MCP 基于 JSON-RPC 规范运行。虽然 FastMCP 以熟悉的方式呈现请求和响应,但其本质是 JSON-RPC 消息,而非 Web 应用中常见的 HTTP 请求/响应对。FastMCP 中间件适用于所有 传输类型 ,包括本地 stdio 传输和 HTTP 传输,但并非所有中间件实现都兼容所有传输类型(例如,检查 HTTP 头部的中间件无法在 stdio 传输中工作)。
实现中间件最基础的方式是重写 Middleware 基类的 call 方法:
from fastmcp.server.middleware import Middleware, MiddlewareContext

class RawMiddleware(Middleware):
    async def __call__(self, context: MiddlewareContext, call_next):
      # 此方法接收所有消息,无论类型
      print(f"原始中间件正在处理:{context.method}")
      result = await call_next(context)
      print(f"原始中间件处理完成:{context.method}")
      return result中间件钩子

为便于用户针对特定类型的消息,FastMCP 中间件提供了一系列专用钩子。您可以重写特定的钩子方法(而非实现原始的 __call__ 方法),这些方法仅在特定类型的操作时被调用,从而允许您精确地定位中间件逻辑所需的粒度。
钩子层级与执行顺序

FastMCP 提供多个按不同粒度调用的钩子。理解此层级结构对有效设计中间件至关重要。
当请求进入时,同一请求可能触发多个钩子调用,执行顺序由泛化到具体:

[*]on_message - 为所有 MCP 消息(请求和通知)调用
[*]on_request 或 on_notification - 根据消息类型调用
[*]操作特定钩子 - 为特定 MCP 操作调用,如 on_call_tool
例如,当客户端调用工具时,您的中间件将收到多次钩子调用:

[*]on_message 和 on_request 用于任何初始工具发现操作(如 list_tools)
[*]on_message(因为它是任何 MCP 消息)用于工具调用本身
[*]on_request(因为工具调用期望响应)用于工具调用本身
[*]on_call_tool(因为它是具体的工具执行)用于工具调用本身
请注意,MCP SDK 可能会执行额外操作(如为缓存目的列出工具),这将触发超出直接工具执行范围的额外中间件调用。
此层级结构允许您以适当的粒度定位中间件逻辑。对广泛关注点(如日志)使用 on_message,对身份验证使用 on_request,对工具特定逻辑(如性能监控)使用 on_call_tool。
可用钩子


[*]on_message: 为所有 MCP 消息(请求和通知)调用
[*]on_request: 专为 MCP 请求(期望响应)调用
[*]on_notification: 专为 MCP 通知(即发即弃)调用
[*]on_call_tool: 在执行工具时调用
[*]on_read_resource: 在读取资源时调用
[*]on_get_prompt: 在获取提示时调用
[*]on_list_tools: 在列出可用工具时调用
[*]on_list_resources: 在列出可用资源时调用
[*]on_list_resource_templates: 在列出资源模板时调用
[*]on_list_prompts: 在列出可用提示时调用
中间件中的组件访问

理解如何在中间件中访问组件信息(工具、资源、提示)对构建强大的中间件功能至关重要。访问模式在列出操作与执行操作之间存在显著差异。
列出操作 vs 执行操作

FastMCP 中间件以不同方式处理两种类型的操作:
列出操作 (on_list_tools, on_list_resources, on_list_prompts 等):

[*]中间件接收FastMCP 组件对象,包含完整元数据
[*]这些对象包含 FastMCP 特有属性(如 tags),可直接从组件访问
[*]结果在转换为 MCP 格式前包含完整组件信息
[*]标签包含在返回给 MCP 客户端的组件 meta 字段中
执行操作 (on_call_tool, on_read_resource, on_get_prompt):

[*]中间件在组件执行前运行
[*]中间件结果为执行结果,或组件未找到时的错误
[*]组件元数据在钩子参数中不可直接访问
在执行期间访问组件元数据

如果需要在执行操作期间检查组件属性(如标签),请使用通过上下文获取的 FastMCP 服务器实例:
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ToolError

class TagBasedMiddleware(Middleware):
    async def on_call_tool(self, context: MiddlewareContext, call_next):
      # 访问工具对象以检查其元数据
      if context.fastmcp_context:
            try:
                tool = await context.fastmcp_context.fastmcp.get_tool(context.message.name)
               
                # 检查此工具是否带有 "private" 标签
                if "private" in tool.tags:
                  raise ToolError("访问被拒绝:私有工具")
                  
                # 检查工具是否启用
                if not tool.enabled:
                  raise ToolError("工具当前已禁用")
                  
            except Exception:
                # 工具未找到或其他错误 - 让执行继续
                # 并自然处理错误
                pass
      
      return await call_next(context)相同模式适用于资源和提示:
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ResourceError, PromptError

class ComponentAccessMiddleware(Middleware):
    async def on_read_resource(self, context: MiddlewareContext, call_next):
      if context.fastmcp_context:
            try:
                resource = await context.fastmcp_context.fastmcp.get_resource(context.message.uri)
                if "restricted" in resource.tags:
                  raise ResourceError("访问被拒绝:受限资源")
            except Exception:
                pass
      return await call_next(context)
   
    async def on_get_prompt(self, context: MiddlewareContext, call_next):
      if context.fastmcp_context:
            try:
                prompt = await context.fastmcp_context.fastmcp.get_prompt(context.message.name)
                if not prompt.enabled:
                  raise PromptError("提示当前已禁用")
            except Exception:
                pass
      return await call_next(context)处理列出结果

对于列出操作,中间件 call_next 函数在组件转换为 MCP 格式前返回 FastMCP 组件列表。您可以过滤或修改此列表并将其返回给客户端。例如:
from fastmcp.server.middleware import Middleware, MiddlewareContext

class ListingFilterMiddleware(Middleware):
    async def on_list_tools(self, context: MiddlewareContext, call_next):
      result = await call_next(context)
      
      # 过滤掉带有 "private" 标签的工具
      filtered_tools = [
            tool for tool in result
            if "private" not in tool.tags
      ]
      
      # 返回修改后的列表
      return filtered_tools此过滤在组件转换为 MCP 格式并返回给客户端前进行。标签在过滤期间可访问,并包含在最终列出响应的组件 meta 字段中。
在列出操作中过滤组件时,请确保也在相应的执行钩子(on_call_tool、on_read_resource、on_get_prompt)中阻止已过滤组件的执行,以保持一致性。
工具调用拒绝

您可以通过在中间件中抛出 ToolError 来拒绝访问特定工具。这是阻止工具执行的正确方式,因为它与 FastMCP 错误处理系统正确集成
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ToolError

class AuthMiddleware(Middleware):
    async def on_call_tool(self, context: MiddlewareContext, call_next):
      tool_name = context.message.name
      
      # 拒绝访问受限工具
      if tool_name.lower() in ["delete", "admin_config"]:
            raise ToolError("访问被拒绝:工具需要管理员权限")
      
      # 允许其他工具继续执行
      return await call_next(context)拒绝工具调用时,务必抛出 ToolError,而非返回 ToolResult 对象或其他值。ToolError 确保错误通过中间件链正确传播,并转换为正确的 MCP 错误响应格式。
工具调用修改

对于工具调用等执行操作,您可以在执行前修改参数,或在执行后转换结果:
from fastmcp.server.middleware import Middleware, MiddlewareContext

class ToolCallMiddleware(Middleware):
    async def on_call_tool(self, context: MiddlewareContext, call_next):
      # 在执行前修改参数
      if context.message.name == "calculate":
            # 确保输入为正数
            if context.message.arguments.get("value", 0) < 0:
                context.message.arguments["value"] = abs(context.message.arguments["value"])
      
      result = await call_next(context)
      
      # 在执行后转换结果
      if context.message.name == "get_data":
            # 向结果添加元数据
            if result.structured_content:
                result.structured_content["processed_at"] = "2024-01-01T00:00:00Z"
      
      return result对于更复杂的工具重写场景,请考虑使用 工具转换 模式,它为创建修改后的工具变体提供了更结构化的方法。
钩子剖析

每个中间件钩子遵循相同的模式。让我们通过 on_message 钩子来理解其结构:
async def on_message(self, context: MiddlewareContext, call_next):
    # 1. 预处理:检查并可选地修改请求
    print(f"正在处理 {context.method}")
   
    # 2. 链式延续:调用下一个中间件/处理器
    result = await call_next(context)
   
    # 3. 后处理:检查并可选地修改响应
    print(f"已完成 {context.method}")
   
    # 4. 返回结果(可能已修改)
    return result每个钩子接收两个参数:

[*]context: MiddlewareContext - 包含当前请求信息:

[*]context.method - MCP 方法名称(如 "tools/call")
[*]context.source - 请求来源("client" 或 "server")
[*]context.type - 消息类型("request" 或 "notification")
[*]context.message - MCP 消息数据
[*]context.timestamp - 请求接收时间
[*]context.fastmcp_context - FastMCP Context 对象(如可用)

[*]call_next - 用于继续中间件链的函数。除非您希望完全停止处理,否则必须调用此函数。
开发者对请求流拥有完全控制权:

[*]继续处理:调用 await call_next(context) 以继续
[*]修改请求:在调用 call_next 前更改上下文
[*]修改响应:在调用 call_next 后更改结果
[*]停止链:不调用 call_next(极少需要)
[*]处理错误:在 try/catch 块中包装 call_next
除了修改请求和响应,您还可以存储状态数据,供工具(可选)稍后访问。为此,请使用 FastMCP Context 适当调用 set_state 或 get_state。
创建中间件

FastMCP 中间件通过继承 Middleware 基类并重写所需钩子来实现。
from fastmcp import FastMCP
from fastmcp.server.middleware import Middleware, MiddlewareContext

class LoggingMiddleware(Middleware):
    """记录所有 MCP 操作的中间件。"""
   
    async def on_message(self, context: MiddlewareContext, call_next):
      """为所有 MCP 消息调用。"""
      print(f"正在处理来自 {context.source} 的 {context.method}")
      
      result = await call_next(context)
      
      print(f"{context.method} 处理完成")
      return result

# 将中间件添加到您的服务器
mcp = FastMCP("MyServer")
mcp.add_middleware(LoggingMiddleware())向服务器添加中间件

中间件按添加到服务器的顺序执行。最先添加的中间件在进入时最先运行,在退出时最后运行:
mcp = FastMCP("MyServer")

mcp.add_middleware(AuthenticationMiddleware("secret-token"))
mcp.add_middleware(PerformanceMiddleware())
mcp.add_middleware(LoggingMiddleware())这将创建以下执行流:

[*]AuthenticationMiddleware(预处理)
[*]PerformanceMiddleware(预处理)
[*]LoggingMiddleware(预处理)
[*]实际工具/资源处理器
[*]LoggingMiddleware(后处理)
[*]PerformanceMiddleware(后处理)
[*]AuthenticationMiddleware(后处理)
组合服务器与中间件

当使用 服务器组合(下面提的Composition) (如 mount 或 import_server)时,中间件行为遵循以下规则:

[*]父服务器中间件为所有请求运行,包括路由到挂载服务器的请求
[*]挂载服务器中间件仅为由该特定服务器处理的请求运行
[*]中间件顺序在每个服务器内保持不变
# 带有中间件的父服务器
parent = FastMCP("Parent")
parent.add_middleware(AuthenticationMiddleware("token"))

# 带有自身中间件的子服务器
child = FastMCP("Child")
child.add_middleware(LoggingMiddleware())

@child.tool
def child_tool() -> str:
    return "from child"

# 挂载子服务器
parent.mount(child, prefix="child")当客户端调用 "child_tool" 时,请求将首先流经父服务器的身份验证中间件,然后路由到子服务器,在子服务器中再经过其日志中间件。
内置中间件

FastMCP 包含多个中间件实现,展示了最佳实践并提供立即可用的功能。让我们通过构建简化版本来探索每种类型的工作原理,然后了解如何使用完整实现。
计时中间件

性能监控对于理解服务器行为和识别瓶颈至关重要。FastMCP 在 fastmcp.server.middleware.timing 中包含计时中间件。
以下是其工作方式的示例:
import time
from fastmcp.server.middleware import Middleware, MiddlewareContext

class SimpleTimingMiddleware(Middleware):
    async def on_request(self, context: MiddlewareContext, call_next):
      start_time = time.perf_counter()
      
      try:
            result = await call_next(context)
            duration_ms = (time.perf_counter() - start_time) * 1000
            print(f"请求 {context.method} 在 {duration_ms:.2f}ms 内完成")
            return result
      except Exception as e:
            duration_ms = (time.perf_counter() - start_time) * 1000
            print(f"请求 {context.method} 在 {duration_ms:.2f}ms 后失败:{e}")
            raise要使用具有正确日志和配置的完整版本:
from fastmcp.server.middleware.timing import (
    TimingMiddleware,
    DetailedTimingMiddleware
)

# 对所有请求进行基础计时
mcp.add_middleware(TimingMiddleware())

# 详细的操作级计时(工具、资源、提示)
mcp.add_middleware(DetailedTimingMiddleware())内置版本包括自定义日志支持、正确格式化,且 DetailedTimingMiddleware 提供 on_call_tool 和 on_read_resource 等操作特定钩子,以实现精细计时。
日志中间件

请求和响应日志记录对于调试、监控和理解 MCP 服务器中的使用模式至关重要。FastMCP 在 fastmcp.server.middleware.logging 中提供全面的日志中间件。
以下是其工作方式的示例:
from fastmcp.server.middleware import Middleware, MiddlewareContext

class SimpleLoggingMiddleware(Middleware):
    async def on_message(self, context: MiddlewareContext, call_next):
      print(f"正在处理来自 {context.source} 的 {context.method}")
      
      try:
            result = await call_next(context)
            print(f"{context.method} 处理完成")
            return result
      except Exception as e:
            print(f"{context.method} 失败:{e}")
            raise要使用具有高级功能的完整版本:
from fastmcp.server.middleware.logging import (
    LoggingMiddleware,
    StructuredLoggingMiddleware
)

# 支持负载的人类可读日志
mcp.add_middleware(LoggingMiddleware(
    include_payloads=True,
    max_payload_length=1000
))

# 用于日志聚合工具的 JSON 结构化日志
mcp.add_middleware(StructuredLoggingMiddleware(include_payloads=True))内置版本包括负载日志、结构化 JSON 输出、自定义日志支持、负载大小限制以及用于精细控制的操作特定钩子。
速率限制中间件

速率限制对于保护服务器免受滥用、确保公平资源使用以及在负载下保持性能至关重要。FastMCP 在 fastmcp.server.middleware.rate_limiting 中包含复杂的速率限制中间件。
以下是其工作方式的示例:
import time
from collections import defaultdict
from fastmcp.server.middleware import Middleware, MiddlewareContext
from mcp import McpError
from mcp.types import ErrorData

class SimpleRateLimitMiddleware(Middleware):
    def __init__(self, requests_per_minute: int = 60):
      self.requests_per_minute = requests_per_minute
      self.client_requests = defaultdict(list)
   
    async def on_request(self, context: MiddlewareContext, call_next):
      current_time = time.time()
      client_id = "default"# 实际中,从头部或上下文中提取
      
      # 清理旧请求并检查限制
      cutoff_time = current_time - 60
      self.client_requests = [
            req_time for req_time in self.client_requests
            if req_time > cutoff_time
      ]
      
      if len(self.client_requests) >= self.requests_per_minute:
            raise McpError(ErrorData(code=-32000, message="超出速率限制"))
      
      self.client_requests.append(current_time)
      return await call_next(context)要使用具有高级算法的完整版本:
from fastmcp.server.middleware.rate_limiting import (
    RateLimitingMiddleware,
    SlidingWindowRateLimitingMiddleware
)

# 令牌桶速率限制(允许受控突发)
mcp.add_middleware(RateLimitingMiddleware(
    max_requests_per_second=10.0,
    burst_capacity=20
))

# 滑动窗口速率限制(精确的基于时间的控制)
mcp.add_middleware(SlidingWindowRateLimitingMiddleware(
    max_requests=100,
    window_minutes=1
))内置版本包括令牌桶算法、按客户端识别、全局速率限制以及具有可配置客户端识别功能的异步安全实现。
错误处理中间件

一致的错误处理和恢复对于健壮的 MCP 服务器至关重要。FastMCP 在 fastmcp.server.middleware.error_handling 中提供全面的错误处理中间件。
以下是其工作方式的示例:
import logging
from fastmcp.server.middleware import Middleware, MiddlewareContext

class SimpleErrorHandlingMiddleware(Middleware):
    def __init__(self):
      self.logger = logging.getLogger("errors")
      self.error_counts = {}
   
    async def on_message(self, context: MiddlewareContext, call_next):
      try:
            return await call_next(context)
      except Exception as error:
            # 记录错误并跟踪统计信息
            error_key = f"{type(error).__name__}:{context.method}"
            self.error_counts = self.error_counts.get(error_key, 0) + 1
            
            self.logger.error(f"{context.method} 中发生错误:{type(error).__name__}: {error}")
            raise要使用具有高级功能的完整版本:
from fastmcp.server.middleware.error_handling import (
    ErrorHandlingMiddleware,
    RetryMiddleware
)

# 全面的错误日志和转换
mcp.add_middleware(ErrorHandlingMiddleware(
    include_traceback=True,
    transform_errors=True,
    error_callback=my_error_callback
))

# 带指数退避的自动重试
mcp.add_middleware(RetryMiddleware(
    max_retries=3,
    retry_exceptions=(ConnectionError, TimeoutError)
))内置版本包括错误转换、自定义回调、可配置的重试逻辑以及正确的 MCP 错误格式化。
组合中间件

from fastmcp import FastMCP
from fastmcp.server.middleware.timing import TimingMiddleware
from fastmcp.server.middleware.logging import LoggingMiddleware
from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware

mcp = FastMCP("Production Server")

# 按逻辑顺序添加中间件
mcp.add_middleware(ErrorHandlingMiddleware())# 首先处理错误
mcp.add_middleware(RateLimitingMiddleware(max_requests_per_second=50))
mcp.add_middleware(TimingMiddleware())# 计时实际执行
mcp.add_middleware(LoggingMiddleware())# 记录所有内容

@mcp.tool
def my_tool(data: str) -> str:
    return f"已处理:{data}"Composition

随着MCP 应用规模扩大,你可能希望将工具、资源和提示按逻辑模块组织,或复用现有的服务器组件。FastMCP 通过两种方法支持服务器组合:

[*]import_server:一次性复制组件并添加前缀(静态组合)。
[*]mount:创建实时链接,主服务器在运行时将请求委托给子服务器(动态组合)。
为什么要组合服务器


[*]模块化:将大型应用拆分为更小、更专注的服务器(例如 WeatherServer、DatabaseServer、CalendarServer)。
[*]可复用性:创建通用工具服务器(例如 TextProcessingServer),并在需要时挂载。
[*]团队协作:不同团队可分别开发独立的 FastMCP 服务器,后期再进行组合。
[*]逻辑组织:将相关功能按逻辑分组,便于管理。
导入vs挂载

选择导入还是挂载取决于您的具体用例和需求。
特性导入挂载方法FastMCP.import_server(server, prefix=None)FastMCP.mount(server, prefix=None)组合类型一次性复制(静态)实时链接(动态)更新同步子服务器的变更不会反映到主服务器子服务器的变更立即反映到主服务器性能快速 — 无运行时委托开销较慢 — 受最慢挂载服务器影响前缀可选 — 省略则保留原名称可选 — 省略则保留原名称适用场景打包最终组件、性能敏感场景运行时模块化组合导入

import_server() 方法将一个 FastMCP 实例(子服务器)中的所有组件(工具、资源、模板、提示)复制到另一个实例(主服务器)中。可选提供 prefix 以避免命名冲突。若未提供前缀,组件将按原样导入。当多个服务器使用相同前缀(或无前缀)导入时,最后导入的服务器组件将覆盖先前导入的同名组件。
from fastmcp import FastMCP
import asyncio

# 定义子服务器
weather_mcp = FastMCP(name="WeatherService")

@weather_mcp.tool
def get_forecast(city: str) -> dict:
    """获取天气预报。"""
    return {"city": city, "forecast": "Sunny"}

@weather_mcp.resource("data://cities/supported")
def list_supported_cities() -> list:
    """列出支持天气查询的城市。"""
    return ["London", "Paris", "Tokyo"]

# 定义主服务器
main_mcp = FastMCP(name="MainApp")

# 导入子服务器
async def setup():
    await main_mcp.import_server(weather_mcp, prefix="weather")

# 结果:main_mcp 现包含带前缀的组件:
# - 工具: "weather_get_forecast"
# - 资源: "data://weather/cities/supported"

if __name__ == "__main__":
    asyncio.run(setup())
    main_mcp.run()导入的工作原理

当你调用 await main_mcp.import_server(subserver, prefix={whatever}) 时:

[*]工具:subserver 的所有工具被添加到 main_mcp,名称前缀为 {prefix}_。

[*]subserver.tool(name="my_tool") 变为 main_mcp.tool(name="{prefix}_my_tool")。

[*]资源:所有资源的 URI 和名称均被添加前缀。

[*]URI: subserver.resource(uri="data://info") 变为 main_mcp.resource(uri="data://{prefix}/info")。
[*]名称: resource.name 变为 "{prefix}_{resource.name}"。

[*]资源模板:模板的前缀规则与资源类似。

[*]URI: subserver.resource(uri="data://{id}") 变为 main_mcp.resource(uri="data://{prefix}/{id}")。
[*]名称: template.name 变为 "{prefix}_{template.name}"。

[*]提示:所有提示的名称被添加前缀 {prefix}_。

[*]subserver.prompt(name="my_prompt") 变为 main_mcp.prompt(name="{prefix}_my_prompt")。

请注意,import_server 执行的是一次性复制。在导入之后对 subserver 所做的更改不会反映在 main_mcp 中。subserver 的 lifespan 上下文也不会由主服务器执行。
prefix 参数是可选的。如果省略,组件将按原样导入,不进行修改,这样组件将保留其原始名称。当导入多个具有相同前缀或无前缀的服务器时,最后导入的服务器的组件将优先。
挂载

mount() 方法在 main_mcp 服务器与 subserver 之间创建一个实时链接。它不复制组件,而是在运行时将匹配可选 prefix 的组件请求委托给 subserver 处理。若未提供前缀,则子服务器的组件可通过原始名称直接访问。当多个服务器使用相同前缀(或无前缀)挂载时,对于冲突的组件名称,最后挂载的服务器将优先。
import asyncio
from fastmcp import FastMCP, Client

# 定义子服务器
dynamic_mcp = FastMCP(name="DynamicService")

@dynamic_mcp.tool
def initial_tool():
    """初始工具演示。"""
    return "Initial Tool Exists"

# 挂载子服务器(同步操作)
main_mcp = FastMCP(name="MainAppLive")
main_mcp.mount(dynamic_mcp, prefix="dynamic")

# 在挂载后添加工具 — 仍可通过 main_mcp 访问
@dynamic_mcp.tool
def added_later():
    """挂载后添加的工具。"""
    return "Tool Added Dynamically!"

# 测试访问已挂载的工具
async def test_dynamic_mount():
    tools = await main_mcp.get_tools()
    print("可用工具:", list(tools.keys()))
    # 输出:['dynamic_initial_tool', 'dynamic_added_later']
   
    async with Client(main_mcp) as client:
      result = await client.call_tool("dynamic_added_later")
      print("结果:", result.data)
      # 输出:"Tool Added Dynamically!"

if __name__ == "__main__":
    asyncio.run(test_dynamic_mount())挂载的工作原理

配置挂载后:

[*]实时链接:父服务器与挂载的服务器建立连接。
[*]动态更新:对挂载服务器的更改在通过父服务器访问时立即生效。
[*]前缀访问:父服务器使用前缀将请求路由到挂载的服务器。
[*]委托:对匹配前缀的组件的请求在运行时委托给挂载的服务器处理。
命名工具、资源、模板和提示的前缀规则与 import_server 相同。这包括为资源和模板的 URI/键及名称添加前缀,以便在多服务器配置中更好地识别。
由于“实时链接”的存在,父服务器上的 list_tools() 等操作会受到最慢挂载服务器速度的影响。特别是,基于 HTTP 的挂载服务器可能引入显著延迟(300-400ms,而本地工具仅需 1-2ms),并且这种减速会影响整个服务器,而不仅仅是与 HTTP 代理工具的交互。如果性能至关重要,通过 import_server() 导入工具可能是更合适的解决方案,因为它在启动时一次性复制组件,而不是在运行时委托请求。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: [MCP][07]logging和progress等功能说明