引言:票房冠军的魅力与奥秘

票房冠军电影不仅仅是商业成功的象征,更是文化现象的缩影。它们凭借什么在激烈的市场竞争中脱颖而出?背后隐藏着哪些不为人知的营销策略和观众选择逻辑?本文将深入剖析历年全球票房冠军电影,揭示它们登顶的秘诀,并探讨背后的营销秘密和观众心理。

全球票房冠军电影概览

1. 《阿凡达》(Avatar,2009)

  • 全球票房:约29.23亿美元
  • 登顶原因
    • 技术创新:开创性的3D技术和CGI特效,为观众带来前所未有的视觉体验
    • 普世主题:环保、反殖民、人与自然和谐共处的主题引发全球共鸣
    • 导演效应:詹姆斯·卡梅隆继《泰坦尼克号》后的又一力作,自带话题性

2. 《复仇者联盟4:终局之战》(Avengers: Endgame,2019)

  • 全球票房:约27.99亿美元
  • 登顶原因
    • IP积累:漫威电影宇宙11年22部电影的铺垫,情感积累达到顶峰
    • 粉丝经济:对核心粉丝的精准回馈,满足情怀需求
    • 事件营销:将电影上映打造成全球性文化事件

3. 《泰坦尼克号》(Titanic,1997)

  • 全球票房:约22.01亿美元(不含重映)
  • 登顶原因
    • 史诗爱情:将灾难与爱情完美结合,创造跨时代经典
    • 技术标杆:当时最高制作水准的视觉效果
    • 音乐传播:主题曲《My Heart Will Go On》成为全球流行文化符号

3. 《星球大战》(Star Wars,1977)

  • 全球票房:约7.75亿美元(不含重映)
  • 登顶原因
    • 开创性:开创科幻电影新纪元,定义太空歌剧类型
    • 文化现象:创造独特的世界观和粉丝文化
    • 技术革命:工业光魔的诞生与发展

4. 《乱世佳人》(Gone with the Wind,1939)

  • 全球票房:约3.91亿美元(不含重映)
  • 登顶原因
    • 文学改编:经典文学作品的完美影视化
    • 时代印记:美国南方历史的史诗呈现
    • 明星效应:费雯·丽和克拉克·盖博的经典演绎

登顶秘诀:技术、情感与时机的完美结合

1. 技术创新引领观影革命

《阿凡达》的成功证明了技术创新对票房的巨大推动力。卡梅隆团队开发的立体摄影系统和虚拟摄影技术,让观众沉浸在潘多拉星球的奇幻世界中。这种技术优势形成了独特的观影门槛,促使观众必须到影院才能体验完整效果。

2. 情感共鸣的深度构建

《复仇者联盟4》通过11年的情感积累,将观众与角色建立深厚连接。钢铁侠的牺牲、美队的传承,这些角色命运的终章成为观众情感的宣泄口。这种长期的情感投资转化为票房爆发力。

3. 文化符号的创造与利用

《星球大战》不仅是一部电影,更成为一种文化符号。原力、光剑、绝地武士等概念深入人心,衍生出庞大的粉丝文化。这种文化影响力让电影超越娱乐产品,成为时代记忆。

4. 社会议题的巧妙植入

《阿凡达》通过纳美人的故事隐喻现实中的环境破坏和殖民历史,引发观众思考。这种社会议题的讨论为电影增添了深度,延长了话题生命周期。

营销秘密:从预热到长尾的完整策略

1. 事件化营销:将电影变成社会事件

《复仇者联盟4》的营销策略堪称典范:

  • 倒计时策略:提前一年开始预热,通过”终局之战”概念制造紧迫感
  • 饥饿营销:严格控制信息释放,预告片只展示必要信息
  • 粉丝特权:组织粉丝提前观影,制造口碑传播
  • 跨界联动:与各大品牌合作推出限定产品,扩大影响力

2. 社交媒体时代的精准投放

《阿凡达》的营销团队利用社交媒体进行病毒式传播:

  • 话题制造:#AvatarChallenge 鼓励用户分享自己的”阿凡达”形象
  • KOL合作:邀请科技、环保领域的意见领袖提前观影并发声
  • 互动体验:开发在线互动游戏,让用户提前体验潘多拉星球

3. 情感营销:从观众到粉丝的转化

《泰坦尼克号》的营销重点在于情感共鸣:

  • 音乐先行:提前发布主题曲,让旋律深入人心
  • 故事预热:通过纪录片等形式讲述泰坦尼克号的历史背景
  • 明星包装:将莱昂纳多和凯特打造成全球偶像
  • 情怀延续:多年后重映依然能引发观影热潮

4. 全球化与本地化结合

《复仇者联盟4》在全球不同市场采用差异化策略:

  • 亚洲市场:强调英雄情怀和视觉特效
  • 欧洲市场:突出故事深度和角色发展
  1. 北美市场:强调事件性和粉丝服务

观众选择逻辑:从需求到决策的心理路径

1. 社交需求:避免社交孤立

在社交媒体时代,观看票房冠军电影成为一种社交货币。《复仇者联盟4》上映期间,”你看过终局之战了吗?”成为社交开场白。观众选择观看这类电影,部分是为了避免在社交圈中被孤立。

2. 风险规避:从众心理的选择

面对海量影视内容,观众倾向于选择经过市场验证的产品。票房冠军代表着”多数人的选择”,这种从众心理降低了观众的选择风险。数据显示,票房前10的电影往往能占据总票房的30%以上。

3. 情感宣泄与身份认同

现代观众越来越重视电影的情感价值。《阿凡达》中的环保主题让观众产生道德认同感;《复仇者联盟4》的英雄牺牲让观众找到情感投射对象。这种身份认同感是驱动观众选择的重要因素。

4. 技术体验的稀缺性

IMAX、3D、4DX等特殊影厅的普及,让观众更愿意为技术体验付费。《阿凡达》的成功很大程度上得益于其技术独特性——只有在特定设备上才能获得完整体验,这种稀缺性创造了观影需求。

结论:票房冠军的综合制胜法则

票房冠军电影的成功绝非偶然,而是技术、情感、营销和时机的完美结合。它们不仅满足观众的娱乐需求,更创造了文化现象和社会话题。未来的票房冠军,必将在技术创新、情感深度和营销策略上继续突破,为观众带来前所未有的体验。

110101001-llm/llm/llm/llm.py

import os import json import time import requests from typing import List, Dict, Any, Optional from datetime import datetime from enum import Enum

class ModelType(Enum):

"""模型类型枚举"""
GPT_35_TURBO = "gpt-3.5-turbo"
GPT_4 = "gpt-4"
GPT_4_TURBO = "gpt-4-turbo"
CLAUDE_3_SONNET = "claude-3-sonnet"
CLAUDE_3_OPUS = "claude-3-opus"
QWEN_MAX = "qwen-max"
QWEN_PLUS = "qwen-plus"
ERNIE_BOT = "ernie-bot"
GLM_4 = "glm-4"

class LLMProvider(Enum):

"""LLM服务提供商枚举"""
OPENAI = "openai"
ANTHHROPIC = "anthropic"
ALIYUN = "aliyun"
BAIDU = "baidu"
ZHIPU = "zhipu"

class LLMConfig:

"""LLM配置类"""
def __init__(
    self,
    provider: LLMProvider,
    api_key: str,
    base_url: str = None,
    model: ModelType = ModelType.GPT_35_TURBO,
    temperature: float = 0.7,
    max_tokens: int = 2048,
    timeout: int = 30,
    max_retries: int = 3,
    retry_delay: float = 1.0
):
    self.provider = provider
    self.api_key = api_key
    self.base_url = base_url
    self.model = model
    self.temperature = temperature
    self.max_tokens = max_tokens
    self.timeout = timeout
    self.max_retries = max_retries
    self.retry_delay = retry_delay

class LLMResponse:

"""LLM响应类"""
def __init__(
    self,
    content: str,
    model: str,
    usage: Dict[str, int],
    finish_reason: str = "stop",
    timestamp: Optional[datetime] = None
):
    self.content = content
    self.model = model
    self.usage = usage
    self.finish_reason = finish_reason
    self.timestamp = timestamp or datetime.now()

def to_dict(self) -> Dict[str, Any]:
    return {
        "content": self.content,
        "model": self.model,
        "usage": self.usage,
        "finish_reason": self.finish_reason,
        "timestamp": self.timestamp.isoformat()
    }

class LLMClient:

"""LLM客户端基类"""

def __init__(self, config: LLMConfig):
    self.config = config
    self.session = requests.Session()
    self.session.headers.update({
        "Content-Type": "application/json",
        "User-Agent": "LLMClient/1.0"
    })

def _build_url(self) -> str:
    """构建API URL"""
    if self.config.base_url:
        return self.config.base_url

    base_urls = {
        LLMProvider.OPENAI: "https://api.openai.com/v1/chat/completions",
        LLMProvider.ANTHHROPIC: "https://api.anthropic.com/v1/messages",
        LLMProvider.ALIYUN: "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation",
        LLMProvider.BAIDU: "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions",
        LLMProvider.ZHIPU: "https://open.bigmodel.cn/api/paas/v4/chat/completions"
    }
    return base_urls.get(self.config.provider, "")

def _build_payload(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
    """构建请求体"""
    payload = {
        "model": self.config.model.value,
        "messages": messages,
        "temperature": self.config.temperature,
        "max_tokens": self.config.max_tokens
    }

    # 特殊处理百度的API
    if self.config.provider == LLMProvider.BAIDU:
        payload = {
            "messages": messages,
            "temperature": self.config.temperature,
            "max_tokens": self.config.max_tokens
        }

    return payload

def _parse_response(self, response: requests.Response) -> LLMResponse:
    """解析响应"""
    try:
        data = response.json()
    except json.JSONDecodeError:
        raise ValueError(f"Invalid JSON response: {response.text}")

    if response.status_code != 200:
        error_msg = data.get("error", {}).get("message", "Unknown error")
        raise ValueError(f"API Error ({response.status_code}): {error_msg}")

    # 不同提供商的响应格式不同
    if self.config.provider == LLMProvider.OPENAI:
        choices = data.get("choices", [])
        if not choices:
            raise ValueError("No choices in response")
        content = choices[0]["message"]["content"]
        usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
        finish_reason = choices[0].get("finish_reason", "stop")

    elif self.config.provider == LLMProvider.ANTHHROPIC:
        content = data.get("content", [{}])[0].get("text", "")
        usage = {
            "prompt_tokens": data.get("input_tokens", 0),
            "completion_tokens": data.get("output_tokens", 0),
            "total_tokens": data.get("input_tokens", 0) + data.get("output_tokens", 0)
        }
        finish_reason = data.get("stop_reason", "stop")

    elif self.config.provider == LLMProvider.ALIYUN:
        output = data.get("output", {})
        content = output.get("text", "")
        usage = output.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
        finish_reason = "stop"  # 阿里云不返回finish_reason

    elif self.config.provider == LLMProvider.BAIDU:
        content = data.get("result", "")
        usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
        finish_reason = "stop"  # 百度不返回finish_reason

    elif self.config.provider == LLMProvider.ZHIPU:
        choices = data.get("choices", [])
        if not choices:
            raise ValueError("No choices in response")
        content = choices[0]["message"]["content"]
        usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
        finish_reason = choices[0].get("finish_reason", "stop")

    else:
        raise ValueError(f"Unsupported provider: {self.config.provider}")

    return LLMResponse(
        content=content,
        model=data.get("model", self.config.model.value),
        usage=usage,
        finish_reason=finish_reason
    )

def _add_auth(self, headers: Dict[str, str]) -> Dict[str, str]:
    """添加认证信息"""
    if self.config.provider == LLMProvider.OPENAI:
        headers["Authorization"] = f"Bearer {self.config.api_key}"
    elif self.config.provider == LLMProvider.ANTHHROPIC:
        headers["x-api-key"] = self.config.api_key
        headers["anthropic-version"] = "2023-06-01"
    elif self.config.provider == LLMProvider.ALIYUN:
        headers["Authorization"] = f"Bearer {self.config.api_key}"
    elif self.config.provider == LLMProvider.BAIDU:
        # 百度需要access_token,这里假设api_key就是access_token
        # 实际使用时需要先调用access_token接口
        pass
    elif self.config.provider == LLMProvider.ZHIPU:
        headers["Authorization"] = f"Bearer {self.config.api_key}"
    return headers

def chat(self, messages: List[Dict[str, str]]) -> LLMResponse:
    """
    聊天接口

    Args:
        messages: 消息列表,格式 [{"role": "user", "content": "hello"}]

    Returns:
        LLMResponse: 响应对象

    Raises:
        ValueError: 当API调用失败时
    """
    url = self._build_url()
    payload = self._build_payload(messages)
    headers = self._add_auth({})

    for attempt in range(self.config.max_retries):
        try:
            response = self.session.post(
                url,
                json=payload,
                headers=headers,
                timeout=self.config.timeout
            )
            return self._parse_response(response)
        except Exception as e:
            if attempt == self.config.max_retries - 1:
                raise
            time.sleep(self.config.retry_delay * (2 ** attempt))  # 指数退避

def chat_stream(self, messages: List[Dict[str, str]]) -> str:
    """
    流式聊天接口(仅支持OpenAI和Anthropic)

    Args:
        messages: 消息列表

    Returns:
        str: 完整响应内容
    """
    if self.config.provider not in [LLMProvider.OPENAI, LLMProvider.ANTHHROPIC]:
        # 对于不支持流式的提供商,回退到普通chat
        return self.chat(messages).content

    url = self._build_url()
    payload = self._build_payload(messages)
    payload["stream"] = True
    headers = self._add_auth({})

    full_content = ""
    for attempt in range(self.config.max_retries):
        try:
            with self.session.post(
                url,
                json=payload,
                headers=headers,
                timeout=self.config.timeout,
                stream=True
            ) as response:
                if response.status_code != 200:
                    raise ValueError(f"API Error: {response.status_code}")

                for line in response.iter_lines():
                    if not line:
                        continue
                    line = line.decode('utf-8')
                    if line.startswith('data: '):
                        data_str = line[6:]  # 去掉 'data: '
                        if data_str == '[DONE]':
                            break
                        try:
                            data = json.loads(data_str)
                            if self.config.provider == LLMProvider.OPENAI:
                                choices = data.get("choices", [])
                                if choices:
                                    delta = choices[0].get("delta", {})
                                    content = delta.get("content", "")
                                    if content:
                                        print(content, end="", flush=True)
                                        full_content += content
                            elif self.config.provider == LLMProvider.ANTHHROPIC:
                                # Anthropic流式处理
                                content = data.get("content", "")
                                if content:
                                    print(content, end="", flush=True)
                                    full_content += content
                        except json.JSONDecodeError:
                            continue
            print()  # 换行
            return full_content
        except Exception as e:
            if attempt == self.config.max_retries - 1:
                raise
            time.sleep(self.config.retry_delay * (2 ** attempt))

def generate(
    self,
    prompt: str,
    system_prompt: str = "You are a helpful assistant.",
    history: Optional[List[Dict[str, str]]] = None
) -> LLMResponse:
    """
    便捷的生成接口

    Args:
        prompt: 用户输入
        system_prompt: 系统提示词
        history: 历史消息

    Returns:
        LLMResponse: 响应对象
    """
    messages = [{"role": "system", "content": system_prompt}]
    if history:
        messages.extend(history)
    messages.append({"role": "user", "content": prompt})
    return self.chat(messages)

def batch_generate(
    self,
    prompts: List[str],
    system_prompt: str = "You are a helpful assistant.",
    delay: float = 1.0
) -> List[LLMResponse]:
    """
    批量生成

    Args:
        prompts: 提示词列表
        system_prompt: 系统提示词
        delay: 请求间隔

    Returns:
        List[LLMResponse]: 响应列表
    """
    results = []
    for prompt in prompts:
        response = self.generate(prompt, system_prompt)
        results.append(response)
        time.sleep(delay)
    return results

def count_tokens(self, messages: List[Dict[str, str]]) -> int:
    """
    估算token数量(简单实现,实际应根据具体模型的tokenizer)

    Args:
        messages: 消息列表

    Returns:
        int: 估算的token数量
    """
    total_text = ""
    for msg in messages:
        total_text += f"{msg['role']}: {msg['content']}\n"
    # 简单估算:1个token ≈ 4个字符
    return len(total_text) // 4

class LLMManager:

"""LLM管理器,支持多模型切换"""

def __init__(self):
    self.clients: Dict[str, LLMClient] = {}
    self.current_client: Optional[LLMClient] = None
    self.history: List[Dict[str, str]] = []

def register_client(self, name: str, client: LLMClient):
    """注册客户端"""
    self.clients[name] = client
    if self.current_client is None:
        self.current_client = client

def set_current_client(self, name: str):
    """设置当前客户端"""
    if name in self.clients:
        self.current_client = self.clients[name]
    else:
        raise ValueError(f"Client {name} not registered")

def chat(self, messages: List[Dict[str, str]]) -> LLMResponse:
    """聊天"""
    if self.current_client is None:
        raise ValueError("No client registered")
    return self.current_client.chat(messages)

def chat_stream(self, messages: List[Dict[str, str]]) -> str:
    """流式聊天"""
    if self.current_client is None:
        raise ValueError("No client registered")
    return self.current_client.chat_stream(messages)

def generate(
    self,
    prompt: str,
    system_prompt: str = "You are a helpful assistant.",
    save_history: bool = False
) -> LLMResponse:
    """生成"""
    if self.current_client is None:
        raise ValueError("No client registered")

    messages = [{"role": "system", "content": system_prompt}]
    if save_history:
        messages.extend(self.history)
    messages.append({"role": "user", "content": prompt})

    response = self.current_client.chat(messages)

    if save_history:
        self.history.append({"role": "user", "content": prompt})
        self.history.append({"role": "assistant", "content": response.content})

    return response

def clear_history(self):
    """清除历史记录"""
    self.history = []

def get_history(self) -> List[Dict[str, str]]:
    """获取历史记录"""
    return self.history.copy()

使用示例

if name == “main”:

# 示例1:OpenAI配置
openai_config = LLMConfig(
    provider=LLMProvider.OPENAI,
    api_key=os.getenv("OPENAI_API_KEY", "your-openai-key"),
    model=ModelType.GPT_35_TURBO,
    temperature=0.7,
    max_tokens=1024
)
openai_client = LLMClient(openai_config)

# 示例2:Anthropic配置
anthropic_config = LLMConfig(
    provider=LLMProvider.ANTHHROPIC,
    api_key=os.getenv("ANTHROPIC_API_KEY", "your-anthropic-key"),
    model=ModelType.CLAUDE_3_SONNET,
    temperature=0.7,
    max_tokens=2048
)
anthropic_client = LLMClient(anthropic_config)

# 示例3:阿里云配置
aliyun_config = LLMConfig(
    provider=LLMProvider.ALIYUN,
    api_key=os.getenv("ALIYUN_API_KEY", "your-aliyun-key"),
    model=ModelType.QWEN_MAX,
    temperature=0.7,
    max_tokens=1024
)
aliyun_client = LLMClient(aliyun_config)

# 使用管理器
manager = LLMManager()
manager.register_client("openai", openai_client)
manager.register_client("anthropic", anthropic_client)
manager.register_client("aliyun", aliyun_client)

# 测试对话
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Hello, how are you?"}
]

try:
    # 普通对话
    response = manager.chat(messages)
    print("Response:", response.content)
    print("Usage:", response.usage)

    # 流式对话
    print("\nStreaming:")
    full_content = manager.chat_stream(messages)
    print("Full content:", full_content)

except Exception as e:
    print(f"Error: {e}")</think># LLM大语言模型接口封装库

项目概述

这是一个功能完整的大语言模型(LLM)接口封装库,支持多家主流LLM服务提供商,包括OpenAI、Anthropic、阿里云、百度和智谱AI。该库提供了统一的API接口,支持普通对话、流式输出、批量生成、历史记录管理等核心功能。

核心设计

1. 枚举类设计

from enum import Enum

class ModelType(Enum):
    """模型类型枚举"""
    GPT_35_TURBO = "gpt-3.5-turbo"
    GPT_4 = "gpt-4"
    GPT_4_TURBO = "gpt-4-turbo"
    CLAUDE_3_SONNET = "claude-3-sonnet"
    CLAUDE_3_OPUS = "claude-3-opus"
    QWEN_MAX = "qwen-max"
    QWEN_PLUS = "qwen-plus"
    ERNIE_BOT = "ernie-bot"
    GLM_4 = "glm-4"

class LLMProvider(Enum):
    """LLM服务提供商枚举"""
    OPENAI = "openai"
    ANTHHROPIC = "anthropic"
    ALIYUN = "aliyun"
    BAIDU = "baidu"
    ZHIPU = "zhipu"

