引言

MCP(Model Context Protocol)是Anthropic公司于2024年11月推出的一种开放协议,旨在标准化大型语言模型(LLM)与外部工具、数据源和计算资源之间的交互方式。随着AI应用的快速发展,MCP已成为连接AI模型与现实世界的关键桥梁。本文将从基础概念出发,系统性地解析MCP的类型分类,涵盖从基础到高级的完整知识体系,并通过实际代码示例帮助读者深入理解。

一、MCP基础概念与核心架构

1.1 什么是MCP?

MCP(Model Context Protocol)是一种客户端-服务器协议,允许LLM通过标准化接口访问外部资源。它解决了传统API调用的碎片化问题,为AI应用提供了一种统一的上下文管理方式。

核心特点:

  • 标准化接口:统一的协议规范,减少集成复杂度
  • 上下文感知:模型能够理解并利用外部数据和工具
  • 安全可控:细粒度的权限管理和数据访问控制
  • 可扩展性:支持自定义工具和资源的扩展

1.2 MCP架构组成

MCP采用客户端-服务器架构,包含以下核心组件:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   LLM客户端     │◄──►│   MCP服务器     │◄──►│   外部资源      │
│ (如Claude、     │    │ (工具/数据源)   │    │ (API、数据库等) │
│  Cursor等)      │    │                 │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘

关键角色:

  • MCP Host:运行LLM的应用程序(如Claude Desktop、Cursor)
  • MCP Client:在Host中运行,负责与服务器通信
  • MCP Server:提供工具、资源和提示模板的服务器

二、MCP类型基础分类

2.1 按功能类型分类

2.1.1 工具型MCP服务器(Tools)

工具型MCP服务器提供可执行的操作,允许LLM调用外部函数或API。

特点:

  • 执行具体操作(如查询数据库、调用API)
  • 有明确的输入输出格式
  • 通常需要参数验证

示例:天气查询工具

# weather_tool_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import requests
import json

app = Server("weather-tool-server")

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "get_weather",
            "description": "获取指定城市的天气信息",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称"
                    },
                    "unit": {
                        "type": "string",
                        "description": "温度单位",
                        "enum": ["celsius", "fahrenheit"],
                        "default": "celsius"
                    }
                },
                "required": ["city"]
            }
        }
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "get_weather":
        city = arguments["city"]
        unit = arguments.get("unit", "celsius")
        
        # 模拟天气API调用
        weather_data = {
            "city": city,
            "temperature": 22 if unit == "celsius" else 72,
            "condition": "晴天",
            "humidity": 65,
            "unit": unit
        }
        
        return {
            "content": [
                {
                    "type": "text",
                    "text": json.dumps(weather_data, ensure_ascii=False)
                }
            ]
        }

if __name__ == "__main__":
    stdio_server(app)

使用场景:

  • 数据查询和分析
  • 外部服务集成
  • 自动化工作流

2.1.2 资源型MCP服务器(Resources)

资源型MCP服务器提供只读的数据源,LLM可以读取但不能修改。

特点:

  • 提供静态或动态数据
  • 通常有明确的URI标识
  • 支持数据订阅和实时更新

示例:文档资源服务器

# document_resource_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json

app = Server("document-resource-server")

# 模拟文档数据库
DOCUMENTS = {
    "file://docs/api-guide.md": {
        "title": "API使用指南",
        "content": "# API使用指南\n\n## 快速开始\n1. 获取API密钥\n2. 发送请求\n3. 处理响应",
        "metadata": {
            "author": "技术团队",
            "last_updated": "2024-01-15",
            "version": "1.0"
        }
    },
    "file://docs/error-codes.md": {
        "title": "错误代码参考",
        "content": "# 错误代码\n\n- 400: 请求参数错误\n- 401: 认证失败\n- 500: 服务器内部错误",
        "metadata": {
            "author": "运维团队",
            "last_updated": "2024-01-10",
            "version": "2.1"
        }
    }
}

@app.list_resources()
async def list_resources():
    return [
        {
            "uri": uri,
            "name": data["title"],
            "description": f"文档: {data['title']}",
            "mimeType": "text/markdown"
        }
        for uri, data in DOCUMENTS.items()
    ]

