前言
很多文档和博客都只介绍如何开发MCP Server,然后集成到VS Code或者Cursor等程序,很少涉及如何开发MCP Host和MCP Client。如果你想要在自己的服务中集成完整的MCP功能,光看这些是远远不够的。所以本文及后续的MCP系列文章都会带你深入了解如何开发MCP Client,让你真正掌握这项技术。
准备开发环境
MCP官方SDK主要支持Python和TypeScript,当然也有其他语言的实现,不过我这里就以Python为例了。我的Python版本是3.13.5,但其实只要高于3.11应该都没问题。
我个人推荐使用uv来管理依赖,当然你也可以用传统的pip。Python SDK有官方的mcp包和社区的FastMCP包。官方SDK其实也内置了FastMCP,不过是v1版本,而FastMCP官网已经更新到了v2版本。作为学习,两个都装上试试也无妨。- # 使用 uv
- uv add mcp fastmcp
- # 使用 pip
- python -m pip install mcp fastmcp
复制代码 第一个MCP项目:你好,MCP世界!
在第一个MCP项目中,我们实现一个简单的MCP Client和MCP Server,但还没集成LLM。在这个阶段,Client调用Server的tool或resource都需要手动指定。
MCP Server
下面的MCP Server示例代码定义了一些prompts、resources和tools。这里有个小贴士:函数参数的类型注解、返回类型和docstring都一定要写清楚,否则后续集成LLM时,LLM就无法正确理解如何调用你的工具了。
这段Server可以通过stdio方式被Client调用。在正式让Client调用之前,建议你先手动运行一下Server,测试它能否正常启动,避免Client启动时报一堆让人摸不着头脑的错误。- from mcp.server.fastmcp import FastMCP
- from datetime import datetime
- import asyncssh
- from typing import TypeAlias, Union
- mcp = FastMCP("custom")
- @mcp.prompt()
- def greet_user(name: str, style: str = "formal") -> str:
- """Greet a user with a specified style."""
- if style == "formal":
- return f"Good day, {name}. How do you do?"
- elif style == "friendly":
- return f"Hey {name}! What's up?"
- elif style == "casual":
- return f"Yo {name}, how's it going?"
- else:
- return f"Hello, {name}!"
- @mcp.resource("greeting://{name}")
- def greeting_resource(name: str) -> str:
- """A simple greeting resource."""
- return f"Hello, {name}!"
- @mcp.resource("config://app")
- def get_config() -> str:
- """Static configuration data"""
- return "App configuration here"
- @mcp.tool()
- def add(a: int, b: int) -> int:
- """Add two numbers"""
- return a + b
- @mcp.tool()
- def multiply(a: int, b: int) -> int:
- """Multiply two numbers"""
- return a * b
- Number: TypeAlias = Union[int, float]
- @mcp.tool()
- def is_greater_than(a: Number, b: Number) -> Number:
- """Check if a is greater than b"""
- return a > b
- @mcp.tool()
- async def get_weather(city: str) -> str:
- """Get weather for a given city."""
- return f"It's always sunny in {city}!"
- @mcp.tool()
- async def get_date() -> str:
- """Get today's date."""
- return datetime.now().strftime("%Y-%m-%d")
- @mcp.tool()
- async def execute_ssh_command_remote(hostname: str, command: str) -> str:
- """Execute an SSH command on a remote host.
-
- Args:
- hostname (str): The hostname of the remote host.
- command (str): The SSH command to execute.
- Returns:
- str: The output of the SSH command.
- """
- async with asyncssh.connect(hostname, username="rainux", connect_timeout=10) as conn:
- result = await conn.run(command, timeout=10)
- stdout = result.stdout
- stderr = result.stderr
- content = str(stdout if stdout else stderr)
- return content
- if __name__ == "__main__":
- mcp.run(transport="stdio")
复制代码 MCP Client
Client通过STDIO方式调用MCP Server,server_params中指定了如何运行Server,包括python解释器路径、Server文件名和运行位置。需要注意的是,Client启动时也会启动Server,如果Server报错,Client也会跟着无法启动。- import asyncio
- from pathlib import Path
- from pydantic import AnyUrl
- from mcp import ClientSession, StdioServerParameters, types
- from mcp.client.stdio import stdio_client
- server_params = StdioServerParameters(
- command=str(Path(__file__).parent / ".venv" / "bin" / "python"),
- args=[str(Path(__file__).parent / "demo1-server.py")],
- cwd=str(Path(__file__).parent),
- )
- async def run():
- async with stdio_client(server_params) as (read, write):
- async with ClientSession(read, write) as session:
- # Initialize the connection
- await session.initialize()
- # List available prompts
- prompts = await session.list_prompts()
- print(f"Available prompts: {[p.name for p in prompts.prompts]}")
- # Get a prompt (greet_user prompt from fastmcp_quickstart)
- if prompts.prompts:
- prompt = await session.get_prompt("greet_user", arguments={"name": "Alice", "style": "friendly"})
- print(f"Prompt result: {prompt.messages[0].content}")
- # List available resources
- resources = await session.list_resources()
- print(f"Available resources: {[r.uri for r in resources.resources]}")
- # List available tools
- tools = await session.list_tools()
- print(f"Available tools: {[t.name for t in tools.tools]}")
- # Read a resource (greeting resource from fastmcp_quickstart)
- resource_content = await session.read_resource(AnyUrl("greeting://World"))
- content_block = resource_content.contents[0]
- if isinstance(content_block, types.TextResourceContents):
- print(f"Resource content: {content_block.text}")
- # Call a tool (add tool from fastmcp_quickstart)
- result = await session.call_tool("add", arguments={"a": 5, "b": 3})
- result_unstructured = result.content[0]
- if isinstance(result_unstructured, types.TextContent):
- print(f"Tool result: {result_unstructured.text}")
- result_structured = result.structuredContent
- print(f"Structured tool result: {result_structured}")
- if __name__ == "__main__":
- asyncio.run(run())
复制代码 运行Client,输出如下:- Processing request of type ListPromptsRequest
- Available prompts: ['greet_user']
- Processing request of type GetPromptRequest
- Prompt result: type='text' text="Hey Alice! What's up?" annotations=None meta=None
- Processing request of type ListResourcesRequest
- Available resources: [AnyUrl('config://app')]
- Processing request of type ListToolsRequest
- Available tools: ['add', 'multiply', 'get_weather', 'get_date', 'execute_ssh_command_remote']
- Processing request of type ReadResourceRequest
- Resource content: Hello, World!
- Processing request of type CallToolRequest
- Tool result: 8
- Structured tool result: {'result': 8}
复制代码 可以看到,Client成功地调用了Server上的各种功能,包括获取提示、读取资源和调用工具。
使用streamable-http远程调用:让MCP飞起来!
上面的例子中,Client通过STDIO方式在本地调用Server。现在我们稍作修改,让它可以通过HTTP远程调用Server,这样就更加灵活了。
MCP Server
只列出修改的部分:- mcp = FastMCP("custom", host="localhost", port=8001)
- if __name__ == "__main__":
- mcp.run(transport="streamable-http")
复制代码 修改完成后,启动Server,它会监听在localhost:8001地址上,就像一个小小的Web服务(其实就是个Web服务,暴露的api为/mcp)。
MCP Client
同样只列出修改的部分。Client需要指定MCP Server的地址。streamablehttp_client返回的第三个参数get_session_id用于会话管理,大多数情况下你不需要直接使用它,所以在一些文档中这里会用_来占位。- from mcp.client.streamable_http import streamablehttp_client
- server_uri = "http://localhost:8001/mcp"
- async def main():
- async with streamablehttp_client(server_uri) as (read, write, get_session_id):
- # 获取当前会话ID
- session_id = get_session_id()
- print(f"Session ID before initialization: {session_id}")
-
- async with ClientSession(read, write) as session:
- # Initialize the connection
- await session.initialize()
-
- # 初始化后再次获取会话ID
- session_id = get_session_id()
- print(f"Session ID after initialization: {session_id}")
复制代码 client运行输出:- Session ID before initialization: None
- Session ID after initialization: 60ce4204b907469e9eb46e7e01df040d
- Available prompts: ['greet_user']
- Prompt result: type='text' text="Hey Alice! What's up?" annotations=None meta=None
- Available resources: [AnyUrl('config://app')]
- Available tools: ['add', 'multiply', 'get_weather', 'get_date', 'execute_ssh_command_remote']
- Resource content: Hello, World!
- Tool result: 8
- Structured tool result: {'result': 8}
复制代码 现在我们的MCP应用已经可以通过网络进行远程调用了,架构变得更加灵活。
集成LLM:让AI自己做决定!
前面两个示例中,我们都需要在Client中手动控制调用Server的tool,这在实际应用中显然是不现实的。我们需要集成LLM,让AI自己决定该调用哪个工具。
MCP Server
Server端不需要做任何变更,Client还是通过HTTP方式调用我们之前创建的Server。
MCP Client
这里我们选用阿里的通义千问(Qwen)。Qwen的API Key可以自行申请,氪个5块钱就够个人开发用很久了。为了便于后续开发,我把配置功能单独放到了一个模块里,下面代码中直接使用了,相关模块放在"补充"部分。- """
- MCP (Model Context Protocol) 客户端示例
- 该客户端演示了如何使用 MCP 协议与 MCP 服务器进行交互,并通过 LLM 调用服务器提供的工具。
- 工作流程:
- 1. 连接到 MCP 服务器
- 2. 获取服务器提供的工具列表
- 3. 用户输入查询
- 4. 将查询发送给 LLM,LLM 可能会调用 MCP 服务器提供的工具
- 5. 执行工具调用并获取结果
- 6. 将结果返回给 LLM 进行最终回答
- """
- import asyncio
- # JSON 处理
- import json
- # 增强输入功能(在某些系统上提供命令历史等功能)
- import readline # 引入readline模块用于增强python的input功能, Windows下的python标准库可能不包含
- # 异常追踪信息
- import traceback
- # 异步上下文管理器,用于资源管理
- from contextlib import AsyncExitStack
- # 类型提示支持
- from typing import List, Optional, cast
- # MCP 客户端会话和 HTTP 传输
- from mcp import ClientSession
- from mcp.client.streamable_http import streamablehttp_client
- # OpenAI 异步客户端,用于与 LLM 通信
- from openai import AsyncOpenAI
- # OpenAI 聊天完成相关的类型定义
- from openai.types.chat import (ChatCompletionAssistantMessageParam,
- ChatCompletionMessageFunctionToolCall,
- ChatCompletionMessageParam,
- ChatCompletionMessageToolCall,
- ChatCompletionToolMessageParam,
- ChatCompletionToolParam,
- ChatCompletionUserMessageParam)
- # 项目配置和日志模块
- from pkg.config import cfg
- from pkg.log import logger
- class MCPClient:
- """
- MCP 客户端类,负责管理与 MCP 服务器的连接和交互
- """
-
- def __init__(self):
- """
- 初始化 MCP 客户端
- """
- # 客户端会话,初始为空
- self.session: Optional[ClientSession] = None
- # 异步上下文管理栈,用于管理异步资源的生命周期
- self.exit_stack = AsyncExitStack()
- # OpenAI 异步客户端,用于与 LLM 通信
- self.client = AsyncOpenAI(
- base_url=cfg.llm_base_url,
- api_key=cfg.llm_api_key,
- )
- async def connect_to_server(self, server_uri: str):
- """
- 连接到 MCP 服务器
-
- Args:
- server_uri (str): MCP 服务器的 URI
- """
- # 创建 Streamable HTTP 传输连接
- http_transport = await self.exit_stack.enter_async_context(
- streamablehttp_client(server_uri)
- )
- # 获取读写流
- self.read, self.write, _ = http_transport
- # 创建并初始化客户端会话
- self.session = await self.exit_stack.enter_async_context(
- ClientSession(self.read, self.write)
- )
- # 初始化会话
- await self.session.initialize()
- # 检查会话是否成功初始化
- if self.session is None:
- raise RuntimeError("Failed to initialize session")
-
- # 获取服务器提供的工具列表
- response = await self.session.list_tools()
- tools = response.tools
- logger.info(f"\nConnected to server with tools: {[tool.name for tool in tools]}")
- async def process_query(self, query: str) -> str:
- """
- 处理用户查询
-
- Args:
- query (str): 用户的查询
-
- Returns:
- str: 处理结果
- """
- # 初始化消息历史,包含用户的查询
- messages: List[ChatCompletionMessageParam] = [
- ChatCompletionUserMessageParam(
- role="user",
- content=query
- )
- ]
-
- # 确保会话已初始化
- if self.session is None:
- raise RuntimeError("Session not initialized. Please connect to server first.")
-
- # 获取服务器提供的工具列表
- response = await self.session.list_tools()
-
- # 构建工具列表,处理可能为None的字段
- # 这些工具将被传递给 LLM,以便 LLM 知道可以调用哪些工具
- available_tools: List[ChatCompletionToolParam] = []
- for tool in response.tools:
- tool_def: ChatCompletionToolParam = {
- "type": "function",
- "function": {
- "name": tool.name,
- "description": tool.description or "",
- "parameters": tool.inputSchema or {}
- }
- }
- available_tools.append(tool_def)
- logger.info(f"Available tools: {available_tools}")
- # 调用 LLM 进行聊天完成
- response = await self.client.chat.completions.create(
- model=cfg.llm_model,
- messages=messages,
- tools=available_tools,
- )
- # 存储最终输出文本
- final_text = []
- # 获取 LLM 的响应消息
- message = response.choices[0].message
- final_text.append(message.content or "")
- # 如果 LLM 要求调用工具,则处理工具调用
- while message.tool_calls:
- # 处理每个工具调用
- for tool_call in message.tool_calls:
- # 确保我们处理的是正确的工具调用类型
- if hasattr(tool_call, 'function'):
- # 这是一个函数工具调用
- function_call = cast(ChatCompletionMessageFunctionToolCall, tool_call)
- function = function_call.function
- tool_name = function.name
- # 解析工具参数
- tool_args = json.loads(function.arguments)
- else:
- # 跳过不支持的工具调用类型
- continue
- # 执行工具调用
- if self.session is None:
- raise RuntimeError("Session not initialized. Cannot call tool.")
-
- # 调用 MCP 服务器上的工具
- result = await self.session.call_tool(tool_name, tool_args)
- final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
- # 将工具调用和结果添加到消息历史
- # 这样 LLM 可以知道它之前调用了哪些工具
- assistant_msg: ChatCompletionAssistantMessageParam = {
- "role": "assistant",
- "tool_calls": [
- {
- "id": tool_call.id,
- "type": "function",
- "function": {
- "name": tool_name,
- "arguments": json.dumps(tool_args)
- }
- }
- ]
- }
- messages.append(assistant_msg)
-
- # 添加工具调用结果到消息历史
- tool_msg: ChatCompletionToolMessageParam = {
- "role": "tool",
- "tool_call_id": tool_call.id,
- "content": str(result.content) if result.content else ""
- }
- messages.append(tool_msg)
- # 将工具调用的结果交给 LLM,让 LLM 生成最终回答
- response = await self.client.chat.completions.create(
- model=cfg.llm_model,
- messages=messages,
- tools=available_tools
- )
- # 获取新的响应消息
- message = response.choices[0].message
- if message.content:
- final_text.append(message.content)
- # 返回最终结果
- return "\n".join(final_text)
-
- async def chat_loop(self):
- """
- 运行交互式聊天循环
- """
- print("\nMCP Client Started!")
- print("Type your queries or 'quit' to exit.")
- # 持续接收用户输入
- while True:
- try:
- # 获取用户输入
- query = input("\nQuery: ").strip()
- # 检查是否退出
- if query.lower() == 'quit':
- break
- # 忽略空输入
- if not query:
- continue
- # 处理用户查询并输出结果
- response = await self.process_query(query)
- print("\n" + response)
- # 异常处理
- except Exception as e:
- print(f"\nError: {str(e)}")
- print(traceback.format_exc())
- async def cleanup(self):
- """
- 清理资源
- """
- await self.exit_stack.aclose()
- async def main():
- """
- 主函数
- """
- # 创建 MCP 客户端实例
- client = MCPClient()
- try:
- # 连接到 MCP 服务器
- await client.connect_to_server("http://localhost:8001/mcp")
- # 运行聊天循环
- await client.chat_loop()
- except Exception as e:
- print(f"Error: {str(e)}")
- finally:
- # 清理资源
- await client.cleanup()
- # 程序入口点
- if __name__ == "__main__":
- asyncio.run(main())
复制代码 client运行输出:- MCP Client Started!
- Type your queries or 'quit' to exit.
- Query: 今天的日期是什么
- [Calling tool get_date with args {}]
- 今天的日期是2025年9月13日。
- Query: 合肥的天气怎么样?
- [Calling tool get_weather with args {'city': '合肥'}]
- 合肥的天气总是阳光明媚!
- Query: 0.11比0.9大吗
- [Calling tool is_greater_than with args {'a': 0.11, 'b': 0.9}]
- 0.11 不比 0.9 大。0.11 小于 0.9。
- Query: quit
复制代码 现在AI可以自己决定调用哪个工具了。当你问"今天的日期是什么"时,它会自动调用get_date工具;当你问"合肥的天气怎么样"时,它会自动调用get_weather工具。这才是真正的智能!
小结
通过这篇文章,我们从零开始构建了一个完整的MCP应用,涵盖了从基础的Client-Server通信到集成LLM的全过程。我们学习了:
- 如何搭建MCP开发环境
- 如何创建MCP Server并定义tools、resources和prompts
- 如何编写MCP Client并通过stdio和HTTP两种方式与Server通信
- 如何集成LLM,让AI自主决定调用哪个工具
整个过程就像搭积木一样,每一步都有其特定的作用:
- Server负责提供功能(工具和资源)
- Client负责协调和调用这些功能
- LLM负责智能决策,决定何时以及如何使用这些功能
这种架构的优势在于功能扩展非常灵活。当你需要添加新功能时,只需要在Server端添加新的tools或resources,Client和LLM会自动发现并使用它们,而不需要修改Client端的代码。
MCP真正实现了"上下文协议"的概念,让AI可以像人类一样访问和操作各种工具和资源,这是迈向更强大AI应用的重要一步。接下来你可以尝试添加更多有趣的工具,比如文件操作、数据库查询、API调用等,让你的AI助手变得更加强大!
补充
配置模块
pkg/config.py- import json
- from pathlib import Path
- class Config:
- def __init__(self):
- p = Path(__file__).parent.parent / "conf" / "config.json"
- if not p.exists():
- raise FileNotFoundError(f"Config file not found: {p}")
- self.data = self.read_json(str(p))
- def read_json(self, filepath: str) -> dict:
- with open(filepath, "r") as f:
- return json.load(f)
-
- @property
- def llm_model(self) -> str:
- return self.data["llm"]["model"]
-
- @property
- def llm_api_key(self):
- return self.data["llm"]["api_key"]
-
- @property
- def llm_base_url(self) -> str:
- return self.data["llm"]["base_url"]
-
- @property
- def server_host(self) -> str:
- return self.data["server"]["host"]
-
- @property
- def server_port(self) -> int:
- return self.data["server"]["port"]
-
- cfg = Config()
复制代码 配置文件conf/config.json- {
- "llm": {
- "model": "qwen-plus",
- "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
- "api_key": "your token"
- },
- "server": {
- "host": "127.0.0.1",
- "port": 8000
- }
- }
复制代码 日志模块
pkg/log.py- import logging
- import sys
- def set_formatter():
- """设置formatter"""
- fmt = "%(asctime)s | %(name)s | %(levelname)s | %(filename)s:%(lineno)d | %(funcName)s | %(message)s"
- datefmt = "%Y-%m-%d %H:%M:%S"
- return logging.Formatter(fmt, datefmt=datefmt)
- def set_stream_handler():
- return logging.StreamHandler(sys.stdout)
- def set_file_handler():
- return logging.FileHandler("app.log", mode="a", encoding="utf-8")
- def get_logger(name: str = "mylogger", level=logging.DEBUG):
- logger = logging.getLogger(name)
- formatter = set_formatter()
- # handler = set_stream_handler()
- handler = set_file_handler()
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- logger.setLevel(level)
- return logger
- logger = get_logger()
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |