引言
MCP(Model Context Protocol)是Anthropic公司于2024年11月推出的一种开放协议,旨在标准化大型语言模型(LLM)与外部工具、数据源和计算资源之间的交互方式。随着AI应用的快速发展,MCP已成为连接AI模型与现实世界的关键桥梁。本文将从基础概念出发,系统性地解析MCP的类型分类,涵盖从基础到高级的完整知识体系,并通过实际代码示例帮助读者深入理解。
一、MCP基础概念与核心架构
1.1 什么是MCP?
MCP(Model Context Protocol)是一种客户端-服务器协议,允许LLM通过标准化接口访问外部资源。它解决了传统API调用的碎片化问题,为AI应用提供了一种统一的上下文管理方式。
核心特点:
- 标准化接口:统一的协议规范,减少集成复杂度
- 上下文感知:模型能够理解并利用外部数据和工具
- 安全可控:细粒度的权限管理和数据访问控制
- 可扩展性:支持自定义工具和资源的扩展
1.2 MCP架构组成
MCP采用客户端-服务器架构,包含以下核心组件:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ LLM客户端 │◄──►│ MCP服务器 │◄──►│ 外部资源 │
│ (如Claude、 │ │ (工具/数据源) │ │ (API、数据库等) │
│ Cursor等) │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
关键角色:
- MCP Host:运行LLM的应用程序(如Claude Desktop、Cursor)
- MCP Client:在Host中运行,负责与服务器通信
- MCP Server:提供工具、资源和提示模板的服务器
二、MCP类型基础分类
2.1 按功能类型分类
2.1.1 工具型MCP服务器(Tools)
工具型MCP服务器提供可执行的操作,允许LLM调用外部函数或API。
特点:
- 执行具体操作(如查询数据库、调用API)
- 有明确的输入输出格式
- 通常需要参数验证
示例:天气查询工具
# weather_tool_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import requests
import json
app = Server("weather-tool-server")
@app.list_tools()
async def list_tools():
return [
{
"name": "get_weather",
"description": "获取指定城市的天气信息",
"inputSchema": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称"
},
"unit": {
"type": "string",
"description": "温度单位",
"enum": ["celsius", "fahrenheit"],
"default": "celsius"
}
},
"required": ["city"]
}
}
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "get_weather":
city = arguments["city"]
unit = arguments.get("unit", "celsius")
# 模拟天气API调用
weather_data = {
"city": city,
"temperature": 22 if unit == "celsius" else 72,
"condition": "晴天",
"humidity": 65,
"unit": unit
}
return {
"content": [
{
"type": "text",
"text": json.dumps(weather_data, ensure_ascii=False)
}
]
}
if __name__ == "__main__":
stdio_server(app)
使用场景:
- 数据查询和分析
- 外部服务集成
- 自动化工作流
2.1.2 资源型MCP服务器(Resources)
资源型MCP服务器提供只读的数据源,LLM可以读取但不能修改。
特点:
- 提供静态或动态数据
- 通常有明确的URI标识
- 支持数据订阅和实时更新
示例:文档资源服务器
# document_resource_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
app = Server("document-resource-server")
# 模拟文档数据库
DOCUMENTS = {
"file://docs/api-guide.md": {
"title": "API使用指南",
"content": "# API使用指南\n\n## 快速开始\n1. 获取API密钥\n2. 发送请求\n3. 处理响应",
"metadata": {
"author": "技术团队",
"last_updated": "2024-01-15",
"version": "1.0"
}
},
"file://docs/error-codes.md": {
"title": "错误代码参考",
"content": "# 错误代码\n\n- 400: 请求参数错误\n- 401: 认证失败\n- 500: 服务器内部错误",
"metadata": {
"author": "运维团队",
"last_updated": "2024-01-10",
"version": "2.1"
}
}
}
@app.list_resources()
async def list_resources():
return [
{
"uri": uri,
"name": data["title"],
"description": f"文档: {data['title']}",
"mimeType": "text/markdown"
}
for uri, data in DOCUMENTS.items()
]
@app.read_resource()
async def read_resource(uri: str):
if uri in DOCUMENTS:
return {
"contents": [
{
"uri": uri,
"text": DOCUMENTS[uri]["content"]
}
]
}
else:
raise ValueError(f"Resource not found: {uri}")
if __name__ == "__main__":
stdio_server(app)
使用场景:
- 文档和知识库访问
- 配置文件读取
- 静态数据提供
2.1.3 提示模板型MCP服务器(Prompts)
提示模板型MCP服务器提供预定义的提示模板,帮助LLM生成更准确的响应。
特点:
- 提供结构化的提示模板
- 支持参数化输入
- 可以包含示例和约束
示例:代码审查提示模板
# code_review_prompt_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
app = Server("code-review-prompt-server")
@app.list_prompts()
async def list_prompts():
return [
{
"name": "code_review",
"description": "代码审查提示模板",
"arguments": {
"code": {
"description": "要审查的代码",
"required": True
},
"language": {
"description": "编程语言",
"required": True
},
"focus": {
"description": "审查重点",
"required": False,
"enum": ["security", "performance", "readability", "all"]
}
}
}
]
@app.get_prompt()
async def get_prompt(name: str, arguments: dict):
if name == "code_review":
code = arguments["code"]
language = arguments["language"]
focus = arguments.get("focus", "all")
prompt_template = f"""你是一位专业的代码审查专家。请审查以下{language}代码:
```{language}
{code}
请从以下方面进行审查:
- 代码质量:是否符合最佳实践
- 安全性:是否存在安全漏洞
- 性能:是否有性能优化空间
- 可读性:代码是否易于理解
重点关注:{focus if focus != “all” else “所有方面”}
请提供详细的审查报告,包括:
问题列表(如果有)
改进建议
代码示例(如果需要修改) “””
return { "messages": [ { "role": "user", "content": { "type": "text", "text": prompt_template } } ] }
if name == “main”:
stdio_server(app)
**使用场景:**
- 标准化任务处理
- 复杂任务分解
- 质量控制和一致性保证
### 2.2 按部署方式分类
#### 2.2.1 本地MCP服务器
本地MCP服务器运行在用户本地环境中,适合处理敏感数据或需要低延迟的场景。
**优势:**
- 数据隐私保护
- 低网络延迟
- 完全控制权
**示例:本地文件系统访问**
```python
# local_file_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import os
import json
from pathlib import Path
app = Server("local-file-server")
@app.list_tools()
async def list_tools():
return [
{
"name": "read_file",
"description": "读取本地文件",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件路径"
}
},
"required": ["path"]
}
},
{
"name": "list_directory",
"description": "列出目录内容",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "目录路径"
}
},
"required": ["path"]
}
}
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "read_file":
file_path = arguments["path"]
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
"content": [
{
"type": "text",
"text": f"文件内容:\n{content}"
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"读取文件失败: {str(e)}"
}
]
}
elif name == "list_directory":
dir_path = arguments["path"]
try:
items = []
for item in Path(dir_path).iterdir():
items.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else None
})
return {
"content": [
{
"type": "text",
"text": json.dumps(items, indent=2, ensure_ascii=False)
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"读取目录失败: {str(e)}"
}
]
}
if __name__ == "__main__":
stdio_server(app)
2.2.2 云端MCP服务器
云端MCP服务器部署在云环境中,适合需要高可用性和可扩展性的场景。
优势:
- 高可用性
- 易于扩展
- 集中管理
示例:云端数据库查询服务
# cloud_db_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import asyncpg
import json
app = Server("cloud-database-server")
# 数据库连接配置
DB_CONFIG = {
"host": "your-db-host",
"port": 5432,
"database": "your_db",
"user": "your_user",
"password": "your_password"
}
@app.list_tools()
async def list_tools():
return [
{
"name": "query_database",
"description": "执行SQL查询",
"inputSchema": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL查询语句"
},
"params": {
"type": "array",
"description": "查询参数",
"items": {}
}
},
"required": ["sql"]
}
}
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "query_database":
sql = arguments["sql"]
params = arguments.get("params", [])
try:
conn = await asyncpg.connect(**DB_CONFIG)
rows = await conn.fetch(sql, *params)
await conn.close()
# 转换为JSON格式
result = [dict(row) for row in rows]
return {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2, ensure_ascii=False)
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"数据库查询失败: {str(e)}"
}
]
}
if __name__ == "__main__":
stdio_server(app)
三、MCP高级分类与复杂场景
3.1 复合型MCP服务器
复合型MCP服务器结合了多种功能类型,提供更全面的服务。
示例:企业级综合服务MCP服务器
# enterprise_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import asyncio
from datetime import datetime
app = Server("enterprise-mcp-server")
# 模拟数据存储
EMPLOYEES = {
"E001": {"name": "张三", "department": "技术部", "role": "工程师"},
"E002": {"name": "李四", "department": "市场部", "role": "经理"},
"E003": {"name": "王五", "department": "财务部", "role": "分析师"}
}
PROJECTS = {
"P001": {"name": "AI平台开发", "status": "进行中", "budget": 1000000},
"P002": {"name": "移动应用重构", "status": "已完成", "budget": 500000}
}
# 1. 工具函数
@app.list_tools()
async def list_tools():
return [
{
"name": "get_employee_info",
"description": "获取员工信息",
"inputSchema": {
"type": "object",
"properties": {
"employee_id": {
"type": "string",
"description": "员工ID"
}
},
"required": ["employee_id"]
}
},
{
"name": "get_project_status",
"description": "获取项目状态",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "项目ID"
}
},
"required": ["project_id"]
}
},
{
"name": "generate_report",
"description": "生成报告",
"inputSchema": {
"type": "object",
"properties": {
"report_type": {
"type": "string",
"description": "报告类型",
"enum": ["employee", "project", "summary"]
},
"period": {
"type": "string",
"description": "时间周期"
}
},
"required": ["report_type"]
}
}
]
# 2. 资源函数
@app.list_resources()
async def list_resources():
return [
{
"uri": "file://company_policy.md",
"name": "公司政策",
"description": "公司规章制度文档",
"mimeType": "text/markdown"
},
{
"uri": "file://org_chart.json",
"name": "组织架构",
"description": "公司组织架构图",
"mimeType": "application/json"
}
]
# 3. 提示模板函数
@app.list_prompts()
async def list_prompts():
return [
{
"name": "business_analysis",
"description": "业务分析提示模板",
"arguments": {
"data": {
"description": "业务数据",
"required": True
},
"focus": {
"description": "分析重点",
"required": False
}
}
}
]
# 工具实现
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "get_employee_info":
emp_id = arguments["employee_id"]
if emp_id in EMPLOYEES:
return {
"content": [
{
"type": "text",
"text": json.dumps(EMPLOYEES[emp_id], ensure_ascii=False)
}
]
}
else:
return {
"content": [
{
"type": "text",
"text": f"员工 {emp_id} 不存在"
}
]
}
elif name == "get_project_status":
proj_id = arguments["project_id"]
if proj_id in PROJECTS:
return {
"content": [
{
"type": "text",
"text": json.dumps(PROJECTS[proj_id], ensure_ascii=False)
}
]
}
else:
return {
"content": [
{
"type": "text",
"text": f"项目 {proj_id} 不存在"
}
]
}
elif name == "generate_report":
report_type = arguments["report_type"]
period = arguments.get("period", "当前季度")
if report_type == "employee":
report = f"员工报告 - {period}\n"
for emp_id, info in EMPLOYEES.items():
report += f"- {info['name']} ({info['department']}) - {info['role']}\n"
elif report_type == "project":
report = f"项目报告 - {period}\n"
for proj_id, info in PROJECTS.items():
report += f"- {info['name']}: {info['status']} (预算: ¥{info['budget']:,})\n"
else: # summary
total_employees = len(EMPLOYEES)
total_projects = len(PROJECTS)
total_budget = sum(p['budget'] for p in PROJECTS.values())
report = f"公司概览 - {period}\n"
report += f"员工总数: {total_employees}\n"
report += f"项目总数: {total_projects}\n"
report += f"总预算: ¥{total_budget:,}\n"
return {
"content": [
{
"type": "text",
"text": report
}
]
}
# 资源实现
@app.read_resource()
async def read_resource(uri: str):
if uri == "file://company_policy.md":
return {
"contents": [
{
"uri": uri,
"text": "# 公司政策\n\n## 工作时间\n- 周一至周五: 9:00-18:00\n- 午休时间: 12:00-13:00\n\n## 休假制度\n- 年假: 15天\n- 病假: 10天"
}
]
}
elif uri == "file://org_chart.json":
org_data = {
"CEO": {
"CTO": ["技术部", "研发部"],
"CMO": ["市场部", "销售部"],
"CFO": ["财务部", "人力资源部"]
}
}
return {
"contents": [
{
"uri": uri,
"text": json.dumps(org_data, indent=2, ensure_ascii=False)
}
]
}
else:
raise ValueError(f"Resource not found: {uri}")
# 提示模板实现
@app.get_prompt()
async def get_prompt(name: str, arguments: dict):
if name == "business_analysis":
data = arguments["data"]
focus = arguments.get("focus", "全面分析")
prompt = f"""作为业务分析师,请分析以下数据:
数据: {data}
分析重点: {focus}
请提供:
1. 关键发现
2. 问题识别
3. 改进建议
4. 行动计划
"""
return {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": prompt
}
}
]
}
if __name__ == "__main__":
stdio_server(app)
3.2 流式MCP服务器
流式MCP服务器支持实时数据流和持续更新,适合需要实时反馈的场景。
示例:实时监控MCP服务器
# streaming_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import asyncio
import json
import random
from datetime import datetime
app = Server("streaming-monitor-server")
# 模拟实时数据流
async def generate_metrics():
"""生成模拟的监控指标"""
while True:
metrics = {
"timestamp": datetime.now().isoformat(),
"cpu_usage": random.uniform(0, 100),
"memory_usage": random.uniform(0, 100),
"disk_usage": random.uniform(0, 100),
"network_throughput": random.uniform(0, 1000),
"active_users": random.randint(100, 1000)
}
yield metrics
await asyncio.sleep(1) # 每秒生成一次数据
@app.list_tools()
async def list_tools():
return [
{
"name": "start_monitoring",
"description": "启动实时监控",
"inputSchema": {
"type": "object",
"properties": {
"metrics": {
"type": "array",
"description": "要监控的指标",
"items": {
"type": "string",
"enum": ["cpu", "memory", "disk", "network", "users"]
}
},
"interval": {
"type": "number",
"description": "更新间隔(秒)",
"default": 1
}
},
"required": ["metrics"]
}
}
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "start_monitoring":
metrics_to_monitor = arguments["metrics"]
interval = arguments.get("interval", 1)
async def stream_metrics():
async for metric_data in generate_metrics():
# 过滤需要的指标
filtered_data = {k: v for k, v in metric_data.items()
if k in metrics_to_monitor or k == "timestamp"}
yield {
"type": "text",
"text": json.dumps(filtered_data, ensure_ascii=False)
}
return {
"content": stream_metrics(),
"is_streaming": True
}
if __name__ == "__main__":
stdio_server(app)
3.3 分布式MCP服务器集群
分布式MCP服务器集群通过多个服务器协同工作,提供高可用性和负载均衡。
架构示例:
# distributed_mcp_cluster.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import asyncio
import aiohttp
import json
class DistributedMCPNode:
"""分布式MCP节点"""
def __init__(self, node_id, role):
self.node_id = node_id
self.role = role # 'worker', 'coordinator', 'cache'
self.peers = []
async def sync_with_peers(self):
"""与对等节点同步"""
pass
class LoadBalancer:
"""负载均衡器"""
def __init__(self):
self.nodes = []
self.current_index = 0
def add_node(self, node):
self.nodes.append(node)
def get_next_node(self):
if not self.nodes:
return None
node = self.nodes[self.current_index % len(self.nodes)]
self.current_index += 1
return node
# 主服务器
app = Server("distributed-mcp-cluster")
# 负载均衡器实例
lb = LoadBalancer()
@app.list_tools()
async def list_tools():
return [
{
"name": "distributed_query",
"description": "分布式查询",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "查询内容"
},
"strategy": {
"type": "string",
"description": "分发策略",
"enum": ["round_robin", "least_connections", "random"],
"default": "round_robin"
}
},
"required": ["query"]
}
}
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "distributed_query":
query = arguments["query"]
strategy = arguments.get("strategy", "round_robin")
# 根据策略选择节点
if strategy == "round_robin":
node = lb.get_next_node()
elif strategy == "random":
import random
node = random.choice(lb.nodes) if lb.nodes else None
else:
node = lb.get_next_node()
if node:
# 模拟向节点发送请求
result = await simulate_node_query(node, query)
return {
"content": [
{
"type": "text",
"text": f"节点 {node.node_id} 响应: {result}"
}
]
}
else:
return {
"content": [
{
"type": "text",
"text": "没有可用的节点"
}
]
}
async def simulate_node_query(node, query):
"""模拟节点查询"""
await asyncio.sleep(0.1) # 模拟网络延迟
return f"处理查询 '{query}' - 节点角色: {node.role}"
if __name__ == "__main__":
# 添加模拟节点
lb.add_node(DistributedMCPNode("node-1", "worker"))
lb.add_node(DistributedMCPNode("node-2", "worker"))
lb.add_node(DistributedMCPNode("node-3", "cache"))
stdio_server(app)
四、MCP安全与权限管理
4.1 认证与授权
MCP支持多种认证机制,确保只有授权用户才能访问资源。
示例:基于令牌的认证MCP服务器
# auth_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import hashlib
import secrets
import json
from datetime import datetime, timedelta
app = Server("auth-mcp-server")
# 模拟用户数据库
USERS = {
"admin": {
"password_hash": hashlib.sha256("admin123".encode()).hexdigest(),
"role": "admin",
"permissions": ["read", "write", "delete", "admin"]
},
"user": {
"password_hash": hashlib.sha256("user123".encode()).hexdigest(),
"role": "user",
"permissions": ["read"]
}
}
# 令牌存储
TOKENS = {}
def generate_token(user_id, expires_in=3600):
"""生成访问令牌"""
token = secrets.token_urlsafe(32)
expiry = datetime.now() + timedelta(seconds=expires_in)
TOKENS[token] = {
"user_id": user_id,
"expiry": expiry,
"permissions": USERS[user_id]["permissions"]
}
return token
def verify_token(token):
"""验证令牌"""
if token not in TOKENS:
return None
token_data = TOKENS[token]
if datetime.now() > token_data["expiry"]:
del TOKENS[token]
return None
return token_data
def check_permission(token_data, required_permission):
"""检查权限"""
if not token_data:
return False
return required_permission in token_data["permissions"]
@app.list_tools()
async def list_tools():
return [
{
"name": "login",
"description": "用户登录",
"inputSchema": {
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "用户名"
},
"password": {
"type": "string",
"description": "密码"
}
},
"required": ["username", "password"]
}
},
{
"name": "protected_operation",
"description": "受保护的操作",
"inputSchema": {
"type": "object",
"properties": {
"token": {
"type": "string",
"description": "访问令牌"
},
"operation": {
"type": "string",
"description": "操作类型",
"enum": ["read_data", "write_data", "delete_data", "admin_action"]
},
"data": {
"type": "string",
"description": "操作数据"
}
},
"required": ["token", "operation"]
}
}
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "login":
username = arguments["username"]
password = arguments["password"]
if username in USERS:
password_hash = hashlib.sha256(password.encode()).hexdigest()
if USERS[username]["password_hash"] == password_hash:
token = generate_token(username)
return {
"content": [
{
"type": "text",
"text": json.dumps({
"status": "success",
"token": token,
"expires_in": 3600
}, ensure_ascii=False)
}
]
}
return {
"content": [
{
"type": "text",
"text": json.dumps({
"status": "error",
"message": "认证失败"
}, ensure_ascii=False)
}
]
}
elif name == "protected_operation":
token = arguments["token"]
operation = arguments["operation"]
data = arguments.get("data", "")
# 验证令牌
token_data = verify_token(token)
if not token_data:
return {
"content": [
{
"type": "text",
"text": json.dumps({
"status": "error",
"message": "无效或过期的令牌"
}, ensure_ascii=False)
}
]
}
# 检查权限
required_permission = operation.replace("_", "")
if not check_permission(token_data, required_permission):
return {
"content": [
{
"type": "text",
"text": json.dumps({
"status": "error",
"message": f"权限不足,需要 {required_permission} 权限"
}, ensure_ascii=False)
}
]
}
# 执行操作
result = f"执行 {operation} 操作,数据: {data},用户: {token_data['user_id']}"
return {
"content": [
{
"type": "text",
"text": json.dumps({
"status": "success",
"result": result
}, ensure_ascii=False)
}
]
}
if __name__ == "__main__":
stdio_server(app)
4.2 数据隔离与多租户
多租户MCP服务器为不同用户提供数据隔离。
示例:多租户数据库服务
# multi_tenant_db_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import asyncio
app = Server("multi-tenant-db-server")
# 模拟多租户数据存储
TENANT_DATA = {
"tenant_a": {
"users": [
{"id": 1, "name": "Alice", "email": "alice@tenant-a.com"},
{"id": 2, "name": "Bob", "email": "bob@tenant-a.com"}
],
"products": [
{"id": 101, "name": "Product A", "price": 99.99},
{"id": 102, "name": "Product B", "price": 149.99}
]
},
"tenant_b": {
"users": [
{"id": 1, "name": "Charlie", "email": "charlie@tenant-b.com"},
{"id": 2, "name": "Diana", "email": "diana@tenant-b.com"}
],
"products": [
{"id": 201, "name": "Product X", "price": 199.99},
{"id": 202, "name": "Product Y", "price": 249.99}
]
}
}
@app.list_tools()
async def list_tools():
return [
{
"name": "query_tenant_data",
"description": "查询租户数据",
"inputSchema": {
"type": "object",
"properties": {
"tenant_id": {
"type": "string",
"description": "租户ID"
},
"resource": {
"type": "string",
"description": "资源类型",
"enum": ["users", "products"]
},
"filter": {
"type": "object",
"description": "过滤条件"
}
},
"required": ["tenant_id", "resource"]
}
}
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "query_tenant_data":
tenant_id = arguments["tenant_id"]
resource = arguments["resource"]
filter_condition = arguments.get("filter", {})
if tenant_id not in TENANT_DATA:
return {
"content": [
{
"type": "text",
"text": json.dumps({
"status": "error",
"message": f"租户 {tenant_id} 不存在"
}, ensure_ascii=False)
}
]
}
# 获取租户数据
data = TENANT_DATA[tenant_id].get(resource, [])
# 应用过滤条件
if filter_condition:
filtered_data = []
for item in data:
match = True
for key, value in filter_condition.items():
if item.get(key) != value:
match = False
break
if match:
filtered_data.append(item)
data = filtered_data
return {
"content": [
{
"type": "text",
"text": json.dumps({
"tenant_id": tenant_id,
"resource": resource,
"count": len(data),
"data": data
}, ensure_ascii=False)
}
]
}
if __name__ == "__main__":
stdio_server(app)
五、MCP最佳实践与优化策略
5.1 性能优化
5.1.1 缓存策略
# caching_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import time
from functools import lru_cache
app = Server("caching-mcp-server")
# 内存缓存
cache = {}
CACHE_TTL = 300 # 5分钟
def get_cached_data(key):
"""获取缓存数据"""
if key in cache:
cached_time, data = cache[key]
if time.time() - cached_time < CACHE_TTL:
return data
else:
del cache[key]
return None
def set_cached_data(key, data):
"""设置缓存数据"""
cache[key] = (time.time(), data)
@app.list_tools()
async def list_tools():
return [
{
"name": "get_data_with_cache",
"description": "带缓存的数据获取",
"inputSchema": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "数据键"
},
"force_refresh": {
"type": "boolean",
"description": "强制刷新缓存",
"default": False
}
},
"required": ["key"]
}
}
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "get_data_with_cache":
key = arguments["key"]
force_refresh = arguments.get("force_refresh", False)
if not force_refresh:
cached = get_cached_data(key)
if cached is not None:
return {
"content": [
{
"type": "text",
"text": json.dumps({
"source": "cache",
"data": cached
}, ensure_ascii=False)
}
]
}
# 模拟数据获取
await asyncio.sleep(0.5) # 模拟延迟
data = f"数据 {key} - 时间: {time.time()}"
# 缓存数据
set_cached_data(key, data)
return {
"content": [
{
"type": "text",
"text": json.dumps({
"source": "fresh",
"data": data
}, ensure_ascii=False)
}
]
}
if __name__ == "__main__":
stdio_server(app)
5.1.2 批量处理
# batch_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import asyncio
app = Server("batch-mcp-server")
@app.list_tools()
async def list_tools():
return [
{
"name": "batch_process",
"description": "批量处理任务",
"inputSchema": {
"type": "object",
"properties": {
"tasks": {
"type": "array",
"description": "任务列表",
"items": {
"type": "object",
"properties": {
"type": {
"type": "string",
"description": "任务类型"
},
"data": {
"type": "object",
"description": "任务数据"
}
}
}
},
"concurrency": {
"type": "number",
"description": "并发数",
"default": 3
}
},
"required": ["tasks"]
}
}
]
async def process_task(task):
"""处理单个任务"""
await asyncio.sleep(0.1) # 模拟处理时间
return {
"task_id": task.get("id", "unknown"),
"result": f"处理完成: {task.get('type', 'unknown')}",
"status": "success"
}
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "batch_process":
tasks = arguments["tasks"]
concurrency = arguments.get("concurrency", 3)
# 使用信号量控制并发
semaphore = asyncio.Semaphore(concurrency)
async def process_with_semaphore(task):
async with semaphore:
return await process_task(task)
# 并发处理所有任务
results = await asyncio.gather(
*[process_with_semaphore(task) for task in tasks],
return_exceptions=True
)
return {
"content": [
{
"type": "text",
"text": json.dumps({
"total_tasks": len(tasks),
"successful": len([r for r in results if not isinstance(r, Exception)]),
"results": results
}, ensure_ascii=False)
}
]
}
if __name__ == "__main__":
stdio_server(app)
5.2 错误处理与重试机制
# resilient_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import asyncio
import random
app = Server("resilient-mcp-server")
class RetryableError(Exception):
"""可重试的错误"""
pass
class NonRetryableError(Exception):
"""不可重试的错误"""
pass
async def unreliable_operation():
"""模拟不可靠的操作"""
if random.random() < 0.3: # 30%失败率
raise RetryableError("临时故障")
elif random.random() < 0.1: # 10%永久失败
raise NonRetryableError("永久故障")
return "操作成功"
async def retry_operation(max_retries=3, delay=1):
"""带重试的操作"""
for attempt in range(max_retries):
try:
result = await unreliable_operation()
return result
except RetryableError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(delay * (2 ** attempt)) # 指数退避
except NonRetryableError:
raise
@app.list_tools()
async def list_tools():
return [
{
"name": "resilient_operation",
"description": "弹性操作",
"inputSchema": {
"type": "object",
"properties": {
"max_retries": {
"type": "number",
"description": "最大重试次数",
"default": 3
},
"retry_delay": {
"type": "number",
"description": "重试延迟(秒)",
"default": 1
}
}
}
}
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "resilient_operation":
max_retries = arguments.get("max_retries", 3)
retry_delay = arguments.get("retry_delay", 1)
try:
result = await retry_operation(max_retries, retry_delay)
return {
"content": [
{
"type": "text",
"text": json.dumps({
"status": "success",
"result": result
}, ensure_ascii=False)
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": json.dumps({
"status": "error",
"message": str(e)
}, ensure_ascii=False)
}
]
}
if __name__ == "__main__":
stdio_server(app)
六、MCP生态系统与工具
6.1 官方MCP服务器
Anthropic提供了多个官方MCP服务器,涵盖常见使用场景:
- 文件系统MCP服务器:提供文件读写操作
- GitHub MCP服务器:集成GitHub API
- Slack MCP服务器:集成Slack消息
- PostgreSQL MCP服务器:数据库查询
- Google Drive MCP服务器:云存储访问
6.2 社区MCP服务器
社区贡献的MCP服务器扩展了MCP的功能范围:
- Jira MCP服务器:项目管理工具集成
- Notion MCP服务器:笔记和文档管理
- Stripe MCP服务器:支付处理
- Twilio MCP服务器:短信和语音服务
- AWS MCP服务器:云服务集成
6.3 开发工具
6.3.1 MCP Inspector
MCP Inspector是官方提供的调试工具,用于测试MCP服务器。
# 安装MCP Inspector
npm install -g @modelcontextprotocol/inspector
# 启动Inspector
mcp-inspector
6.3.2 MCP CLI工具
# 安装MCP CLI
pip install mcp-cli
# 查看服务器信息
mcp-cli list
# 测试工具调用
mcp-cli call <server-name> <tool-name> <arguments>
七、MCP未来发展趋势
7.1 标准化演进
MCP协议正在向更标准化的方向发展:
- 协议版本管理:支持多版本兼容
- 扩展机制:允许自定义扩展
- 认证标准:OAuth 2.0、JWT等标准集成
7.2 与新兴技术集成
- Web3集成:区块链和智能合约
- 边缘计算:分布式边缘节点
- 量子计算:量子算法接口
7.3 AI原生应用
MCP将成为AI原生应用的核心基础设施:
- 自主AI代理:支持复杂任务规划
- 多模态交互:文本、图像、语音统一接口
- 持续学习:动态工具发现和学习
八、总结
MCP作为一种新兴的AI集成协议,正在快速演进和完善。从基础的工具、资源、提示模板分类,到高级的流式、分布式、多租户架构,MCP为AI应用提供了强大的扩展能力。通过本文的详细解析和代码示例,读者可以:
- 理解MCP的核心概念:掌握协议的基本原理和架构
- 掌握分类方法:从多个维度理解MCP服务器的类型
- 实践开发技能:通过代码示例学习实际开发
- 了解最佳实践:掌握性能优化和安全策略
- 展望未来趋势:了解MCP的发展方向
随着AI技术的不断发展,MCP将在连接AI模型与现实世界中发挥越来越重要的作用。建议开发者从简单的MCP服务器开始实践,逐步探索更复杂的应用场景,为构建下一代AI应用奠定基础。
参考资源:
- MCP官方文档:https://modelcontextprotocol.io
- MCP GitHub仓库:https://github.com/modelcontextprotocol
- Anthropic开发者博客:https://www.anthropic.com/developers
学习建议:
- 从官方示例开始,理解基础概念
- 尝试构建简单的MCP服务器
- 参与社区讨论,了解最新进展
- 关注安全最佳实践
- 探索与现有系统的集成方案
