引言:票房冠军的魅力与奥秘
票房冠军电影不仅仅是商业成功的象征,更是文化现象的缩影。它们凭借什么在激烈的市场竞争中脱颖而出?背后隐藏着哪些不为人知的营销策略和观众选择逻辑?本文将深入剖析历年全球票房冠军电影,揭示它们登顶的秘诀,并探讨背后的营销秘密和观众心理。
全球票房冠军电影概览
1. 《阿凡达》(Avatar,2009)
- 全球票房:约29.23亿美元
- 登顶原因:
- 技术创新:开创性的3D技术和CGI特效,为观众带来前所未有的视觉体验
- 普世主题:环保、反殖民、人与自然和谐共处的主题引发全球共鸣
- 导演效应:詹姆斯·卡梅隆继《泰坦尼克号》后的又一力作,自带话题性
2. 《复仇者联盟4:终局之战》(Avengers: Endgame,2019)
- 全球票房:约27.99亿美元
- 登顶原因:
- IP积累:漫威电影宇宙11年22部电影的铺垫,情感积累达到顶峰
- 粉丝经济:对核心粉丝的精准回馈,满足情怀需求
- 事件营销:将电影上映打造成全球性文化事件
3. 《泰坦尼克号》(Titanic,1997)
- 全球票房:约22.01亿美元(不含重映)
- 登顶原因:
- 史诗爱情:将灾难与爱情完美结合,创造跨时代经典
- 技术标杆:当时最高制作水准的视觉效果
- 音乐传播:主题曲《My Heart Will Go On》成为全球流行文化符号
3. 《星球大战》(Star Wars,1977)
- 全球票房:约7.75亿美元(不含重映)
- 登顶原因:
- 开创性:开创科幻电影新纪元,定义太空歌剧类型
- 文化现象:创造独特的世界观和粉丝文化
- 技术革命:工业光魔的诞生与发展
4. 《乱世佳人》(Gone with the Wind,1939)
- 全球票房:约3.91亿美元(不含重映)
- 登顶原因:
- 文学改编:经典文学作品的完美影视化
- 时代印记:美国南方历史的史诗呈现
- 明星效应:费雯·丽和克拉克·盖博的经典演绎
登顶秘诀:技术、情感与时机的完美结合
1. 技术创新引领观影革命
《阿凡达》的成功证明了技术创新对票房的巨大推动力。卡梅隆团队开发的立体摄影系统和虚拟摄影技术,让观众沉浸在潘多拉星球的奇幻世界中。这种技术优势形成了独特的观影门槛,促使观众必须到影院才能体验完整效果。
2. 情感共鸣的深度构建
《复仇者联盟4》通过11年的情感积累,将观众与角色建立深厚连接。钢铁侠的牺牲、美队的传承,这些角色命运的终章成为观众情感的宣泄口。这种长期的情感投资转化为票房爆发力。
3. 文化符号的创造与利用
《星球大战》不仅是一部电影,更成为一种文化符号。原力、光剑、绝地武士等概念深入人心,衍生出庞大的粉丝文化。这种文化影响力让电影超越娱乐产品,成为时代记忆。
4. 社会议题的巧妙植入
《阿凡达》通过纳美人的故事隐喻现实中的环境破坏和殖民历史,引发观众思考。这种社会议题的讨论为电影增添了深度,延长了话题生命周期。
营销秘密:从预热到长尾的完整策略
1. 事件化营销:将电影变成社会事件
《复仇者联盟4》的营销策略堪称典范:
- 倒计时策略:提前一年开始预热,通过”终局之战”概念制造紧迫感
- 饥饿营销:严格控制信息释放,预告片只展示必要信息
- 粉丝特权:组织粉丝提前观影,制造口碑传播
- 跨界联动:与各大品牌合作推出限定产品,扩大影响力
2. 社交媒体时代的精准投放
《阿凡达》的营销团队利用社交媒体进行病毒式传播:
- 话题制造:#AvatarChallenge 鼓励用户分享自己的”阿凡达”形象
- KOL合作:邀请科技、环保领域的意见领袖提前观影并发声
- 互动体验:开发在线互动游戏,让用户提前体验潘多拉星球
3. 情感营销:从观众到粉丝的转化
《泰坦尼克号》的营销重点在于情感共鸣:
- 音乐先行:提前发布主题曲,让旋律深入人心
- 故事预热:通过纪录片等形式讲述泰坦尼克号的历史背景
- 明星包装:将莱昂纳多和凯特打造成全球偶像
- 情怀延续:多年后重映依然能引发观影热潮
4. 全球化与本地化结合
《复仇者联盟4》在全球不同市场采用差异化策略:
- 亚洲市场:强调英雄情怀和视觉特效
- 欧洲市场:突出故事深度和角色发展
- 北美市场:强调事件性和粉丝服务
观众选择逻辑:从需求到决策的心理路径
1. 社交需求:避免社交孤立
在社交媒体时代,观看票房冠军电影成为一种社交货币。《复仇者联盟4》上映期间,”你看过终局之战了吗?”成为社交开场白。观众选择观看这类电影,部分是为了避免在社交圈中被孤立。
2. 风险规避:从众心理的选择
面对海量影视内容,观众倾向于选择经过市场验证的产品。票房冠军代表着”多数人的选择”,这种从众心理降低了观众的选择风险。数据显示,票房前10的电影往往能占据总票房的30%以上。
3. 情感宣泄与身份认同
现代观众越来越重视电影的情感价值。《阿凡达》中的环保主题让观众产生道德认同感;《复仇者联盟4》的英雄牺牲让观众找到情感投射对象。这种身份认同感是驱动观众选择的重要因素。
4. 技术体验的稀缺性
IMAX、3D、4DX等特殊影厅的普及,让观众更愿意为技术体验付费。《阿凡达》的成功很大程度上得益于其技术独特性——只有在特定设备上才能获得完整体验,这种稀缺性创造了观影需求。
结论:票房冠军的综合制胜法则
票房冠军电影的成功绝非偶然,而是技术、情感、营销和时机的完美结合。它们不仅满足观众的娱乐需求,更创造了文化现象和社会话题。未来的票房冠军,必将在技术创新、情感深度和营销策略上继续突破,为观众带来前所未有的体验。
import os import json import time import requests from typing import List, Dict, Any, Optional from datetime import datetime from enum import Enum
class ModelType(Enum):
"""模型类型枚举"""
GPT_35_TURBO = "gpt-3.5-turbo"
GPT_4 = "gpt-4"
GPT_4_TURBO = "gpt-4-turbo"
CLAUDE_3_SONNET = "claude-3-sonnet"
CLAUDE_3_OPUS = "claude-3-opus"
QWEN_MAX = "qwen-max"
QWEN_PLUS = "qwen-plus"
ERNIE_BOT = "ernie-bot"
GLM_4 = "glm-4"
class LLMProvider(Enum):
"""LLM服务提供商枚举"""
OPENAI = "openai"
ANTHHROPIC = "anthropic"
ALIYUN = "aliyun"
BAIDU = "baidu"
ZHIPU = "zhipu"
class LLMConfig:
"""LLM配置类"""
def __init__(
self,
provider: LLMProvider,
api_key: str,
base_url: str = None,
model: ModelType = ModelType.GPT_35_TURBO,
temperature: float = 0.7,
max_tokens: int = 2048,
timeout: int = 30,
max_retries: int = 3,
retry_delay: float = 1.0
):
self.provider = provider
self.api_key = api_key
self.base_url = base_url
self.model = model
self.temperature = temperature
self.max_tokens = max_tokens
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay = retry_delay
class LLMResponse:
"""LLM响应类"""
def __init__(
self,
content: str,
model: str,
usage: Dict[str, int],
finish_reason: str = "stop",
timestamp: Optional[datetime] = None
):
self.content = content
self.model = model
self.usage = usage
self.finish_reason = finish_reason
self.timestamp = timestamp or datetime.now()
def to_dict(self) -> Dict[str, Any]:
return {
"content": self.content,
"model": self.model,
"usage": self.usage,
"finish_reason": self.finish_reason,
"timestamp": self.timestamp.isoformat()
}
class LLMClient:
"""LLM客户端基类"""
def __init__(self, config: LLMConfig):
self.config = config
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"User-Agent": "LLMClient/1.0"
})
def _build_url(self) -> str:
"""构建API URL"""
if self.config.base_url:
return self.config.base_url
base_urls = {
LLMProvider.OPENAI: "https://api.openai.com/v1/chat/completions",
LLMProvider.ANTHHROPIC: "https://api.anthropic.com/v1/messages",
LLMProvider.ALIYUN: "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation",
LLMProvider.BAIDU: "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions",
LLMProvider.ZHIPU: "https://open.bigmodel.cn/api/paas/v4/chat/completions"
}
return base_urls.get(self.config.provider, "")
def _build_payload(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""构建请求体"""
payload = {
"model": self.config.model.value,
"messages": messages,
"temperature": self.config.temperature,
"max_tokens": self.config.max_tokens
}
# 特殊处理百度的API
if self.config.provider == LLMProvider.BAIDU:
payload = {
"messages": messages,
"temperature": self.config.temperature,
"max_tokens": self.config.max_tokens
}
return payload
def _parse_response(self, response: requests.Response) -> LLMResponse:
"""解析响应"""
try:
data = response.json()
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON response: {response.text}")
if response.status_code != 200:
error_msg = data.get("error", {}).get("message", "Unknown error")
raise ValueError(f"API Error ({response.status_code}): {error_msg}")
# 不同提供商的响应格式不同
if self.config.provider == LLMProvider.OPENAI:
choices = data.get("choices", [])
if not choices:
raise ValueError("No choices in response")
content = choices[0]["message"]["content"]
usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
finish_reason = choices[0].get("finish_reason", "stop")
elif self.config.provider == LLMProvider.ANTHHROPIC:
content = data.get("content", [{}])[0].get("text", "")
usage = {
"prompt_tokens": data.get("input_tokens", 0),
"completion_tokens": data.get("output_tokens", 0),
"total_tokens": data.get("input_tokens", 0) + data.get("output_tokens", 0)
}
finish_reason = data.get("stop_reason", "stop")
elif self.config.provider == LLMProvider.ALIYUN:
output = data.get("output", {})
content = output.get("text", "")
usage = output.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
finish_reason = "stop" # 阿里云不返回finish_reason
elif self.config.provider == LLMProvider.BAIDU:
content = data.get("result", "")
usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
finish_reason = "stop" # 百度不返回finish_reason
elif self.config.provider == LLMProvider.ZHIPU:
choices = data.get("choices", [])
if not choices:
raise ValueError("No choices in response")
content = choices[0]["message"]["content"]
usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
finish_reason = choices[0].get("finish_reason", "stop")
else:
raise ValueError(f"Unsupported provider: {self.config.provider}")
return LLMResponse(
content=content,
model=data.get("model", self.config.model.value),
usage=usage,
finish_reason=finish_reason
)
def _add_auth(self, headers: Dict[str, str]) -> Dict[str, str]:
"""添加认证信息"""
if self.config.provider == LLMProvider.OPENAI:
headers["Authorization"] = f"Bearer {self.config.api_key}"
elif self.config.provider == LLMProvider.ANTHHROPIC:
headers["x-api-key"] = self.config.api_key
headers["anthropic-version"] = "2023-06-01"
elif self.config.provider == LLMProvider.ALIYUN:
headers["Authorization"] = f"Bearer {self.config.api_key}"
elif self.config.provider == LLMProvider.BAIDU:
# 百度需要access_token,这里假设api_key就是access_token
# 实际使用时需要先调用access_token接口
pass
elif self.config.provider == LLMProvider.ZHIPU:
headers["Authorization"] = f"Bearer {self.config.api_key}"
return headers
def chat(self, messages: List[Dict[str, str]]) -> LLMResponse:
"""
聊天接口
Args:
messages: 消息列表,格式 [{"role": "user", "content": "hello"}]
Returns:
LLMResponse: 响应对象
Raises:
ValueError: 当API调用失败时
"""
url = self._build_url()
payload = self._build_payload(messages)
headers = self._add_auth({})
for attempt in range(self.config.max_retries):
try:
response = self.session.post(
url,
json=payload,
headers=headers,
timeout=self.config.timeout
)
return self._parse_response(response)
except Exception as e:
if attempt == self.config.max_retries - 1:
raise
time.sleep(self.config.retry_delay * (2 ** attempt)) # 指数退避
def chat_stream(self, messages: List[Dict[str, str]]) -> str:
"""
流式聊天接口(仅支持OpenAI和Anthropic)
Args:
messages: 消息列表
Returns:
str: 完整响应内容
"""
if self.config.provider not in [LLMProvider.OPENAI, LLMProvider.ANTHHROPIC]:
# 对于不支持流式的提供商,回退到普通chat
return self.chat(messages).content
url = self._build_url()
payload = self._build_payload(messages)
payload["stream"] = True
headers = self._add_auth({})
full_content = ""
for attempt in range(self.config.max_retries):
try:
with self.session.post(
url,
json=payload,
headers=headers,
timeout=self.config.timeout,
stream=True
) as response:
if response.status_code != 200:
raise ValueError(f"API Error: {response.status_code}")
for line in response.iter_lines():
if not line:
continue
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:] # 去掉 'data: '
if data_str == '[DONE]':
break
try:
data = json.loads(data_str)
if self.config.provider == LLMProvider.OPENAI:
choices = data.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
print(content, end="", flush=True)
full_content += content
elif self.config.provider == LLMProvider.ANTHHROPIC:
# Anthropic流式处理
content = data.get("content", "")
if content:
print(content, end="", flush=True)
full_content += content
except json.JSONDecodeError:
continue
print() # 换行
return full_content
except Exception as e:
if attempt == self.config.max_retries - 1:
raise
time.sleep(self.config.retry_delay * (2 ** attempt))
def generate(
self,
prompt: str,
system_prompt: str = "You are a helpful assistant.",
history: Optional[List[Dict[str, str]]] = None
) -> LLMResponse:
"""
便捷的生成接口
Args:
prompt: 用户输入
system_prompt: 系统提示词
history: 历史消息
Returns:
LLMResponse: 响应对象
"""
messages = [{"role": "system", "content": system_prompt}]
if history:
messages.extend(history)
messages.append({"role": "user", "content": prompt})
return self.chat(messages)
def batch_generate(
self,
prompts: List[str],
system_prompt: str = "You are a helpful assistant.",
delay: float = 1.0
) -> List[LLMResponse]:
"""
批量生成
Args:
prompts: 提示词列表
system_prompt: 系统提示词
delay: 请求间隔
Returns:
List[LLMResponse]: 响应列表
"""
results = []
for prompt in prompts:
response = self.generate(prompt, system_prompt)
results.append(response)
time.sleep(delay)
return results
def count_tokens(self, messages: List[Dict[str, str]]) -> int:
"""
估算token数量(简单实现,实际应根据具体模型的tokenizer)
Args:
messages: 消息列表
Returns:
int: 估算的token数量
"""
total_text = ""
for msg in messages:
total_text += f"{msg['role']}: {msg['content']}\n"
# 简单估算:1个token ≈ 4个字符
return len(total_text) // 4
class LLMManager:
"""LLM管理器,支持多模型切换"""
def __init__(self):
self.clients: Dict[str, LLMClient] = {}
self.current_client: Optional[LLMClient] = None
self.history: List[Dict[str, str]] = []
def register_client(self, name: str, client: LLMClient):
"""注册客户端"""
self.clients[name] = client
if self.current_client is None:
self.current_client = client
def set_current_client(self, name: str):
"""设置当前客户端"""
if name in self.clients:
self.current_client = self.clients[name]
else:
raise ValueError(f"Client {name} not registered")
def chat(self, messages: List[Dict[str, str]]) -> LLMResponse:
"""聊天"""
if self.current_client is None:
raise ValueError("No client registered")
return self.current_client.chat(messages)
def chat_stream(self, messages: List[Dict[str, str]]) -> str:
"""流式聊天"""
if self.current_client is None:
raise ValueError("No client registered")
return self.current_client.chat_stream(messages)
def generate(
self,
prompt: str,
system_prompt: str = "You are a helpful assistant.",
save_history: bool = False
) -> LLMResponse:
"""生成"""
if self.current_client is None:
raise ValueError("No client registered")
messages = [{"role": "system", "content": system_prompt}]
if save_history:
messages.extend(self.history)
messages.append({"role": "user", "content": prompt})
response = self.current_client.chat(messages)
if save_history:
self.history.append({"role": "user", "content": prompt})
self.history.append({"role": "assistant", "content": response.content})
return response
def clear_history(self):
"""清除历史记录"""
self.history = []
def get_history(self) -> List[Dict[str, str]]:
"""获取历史记录"""
return self.history.copy()
使用示例
if name == “main”:
# 示例1:OpenAI配置
openai_config = LLMConfig(
provider=LLMProvider.OPENAI,
api_key=os.getenv("OPENAI_API_KEY", "your-openai-key"),
model=ModelType.GPT_35_TURBO,
temperature=0.7,
max_tokens=1024
)
openai_client = LLMClient(openai_config)
# 示例2:Anthropic配置
anthropic_config = LLMConfig(
provider=LLMProvider.ANTHHROPIC,
api_key=os.getenv("ANTHROPIC_API_KEY", "your-anthropic-key"),
model=ModelType.CLAUDE_3_SONNET,
temperature=0.7,
max_tokens=2048
)
anthropic_client = LLMClient(anthropic_config)
# 示例3:阿里云配置
aliyun_config = LLMConfig(
provider=LLMProvider.ALIYUN,
api_key=os.getenv("ALIYUN_API_KEY", "your-aliyun-key"),
model=ModelType.QWEN_MAX,
temperature=0.7,
max_tokens=1024
)
aliyun_client = LLMClient(aliyun_config)
# 使用管理器
manager = LLMManager()
manager.register_client("openai", openai_client)
manager.register_client("anthropic", anthropic_client)
manager.register_client("aliyun", aliyun_client)
# 测试对话
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello, how are you?"}
]
try:
# 普通对话
response = manager.chat(messages)
print("Response:", response.content)
print("Usage:", response.usage)
# 流式对话
print("\nStreaming:")
full_content = manager.chat_stream(messages)
print("Full content:", full_content)
except Exception as e:
print(f"Error: {e}")</think># LLM大语言模型接口封装库
项目概述
这是一个功能完整的大语言模型(LLM)接口封装库,支持多家主流LLM服务提供商,包括OpenAI、Anthropic、阿里云、百度和智谱AI。该库提供了统一的API接口,支持普通对话、流式输出、批量生成、历史记录管理等核心功能。
核心设计
1. 枚举类设计
from enum import Enum
class ModelType(Enum):
"""模型类型枚举"""
GPT_35_TURBO = "gpt-3.5-turbo"
GPT_4 = "gpt-4"
GPT_4_TURBO = "gpt-4-turbo"
CLAUDE_3_SONNET = "claude-3-sonnet"
CLAUDE_3_OPUS = "claude-3-opus"
QWEN_MAX = "qwen-max"
QWEN_PLUS = "qwen-plus"
ERNIE_BOT = "ernie-bot"
GLM_4 = "glm-4"
class LLMProvider(Enum):
"""LLM服务提供商枚举"""
OPENAI = "openai"
ANTHHROPIC = "anthropic"
ALIYUN = "aliyun"
BAIDU = "baidu"
ZHIPU = "zhipu"
使用枚举类的好处:
- 类型安全,避免字符串拼写错误
- 代码可读性强
- IDE自动补全支持
2. 配置类设计
class LLMConfig:
"""LLM配置类"""
def __init__(
self,
provider: LLMProvider,
api_key: str,
base_url: str = None,
model: ModelType = ModelType.GPT_35_TURBO,
temperature: float = 0.7,
max_tokens: int = 2048,
timeout: int = 30,
max_retries: int = 3,
retry_delay: float = 1.0
):
self.provider = provider
self.api_key = api_key
self.base_url = base_url
self.model = model
self.temperature = temperature
self.max_tokens = max_tokens
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay = retry_delay
配置类集中管理所有参数,支持:
- 自定义base_url(用于代理或私有部署)
- 超时和重试机制
- 模型参数(temperature, max_tokens)
3. 响应类设计
class LLMResponse:
"""LLM响应类"""
def __init__(
self,
content: str,
model: str,
usage: Dict[str, int],
finish_reason: str = "stop",
timestamp: Optional[datetime] = None
):
self.content = content
self.model = model
self.usage = usage
self.finish_reason = finish_reason
self.timestamp = timestamp or datetime.now()
def to_dict(self) -> Dict[str, Any]:
return {
"content": self.content,
"model": self.model,
"usage": self.usage,
"finish_reason": self.finish_reason,
"timestamp": self.timestamp.isoformat()
}
标准化响应格式,包含:
- 生成内容
- 使用的模型
- token用量统计
- 完成原因(正常结束/长度限制)
- 时间戳
核心功能实现
1. 多提供商支持
class LLMClient:
"""LLM客户端基类"""
def _build_url(self) -> str:
"""构建API URL"""
if self.config.base_url:
return self.config.base_url
base_urls = {
LLMProvider.OPENAI: "https://api.openai.com/v1/chat/completions",
LLMProvider.ANTHHROPIC: "https://api.anthropic.com/v1/messages",
LLMProvider.ALIYUN: "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation",
LLMProvider.BAIDU: "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions",
LLMProvider.ZHIPU: "https://open.bigmodel.cn/api/paas/v4/chat/completions"
}
return base_urls.get(self.config.provider, "")
2. 请求体构建(适配不同API)
def _build_payload(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""构建请求体"""
payload = {
"model": self.config.model.value,
"messages": messages,
"temperature": self.config.temperature,
"max_tokens": self.config.max_tokens
}
# 特殊处理百度的API
if self.config.provider == LLMProvider.BAIDU:
payload = {
"messages": messages,
"temperature": self.config.temperature,
"max_tokens": self.config.max_tokens
}
return payload
3. 响应解析(适配不同API格式)
def _parse_response(self, response: requests.Response) -> LLMResponse:
"""解析响应"""
try:
data = response.json()
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON response: {response.text}")
if response.status_code != 200:
error_msg = data.get("error", {}).get("message", "Unknown error")
raise ValueError(f"API Error ({response.status_code}): {error_msg}")
# 不同提供商的响应格式不同
if self.config.provider == LLMProvider.OPENAI:
choices = data.get("choices", [])
if not choices:
raise ValueError("No choices in response")
content = choices[0]["message"]["content"]
usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
finish_reason = choices[0].get("finish_reason", "stop")
elif self.config.provider == LLMProvider.ANTHHROPIC:
content = data.get("content", [{}])[0].get("text", "")
usage = {
"prompt_tokens": data.get("input_tokens", 0),
"completion_tokens": data.get("output_tokens", 0),
"total_tokens": data.get("input_tokens", 0) + data.get("output_tokens", 0)
}
finish_reason = data.get("stop_reason", "stop")
elif self.config.provider == LLMProvider.ALIYUN:
output = data.get("output", {})
content = output.get("text", "")
usage = output.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
finish_reason = "stop"
elif self.config.provider == LLMProvider.BAIDU:
content = data.get("result", "")
usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
finish_reason = "stop"
elif self.config.provider == LLMProvider.ZHIPU:
choices = data.get("choices", [])
if not choices:
raise ValueError("No choices in response")
content = choices[0]["message"]["content"]
usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
finish_reason = choices[0].get("finish_reason", "stop")
else:
raise ValueError(f"Unsupported provider: {self.config.provider}")
return LLMResponse(
content=content,
model=data.get("model", self.config.model.value),
usage=usage,
finish_reason=finish_reason
)
4. 认证机制
def _add_auth(self, headers: Dict[str, str]) -> Dict[str, str]:
"""添加认证信息"""
if self.config.provider == LLMProvider.OPENAI:
headers["Authorization"] = f"Bearer {self.config.api_key}"
elif self.config.provider == LLMProvider.ANTHHROPIC:
headers["x-api-key"] = self.config.api_key
headers["anthropic-version"] = "2023-06-01"
elif self.config.provider == LLMProvider.ALIYUN:
headers["Authorization"] = f"Bearer {self.config.api_key}"
elif self.config.provider == LLMProvider.BAIDU:
# 百度需要access_token,这里假设api_key就是access_token
pass
elif self.config.provider == LLMProvider.ZHIPU:
headers["Authorization"] = f"Bearer {self.config.api_key}"
return headers
5. 普通对话接口
def chat(self, messages: List[Dict[str, str]]) -> LLMResponse:
"""
聊天接口
Args:
messages: 消息列表,格式 [{"role": "user", "content": "hello"}]
Returns:
LLMResponse: 响应对象
Raises:
ValueError: 当API调用失败时
"""
url = self._build_url()
payload = self._build_payload(messages)
headers = self._add_auth({})
for attempt in range(self.config.max_retries):
try:
response = self.session.post(
url,
json=payload,
headers=headers,
timeout=self.config.timeout
)
return self._parse_response(response)
except Exception as e:
if attempt == self.config.max_retries - 1:
raise
time.sleep(self.config.retry_delay * (2 ** attempt)) # 指数退避
6. 流式输出接口
def chat_stream(self, messages: List[Dict[str, str]]) -> str:
"""
流式聊天接口(仅支持OpenAI和Anthropic)
Args:
messages: 消息列表
Returns:
str: 完整响应内容
"""
if self.config.provider not in [LLMProvider.OPENAI, LLMProvider.ANTHHROPIC]:
# 对于不支持流式的提供商,回退到普通chat
return self.chat(messages).content
url = self._build_url()
payload = self._build_payload(messages)
payload["stream"] = True
headers = self._add_auth({})
full_content = ""
for attempt in range(self.config.max_retries):
try:
with self.session.post(
url,
json=payload,
headers=headers,
timeout=self.config.timeout,
stream=True
) as response:
if response.status_code != 200:
raise ValueError(f"API Error: {response.status_code}")
for line in response.iter_lines():
if not line:
continue
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:] # 去掉 'data: '
if data_str == '[DONE]':
break
try:
data = json.loads(data_str)
if self.config.provider == LLMProvider.OPENAI:
choices = data.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
print(content, end="", flush=True)
full_content += content
elif self.config.provider == LLMProvider.ANTHHROPIC:
# Anthropic流式处理
content = data.get("content", "")
if content:
print(content, end="", flush=True)
full_content += content
except json.JSONDecodeError:
continue
print() # 换行
return full_content
except Exception as e:
if attempt == self.config.max_retries - 1:
raise
time.sleep(self.config.retry_delay * (2 ** attempt))
7. 便捷生成接口
def generate(
self,
prompt: str,
system_prompt: str = "You are a helpful assistant.",
history: Optional[List[Dict[str, str]]] = None
) -> LLMResponse:
"""
便捷的生成接口
Args:
prompt: 用户输入
system_prompt: 系统提示词
history: 历史消息
Returns:
LLMResponse: 响应对象
"""
messages = [{"role": "system", "content": system_prompt}]
if history:
messages.extend(history)
messages.append({"role": "user", "content": prompt})
return self.chat(messages)
8. 批量生成接口
def batch_generate(
self,
prompts: List[str],
system_prompt: str = "You are a helpful assistant.",
delay: float = 1.0
) -> List[LLMResponse]:
"""
批量生成
Args:
prompts: 提示词列表
system_prompt: 系统提示词
delay: 请求间隔
Returns:
List[LLMResponse]: 响应列表
"""
results = []
for prompt in prompts:
response = self.generate(prompt, system_prompt)
results.append(response)
time.sleep(delay)
return results
管理器模式
LLMManager类
class LLMManager:
"""LLM管理器,支持多模型切换"""
def __init__(self):
self.clients: Dict[str, LLMClient] = {}
self.current_client: Optional[LLMClient] = None
self.history: List[Dict[str, str]] = []
def register_client(self, name: str, client: LLMClient):
"""注册客户端"""
self.clients[name] = client
if self.current_client is None:
self.current_client = client
def set_current_client(self, name: str):
"""设置当前客户端"""
if name in self.clients:
self.current_client = self.clients[name]
else:
raise ValueError(f"Client {name} not registered")
def chat(self, messages: List[Dict[str, str]]) -> LLMResponse:
"""聊天"""
if self.current_client is None:
raise ValueError("No client registered")
return self.current_client.chat(messages)
def chat_stream(self, messages: List[Dict[str, str]]) -> str:
"""流式聊天"""
if self.current_client is None:
raise ValueError("No client registered")
return self.current_client.chat_stream(messages)
def generate(
self,
prompt: str,
system_prompt: str = "You are a helpful assistant.",
save_history: bool = False
) -> LLMResponse:
"""生成"""
if self.current_client is None:
raise ValueError("No client registered")
messages = [{"role": "system", "content": system_prompt}]
if save_history:
messages.extend(self.history)
messages.append({"role": "user", "content": prompt})
response = self.current_client.chat(messages)
if save_history:
self.history.append({"role": "user", "content": prompt})
self.history.append({"role": "assistant", "content": response.content})
return response
def clear_history(self):
"""清除历史记录"""
self.history = []
def get_history(self) -> List[Dict[str, str]]:
"""获取历史记录"""
return self.history.copy()
使用示例
1. 基础使用
import os
from llm import LLMConfig, LLMClient, ModelType, LLMProvider
# 配置OpenAI客户端
config = LLMConfig(
provider=LLMProvider.OPENAI,
api_key=os.getenv("OPENAI_API_KEY"),
model=ModelType.GPT_35_TURBO,
temperature=0.7,
max_tokens=1024
)
client = LLMClient(config)
# 普通对话
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello, how are you?"}
]
response = client.chat(messages)
print(response.content)
print(f"Token usage: {response.usage}")
2. 流式输出
# 流式对话(实时输出)
full_content = client.chat_stream(messages)
3. 使用管理器
from llm import LLMManager
# 创建管理器
manager = LLMManager()
# 注册多个客户端
manager.register_client("openai", openai_client)
manager.register_client("anthropic", anthropic_client)
manager.register_client("aliyun", aliyun_client)
# 切换模型
manager.set_current_client("anthropic")
# 带历史记录的对话
response = manager.generate(
prompt="What is the capital of France?",
system_prompt="You are a helpful assistant.",
save_history=True
)
# 查看历史
history = manager.get_history()
4. 批量生成
# 批量处理多个问题
prompts = [
"What is Python?",
"What is JavaScript?",
"What is Java?"
]
responses = client.batch_generate(prompts, delay=1.0)
for i, response in enumerate(responses):
print(f"Q{i+1}: {response.content}")
5. 错误处理与重试
# 自动重试机制
try:
response = client.chat(messages)
except ValueError as e:
print(f"API Error: {e}")
# 可以切换到备用模型
manager.set_current_client("anthropic")
response = client.chat(messages)
高级特性
1. Token计数
def count_tokens(self, messages: List[Dict[str, str]]) -> int:
"""
估算token数量(简单实现,实际应根据具体模型的tokenizer)
Args:
messages: 消息列表
Returns:
int: 估算的token数量
"""
total_text = ""
for msg in messages:
total_text += f"{msg['role']}: {msg['content']}\n"
# 简单估算:1个token ≈ 4个字符
return len(total_text) // 4
2. 响应序列化
# 将响应转换为字典,便于存储
response_dict = response.to_dict()
# 可以保存到JSON文件
import json
with open("response.json", "w") as f:
json.dump(response_dict, f, indent=2)
最佳实践
1. 环境变量管理
import os
from dotenv import load_dotenv
load_dotenv()
config = LLMConfig(
provider=LLMProvider.OPENAI,
api_key=os.getenv("OPENAI_API_KEY"),
# ...
)
2. 配置文件
# config.py
LLM_CONFIGS = {
"openai": {
"provider": LLMProvider.OPENAI,
"api_key": os.getenv("OPENAI_API_KEY"),
"model": ModelType.GPT_4_TURBO,
"temperature": 0.7
},
"claude": {
"provider": LLMProvider.ANTHHROPIC,
"api_key": os.getenv("ANTHROPIC_API_KEY"),
"model": ModelType.CLAUDE_3_SONNET,
"temperature": 0.7
}
}
3. 日志记录
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def chat_with_logging(self, messages):
try:
response = self.chat(messages)
logger.info(f"Success: {response.usage}")
return response
except Exception as e:
logger.error(f"Error: {e}")
raise
4. 速率限制
import time
from functools import wraps
def rate_limit(max_per_minute: int):
"""速率限制装饰器"""
min_interval = 60.0 / max_per_minute
last_called = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
wait = min_interval - elapsed
if wait > 0:
time.sleep(wait)
ret = func(*args, **kwargs)
last_called[0] = time.time()
return ret
return wrapper
return decorator
# 使用
@rate_limit(30) # 每分钟最多30次调用
def chat_with_rate_limit(self, messages):
return self.chat(messages)
总结
这个LLM封装库具有以下特点:
- 多提供商支持:统一接口支持主流LLM服务
- 错误处理:自动重试机制,提高稳定性
- 流式输出:支持实时响应展示
- 历史管理:内置对话历史管理
- 批量处理:支持批量生成,提高效率
- 配置灵活:参数可配置,支持自定义
- 易于扩展:基类设计,易于添加新提供商
通过这个库,开发者可以轻松切换不同LLM服务,无需关心底层API差异,专注于业务逻辑开发。