使用枚举类的好处:

  • 类型安全,避免字符串拼写错误
  • 代码可读性强
  • IDE自动补全支持

2. 配置类设计

class LLMConfig:
    """LLM配置类"""
    def __init__(
        self,
        provider: LLMProvider,
        api_key: str,
        base_url: str = None,
        model: ModelType = ModelType.GPT_35_TURBO,
        temperature: float = 0.7,
        max_tokens: int = 2048,
        timeout: int = 30,
        max_retries: int = 3,
        retry_delay: float = 1.0
    ):
        self.provider = provider
        self.api_key = api_key
        self.base_url = base_url
        self.model = model
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.timeout = timeout
        self.max_retries = max_retries
        self.retry_delay = retry_delay

配置类集中管理所有参数,支持:

  • 自定义base_url(用于代理或私有部署)
  • 超时和重试机制
  • 模型参数(temperature, max_tokens)

3. 响应类设计

class LLMResponse:
    """LLM响应类"""
    def __init__(
        self,
        content: str,
        model: str,
        usage: Dict[str, int],
        finish_reason: str = "stop",
        timestamp: Optional[datetime] = None
    ):
        self.content = content
        self.model = model
        self.usage = usage
        self.finish_reason = finish_reason
        self.timestamp = timestamp or datetime.now()

    def to_dict(self) -> Dict[str, Any]:
        return {
            "content": self.content,
            "model": self.model,
            "usage": self.usage,
            "finish_reason": self.finish_reason,
            "timestamp": self.timestamp.isoformat()
        }

标准化响应格式,包含:

  • 生成内容
  • 使用的模型
  • token用量统计
  • 完成原因(正常结束/长度限制)
  • 时间戳

核心功能实现

1. 多提供商支持

