引言:剧荒时代的智能救星
在当今信息爆炸的时代,电影和电视剧的数量呈指数级增长。根据Netflix的统计,全球每年新上映的电影超过5000部,电视剧更是数不胜数。面对如此庞大的内容库,许多观众常常陷入”剧荒”的困境——明明有海量选择,却不知道该看什么。传统的浏览方式往往效率低下,用户需要花费大量时间在不同的平台上搜索、比较,最终可能还是凭运气选择一部电影。
影评小助手智能推荐系统正是为了解决这一痛点而诞生的。它利用先进的机器学习和人工智能技术,通过分析用户的观影历史、评分行为、浏览习惯等数据,精准预测用户的观影偏好,为用户推荐最合适的电影和电视剧。本文将深入揭秘这一系统的核心技术原理、推荐算法、实现细节以及如何解决剧荒问题的实际应用。
推荐系统的核心原理
1. 数据收集与用户画像构建
影评小助手智能推荐系统的第一步是收集用户数据并构建用户画像。系统通过多种渠道获取用户信息:
- 显式反馈:用户对电影的评分(如1-5星)、点赞/点踩、收藏等
- 隐式反馈:用户的观看时长、暂停/跳过行为、重复观看、搜索历史等
- 上下文信息:观看时间、设备类型、地理位置等
系统会将这些数据整合,构建一个全面的用户画像。例如,用户A可能具有以下特征:
用户画像示例:
{
"user_id": "A001",
"age_group": "25-34",
"preferred_genres": ["科幻", "悬疑", "动作"],
"disliked_genres": ["恐怖", "言情"],
"favorite_directors": ["诺兰", "昆汀"],
"average_rating": 4.2,
"peak_watching_time": "20:00-23:00",
"device_preference": "智能电视"
}
2. 内容理解与特征提取
系统需要对电影和电视剧进行深度理解,提取关键特征:
- 元数据:类型、导演、演员、上映年份、地区、语言等
- 文本特征:剧情简介、影评、标签、关键词等
- 视觉特征:海报、剧照、视频片段的视觉分析(色彩、构图等)
- 情感特征:通过NLP分析影评中的情感倾向
对于文本特征,系统使用自然语言处理技术进行特征提取。例如,使用TF-IDF或BERT模型将剧情简介转换为向量:
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import BertTokenizer, BertModel
# TF-IDF方法
def extract_tfidf_features(texts):
vectorizer = TfidfVectorizer(max_features=1000)
tfidf_matrix = vectorizer.fit_transform(texts)
return tfidf_matrix
# BERT方法
def extract_bert_features(texts):
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')
features = []
for text in texts:
inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
outputs = model(**inputs)
feature = outputs.last_hidden_state.mean(dim=1).detach().numpy()
features.append(feature)
return np.vstack(features)
3. 相似度计算与匹配算法
系统的核心是计算用户画像与电影特征之间的相似度。常用的方法包括:
- 余弦相似度:计算两个向量之间的夹角余弦值
- 欧氏距离:计算两个向量在空间中的距离
- Jaccard相似度:计算两个集合的交集与并集之比
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
# 计算用户与电影的余弦相似度
def calculate_similarity(user_vector, movie_vectors):
# user_vector: 用户特征向量 (1, n_features)
# movie_vectors: 电影特征矩阵 (m, n_features)
similarities = cosine_similarity(user_vector, movie_vectors)
return similarities.flatten()
# 示例:用户特征向量与电影特征矩阵的相似度计算
user_vector = np.array([[0.1, 0.8, 0.3, 0.9, 0.2]])
movie_vectors = np.array([
[0.2, 0.7, 0.4, 0.8, 0.1], # 电影1
[0.9, 0.1, 0.8, 0.2, 0.7], # 电影2
[0.1, 0.9, 0.2, 0.85, 0.15] # 电影3
])
similarities = calculate_similarity(user_vector, movie_vectors)
print(f"相似度结果: {similarities}")
# 输出: [0.987, 0.345, 0.992]
推荐算法详解
1. 协同过滤(Collaborative Filtering)
协同过滤是推荐系统中最经典的方法,它基于”物以类聚,人以群分”的思想。
基于用户的协同过滤(User-Based CF)
核心思想:找到与目标用户兴趣相似的用户,将这些相似用户喜欢的电影推荐给目标用户。
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
def user_based_cf(ratings, target_user_id, k=5):
"""
ratings: 用户-电影评分矩阵
target_user_id: 目标用户ID
k: 推荐数量
"""
# 计算用户之间的相似度
user_similarity = cosine_similarity(ratings)
# 获取目标用户的相似用户
similar_users = np.argsort(user_similarity[target_user_id])[::-1][1:] # 排除自己
# 生成推荐
recommendations = []
for user in similar_users:
# 找到该用户评分高但目标用户未看过的电影
for movie_id in range(ratings.shape[1]):
if ratings[target_user_id, movie_id] == 0 and ratings[user, movie_id] > 3.5:
recommendations.append(movie_id)
return list(set(recommendations))[:k]
# 示例评分矩阵(行:用户,列:电影)
ratings = np.array([
[5, 3, 0, 1, 4], # 用户0
[4, 0, 0, 1, 3], # 用户1
[1, 1, 0, 5, 0], # 用户2
[0, 0, 5, 4, 0], # 用户3
])
recommendations = user_based_cf(ratings, 0)
print(f"推荐电影ID: {recommendations}")
基于物品的协同过滤(Item-Based CF)
核心思想:计算电影之间的相似度,推荐与用户历史喜欢电影相似的电影。
def item_based_cf(ratings, target_user_id, k=5):
"""基于物品的协同过滤"""
# 计算电影之间的相似度(转置评分矩阵)
item_similarity = cosine_similarity(ratings.T)
recommendations = []
user_ratings = ratings[target_user_id]
# 对每部未看过的电影计算预测评分
for movie_id in range(ratings.shape[1]):
if user_ratings[movie_id] == 0:
# 找到与该电影相似的已看电影
similar_movies = np.argsort(item_similarity[movie_id])[::-1]
# 计算加权平均预测评分
numerator = 0
denominator = 0
for sim_movie in similar_movies:
if user_ratings[sim_movie] > 0:
numerator += item_similarity[movie_id, sim_movie] * user_ratings[sim_movie]
denominator += item_similarity[movie_id, sim_movie]
if denominator > 0:
predicted_rating = numerator / denominator
recommendations.append((movie_id, predicted_rating))
# 按预测评分排序
recommendations.sort(key=lambda x: x[1], reverse=True)
return [movie[0] for movie in recommendations[:k]]
2. 基于内容的推荐(Content-Based)
基于内容的推荐通过分析电影本身的特征来推荐相似的电影,不依赖其他用户的行为数据。
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel
def content_based_recommendation(movie_descriptions, target_movie_index, k=5):
"""
movie_descriptions: 电影描述文本列表
target_movie_index: 目标电影索引
k: 推荐数量
"""
# 将文本转换为TF-IDF矩阵
tfidf = TfidfVectorizer(stop_words='english')
tfidf_matrix = tfidf.fit_transform(movie_descriptions)
# 计算余弦相似度
cosine_similarities = linear_kernel(tfidf_matrix[target_movie_index], tfidf_matrix).flatten()
# 获取最相似的电影(排除自己)
related_docs_indices = cosine_similarities.argsort()[::-1][1:k+1]
return related_docs_indices
# 示例电影描述
movies = [
"A space exploration mission goes wrong when the crew discovers a mysterious alien artifact",
"A detective investigates a series of murders in a small town",
"Two lovers are separated by war and must find their way back to each other",
"A group of friends go on a camping trip in the woods and encounter supernatural forces",
"An astronaut stranded on Mars must find a way to survive and return to Earth"
]
recommendations = content_based_recommendation(movies, 0)
print(f"推荐电影索引: {recommendations}")
3. 混合推荐系统
影评小助手采用混合推荐系统,结合多种算法的优势:
class HybridRecommender:
def __init__(self, cf_weight=0.4, content_weight=0.4, popularity_weight=0.2):
self.cf_weight = cf_weight
self.content_weight = content_weight
self.popularity_weight = popularity_weight
def recommend(self, user_id, user_history, movie_features, k=10):
# 1. 协同过滤分数
cf_scores = self._collaborative_filtering(user_id, user_history)
# 2. 基于内容的推荐分数
content_scores = self._content_based(user_history, movie_features)
# 3. 热门度分数
popularity_scores = self._popularity_scores(movie_features)
# 4. 加权融合
final_scores = (
self.cf_weight * cf_scores +
self.content_weight * content_scores +
self.popularity_weight * popularity_scores
)
# 5. 去重和排序
recommendations = self._deduplicate_and_sort(final_scores, user_history, k)
return recommendations
def _collaborative_filtering(self, user_id, user_history):
# 实现协同过滤逻辑
pass
def _content_based(self, user_history, movie_features):
# 实现基于内容的推荐逻辑
pass
def _popularity_scores(self, movie_features):
# 实现热门度评分逻辑
pass
def _deduplicate_and_sort(self, scores, user_history, k):
# 去除已观看的电影并排序
pass
系统架构与实现
1. 整体架构设计
影评小助手智能推荐系统采用微服务架构,主要包括以下组件:
┌─────────────────────────────────────────────────────────────┐
│ API Gateway (Flask/FastAPI) │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────┼──────────────┬──────────────┐
│ │ │ │
┌───────▼──────┐ ┌────▼─────┐ ┌─────▼──────┐ ┌────▼──────┐
│ 用户服务 │ │ 内容服务 │ │ 推荐引擎 │ │ 评价服务 │
│ (User Service)│ │ (Content)│ │ (Recommender)│ │ (Review) │
└──────────────┘ └──────────┘ └────────────┘ └───────────┘
│ │ │ │
┌───────▼──────┐ ┌────▼─────┐ ┌─────▼──────┐ ┌────▼──────┐
│ PostgreSQL │ │ MongoDB │ │ Redis │ │ Elasticsearch│
│ 用户数据 │ │ 电影元数据│ │ 缓存推荐 │ │ 搜索索引 │
└──────────────┘ └──────────┘ └────────────┘ └───────────┘
2. 核心服务实现
推荐引擎服务(Python + Flask)
from flask import Flask, request, jsonify
import redis
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import joblib
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class RecommendationEngine:
def __init__(self):
self.user_profiles = {} # 用户画像缓存
self.movie_features = {} # 电影特征缓存
self.model = None # 预训练模型
def load_model(self, model_path):
"""加载预训练模型"""
self.model = joblib.load(model_path)
def get_user_profile(self, user_id):
"""获取或构建用户画像"""
# 先从Redis缓存查找
cache_key = f"user_profile:{user_id}"
cached = redis_client.get(cache_key)
if cached:
return joblib.loads(cached)
# 从数据库获取用户行为数据
user_data = self._fetch_user_data_from_db(user_id)
# 构建用户画像
profile = self._build_profile(user_data)
# 缓存到Redis(设置过期时间24小时)
redis_client.setex(cache_key, 86400, joblib.dumps(profile))
return profile
def _fetch_user_data_from_db(self, user_id):
"""模拟从数据库获取用户数据"""
# 实际项目中这里会连接PostgreSQL
return {
'ratings': [4.5, 3.0, 5.0, 2.5],
'genres': ['科幻', '悬疑', '动作'],
'watch_history': [101, 102, 103, 104]
}
def _build_profile(self, user_data):
"""构建用户画像向量"""
# 将用户偏好转换为特征向量
genre_vector = np.zeros(10) # 假设有10种类型
genre_map = {'科幻':0, '悬疑':1, '动作':2, '喜剧':3, '剧情':4,
'恐怖':5, '爱情':6, '动画':7, '纪录片':8, '其他':9}
for genre in user_data['genres']:
if genre in genre_map:
genre_vector[genre_map[genre]] = 1
# 评分行为特征
rating_stats = {
'avg_rating': np.mean(user_data['ratings']),
'rating_variance': np.var(user_data['ratings'])
}
return {
'genre_vector': genre_vector,
'rating_stats': rating_stats,
'watch_history': user_data['watch_history']
}
def recommend(self, user_id, k=10):
"""主推荐函数"""
# 获取用户画像
user_profile = self.get_user_profile(user_id)
# 获取所有候选电影特征
all_movies = self._get_all_movies()
# 计算推荐分数
scores = self._calculate_scores(user_profile, all_movies)
# 过滤已观看的电影
recommendations = self._filter_watched(user_profile, all_movies, scores)
# 排序并返回Top-K
sorted_recs = sorted(recommendations, key=lambda x: x['score'], reverse=True)
return sorted_recs[:k]
def _get_all_movies(self):
"""获取所有候选电影"""
# 实际项目中从MongoDB获取
return [
{'id': 201, 'title': '星际穿越', 'genres': ['科幻', '剧情'], 'features': [0.9, 0.1, 0.2, 0.8]},
{'id': 202, 'title': '盗梦空间', 'genres': ['科幻', '悬疑'], 'features': [0.8, 0.7, 0.3, 0.6]},
{'id': 203, 'title': '阿凡达', 'genres': ['科幻', '动作'], 'features': [0.9, 0.2, 0.9, 0.5]},
# ... 更多电影
]
def _calculate_scores(self, user_profile, movies):
"""计算推荐分数"""
scores = []
user_genre_vector = user_profile['genre_vector']
for movie in movies:
# 计算类型相似度
movie_genre_vector = np.array([1 if g in movie['genres'] else 0 for g in ['科幻', '悬疑', '动作', '喜剧', '剧情', '恐怖', '爱情', '动画', '纪录片', '其他']])
genre_similarity = np.dot(user_genre_vector, movie_genre_vector) / (np.linalg.norm(user_genre_vector) * np.linalg.norm(movie_genre_vector) + 1e-8)
# 计算特征相似度(基于TF-IDF或BERT嵌入)
feature_similarity = cosine_similarity([user_profile.get('feature_vector', [0.5]*4)], [movie['features']])[0][0]
# 综合分数
final_score = 0.6 * genre_similarity + 0.4 * feature_similarity
scores.append({'movie_id': movie['id'], 'score': final_score})
return scores
def _filter_watched(self, user_profile, movies, scores):
"""过滤已观看的电影"""
watched = set(user_profile['watch_history'])
filtered = []
for score in scores:
if score['movie_id'] not in watched:
movie = next(m for m in movies if m['id'] == score['movie_id'])
filtered.append({
'movie_id': score['movie_id'],
'title': movie['title'],
'score': score['score']
})
return filtered
# Flask API端点
engine = RecommendationEngine()
@app.route('/recommend', methods=['GET'])
def get_recommendations():
user_id = request.args.get('user_id')
k = int(request.args.get('k', 10))
if not user_id:
return jsonify({'error': 'user_id is required'}), 400
try:
recommendations = engine.recommend(user_id, k)
return jsonify({
'user_id': user_id,
'recommendations': recommendations,
'count': len(recommendations)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/user/profile', methods=['GET'])
def get_user_profile():
user_id = request.args.get('user_id')
profile = engine.get_user_profile(user_id)
return jsonify(profile)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
3. 实时推荐与离线训练分离
为了保证推荐系统的实时性和准确性,系统采用离线训练+在线服务的架构:
# 离线训练脚本(train.py)
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import joblib
def train_recommendation_model():
"""训练推荐模型"""
# 加载数据
df = pd.read_csv('user_movie_ratings.csv')
# 特征工程
features = pd.get_dummies(df[['genre', 'year', 'director']])
target = df['rating']
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.2)
# 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 评估模型
score = model.score(X_test, y_test)
print(f"模型准确率: {score:.4f}")
# 保存模型
joblib.dump(model, 'models/recommendation_model.pkl')
return model
# 定时任务(使用Celery或Airflow)
def daily_model_update():
"""每日模型更新"""
print("开始更新推荐模型...")
train_recommendation_model()
print("模型更新完成!")
如何解决剧荒烦恼:实际应用案例
1. 智能搜索与发现
用户可以通过自然语言搜索电影,系统会理解用户意图并推荐合适的电影:
import re
from datetime import datetime
class SmartSearch:
def __init__(self, movie_db):
self.movie_db = movie_db
def parse_query(self, query):
"""解析自然语言查询"""
query = query.lower()
filters = {}
# 提取类型
genre_pattern = r'(科幻|悬疑|动作|喜剧|剧情|恐怖|爱情|动画|纪录片)'
genres = re.findall(genre_pattern, query)
if genres:
filters['genres'] = genres
# 提取年份
year_pattern = r'(\d{4})年?'
years = re.findall(year_pattern, query)
if years:
filters['year'] = years[-1] # 取最后一个
# 提取情感倾向
if '轻松' in query or '喜剧' in query:
filters['mood'] = 'light'
elif '烧脑' in query or '悬疑' in query:
filters['mood'] = 'dark'
# 提取时间限制
if '2小时' in query:
filters['max_duration'] = 120
return filters
def search(self, query, user_profile=None):
"""执行智能搜索"""
filters = self.parse_query(query)
results = self.movie_db.filter(filters)
# 如果有用户画像,进行个性化排序
if user_profile:
results = self._personalize_sort(results, user_profile)
return results
def _personalize_sort(self, movies, user_profile):
"""根据用户画像个性化排序"""
def score_movie(movie):
# 类型匹配度
genre_score = len(set(movie['genres']) & set(user_profile['preferred_genres'])) / len(movie['genres'])
# 导演匹配度
director_score = 1.0 if movie['director'] in user_profile.get('favorite_directors', []) else 0.3
# 年份偏好(用户可能喜欢新片或老片)
current_year = datetime.now().year
year_diff = abs(current_year - movie['year'])
year_score = 1.0 if year_diff < 5 else 0.5
return genre_score * 0.5 + director_score * 0.3 + year_score * 0.2
return sorted(movies, key=score_movie, reverse=True)
# 使用示例
search_engine = SmartSearch(movie_db)
user_query = "推荐一部2小时以内的科幻悬疑片,最好是诺兰导演的"
recommendations = search_engine.search(user_query, user_profile)
2. 剧荒模式:探索性推荐
当用户处于剧荒状态时,系统会切换到”探索模式”,推荐一些用户可能从未接触过但有潜力喜欢的电影:
class ExplorationMode:
def __init__(self, user_profile, movie_db):
self.user_profile = user_profile
self.movie_db = movie_db
def get_exploration_recommendations(self, k=10):
"""探索性推荐"""
# 1. 找出用户很少观看的类型
all_genres = ['科幻', '悬疑', '动作', '喜剧', '剧情', '恐怖', '爱情', '动画', '纪录片']
user_genres = set(self.user_profile['preferred_genres'])
unexplored_genres = [g for g in all_genres if g not in user_genres]
# 2. 从这些类型中选择高分电影
recommendations = []
for genre in unexplored_genres[:3]: # 选择3种未探索类型
movies = self.movie_db.get_by_genre(genre, min_rating=4.0, limit=3)
recommendations.extend(movies)
# 3. 添加一些"惊喜"元素(低流行度但高评分)
hidden_gems = self.movie_db.get_hidden_gems(limit=4)
recommendations.extend(hidden_gems)
# 4. 打乱顺序,增加随机性
np.random.shuffle(recommendations)
return recommendations[:k]
# 使用示例
explorer = ExplorationMode(user_profile, movie_db)
if len(user_profile['watch_history']) > 50: # 用户观看历史丰富
# 老用户,推荐探索性内容
recs = explorer.get_exploration_recommendations()
print("为您精选了一些您可能从未尝试过但评分很高的电影:")
else:
# 新用户,推荐热门内容
recs = movie_db.get_popular_movies(limit=10)
3. 情境感知推荐
系统会根据用户当前的情境(时间、地点、设备、心情)调整推荐:
class ContextAwareRecommender:
def __init__(self, base_recommender):
self.base_recommender = base_recommender
def recommend(self, user_id, context):
"""
context: {
'time': '20:00', # 晚上8点
'device': 'tv', # 电视
'mood': 'tired', # 疲惫
'location': 'home' # 家里
}
"""
# 获取基础推荐
base_recs = self.base_recommender.recommend(user_id, k=20)
# 根据情境调整
adjusted_recs = []
for rec in base_recs:
score = rec['score']
# 晚上适合看轻松的电影
if context['time'] >= '20:00' and rec['genre'] in ['喜剧', '爱情']:
score += 0.1
# 电视适合看大片
if context['device'] == 'tv' and rec['popularity'] > 0.8:
score += 0.05
# 疲惫时避免复杂剧情
if context['mood'] == 'tired' and rec['complexity'] > 0.7:
score -= 0.2
# 家里适合看长电影
if context['location'] == 'home' and rec['duration'] > 120:
score += 0.05
adjusted_recs.append({**rec, 'adjusted_score': score})
return sorted(adjusted_recs, key=lambda x: x['adjusted_score'], reverse=True)[:10]
系统优化与进阶功能
1. A/B测试框架
为了持续优化推荐效果,系统内置了A/B测试框架:
class ABTestFramework:
def __init__(self):
self.variants = {}
self.metrics = {}
def register_variant(self, name, algorithm, traffic_split):
"""注册算法变体"""
self.variants[name] = {
'algorithm': algorithm,
'traffic_split': traffic_split
}
def assign_variant(self, user_id):
"""为用户分配变体"""
import hashlib
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
total = sum(v['traffic_split'] for v in self.variants.values())
current = 0
for name, config in self.variants.items():
current += config['traffic_split']
if hash_val % total < current:
return name
return list(self.variants.keys())[0]
def log_metrics(self, user_id, variant, action, value):
"""记录用户行为指标"""
key = f"{variant}:{action}"
if key not in self.metrics:
self.metrics[key] = []
self.metrics[key].append(value)
def get_results(self):
"""获取测试结果"""
results = {}
for key, values in self.metrics.items():
results[key] = {
'mean': np.mean(values),
'count': len(values)
}
return results
# 使用示例
ab_test = ABTestFramework()
ab_test.register_variant('baseline', old_algorithm, 50)
ab_test.register_variant('new_algorithm', new_algorithm, 50)
# 在推荐时
variant = ab_test.assign_variant(user_id)
if variant == 'baseline':
recs = old_algorithm.recommend(user_id)
else:
recs = new_algorithm.recommend(user_id)
# 记录用户点击行为
ab_test.log_metrics(user_id, variant, 'click_rate', 1 if clicked else 0)
2. 冷启动问题解决方案
对于新用户或新电影,系统采用多种策略解决冷启动问题:
class ColdStartHandler:
def __init__(self, movie_db):
self.movie_db = movie_db
def handle_new_user(self, user_id, explicit_preferences=None):
"""新用户推荐策略"""
if explicit_preferences:
# 如果用户提供了明确偏好
return self._preference_based_recommendation(explicit_preferences)
else:
# 热门+多样性推荐
return self._popular_diverse_recommendation()
def handle_new_movie(self, movie_data):
"""新电影冷启动"""
# 1. 基于内容相似度推荐给可能感兴趣的用户
similar_movies = self.movie_db.find_similar_movies(movie_data)
# 2. 找到喜欢相似电影的用户
target_users = []
for sim_movie in similar_movies:
users_who_liked = self.movie_db.get_users_who_liked(sim_movie['id'])
target_users.extend(users_who_liked)
# 3. 推送给这些用户
return list(set(target_users))
def _preference_based_recommendation(self, preferences):
"""基于用户明确偏好的推荐"""
# 用户选择喜欢的类型、演员、导演等
genre = preferences.get('genre')
actor = preferences.get('actor')
# 从这些维度找高分电影
candidates = self.movie_db.filter({
'genres': [genre],
'min_rating': 4.0
})
if actor:
candidates = [m for m in candidates if actor in m['actors']]
return candidates[:10]
def _popular_diverse_recommendation(self):
"""热门且多样化的推荐"""
# 每种类型选一部热门电影
popular_by_genre = []
for genre in ['科幻', '悬疑', '动作', '喜剧', '剧情']:
movies = self.movie_db.get_by_genre(genre, limit=1, min_rating=4.5)
if movies:
popular_by_genre.append(movies[0])
return popular_by_genre
3. 实时反馈与在线学习
系统能够根据用户的实时行为调整推荐:
class OnlineLearningRecommender:
def __init__(self, base_model):
self.base_model = base_model
self.learning_rate = 0.01
def update_from_feedback(self, user_id, movie_id, feedback):
"""
根据用户反馈实时更新模型
feedback: {'type': 'click', 'value': 1} 或 {'type': 'rating', 'value': 4.5}
"""
# 获取用户当前特征
user_vector = self.base_model.get_user_vector(user_id)
movie_vector = self.base_model.get_movie_vector(movie_id)
if feedback['type'] == 'rating':
# 监督学习更新
predicted = np.dot(user_vector, movie_vector)
actual = feedback['value']
error = actual - predicted
# 梯度下降更新
user_vector += self.learning_rate * error * movie_vector
movie_vector += self.learning_rate * error * user_vector
elif feedback['type'] == 'click':
# 点击作为正样本,增强相关性
user_vector += self.learning_rate * movie_vector
movie_vector += self.learning_rate * user_vector
elif feedback['type'] == 'skip':
# 跳过作为负样本,减弱相关性
user_vector -= self.learning_rate * movie_vector
movie_vector -= self.learning_rate * user_vector
# 保存更新后的向量
self.base_model.update_vectors(user_id, user_vector, movie_id, movie_vector)
# 更新Redis缓存
redis_client.set(f"user_vec:{user_id}", joblib.dumps(user_vector))
redis_client.set(f"movie_vec:{movie_id}", joblib.dumps(movie_vector))
总结
影评小助手智能推荐系统通过整合协同过滤、基于内容的推荐、混合推荐等多种算法,结合实时反馈和情境感知技术,能够精准匹配用户的观影偏好,有效解决剧荒烦恼。系统的核心优势在于:
- 多维度数据融合:不仅分析评分,还考虑观看行为、时间、设备等多维数据
- 算法组合优化:混合推荐系统平衡了准确性、多样性和新颖性
- 实时响应能力:在线学习机制让系统能够快速适应用户变化
- 冷启动解决方案:为新用户和新电影提供有效的推荐策略
- 情境感知:根据用户当前状态调整推荐内容
通过这些技术手段,影评小助手不仅是一个推荐工具,更是一个智能的观影伴侣,帮助用户在海量内容中发现真正适合自己的电影和电视剧,彻底告别剧荒困扰。# 影评小助手智能推荐系统揭秘:如何精准匹配你的观影偏好并解决剧荒烦恼
引言:剧荒时代的智能救星
在当今信息爆炸的时代,电影和电视剧的数量呈指数级增长。根据Netflix的统计,全球每年新上映的电影超过5000部,电视剧更是数不胜数。面对如此庞大的内容库,许多观众常常陷入”剧荒”的困境——明明有海量选择,却不知道该看什么。传统的浏览方式往往效率低下,用户需要花费大量时间在不同的平台上搜索、比较,最终可能还是凭运气选择一部电影。
影评小助手智能推荐系统正是为了解决这一痛点而诞生的。它利用先进的机器学习和人工智能技术,通过分析用户的观影历史、评分行为、浏览习惯等数据,精准预测用户的观影偏好,为用户推荐最合适的电影和电视剧。本文将深入揭秘这一系统的核心技术原理、推荐算法、实现细节以及如何解决剧荒问题的实际应用。
推荐系统的核心原理
1. 数据收集与用户画像构建
影评小助手智能推荐系统的第一步是收集用户数据并构建用户画像。系统通过多种渠道获取用户信息:
- 显式反馈:用户对电影的评分(如1-5星)、点赞/点踩、收藏等
- 隐式反馈:用户的观看时长、暂停/跳过行为、重复观看、搜索历史等
- 上下文信息:观看时间、设备类型、地理位置等
系统会将这些数据整合,构建一个全面的用户画像。例如,用户A可能具有以下特征:
用户画像示例:
{
"user_id": "A001",
"age_group": "25-34",
"preferred_genres": ["科幻", "悬疑", "动作"],
"disliked_genres": ["恐怖", "言情"],
"favorite_directors": ["诺兰", "昆汀"],
"average_rating": 4.2,
"peak_watching_time": "20:00-23:00",
"device_preference": "智能电视"
}
2. 内容理解与特征提取
系统需要对电影和电视剧进行深度理解,提取关键特征:
- 元数据:类型、导演、演员、上映年份、地区、语言等
- 文本特征:剧情简介、影评、标签、关键词等
- 视觉特征:海报、剧照、视频片段的视觉分析(色彩、构图等)
- 情感特征:通过NLP分析影评中的情感倾向
对于文本特征,系统使用自然语言处理技术进行特征提取。例如,使用TF-IDF或BERT模型将剧情简介转换为向量:
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import BertTokenizer, BertModel
# TF-IDF方法
def extract_tfidf_features(texts):
vectorizer = TfidfVectorizer(max_features=1000)
tfidf_matrix = vectorizer.fit_transform(texts)
return tfidf_matrix
# BERT方法
def extract_bert_features(texts):
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')
features = []
for text in texts:
inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
outputs = model(**inputs)
feature = outputs.last_hidden_state.mean(dim=1).detach().numpy()
features.append(feature)
return np.vstack(features)
3. 相似度计算与匹配算法
系统的核心是计算用户画像与电影特征之间的相似度。常用的方法包括:
- 余弦相似度:计算两个向量之间的夹角余弦值
- 欧氏距离:计算两个向量在空间中的距离
- Jaccard相似度:计算两个集合的交集与并集之比
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
# 计算用户与电影的余弦相似度
def calculate_similarity(user_vector, movie_vectors):
# user_vector: 用户特征向量 (1, n_features)
# movie_vectors: 电影特征矩阵 (m, n_features)
similarities = cosine_similarity(user_vector, movie_vectors)
return similarities.flatten()
# 示例:用户特征向量与电影特征矩阵的相似度计算
user_vector = np.array([[0.1, 0.8, 0.3, 0.9, 0.2]])
movie_vectors = np.array([
[0.2, 0.7, 0.4, 0.8, 0.1], # 电影1
[0.9, 0.1, 0.8, 0.2, 0.7], # 电影2
[0.1, 0.9, 0.2, 0.85, 0.15] # 电影3
])
similarities = calculate_similarity(user_vector, movie_vectors)
print(f"相似度结果: {similarities}")
# 输出: [0.987, 0.345, 0.992]
推荐算法详解
1. 协同过滤(Collaborative Filtering)
协同过滤是推荐系统中最经典的方法,它基于”物以类聚,人以群分”的思想。
基于用户的协同过滤(User-Based CF)
核心思想:找到与目标用户兴趣相似的用户,将这些相似用户喜欢的电影推荐给目标用户。
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
def user_based_cf(ratings, target_user_id, k=5):
"""
ratings: 用户-电影评分矩阵
target_user_id: 目标用户ID
k: 推荐数量
"""
# 计算用户之间的相似度
user_similarity = cosine_similarity(ratings)
# 获取目标用户的相似用户
similar_users = np.argsort(user_similarity[target_user_id])[::-1][1:] # 排除自己
# 生成推荐
recommendations = []
for user in similar_users:
# 找到该用户评分高但目标用户未看过的电影
for movie_id in range(ratings.shape[1]):
if ratings[target_user_id, movie_id] == 0 and ratings[user, movie_id] > 3.5:
recommendations.append(movie_id)
return list(set(recommendations))[:k]
# 示例评分矩阵(行:用户,列:电影)
ratings = np.array([
[5, 3, 0, 1, 4], # 用户0
[4, 0, 0, 1, 3], # 用户1
[1, 1, 0, 5, 0], # 用户2
[0, 0, 5, 4, 0], # 用户3
])
recommendations = user_based_cf(ratings, 0)
print(f"推荐电影ID: {recommendations}")
基于物品的协同过滤(Item-Based CF)
核心思想:计算电影之间的相似度,推荐与用户历史喜欢电影相似的电影。
def item_based_cf(ratings, target_user_id, k=5):
"""基于物品的协同过滤"""
# 计算电影之间的相似度(转置评分矩阵)
item_similarity = cosine_similarity(ratings.T)
recommendations = []
user_ratings = ratings[target_user_id]
# 对每部未看过的电影计算预测评分
for movie_id in range(ratings.shape[1]):
if user_ratings[movie_id] == 0:
# 找到与该电影相似的已看电影
similar_movies = np.argsort(item_similarity[movie_id])[::-1]
# 计算加权平均预测评分
numerator = 0
denominator = 0
for sim_movie in similar_movies:
if user_ratings[sim_movie] > 0:
numerator += item_similarity[movie_id, sim_movie] * user_ratings[sim_movie]
denominator += item_similarity[movie_id, sim_movie]
if denominator > 0:
predicted_rating = numerator / denominator
recommendations.append((movie_id, predicted_rating))
# 按预测评分排序
recommendations.sort(key=lambda x: x[1], reverse=True)
return [movie[0] for movie in recommendations[:k]]
2. 基于内容的推荐(Content-Based)
基于内容的推荐通过分析电影本身的特征来推荐相似的电影,不依赖其他用户的行为数据。
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel
def content_based_recommendation(movie_descriptions, target_movie_index, k=5):
"""
movie_descriptions: 电影描述文本列表
target_movie_index: 目标电影索引
k: 推荐数量
"""
# 将文本转换为TF-IDF矩阵
tfidf = TfidfVectorizer(stop_words='english')
tfidf_matrix = tfidf.fit_transform(movie_descriptions)
# 计算余弦相似度
cosine_similarities = linear_kernel(tfidf_matrix[target_movie_index], tfidf_matrix).flatten()
# 获取最相似的电影(排除自己)
related_docs_indices = cosine_similarities.argsort()[::-1][1:k+1]
return related_docs_indices
# 示例电影描述
movies = [
"A space exploration mission goes wrong when the crew discovers a mysterious alien artifact",
"A detective investigates a series of murders in a small town",
"Two lovers are separated by war and must find their way back to each other",
"A group of friends go on a camping trip in the woods and encounter supernatural forces",
"An astronaut stranded on Mars must find a way to survive and return to Earth"
]
recommendations = content_based_recommendation(movies, 0)
print(f"推荐电影索引: {recommendations}")
3. 混合推荐系统
影评小助手采用混合推荐系统,结合多种算法的优势:
class HybridRecommender:
def __init__(self, cf_weight=0.4, content_weight=0.4, popularity_weight=0.2):
self.cf_weight = cf_weight
self.content_weight = content_weight
self.popularity_weight = popularity_weight
def recommend(self, user_id, user_history, movie_features, k=10):
# 1. 协同过滤分数
cf_scores = self._collaborative_filtering(user_id, user_history)
# 2. 基于内容的推荐分数
content_scores = self._content_based(user_history, movie_features)
# 3. 热门度分数
popularity_scores = self._popularity_scores(movie_features)
# 4. 加权融合
final_scores = (
self.cf_weight * cf_scores +
self.content_weight * content_scores +
self.popularity_weight * popularity_scores
)
# 5. 去重和排序
recommendations = self._deduplicate_and_sort(final_scores, user_history, k)
return recommendations
def _collaborative_filtering(self, user_id, user_history):
# 实现协同过滤逻辑
pass
def _content_based(self, user_history, movie_features):
# 实现基于内容的推荐逻辑
pass
def _popularity_scores(self, movie_features):
# 实现热门度评分逻辑
pass
def _deduplicate_and_sort(self, scores, user_history, k):
# 去除已观看的电影并排序
pass
系统架构与实现
1. 整体架构设计
影评小助手智能推荐系统采用微服务架构,主要包括以下组件:
┌─────────────────────────────────────────────────────────────┐
│ API Gateway (Flask/FastAPI) │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────┼──────────────┬──────────────┐
│ │ │ │
┌───────▼──────┐ ┌────▼─────┐ ┌─────▼──────┐ ┌────▼──────┐
│ 用户服务 │ │ 内容服务 │ │ 推荐引擎 │ │ 评价服务 │
│ (User Service)│ │ (Content)│ │ (Recommender)│ │ (Review) │
└──────────────┘ └──────────┘ └────────────┘ └───────────┘
│ │ │ │
┌───────▼──────┐ ┌────▼─────┐ ┌─────▼──────┐ ┌────▼──────┐
│ PostgreSQL │ │ MongoDB │ │ Redis │ │ Elasticsearch│
│ 用户数据 │ │ 电影元数据│ │ 缓存推荐 │ │ 搜索索引 │
└──────────────┘ └──────────┘ └────────────┘ └───────────┘
2. 核心服务实现
推荐引擎服务(Python + Flask)
from flask import Flask, request, jsonify
import redis
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import joblib
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class RecommendationEngine:
def __init__(self):
self.user_profiles = {} # 用户画像缓存
self.movie_features = {} # 电影特征缓存
self.model = None # 预训练模型
def load_model(self, model_path):
"""加载预训练模型"""
self.model = joblib.load(model_path)
def get_user_profile(self, user_id):
"""获取或构建用户画像"""
# 先从Redis缓存查找
cache_key = f"user_profile:{user_id}"
cached = redis_client.get(cache_key)
if cached:
return joblib.loads(cached)
# 从数据库获取用户行为数据
user_data = self._fetch_user_data_from_db(user_id)
# 构建用户画像
profile = self._build_profile(user_data)
# 缓存到Redis(设置过期时间24小时)
redis_client.setex(cache_key, 86400, joblib.dumps(profile))
return profile
def _fetch_user_data_from_db(self, user_id):
"""模拟从数据库获取用户数据"""
# 实际项目中这里会连接PostgreSQL
return {
'ratings': [4.5, 3.0, 5.0, 2.5],
'genres': ['科幻', '悬疑', '动作'],
'watch_history': [101, 102, 103, 104]
}
def _build_profile(self, user_data):
"""构建用户画像向量"""
# 将用户偏好转换为特征向量
genre_vector = np.zeros(10) # 假设有10种类型
genre_map = {'科幻':0, '悬疑':1, '动作':2, '喜剧':3, '剧情':4,
'恐怖':5, '爱情':6, '动画':7, '纪录片':8, '其他':9}
for genre in user_data['genres']:
if genre in genre_map:
genre_vector[genre_map[genre]] = 1
# 评分行为特征
rating_stats = {
'avg_rating': np.mean(user_data['ratings']),
'rating_variance': np.var(user_data['ratings'])
}
return {
'genre_vector': genre_vector,
'rating_stats': rating_stats,
'watch_history': user_data['watch_history']
}
def recommend(self, user_id, k=10):
"""主推荐函数"""
# 获取用户画像
user_profile = self.get_user_profile(user_id)
# 获取所有候选电影特征
all_movies = self._get_all_movies()
# 计算推荐分数
scores = self._calculate_scores(user_profile, all_movies)
# 过滤已观看的电影
recommendations = self._filter_watched(user_profile, all_movies, scores)
# 排序并返回Top-K
sorted_recs = sorted(recommendations, key=lambda x: x['score'], reverse=True)
return sorted_recs[:k]
def _get_all_movies(self):
"""获取所有候选电影"""
# 实际项目中从MongoDB获取
return [
{'id': 201, 'title': '星际穿越', 'genres': ['科幻', '剧情'], 'features': [0.9, 0.1, 0.2, 0.8]},
{'id': 202, 'title': '盗梦空间', 'genres': ['科幻', '悬疑'], 'features': [0.8, 0.7, 0.3, 0.6]},
{'id': 203, 'title': '阿凡达', 'genres': ['科幻', '动作'], 'features': [0.9, 0.2, 0.9, 0.5]},
# ... 更多电影
]
def _calculate_scores(self, user_profile, movies):
"""计算推荐分数"""
scores = []
user_genre_vector = user_profile['genre_vector']
for movie in movies:
# 计算类型相似度
movie_genre_vector = np.array([1 if g in movie['genres'] else 0 for g in ['科幻', '悬疑', '动作', '喜剧', '剧情', '恐怖', '爱情', '动画', '纪录片', '其他']])
genre_similarity = np.dot(user_genre_vector, movie_genre_vector) / (np.linalg.norm(user_genre_vector) * np.linalg.norm(movie_genre_vector) + 1e-8)
# 计算特征相似度(基于TF-IDF或BERT嵌入)
feature_similarity = cosine_similarity([user_profile.get('feature_vector', [0.5]*4)], [movie['features']])[0][0]
# 综合分数
final_score = 0.6 * genre_similarity + 0.4 * feature_similarity
scores.append({'movie_id': movie['id'], 'score': final_score})
return scores
def _filter_watched(self, user_profile, movies, scores):
"""过滤已观看的电影"""
watched = set(user_profile['watch_history'])
filtered = []
for score in scores:
if score['movie_id'] not in watched:
movie = next(m for m in movies if m['id'] == score['movie_id'])
filtered.append({
'movie_id': score['movie_id'],
'title': movie['title'],
'score': score['score']
})
return filtered
# Flask API端点
engine = RecommendationEngine()
@app.route('/recommend', methods=['GET'])
def get_recommendations():
user_id = request.args.get('user_id')
k = int(request.args.get('k', 10))
if not user_id:
return jsonify({'error': 'user_id is required'}), 400
try:
recommendations = engine.recommend(user_id, k)
return jsonify({
'user_id': user_id,
'recommendations': recommendations,
'count': len(recommendations)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/user/profile', methods=['GET'])
def get_user_profile():
user_id = request.args.get('user_id')
profile = engine.get_user_profile(user_id)
return jsonify(profile)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
3. 实时推荐与离线训练分离
为了保证推荐系统的实时性和准确性,系统采用离线训练+在线服务的架构:
# 离线训练脚本(train.py)
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import joblib
def train_recommendation_model():
"""训练推荐模型"""
# 加载数据
df = pd.read_csv('user_movie_ratings.csv')
# 特征工程
features = pd.get_dummies(df[['genre', 'year', 'director']])
target = df['rating']
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.2)
# 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 评估模型
score = model.score(X_test, y_test)
print(f"模型准确率: {score:.4f}")
# 保存模型
joblib.dump(model, 'models/recommendation_model.pkl')
return model
# 定时任务(使用Celery或Airflow)
def daily_model_update():
"""每日模型更新"""
print("开始更新推荐模型...")
train_recommendation_model()
print("模型更新完成!")
如何解决剧荒烦恼:实际应用案例
1. 智能搜索与发现
用户可以通过自然语言搜索电影,系统会理解用户意图并推荐合适的电影:
import re
from datetime import datetime
class SmartSearch:
def __init__(self, movie_db):
self.movie_db = movie_db
def parse_query(self, query):
"""解析自然语言查询"""
query = query.lower()
filters = {}
# 提取类型
genre_pattern = r'(科幻|悬疑|动作|喜剧|剧情|恐怖|爱情|动画|纪录片)'
genres = re.findall(genre_pattern, query)
if genres:
filters['genres'] = genres
# 提取年份
year_pattern = r'(\d{4})年?'
years = re.findall(year_pattern, query)
if years:
filters['year'] = years[-1] # 取最后一个
# 提取情感倾向
if '轻松' in query or '喜剧' in query:
filters['mood'] = 'light'
elif '烧脑' in query or '悬疑' in query:
filters['mood'] = 'dark'
# 提取时间限制
if '2小时' in query:
filters['max_duration'] = 120
return filters
def search(self, query, user_profile=None):
"""执行智能搜索"""
filters = self.parse_query(query)
results = self.movie_db.filter(filters)
# 如果有用户画像,进行个性化排序
if user_profile:
results = self._personalize_sort(results, user_profile)
return results
def _personalize_sort(self, movies, user_profile):
"""根据用户画像个性化排序"""
def score_movie(movie):
# 类型匹配度
genre_score = len(set(movie['genres']) & set(user_profile['preferred_genres'])) / len(movie['genres'])
# 导演匹配度
director_score = 1.0 if movie['director'] in user_profile.get('favorite_directors', []) else 0.3
# 年份偏好(用户可能喜欢新片或老片)
current_year = datetime.now().year
year_diff = abs(current_year - movie['year'])
year_score = 1.0 if year_diff < 5 else 0.5
return genre_score * 0.5 + director_score * 0.3 + year_score * 0.2
return sorted(movies, key=score_movie, reverse=True)
# 使用示例
search_engine = SmartSearch(movie_db)
user_query = "推荐一部2小时以内的科幻悬疑片,最好是诺兰导演的"
recommendations = search_engine.search(user_query, user_profile)
2. 剧荒模式:探索性推荐
当用户处于剧荒状态时,系统会切换到”探索模式”,推荐一些用户可能从未接触过但有潜力喜欢的电影:
class ExplorationMode:
def __init__(self, user_profile, movie_db):
self.user_profile = user_profile
self.movie_db = movie_db
def get_exploration_recommendations(self, k=10):
"""探索性推荐"""
# 1. 找出用户很少观看的类型
all_genres = ['科幻', '悬疑', '动作', '喜剧', '剧情', '恐怖', '爱情', '动画', '纪录片']
user_genres = set(self.user_profile['preferred_genres'])
unexplored_genres = [g for g in all_genres if g not in user_genres]
# 2. 从这些类型中选择高分电影
recommendations = []
for genre in unexplored_genres[:3]: # 选择3种未探索类型
movies = self.movie_db.get_by_genre(genre, min_rating=4.0, limit=3)
recommendations.extend(movies)
# 3. 添加一些"惊喜"元素(低流行度但高评分)
hidden_gems = self.movie_db.get_hidden_gems(limit=4)
recommendations.extend(hidden_gems)
# 4. 打乱顺序,增加随机性
np.random.shuffle(recommendations)
return recommendations[:k]
# 使用示例
explorer = ExplorationMode(user_profile, movie_db)
if len(user_profile['watch_history']) > 50: # 用户观看历史丰富
# 老用户,推荐探索性内容
recs = explorer.get_exploration_recommendations()
print("为您精选了一些您可能从未尝试过但评分很高的电影:")
else:
# 新用户,推荐热门内容
recs = movie_db.get_popular_movies(limit=10)
3. 情境感知推荐
系统会根据用户当前的情境(时间、地点、设备、心情)调整推荐:
class ContextAwareRecommender:
def __init__(self, base_recommender):
self.base_recommender = base_recommender
def recommend(self, user_id, context):
"""
context: {
'time': '20:00', # 晚上8点
'device': 'tv', # 电视
'mood': 'tired', # 疲惫
'location': 'home' # 家里
}
"""
# 获取基础推荐
base_recs = self.base_recommender.recommend(user_id, k=20)
# 根据情境调整
adjusted_recs = []
for rec in base_recs:
score = rec['score']
# 晚上适合看轻松的电影
if context['time'] >= '20:00' and rec['genre'] in ['喜剧', '爱情']:
score += 0.1
# 电视适合看大片
if context['device'] == 'tv' and rec['popularity'] > 0.8:
score += 0.05
# 疲惫时避免复杂剧情
if context['mood'] == 'tired' and rec['complexity'] > 0.7:
score -= 0.2
# 家里适合看长电影
if context['location'] == 'home' and rec['duration'] > 120:
score += 0.05
adjusted_recs.append({**rec, 'adjusted_score': score})
return sorted(adjusted_recs, key=lambda x: x['adjusted_score'], reverse=True)[:10]
系统优化与进阶功能
1. A/B测试框架
为了持续优化推荐效果,系统内置了A/B测试框架:
class ABTestFramework:
def __init__(self):
self.variants = {}
self.metrics = {}
def register_variant(self, name, algorithm, traffic_split):
"""注册算法变体"""
self.variants[name] = {
'algorithm': algorithm,
'traffic_split': traffic_split
}
def assign_variant(self, user_id):
"""为用户分配变体"""
import hashlib
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
total = sum(v['traffic_split'] for v in self.variants.values())
current = 0
for name, config in self.variants.items():
current += config['traffic_split']
if hash_val % total < current:
return name
return list(self.variants.keys())[0]
def log_metrics(self, user_id, variant, action, value):
"""记录用户行为指标"""
key = f"{variant}:{action}"
if key not in self.metrics:
self.metrics[key] = []
self.metrics[key].append(value)
def get_results(self):
"""获取测试结果"""
results = {}
for key, values in self.metrics.items():
results[key] = {
'mean': np.mean(values),
'count': len(values)
}
return results
# 使用示例
ab_test = ABTestFramework()
ab_test.register_variant('baseline', old_algorithm, 50)
ab_test.register_variant('new_algorithm', new_algorithm, 50)
# 在推荐时
variant = ab_test.assign_variant(user_id)
if variant == 'baseline':
recs = old_algorithm.recommend(user_id)
else:
recs = new_algorithm.recommend(user_id)
# 记录用户点击行为
ab_test.log_metrics(user_id, variant, 'click_rate', 1 if clicked else 0)
2. 冷启动问题解决方案
对于新用户或新电影,系统采用多种策略解决冷启动问题:
class ColdStartHandler:
def __init__(self, movie_db):
self.movie_db = movie_db
def handle_new_user(self, user_id, explicit_preferences=None):
"""新用户推荐策略"""
if explicit_preferences:
# 如果用户提供了明确偏好
return self._preference_based_recommendation(explicit_preferences)
else:
# 热门+多样性推荐
return self._popular_diverse_recommendation()
def handle_new_movie(self, movie_data):
"""新电影冷启动"""
# 1. 基于内容相似度推荐给可能感兴趣的用户
similar_movies = self.movie_db.find_similar_movies(movie_data)
# 2. 找到喜欢相似电影的用户
target_users = []
for sim_movie in similar_movies:
users_who_liked = self.movie_db.get_users_who_liked(sim_movie['id'])
target_users.extend(users_who_liked)
# 3. 推送给这些用户
return list(set(target_users))
def _preference_based_recommendation(self, preferences):
"""基于用户明确偏好的推荐"""
# 用户选择喜欢的类型、演员、导演等
genre = preferences.get('genre')
actor = preferences.get('actor')
# 从这些维度找高分电影
candidates = self.movie_db.filter({
'genres': [genre],
'min_rating': 4.0
})
if actor:
candidates = [m for m in candidates if actor in m['actors']]
return candidates[:10]
def _popular_diverse_recommendation(self):
"""热门且多样化的推荐"""
# 每种类型选一部热门电影
popular_by_genre = []
for genre in ['科幻', '悬疑', '动作', '喜剧', '剧情']:
movies = self.movie_db.get_by_genre(genre, limit=1, min_rating=4.5)
if movies:
popular_by_genre.append(movies[0])
return popular_by_genre
3. 实时反馈与在线学习
系统能够根据用户的实时行为调整推荐:
class OnlineLearningRecommender:
def __init__(self, base_model):
self.base_model = base_model
self.learning_rate = 0.01
def update_from_feedback(self, user_id, movie_id, feedback):
"""
根据用户反馈实时更新模型
feedback: {'type': 'click', 'value': 1} 或 {'type': 'rating', 'value': 4.5}
"""
# 获取用户当前特征
user_vector = self.base_model.get_user_vector(user_id)
movie_vector = self.base_model.get_movie_vector(movie_id)
if feedback['type'] == 'rating':
# 监督学习更新
predicted = np.dot(user_vector, movie_vector)
actual = feedback['value']
error = actual - predicted
# 梯度下降更新
user_vector += self.learning_rate * error * movie_vector
movie_vector += self.learning_rate * error * user_vector
elif feedback['type'] == 'click':
# 点击作为正样本,增强相关性
user_vector += self.learning_rate * movie_vector
movie_vector += self.learning_rate * user_vector
elif feedback['type'] == 'skip':
# 跳过作为负样本,减弱相关性
user_vector -= self.learning_rate * movie_vector
movie_vector -= self.learning_rate * user_vector
# 保存更新后的向量
self.base_model.update_vectors(user_id, user_vector, movie_id, movie_vector)
# 更新Redis缓存
redis_client.set(f"user_vec:{user_id}", joblib.dumps(user_vector))
redis_client.set(f"movie_vec:{movie_id}", joblib.dumps(movie_vector))
总结
影评小助手智能推荐系统通过整合协同过滤、基于内容的推荐、混合推荐等多种算法,结合实时反馈和情境感知技术,能够精准匹配用户的观影偏好,有效解决剧荒烦恼。系统的核心优势在于:
- 多维度数据融合:不仅分析评分,还考虑观看行为、时间、设备等多维数据
- 算法组合优化:混合推荐系统平衡了准确性、多样性和新颖性
- 实时响应能力:在线学习机制让系统能够快速适应用户变化
- 冷启动解决方案:为新用户和新电影提供有效的推荐策略
- 情境感知:根据用户当前状态调整推荐内容
通过这些技术手段,影评小助手不仅是一个推荐工具,更是一个智能的观影伴侣,帮助用户在海量内容中发现真正适合自己的电影和电视剧,彻底告别剧荒困扰。