@app.read_resource()
async def read_resource(uri: str):
    if uri in DOCUMENTS:
        return {
            "contents": [
                {
                    "uri": uri,
                    "text": DOCUMENTS[uri]["content"]
                }
            ]
        }
    else:
        raise ValueError(f"Resource not found: {uri}")

if __name__ == "__main__":
    stdio_server(app)

使用场景:

  • 文档和知识库访问
  • 配置文件读取
  • 静态数据提供

2.1.3 提示模板型MCP服务器(Prompts)

提示模板型MCP服务器提供预定义的提示模板,帮助LLM生成更准确的响应。

特点:

  • 提供结构化的提示模板
  • 支持参数化输入
  • 可以包含示例和约束

示例:代码审查提示模板

# code_review_prompt_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json

app = Server("code-review-prompt-server")

@app.list_prompts()
async def list_prompts():
    return [
        {
            "name": "code_review",
            "description": "代码审查提示模板",
            "arguments": {
                "code": {
                    "description": "要审查的代码",
                    "required": True
                },
                "language": {
                    "description": "编程语言",
                    "required": True
                },
                "focus": {
                    "description": "审查重点",
                    "required": False,
                    "enum": ["security", "performance", "readability", "all"]
                }
            }
        }
    ]

@app.get_prompt()
async def get_prompt(name: str, arguments: dict):
    if name == "code_review":
        code = arguments["code"]
        language = arguments["language"]
        focus = arguments.get("focus", "all")
        
        prompt_template = f"""你是一位专业的代码审查专家。请审查以下{language}代码:

```{language}
{code}

请从以下方面进行审查:

  1. 代码质量:是否符合最佳实践
  2. 安全性:是否存在安全漏洞
  3. 性能:是否有性能优化空间
  4. 可读性:代码是否易于理解

重点关注:{focus if focus != “all” else “所有方面”}

请提供详细的审查报告,包括:

  • 问题列表(如果有)

  • 改进建议

  • 代码示例(如果需要修改) “””

    return {
        "messages": [
            {
                "role": "user",
                "content": {
                    "type": "text",
                    "text": prompt_template
                }
            }
        ]
    }
    

if name == “main”:

stdio_server(app)

**使用场景:**
- 标准化任务处理
- 复杂任务分解
- 质量控制和一致性保证

### 2.2 按部署方式分类

#### 2.2.1 本地MCP服务器

本地MCP服务器运行在用户本地环境中,适合处理敏感数据或需要低延迟的场景。

**优势:**
- 数据隐私保护
- 低网络延迟
- 完全控制权

**示例:本地文件系统访问**

```python
# local_file_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import os
import json
from pathlib import Path

app = Server("local-file-server")

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "read_file",
            "description": "读取本地文件",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "文件路径"
                    }
                },
                "required": ["path"]
            }
        },
        {
            "name": "list_directory",
            "description": "列出目录内容",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "目录路径"
                    }
                },
                "required": ["path"]
            }
        }
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "read_file":
        file_path = arguments["path"]
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            return {
                "content": [
                    {
                        "type": "text",
                        "text": f"文件内容:\n{content}"
                    }
                ]
            }
        except Exception as e:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": f"读取文件失败: {str(e)}"
                    }
                ]
            }
    
    elif name == "list_directory":
        dir_path = arguments["path"]
        try:
            items = []
            for item in Path(dir_path).iterdir():
                items.append({
                    "name": item.name,
                    "type": "directory" if item.is_dir() else "file",
                    "size": item.stat().st_size if item.is_file() else None
                })
            return {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps(items, indent=2, ensure_ascii=False)
                    }
                ]
            }
        except Exception as e:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": f"读取目录失败: {str(e)}"
                    }
                ]
            }

if __name__ == "__main__":
    stdio_server(app)

2.2.2 云端MCP服务器

云端MCP服务器部署在云环境中,适合需要高可用性和可扩展性的场景。

优势:

  • 高可用性
  • 易于扩展
  • 集中管理

示例:云端数据库查询服务

# cloud_db_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import asyncpg
import json

app = Server("cloud-database-server")

# 数据库连接配置
DB_CONFIG = {
    "host": "your-db-host",
    "port": 5432,
    "database": "your_db",
    "user": "your_user",
    "password": "your_password"
}

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "query_database",
            "description": "执行SQL查询",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "SQL查询语句"
                    },
                    "params": {
                        "type": "array",
                        "description": "查询参数",
                        "items": {}
                    }
                },
                "required": ["sql"]
            }
        }
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "query_database":
        sql = arguments["sql"]
        params = arguments.get("params", [])
        
        try:
            conn = await asyncpg.connect(**DB_CONFIG)
            rows = await conn.fetch(sql, *params)
            await conn.close()
            
            # 转换为JSON格式
            result = [dict(row) for row in rows]
            
            return {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps(result, indent=2, ensure_ascii=False)
                    }
                ]
            }
        except Exception as e:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": f"数据库查询失败: {str(e)}"
                    }
                ]
            }

if __name__ == "__main__":
    stdio_server(app)

三、MCP高级分类与复杂场景

3.1 复合型MCP服务器

复合型MCP服务器结合了多种功能类型,提供更全面的服务。

示例:企业级综合服务MCP服务器

# enterprise_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import asyncio
from datetime import datetime

app = Server("enterprise-mcp-server")

# 模拟数据存储
EMPLOYEES = {
    "E001": {"name": "张三", "department": "技术部", "role": "工程师"},
    "E002": {"name": "李四", "department": "市场部", "role": "经理"},
    "E003": {"name": "王五", "department": "财务部", "role": "分析师"}
}

PROJECTS = {
    "P001": {"name": "AI平台开发", "status": "进行中", "budget": 1000000},
    "P002": {"name": "移动应用重构", "status": "已完成", "budget": 500000}
}

# 1. 工具函数
@app.list_tools()
async def list_tools():
    return [
        {
            "name": "get_employee_info",
            "description": "获取员工信息",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "employee_id": {
                        "type": "string",
                        "description": "员工ID"
                    }
                },
                "required": ["employee_id"]
            }
        },
        {
            "name": "get_project_status",
            "description": "获取项目状态",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "project_id": {
                        "type": "string",
                        "description": "项目ID"
                    }
                },
                "required": ["project_id"]
            }
        },
        {
            "name": "generate_report",
            "description": "生成报告",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "report_type": {
                        "type": "string",
                        "description": "报告类型",
                        "enum": ["employee", "project", "summary"]
                    },
                    "period": {
                        "type": "string",
                        "description": "时间周期"
                    }
                },
                "required": ["report_type"]
            }
        }
    ]

# 2. 资源函数
@app.list_resources()
async def list_resources():
    return [
        {
            "uri": "file://company_policy.md",
            "name": "公司政策",
            "description": "公司规章制度文档",
            "mimeType": "text/markdown"
        },
        {
            "uri": "file://org_chart.json",
            "name": "组织架构",
            "description": "公司组织架构图",
            "mimeType": "application/json"
        }
    ]

# 3. 提示模板函数
@app.list_prompts()
async def list_prompts():
    return [
        {
            "name": "business_analysis",
            "description": "业务分析提示模板",
            "arguments": {
                "data": {
                    "description": "业务数据",
                    "required": True
                },
                "focus": {
                    "description": "分析重点",
                    "required": False
                }
            }
        }
    ]

# 工具实现
@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "get_employee_info":
        emp_id = arguments["employee_id"]
        if emp_id in EMPLOYEES:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps(EMPLOYEES[emp_id], ensure_ascii=False)
                    }
                ]
            }
        else:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": f"员工 {emp_id} 不存在"
                    }
                ]
            }
    
    elif name == "get_project_status":
        proj_id = arguments["project_id"]
        if proj_id in PROJECTS:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps(PROJECTS[proj_id], ensure_ascii=False)
                    }
                ]
            }
        else:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": f"项目 {proj_id} 不存在"
                    }
                ]
            }
    
    elif name == "generate_report":
        report_type = arguments["report_type"]
        period = arguments.get("period", "当前季度")
        
        if report_type == "employee":
            report = f"员工报告 - {period}\n"
            for emp_id, info in EMPLOYEES.items():
                report += f"- {info['name']} ({info['department']}) - {info['role']}\n"
        
        elif report_type == "project":
            report = f"项目报告 - {period}\n"
            for proj_id, info in PROJECTS.items():
                report += f"- {info['name']}: {info['status']} (预算: ¥{info['budget']:,})\n"
        
        else:  # summary
            total_employees = len(EMPLOYEES)
            total_projects = len(PROJECTS)
            total_budget = sum(p['budget'] for p in PROJECTS.values())
            report = f"公司概览 - {period}\n"
            report += f"员工总数: {total_employees}\n"
            report += f"项目总数: {total_projects}\n"
            report += f"总预算: ¥{total_budget:,}\n"
        
        return {
            "content": [
                {
                    "type": "text",
                    "text": report
                }
            ]
        }

# 资源实现
@app.read_resource()
async def read_resource(uri: str):
    if uri == "file://company_policy.md":
        return {
            "contents": [
                {
                    "uri": uri,
                    "text": "# 公司政策\n\n## 工作时间\n- 周一至周五: 9:00-18:00\n- 午休时间: 12:00-13:00\n\n## 休假制度\n- 年假: 15天\n- 病假: 10天"
                }
            ]
        }
    elif uri == "file://org_chart.json":
        org_data = {
            "CEO": {
                "CTO": ["技术部", "研发部"],
                "CMO": ["市场部", "销售部"],
                "CFO": ["财务部", "人力资源部"]
            }
        }
        return {
            "contents": [
                {
                    "uri": uri,
                    "text": json.dumps(org_data, indent=2, ensure_ascii=False)
                }
            ]
        }
    else:
        raise ValueError(f"Resource not found: {uri}")

# 提示模板实现
@app.get_prompt()
async def get_prompt(name: str, arguments: dict):
    if name == "business_analysis":
        data = arguments["data"]
        focus = arguments.get("focus", "全面分析")
        
        prompt = f"""作为业务分析师,请分析以下数据:

数据: {data}

分析重点: {focus}

请提供:
1. 关键发现
2. 问题识别
3. 改进建议
4. 行动计划
"""
        
        return {
            "messages": [
                {
                    "role": "user",
                    "content": {
                        "type": "text",
                        "text": prompt
                    }
                }
            ]
        }

if __name__ == "__main__":
    stdio_server(app)

3.2 流式MCP服务器

流式MCP服务器支持实时数据流和持续更新,适合需要实时反馈的场景。

示例:实时监控MCP服务器

# streaming_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import asyncio
import json
import random
from datetime import datetime

app = Server("streaming-monitor-server")

# 模拟实时数据流
async def generate_metrics():
    """生成模拟的监控指标"""
    while True:
        metrics = {
            "timestamp": datetime.now().isoformat(),
            "cpu_usage": random.uniform(0, 100),
            "memory_usage": random.uniform(0, 100),
            "disk_usage": random.uniform(0, 100),
            "network_throughput": random.uniform(0, 1000),
            "active_users": random.randint(100, 1000)
        }
        yield metrics
        await asyncio.sleep(1)  # 每秒生成一次数据

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "start_monitoring",
            "description": "启动实时监控",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "metrics": {
                        "type": "array",
                        "description": "要监控的指标",
                        "items": {
                            "type": "string",
                            "enum": ["cpu", "memory", "disk", "network", "users"]
                        }
                    },
                    "interval": {
                        "type": "number",
                        "description": "更新间隔(秒)",
                        "default": 1
                    }
                },
                "required": ["metrics"]
            }
        }
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "start_monitoring":
        metrics_to_monitor = arguments["metrics"]
        interval = arguments.get("interval", 1)
        
        async def stream_metrics():
            async for metric_data in generate_metrics():
                # 过滤需要的指标
                filtered_data = {k: v for k, v in metric_data.items() 
                               if k in metrics_to_monitor or k == "timestamp"}
                
                yield {
                    "type": "text",
                    "text": json.dumps(filtered_data, ensure_ascii=False)
                }
        
        return {
            "content": stream_metrics(),
            "is_streaming": True
        }

if __name__ == "__main__":
    stdio_server(app)

3.3 分布式MCP服务器集群

分布式MCP服务器集群通过多个服务器协同工作,提供高可用性和负载均衡。

架构示例:

# distributed_mcp_cluster.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import asyncio
import aiohttp
import json

class DistributedMCPNode:
    """分布式MCP节点"""
    def __init__(self, node_id, role):
        self.node_id = node_id
        self.role = role  # 'worker', 'coordinator', 'cache'
        self.peers = []
    
    async def sync_with_peers(self):
        """与对等节点同步"""
        pass

class LoadBalancer:
    """负载均衡器"""
    def __init__(self):
        self.nodes = []
        self.current_index = 0
    
    def add_node(self, node):
        self.nodes.append(node)
    
    def get_next_node(self):
        if not self.nodes:
            return None
        node = self.nodes[self.current_index % len(self.nodes)]
        self.current_index += 1
        return node

# 主服务器
app = Server("distributed-mcp-cluster")

# 负载均衡器实例
lb = LoadBalancer()

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "distributed_query",
            "description": "分布式查询",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "查询内容"
                    },
                    "strategy": {
                        "type": "string",
                        "description": "分发策略",
                        "enum": ["round_robin", "least_connections", "random"],
                        "default": "round_robin"
                    }
                },
                "required": ["query"]
            }
        }
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "distributed_query":
        query = arguments["query"]
        strategy = arguments.get("strategy", "round_robin")
        
        # 根据策略选择节点
        if strategy == "round_robin":
            node = lb.get_next_node()
        elif strategy == "random":
            import random
            node = random.choice(lb.nodes) if lb.nodes else None
        else:
            node = lb.get_next_node()
        
        if node:
            # 模拟向节点发送请求
            result = await simulate_node_query(node, query)
            return {
                "content": [
                    {
                        "type": "text",
                        "text": f"节点 {node.node_id} 响应: {result}"
                    }
                ]
            }
        else:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": "没有可用的节点"
                    }
                ]
            }

async def simulate_node_query(node, query):
    """模拟节点查询"""
    await asyncio.sleep(0.1)  # 模拟网络延迟
    return f"处理查询 '{query}' - 节点角色: {node.role}"

if __name__ == "__main__":
    # 添加模拟节点
    lb.add_node(DistributedMCPNode("node-1", "worker"))
    lb.add_node(DistributedMCPNode("node-2", "worker"))
    lb.add_node(DistributedMCPNode("node-3", "cache"))
    
    stdio_server(app)

四、MCP安全与权限管理

4.1 认证与授权

MCP支持多种认证机制,确保只有授权用户才能访问资源。

示例:基于令牌的认证MCP服务器

# auth_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import hashlib
import secrets
import json
from datetime import datetime, timedelta

app = Server("auth-mcp-server")

# 模拟用户数据库
USERS = {
    "admin": {
        "password_hash": hashlib.sha256("admin123".encode()).hexdigest(),
        "role": "admin",
        "permissions": ["read", "write", "delete", "admin"]
    },
    "user": {
        "password_hash": hashlib.sha256("user123".encode()).hexdigest(),
        "role": "user",
        "permissions": ["read"]
    }
}

# 令牌存储
TOKENS = {}

def generate_token(user_id, expires_in=3600):
    """生成访问令牌"""
    token = secrets.token_urlsafe(32)
    expiry = datetime.now() + timedelta(seconds=expires_in)
    TOKENS[token] = {
        "user_id": user_id,
        "expiry": expiry,
        "permissions": USERS[user_id]["permissions"]
    }
    return token

def verify_token(token):
    """验证令牌"""
    if token not in TOKENS:
        return None
    
    token_data = TOKENS[token]
    if datetime.now() > token_data["expiry"]:
        del TOKENS[token]
        return None
    
    return token_data

def check_permission(token_data, required_permission):
    """检查权限"""
    if not token_data:
        return False
    return required_permission in token_data["permissions"]

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "login",
            "description": "用户登录",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "username": {
                        "type": "string",
                        "description": "用户名"
                    },
                    "password": {
                        "type": "string",
                        "description": "密码"
                    }
                },
                "required": ["username", "password"]
            }
        },
        {
            "name": "protected_operation",
            "description": "受保护的操作",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "token": {
                        "type": "string",
                        "description": "访问令牌"
                    },
                    "operation": {
                        "type": "string",
                        "description": "操作类型",
                        "enum": ["read_data", "write_data", "delete_data", "admin_action"]
                    },
                    "data": {
                        "type": "string",
                        "description": "操作数据"
                    }
                },
                "required": ["token", "operation"]
            }
        }
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "login":
        username = arguments["username"]
        password = arguments["password"]
        
        if username in USERS:
            password_hash = hashlib.sha256(password.encode()).hexdigest()
            if USERS[username]["password_hash"] == password_hash:
                token = generate_token(username)
                return {
                    "content": [
                        {
                            "type": "text",
                            "text": json.dumps({
                                "status": "success",
                                "token": token,
                                "expires_in": 3600
                            }, ensure_ascii=False)
                        }
                    ]
                }
        
        return {
            "content": [
                {
                    "type": "text",
                    "text": json.dumps({
                        "status": "error",
                        "message": "认证失败"
                    }, ensure_ascii=False)
                }
            ]
        }
    
    elif name == "protected_operation":
        token = arguments["token"]
        operation = arguments["operation"]
        data = arguments.get("data", "")
        
        # 验证令牌
        token_data = verify_token(token)
        if not token_data:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps({
                            "status": "error",
                            "message": "无效或过期的令牌"
                        }, ensure_ascii=False)
                    }
                ]
            }
        
        # 检查权限
        required_permission = operation.replace("_", "")
        if not check_permission(token_data, required_permission):
            return {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps({
                            "status": "error",
                            "message": f"权限不足,需要 {required_permission} 权限"
                        }, ensure_ascii=False)
                    }
                ]
            }
        
        # 执行操作
        result = f"执行 {operation} 操作,数据: {data},用户: {token_data['user_id']}"
        return {
            "content": [
                {
                    "type": "text",
                    "text": json.dumps({
                        "status": "success",
                        "result": result
                    }, ensure_ascii=False)
                }
            ]
        }

if __name__ == "__main__":
    stdio_server(app)

4.2 数据隔离与多租户

多租户MCP服务器为不同用户提供数据隔离。

示例:多租户数据库服务

# multi_tenant_db_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import asyncio

app = Server("multi-tenant-db-server")

# 模拟多租户数据存储
TENANT_DATA = {
    "tenant_a": {
        "users": [
            {"id": 1, "name": "Alice", "email": "alice@tenant-a.com"},
            {"id": 2, "name": "Bob", "email": "bob@tenant-a.com"}
        ],
        "products": [
            {"id": 101, "name": "Product A", "price": 99.99},
            {"id": 102, "name": "Product B", "price": 149.99}
        ]
    },
    "tenant_b": {
        "users": [
            {"id": 1, "name": "Charlie", "email": "charlie@tenant-b.com"},
            {"id": 2, "name": "Diana", "email": "diana@tenant-b.com"}
        ],
        "products": [
            {"id": 201, "name": "Product X", "price": 199.99},
            {"id": 202, "name": "Product Y", "price": 249.99}
        ]
    }
}

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "query_tenant_data",
            "description": "查询租户数据",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "tenant_id": {
                        "type": "string",
                        "description": "租户ID"
                    },
                    "resource": {
                        "type": "string",
                        "description": "资源类型",
                        "enum": ["users", "products"]
                    },
                    "filter": {
                        "type": "object",
                        "description": "过滤条件"
                    }
                },
                "required": ["tenant_id", "resource"]
            }
        }
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "query_tenant_data":
        tenant_id = arguments["tenant_id"]
        resource = arguments["resource"]
        filter_condition = arguments.get("filter", {})
        
        if tenant_id not in TENANT_DATA:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps({
                            "status": "error",
                            "message": f"租户 {tenant_id} 不存在"
                        }, ensure_ascii=False)
                    }
                ]
            }
        
        # 获取租户数据
        data = TENANT_DATA[tenant_id].get(resource, [])
        
        # 应用过滤条件
        if filter_condition:
            filtered_data = []
            for item in data:
                match = True
                for key, value in filter_condition.items():
                    if item.get(key) != value:
                        match = False
                        break
                if match:
                    filtered_data.append(item)
            data = filtered_data
        
        return {
            "content": [
                {
                    "type": "text",
                    "text": json.dumps({
                        "tenant_id": tenant_id,
                        "resource": resource,
                        "count": len(data),
                        "data": data
                    }, ensure_ascii=False)
                }
            ]
        }

if __name__ == "__main__":
    stdio_server(app)

五、MCP最佳实践与优化策略

5.1 性能优化

5.1.1 缓存策略

# caching_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import time
from functools import lru_cache

app = Server("caching-mcp-server")

# 内存缓存
cache = {}
CACHE_TTL = 300  # 5分钟

def get_cached_data(key):
    """获取缓存数据"""
    if key in cache:
        cached_time, data = cache[key]
        if time.time() - cached_time < CACHE_TTL:
            return data
        else:
            del cache[key]
    return None

def set_cached_data(key, data):
    """设置缓存数据"""
    cache[key] = (time.time(), data)

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "get_data_with_cache",
            "description": "带缓存的数据获取",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "key": {
                        "type": "string",
                        "description": "数据键"
                    },
                    "force_refresh": {
                        "type": "boolean",
                        "description": "强制刷新缓存",
                        "default": False
                    }
                },
                "required": ["key"]
            }
        }
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "get_data_with_cache":
        key = arguments["key"]
        force_refresh = arguments.get("force_refresh", False)
        
        if not force_refresh:
            cached = get_cached_data(key)
            if cached is not None:
                return {
                    "content": [
                        {
                            "type": "text",
                            "text": json.dumps({
                                "source": "cache",
                                "data": cached
                            }, ensure_ascii=False)
                        }
                    ]
                }
        
        # 模拟数据获取
        await asyncio.sleep(0.5)  # 模拟延迟
        data = f"数据 {key} - 时间: {time.time()}"
        
        # 缓存数据
        set_cached_data(key, data)
        
        return {
            "content": [
                {
                    "type": "text",
                    "text": json.dumps({
                        "source": "fresh",
                        "data": data
                    }, ensure_ascii=False)
                }
            ]
        }

if __name__ == "__main__":
    stdio_server(app)

5.1.2 批量处理

# batch_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import asyncio

app = Server("batch-mcp-server")

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "batch_process",
            "description": "批量处理任务",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "tasks": {
                        "type": "array",
                        "description": "任务列表",
                        "items": {
                            "type": "object",
                            "properties": {
                                "type": {
                                    "type": "string",
                                    "description": "任务类型"
                                },
                                "data": {
                                    "type": "object",
                                    "description": "任务数据"
                                }
                            }
                        }
                    },
                    "concurrency": {
                        "type": "number",
                        "description": "并发数",
                        "default": 3
                    }
                },
                "required": ["tasks"]
            }
        }
    ]

async def process_task(task):
    """处理单个任务"""
    await asyncio.sleep(0.1)  # 模拟处理时间
    return {
        "task_id": task.get("id", "unknown"),
        "result": f"处理完成: {task.get('type', 'unknown')}",
        "status": "success"
    }

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "batch_process":
        tasks = arguments["tasks"]
        concurrency = arguments.get("concurrency", 3)
        
        # 使用信号量控制并发
        semaphore = asyncio.Semaphore(concurrency)
        
        async def process_with_semaphore(task):
            async with semaphore:
                return await process_task(task)
        
        # 并发处理所有任务
        results = await asyncio.gather(
            *[process_with_semaphore(task) for task in tasks],
            return_exceptions=True
        )
        
        return {
            "content": [
                {
                    "type": "text",
                    "text": json.dumps({
                        "total_tasks": len(tasks),
                        "successful": len([r for r in results if not isinstance(r, Exception)]),
                        "results": results
                    }, ensure_ascii=False)
                }
            ]
        }

if __name__ == "__main__":
    stdio_server(app)

5.2 错误处理与重试机制

# resilient_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
import json
import asyncio
import random

app = Server("resilient-mcp-server")

class RetryableError(Exception):
    """可重试的错误"""
    pass

class NonRetryableError(Exception):
    """不可重试的错误"""
    pass

async def unreliable_operation():
    """模拟不可靠的操作"""
    if random.random() < 0.3:  # 30%失败率
        raise RetryableError("临时故障")
    elif random.random() < 0.1:  # 10%永久失败
        raise NonRetryableError("永久故障")
    
    return "操作成功"

async def retry_operation(max_retries=3, delay=1):
    """带重试的操作"""
    for attempt in range(max_retries):
        try:
            result = await unreliable_operation()
            return result
        except RetryableError as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(delay * (2 ** attempt))  # 指数退避
        except NonRetryableError:
            raise

@app.list_tools()
async def list_tools():
    return [
        {
            "name": "resilient_operation",
            "description": "弹性操作",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "max_retries": {
                        "type": "number",
                        "description": "最大重试次数",
                        "default": 3
                    },
                    "retry_delay": {
                        "type": "number",
                        "description": "重试延迟(秒)",
                        "default": 1
                    }
                }
            }
        }
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "resilient_operation":
        max_retries = arguments.get("max_retries", 3)
        retry_delay = arguments.get("retry_delay", 1)
        
        try:
            result = await retry_operation(max_retries, retry_delay)
            return {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps({
                            "status": "success",
                            "result": result
                        }, ensure_ascii=False)
                    }
                ]
            }
        except Exception as e:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": json.dumps({
                            "status": "error",
                            "message": str(e)
                        }, ensure_ascii=False)
                    }
                ]
            }

if __name__ == "__main__":
    stdio_server(app)

六、MCP生态系统与工具

6.1 官方MCP服务器

Anthropic提供了多个官方MCP服务器,涵盖常见使用场景:

  1. 文件系统MCP服务器:提供文件读写操作
  2. GitHub MCP服务器:集成GitHub API
  3. Slack MCP服务器:集成Slack消息
  4. PostgreSQL MCP服务器:数据库查询
  5. Google Drive MCP服务器:云存储访问

6.2 社区MCP服务器

社区贡献的MCP服务器扩展了MCP的功能范围:

  • Jira MCP服务器:项目管理工具集成
  • Notion MCP服务器:笔记和文档管理
  • Stripe MCP服务器:支付处理
  • Twilio MCP服务器:短信和语音服务
  • AWS MCP服务器:云服务集成

6.3 开发工具

6.3.1 MCP Inspector

MCP Inspector是官方提供的调试工具,用于测试MCP服务器。

# 安装MCP Inspector
npm install -g @modelcontextprotocol/inspector

# 启动Inspector
mcp-inspector

6.3.2 MCP CLI工具

# 安装MCP CLI
pip install mcp-cli

# 查看服务器信息
mcp-cli list

# 测试工具调用
mcp-cli call <server-name> <tool-name> <arguments>

七、MCP未来发展趋势

7.1 标准化演进

MCP协议正在向更标准化的方向发展:

  1. 协议版本管理:支持多版本兼容
  2. 扩展机制:允许自定义扩展
  3. 认证标准:OAuth 2.0、JWT等标准集成

7.2 与新兴技术集成

  1. Web3集成:区块链和智能合约
  2. 边缘计算:分布式边缘节点
  3. 量子计算:量子算法接口

7.3 AI原生应用

MCP将成为AI原生应用的核心基础设施:

  1. 自主AI代理:支持复杂任务规划
  2. 多模态交互:文本、图像、语音统一接口
  3. 持续学习:动态工具发现和学习

八、总结

MCP作为一种新兴的AI集成协议,正在快速演进和完善。从基础的工具、资源、提示模板分类,到高级的流式、分布式、多租户架构,MCP为AI应用提供了强大的扩展能力。通过本文的详细解析和代码示例,读者可以:

  1. 理解MCP的核心概念:掌握协议的基本原理和架构
  2. 掌握分类方法:从多个维度理解MCP服务器的类型
  3. 实践开发技能:通过代码示例学习实际开发
  4. 了解最佳实践:掌握性能优化和安全策略
  5. 展望未来趋势:了解MCP的发展方向

随着AI技术的不断发展,MCP将在连接AI模型与现实世界中发挥越来越重要的作用。建议开发者从简单的MCP服务器开始实践,逐步探索更复杂的应用场景,为构建下一代AI应用奠定基础。


参考资源:

学习建议:

  1. 从官方示例开始,理解基础概念
  2. 尝试构建简单的MCP服务器
  3. 参与社区讨论,了解最新进展
  4. 关注安全最佳实践
  5. 探索与现有系统的集成方案