class LLMClient:
    """LLM客户端基类"""
    
    def _build_url(self) -> str:
        """构建API URL"""
        if self.config.base_url:
            return self.config.base_url
        
        base_urls = {
            LLMProvider.OPENAI: "https://api.openai.com/v1/chat/completions",
            LLMProvider.ANTHHROPIC: "https://api.anthropic.com/v1/messages",
            LLMProvider.ALIYUN: "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation",
            LLMProvider.BAIDU: "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions",
            LLMProvider.ZHIPU: "https://open.bigmodel.cn/api/paas/v4/chat/completions"
        }
        return base_urls.get(self.config.provider, "")

2. 请求体构建(适配不同API)

def _build_payload(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
    """构建请求体"""
    payload = {
        "model": self.config.model.value,
        "messages": messages,
        "temperature": self.config.temperature,
        "max_tokens": self.config.max_tokens
    }
    
    # 特殊处理百度的API
    if self.config.provider == LLMProvider.BAIDU:
        payload = {
            "messages": messages,
            "temperature": self.config.temperature,
            "max_tokens": self.config.max_tokens
        }
    
    return payload

3. 响应解析(适配不同API格式)

def _parse_response(self, response: requests.Response) -> LLMResponse:
    """解析响应"""
    try:
        data = response.json()
    except json.JSONDecodeError:
        raise ValueError(f"Invalid JSON response: {response.text}")

    if response.status_code != 200:
        error_msg = data.get("error", {}).get("message", "Unknown error")
        raise ValueError(f"API Error ({response.status_code}): {error_msg}")

    # 不同提供商的响应格式不同
    if self.config.provider == LLMProvider.OPENAI:
        choices = data.get("choices", [])
        if not choices:
            raise ValueError("No choices in response")
        content = choices[0]["message"]["content"]
        usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
        finish_reason = choices[0].get("finish_reason", "stop")
        
    elif self.config.provider == LLMProvider.ANTHHROPIC:
        content = data.get("content", [{}])[0].get("text", "")
        usage = {
            "prompt_tokens": data.get("input_tokens", 0),
            "completion_tokens": data.get("output_tokens", 0),
            "total_tokens": data.get("input_tokens", 0) + data.get("output_tokens", 0)
        }
        finish_reason = data.get("stop_reason", "stop")
        
    elif self.config.provider == LLMProvider.ALIYUN:
        output = data.get("output", {})
        content = output.get("text", "")
        usage = output.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
        finish_reason = "stop"
        
    elif self.config.provider == LLMProvider.BAIDU:
        content = data.get("result", "")
        usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
        finish_reason = "stop"
        
    elif self.config.provider == LLMProvider.ZHIPU:
        choices = data.get("choices", [])
        if not choices:
            raise ValueError("No choices in response")
        content = choices[0]["message"]["content"]
        usage = data.get("usage", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
        finish_reason = choices[0].get("finish_reason", "stop")
        
    else:
        raise ValueError(f"Unsupported provider: {self.config.provider}")

    return LLMResponse(
        content=content,
        model=data.get("model", self.config.model.value),
        usage=usage,
        finish_reason=finish_reason
    )

4. 认证机制

def _add_auth(self, headers: Dict[str, str]) -> Dict[str, str]:
    """添加认证信息"""
    if self.config.provider == LLMProvider.OPENAI:
        headers["Authorization"] = f"Bearer {self.config.api_key}"
    elif self.config.provider == LLMProvider.ANTHHROPIC:
        headers["x-api-key"] = self.config.api_key
        headers["anthropic-version"] = "2023-06-01"
    elif self.config.provider == LLMProvider.ALIYUN:
        headers["Authorization"] = f"Bearer {self.config.api_key}"
    elif self.config.provider == LLMProvider.BAIDU:
        # 百度需要access_token,这里假设api_key就是access_token
        pass
    elif self.config.provider == LLMProvider.ZHIPU:
        headers["Authorization"] = f"Bearer {self.config.api_key}"
    return headers

5. 普通对话接口

def chat(self, messages: List[Dict[str, str]]) -> LLMResponse:
    """
    聊天接口
    
    Args:
        messages: 消息列表,格式 [{"role": "user", "content": "hello"}]
        
    Returns:
        LLMResponse: 响应对象
        
    Raises:
        ValueError: 当API调用失败时
    """
    url = self._build_url()
    payload = self._build_payload(messages)
    headers = self._add_auth({})

    for attempt in range(self.config.max_retries):
        try:
            response = self.session.post(
                url,
                json=payload,
                headers=headers,
                timeout=self.config.timeout
            )
            return self._parse_response(response)
        except Exception as e:
            if attempt == self.config.max_retries - 1:
                raise
            time.sleep(self.config.retry_delay * (2 ** attempt))  # 指数退避

6. 流式输出接口

def chat_stream(self, messages: List[Dict[str, str]]) -> str:
    """
    流式聊天接口(仅支持OpenAI和Anthropic)
    
    Args:
        messages: 消息列表
        
    Returns:
        str: 完整响应内容
    """
    if self.config.provider not in [LLMProvider.OPENAI, LLMProvider.ANTHHROPIC]:
        # 对于不支持流式的提供商,回退到普通chat
        return self.chat(messages).content

    url = self._build_url()
    payload = self._build_payload(messages)
    payload["stream"] = True
    headers = self._add_auth({})

    full_content = ""
    for attempt in range(self.config.max_retries):
        try:
            with self.session.post(
                url,
                json=payload,
                headers=headers,
                timeout=self.config.timeout,
                stream=True
            ) as response:
                if response.status_code != 200:
                    raise ValueError(f"API Error: {response.status_code}")

                for line in response.iter_lines():
                    if not line:
                        continue
                    line = line.decode('utf-8')
                    if line.startswith('data: '):
                        data_str = line[6:]  # 去掉 'data: '
                        if data_str == '[DONE]':
                            break
                        try:
                            data = json.loads(data_str)
                            if self.config.provider == LLMProvider.OPENAI:
                                choices = data.get("choices", [])
                                if choices:
                                    delta = choices[0].get("delta", {})
                                    content = delta.get("content", "")
                                    if content:
                                        print(content, end="", flush=True)
                                        full_content += content
                            elif self.config.provider == LLMProvider.ANTHHROPIC:
                                # Anthropic流式处理
                                content = data.get("content", "")
                                if content:
                                    print(content, end="", flush=True)
                                    full_content += content
                        except json.JSONDecodeError:
                            continue
            print()  # 换行
            return full_content
        except Exception as e:
            if attempt == self.config.max_retries - 1:
                raise
            time.sleep(self.config.retry_delay * (2 ** attempt))

7. 便捷生成接口

def generate(
    self,
    prompt: str,
    system_prompt: str = "You are a helpful assistant.",
    history: Optional[List[Dict[str, str]]] = None
) -> LLMResponse:
    """
    便捷的生成接口
    
    Args:
        prompt: 用户输入
        system_prompt: 系统提示词
        history: 历史消息
        
    Returns:
        LLMResponse: 响应对象
    """
    messages = [{"role": "system", "content": system_prompt}]
    if history:
        messages.extend(history)
    messages.append({"role": "user", "content": prompt})
    return self.chat(messages)

8. 批量生成接口

def batch_generate(
    self,
    prompts: List[str],
    system_prompt: str = "You are a helpful assistant.",
    delay: float = 1.0
) -> List[LLMResponse]:
    """
    批量生成
    
    Args:
        prompts: 提示词列表
        system_prompt: 系统提示词
        delay: 请求间隔
        
    Returns:
        List[LLMResponse]: 响应列表
    """
    results = []
    for prompt in prompts:
        response = self.generate(prompt, system_prompt)
        results.append(response)
        time.sleep(delay)
    return results

管理器模式

LLMManager类

class LLMManager:
    """LLM管理器,支持多模型切换"""
    
    def __init__(self):
        self.clients: Dict[str, LLMClient] = {}
        self.current_client: Optional[LLMClient] = None
        self.history: List[Dict[str, str]] = []

    def register_client(self, name: str, client: LLMClient):
        """注册客户端"""
        self.clients[name] = client
        if self.current_client is None:
            self.current_client = client

    def set_current_client(self, name: str):
        """设置当前客户端"""
        if name in self.clients:
            self.current_client = self.clients[name]
        else:
            raise ValueError(f"Client {name} not registered")

    def chat(self, messages: List[Dict[str, str]]) -> LLMResponse:
        """聊天"""
        if self.current_client is None:
            raise ValueError("No client registered")
        return self.current_client.chat(messages)

    def chat_stream(self, messages: List[Dict[str, str]]) -> str:
        """流式聊天"""
        if self.current_client is None:
            raise ValueError("No client registered")
        return self.current_client.chat_stream(messages)

    def generate(
        self,
        prompt: str,
        system_prompt: str = "You are a helpful assistant.",
        save_history: bool = False
    ) -> LLMResponse:
        """生成"""
        if self.current_client is None:
            raise ValueError("No client registered")
        
        messages = [{"role": "system", "content": system_prompt}]
        if save_history:
            messages.extend(self.history)
        messages.append({"role": "user", "content": prompt})
        
        response = self.current_client.chat(messages)
        
        if save_history:
            self.history.append({"role": "user", "content": prompt})
            self.history.append({"role": "assistant", "content": response.content})
        
        return response

    def clear_history(self):
        """清除历史记录"""
        self.history = []

    def get_history(self) -> List[Dict[str, str]]:
        """获取历史记录"""
        return self.history.copy()

使用示例

1. 基础使用

import os
from llm import LLMConfig, LLMClient, ModelType, LLMProvider

# 配置OpenAI客户端
config = LLMConfig(
    provider=LLMProvider.OPENAI,
    api_key=os.getenv("OPENAI_API_KEY"),
    model=ModelType.GPT_35_TURBO,
    temperature=0.7,
    max_tokens=1024
)
client = LLMClient(config)

# 普通对话
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Hello, how are you?"}
]
response = client.chat(messages)
print(response.content)
print(f"Token usage: {response.usage}")

2. 流式输出

# 流式对话(实时输出)
full_content = client.chat_stream(messages)

3. 使用管理器

from llm import LLMManager

# 创建管理器
manager = LLMManager()

# 注册多个客户端
manager.register_client("openai", openai_client)
manager.register_client("anthropic", anthropic_client)
manager.register_client("aliyun", aliyun_client)

# 切换模型
manager.set_current_client("anthropic")

# 带历史记录的对话
response = manager.generate(
    prompt="What is the capital of France?",
    system_prompt="You are a helpful assistant.",
    save_history=True
)

# 查看历史
history = manager.get_history()

4. 批量生成

# 批量处理多个问题
prompts = [
    "What is Python?",
    "What is JavaScript?",
    "What is Java?"
]

responses = client.batch_generate(prompts, delay=1.0)
for i, response in enumerate(responses):
    print(f"Q{i+1}: {response.content}")

5. 错误处理与重试

# 自动重试机制
try:
    response = client.chat(messages)
except ValueError as e:
    print(f"API Error: {e}")
    # 可以切换到备用模型
    manager.set_current_client("anthropic")
    response = client.chat(messages)

高级特性

1. Token计数

def count_tokens(self, messages: List[Dict[str, str]]) -> int:
    """
    估算token数量(简单实现,实际应根据具体模型的tokenizer)
    
    Args:
        messages: 消息列表
        
    Returns:
        int: 估算的token数量
    """
    total_text = ""
    for msg in messages:
        total_text += f"{msg['role']}: {msg['content']}\n"
    # 简单估算:1个token ≈ 4个字符
    return len(total_text) // 4

2. 响应序列化

# 将响应转换为字典,便于存储
response_dict = response.to_dict()
# 可以保存到JSON文件
import json
with open("response.json", "w") as f:
    json.dump(response_dict, f, indent=2)

最佳实践

1. 环境变量管理

import os
from dotenv import load_dotenv

load_dotenv()

config = LLMConfig(
    provider=LLMProvider.OPENAI,
    api_key=os.getenv("OPENAI_API_KEY"),
    # ...
)

2. 配置文件

# config.py
LLM_CONFIGS = {
    "openai": {
        "provider": LLMProvider.OPENAI,
        "api_key": os.getenv("OPENAI_API_KEY"),
        "model": ModelType.GPT_4_TURBO,
        "temperature": 0.7
    },
    "claude": {
        "provider": LLMProvider.ANTHHROPIC,
        "api_key": os.getenv("ANTHROPIC_API_KEY"),
        "model": ModelType.CLAUDE_3_SONNET,
        "temperature": 0.7
    }
}

3. 日志记录

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def chat_with_logging(self, messages):
    try:
        response = self.chat(messages)
        logger.info(f"Success: {response.usage}")
        return response
    except Exception as e:
        logger.error(f"Error: {e}")
        raise

4. 速率限制

import time
from functools import wraps

def rate_limit(max_per_minute: int):
    """速率限制装饰器"""
    min_interval = 60.0 / max_per_minute
    last_called = [0.0]
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            wait = min_interval - elapsed
            if wait > 0:
                time.sleep(wait)
            ret = func(*args, **kwargs)
            last_called[0] = time.time()
            return ret
        return wrapper
    return decorator

# 使用
@rate_limit(30)  # 每分钟最多30次调用
def chat_with_rate_limit(self, messages):
    return self.chat(messages)

总结

这个LLM封装库具有以下特点:

  1. 多提供商支持:统一接口支持主流LLM服务
  2. 错误处理:自动重试机制,提高稳定性
  3. 流式输出:支持实时响应展示
  4. 历史管理:内置对话历史管理
  5. 批量处理:支持批量生成,提高效率
  6. 配置灵活:参数可配置,支持自定义
  7. 易于扩展:基类设计,易于添加新提供商

通过这个库,开发者可以轻松切换不同LLM服务,无需关心底层API差异,专注于业务逻辑开发。