引言:电影票房预测的挑战与机遇
电影票房预测是娱乐产业中最具挑战性的数据分析任务之一。一部电影的票房收入受到众多因素的影响,包括演员阵容、导演声誉、制作预算、上映档期、营销投入、口碑评价等。传统的预测方法往往依赖于专家经验和简单的统计模型,但这些方法在面对复杂的市场动态时往往力不从心。
Kaggle作为一个全球领先的数据科学竞赛平台,为电影票房预测提供了丰富的数据集和竞技场。通过参与Kaggle竞赛,数据科学家们可以利用机器学习技术,从历史数据中挖掘隐藏的模式,构建精准的预测模型。这不仅能够帮助电影公司做出更明智的投资决策,还能为整个行业提供数据驱动的洞察。
数据探索与预处理:构建高质量数据集
数据集结构分析
在Kaggle竞赛中,电影票房预测通常涉及多个数据集,包括电影基本信息、演员和导演数据、财务数据、评分数据等。以经典的TMDB Box Office Prediction竞赛为例,主要数据集包含以下关键字段:
- id: 电影唯一标识符
- budget: 制作预算
- genres: 电影类型
- original_language: 原始语言
- popularity: 流行度评分
- production_companies: 制作公司
- release_date: 上映日期
- runtime: 片长
- spoken_languages: 语言版本
- status: 上映状态
- tagline: 宣传语
- title: 电影标题
- vote_average: 平均评分
- vote_count: 评分人数
- revenue: 票房收入(目标变量)
数据清洗与缺失值处理
数据清洗是构建预测模型的第一步。电影数据通常存在大量缺失值和异常值,需要针对性处理:
import pandas as pd
import numpy as np
from datetime import datetime
def clean_movie_data(df):
"""
电影数据清洗函数
"""
# 1. 处理缺失值
# 预算和票房为0的记录视为缺失值
df['budget'] = df['budget'].replace(0, np.nan)
df['revenue'] = df['revenue'].replace(0, np.nan)
# 2. 处理日期格式
df['release_date'] = pd.to_datetime(df['release_date'], errors='coerce')
df['release_year'] = df['release_date'].dt.year
df['release_month'] = df['revenue'].dt.month
df['release_quarter'] = df['release_date'].dt.quarter
# 3. 处理JSON格式字段
# genres, production_companies等字段是JSON字符串
def parse_json_field(json_str):
if pd.isna(json_str) or json_str == '[]':
return []
try:
import json
data = json.loads(json_str)
return [item['name'] for item in data]
except:
return []
for col in ['genres', 'production_companies', 'production_countries', 'spoken_languages']:
df[col] = df[col].apply(parse_json_field)
# 4. 异常值处理
# 片长异常值:超过300分钟或低于30分钟的电影视为异常
df.loc[(df['runtime'] > 300) | (df['runtime'] < 30), 'runtime'] = np.nan
# 预算异常值:低于1000美元的电影视为异常
df.loc[df['budget'] < 1000, 'budget'] = np.nan
return df
# 使用示例
# df = pd.read_csv('movies.csv')
# df_cleaned = clean_movie_data(df)
特征工程:从原始数据到预测因子
特征工程是机器学习项目中最关键的环节之一。对于电影票房预测,我们需要从原始数据中提取有意义的特征:
def engineer_features(df):
"""
特征工程函数
"""
# 1. 基础数值特征
df['budget_log'] = np.log1p(df['budget']) # 对数变换
df['runtime_log'] = np.log1p(df['runtime'])
# 2. 时间特征
# 提取月份、季度、是否为暑期档/圣诞档
df['is_summer'] = df['release_month'].isin([6,7,8]).astype(int)
df['is_holiday'] = df['release_month'].isin([11,12]).astype(int)
df['is_weekend'] = df['release_date'].dt.weekday.isin([5,6]).astype(int)
# 3. 类型特征
# 将genres转换为one-hot编码
all_genres = set()
for genres in df['genres']:
all_genres.update(genres)
for genre in all_genres:
df[f'genre_{genre}'] = df['genres'].apply(lambda x: 1 if genre in x else 0)
# 4. 制作公司特征
# 计算每个公司的历史平均票房
company_stats = df.explode('production_companies').groupby('production_companies')['revenue'].agg(['mean', 'count']).reset_index()
company_stats.columns = ['company', 'company_avg_revenue', 'company_movie_count']
# 只保留活跃公司
active_companies = company_stats[company_stats['company_movie_count'] >= 3]['company'].tolist()
def get_company_revenue(companies):
if not companies:
return np.nan
active = [c for c in companies if c in active_companies]
if not active:
return np.nan
return company_stats.loc[company_stats['company'].isin(active), 'company_avg_revenue'].mean()
df['company_avg_revenue'] = df['production_companies'].apply(get_company_re2. 演员和导演特征
# 这里需要额外的演员和导演数据集
# 假设我们有cast和crew数据
# df_cast = pd.read_csv('cast.csv')
# df_crew = pd.read_csv('crew.csv')
# 计算演员的历史平均票房
# actor_stats = df_cast.groupby('actor_id')['revenue'].agg(['mean', 'count'])
# 5. 文本特征
# 标题长度
df['title_length'] = df['title'].str.len()
df['tagline_length'] = df['tagline'].str.len()
# 关键词提取(如果数据集中有keywords字段)
# df['keywords'] = df['keywords'].apply(parse_json_field)
# df['keyword_count'] = df['keywords'].apply(len)
# 6. 交互特征
df['budget_per_runtime'] = df['budget'] / df['runtime']
df['budget_x_popularity'] = df['budget'] * df['popularity']
return df
模型构建与优化:从基准到高性能预测
基准模型:线性回归
首先建立一个简单的线性回归模型作为基准:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
def baseline_model(df, features, target='revenue'):
"""
基准线性回归模型
"""
# 选择特征和目标
X = df[features].copy()
y = df[target].copy()
# 处理缺失值
X = X.fillna(X.median())
y = y.fillna(y.median())
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练模型
model = LinearRegression()
model.fit(X_train_scaled, y_train)
# 预测
y_pred_train = model.predict(X_train_scaled)
y_pred_test = model.predict(X_test_scaled)
# 评估
train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
r2 = r2_score(y_test, y_pred_test)
print(f"训练集RMSE: {train_rmse:.2f}")
print(f"测试集RMSE: {test_rmse:.2f}")
print(f"R²分数: {r2:.4f}")
return model, scaler
# 特征选择示例
# features = ['budget', 'popularity', 'runtime', 'vote_average', 'vote_count',
# 'release_year', 'release_month', 'is_summer', 'is_holiday']
# model, scaler = baseline_model(df_cleaned, features)
高级模型:XGBoost与集成学习
XGBoost是Kaggle竞赛中常用的高性能算法,特别适合处理结构化数据:
import xgboost as xgb
from sklearn.model_selection import GridSearchCV
def xgboost_model(df, features, target='revenue'):
"""
XGBoost回归模型
"""
X = df[features].copy()
y = df[target].copy()
# 对数变换目标变量(处理偏态分布)
y_log = np.log1p(y)
# 处理缺失值
X = X.fillna(X.median())
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y_log, test_size=0.2, random_state=42)
# XGBoost参数网格
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [3, 5, 7],
'learning_rate': [0.01, 0.1, 0.3],
'subsample': [0.8, 1.0],
'colsample_bytree': [0.8, 1.0]
}
# 基础模型
xgb_model = xgb.XGBRegressor(
objective='reg:squarederror',
random_state=42,
n_jobs=-1
)
# 网格搜索
grid_search = GridSearchCV(
xgb_model,
param_grid,
cv=5,
scoring='neg_mean_squared_error',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
# 最佳模型
best_model = grid_search.best_estimator_
# 预测(转换回原始尺度)
y_pred_train = np.expm1(best_model.predict(X_train))
y_pred_test = np.expm1(best_model.predict(X_test))
y_test_orig = np.expm1(y_test)
# 评估
train_rmse = np.sqrt(mean_squared_error(np.expm1(y_train), y_pred_train))
test_rmse = np.sqrt(mean_squared_error(y_test_orig, y_pred_test))
r2 = r2_score(y_test_orig, y_pred_test)
print(f"最佳参数: {grid_search.best_params_}")
print(f"训练集RMSE: {train_rmse:.2f}")
print(f"测试集RMSE: {test_rmse:.2f}")
print(f"R²分数: {r2:.4f}")
return best_model
# 使用示例
# features = ['budget', 'popularity', 'runtime', 'vote_average', 'vote_count',
# 'release_year', 'release_month', 'is_summer', 'is_holiday',
# 'company_avg_revenue', 'title_length']
# model = xgboost_model(df_cleaned, features)
模型集成:提升预测稳定性
在Kaggle竞赛中,模型集成是提升成绩的关键策略:
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge, Lasso
import numpy as np
def ensemble_model(df, features, target='revenue'):
"""
模型集成:XGBoost + Random Forest + Ridge回归
"""
X = df[features].copy()
y = df[target].copy()
y_log = np.log1p(y)
X = X.fillna(X.median())
X_train, X_test, y_train, y_test = train_test_split(X, y_log, test_size=0.2, random_state=42)
# 定义多个基础模型
models = {
'xgb': xgb.XGBRegressor(
n_estimators=200, max_depth=5, learning_rate=0.1,
subsample=0.8, colsample_bytree=0.8,
objective='reg:squarederror', random_state=42
),
'rf': RandomForestRegressor(
n_estimators=100, max_depth=10, random_state=42, n_jobs=-1
),
'ridge': Ridge(alpha=10.0, random_state=42),
'gbrt': GradientBoostingRegressor(
n_estimators=100, learning_rate=0.1, max_depth=5, random_state=42
)
}
# 训练基础模型
base_predictions = []
for name, model in models.items():
model.fit(X_train, y_train)
pred = model.predict(X_test)
base_predictions.append(pred)
print(f"{name}模型RMSE: {np.sqrt(mean_squared_error(np.expm1(y_test), np.expm1(pred))):.2f}")
# 简单平均集成
ensemble_pred = np.mean(base_predictions, axis=0)
ensemble_rmse = np.sqrt(mean_squared_error(np.expm1(y_test), np.expm1(ensemble_pred)))
print(f"集成模型RMSE: {ensemble_rmse:.2f}")
# 加权平均集成(基于验证集表现)
weights = [0.4, 0.3, 0.2, 0.1] # 可根据实际表现调整
weighted_pred = np.average(base_predictions, axis=0, weights=weights)
weighted_rmse = np.sqrt(mean_squared_error(np.expm1(y_test), np.expm1(weighted_pred)))
print(f"加权集成模型RMSE: {weighted_rmse:.2f}")
return models, weights
影响因素分析:数据驱动的洞察
特征重要性分析
理解模型决策过程对于业务洞察至关重要。XGBoost提供了内置的特征重要性:
def analyze_feature_importance(model, features):
"""
分析特征重要性
"""
importance_df = pd.DataFrame({
'feature': features,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("特征重要性排名:")
print(importance_df.head(15))
# 可视化
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(10, 8))
sns.barplot(data=importance_df.head(15), x='importance', y='feature')
plt.title('Top 15 Feature Importance')
plt.xlabel('Importance Score')
plt.tight_layout()
plt.show()
return importance_df
# 使用示例
# importance = analyze_feature_importance(model, features)
部分依赖图(Partial Dependence)
部分依赖图可以展示单个特征如何影响预测结果:
from sklearn.inspection import PartialDependenceDisplay
def plot_partial_dependence(model, X, features, target_feature):
"""
绘制部分依赖图
"""
fig, ax = plt.subplots(figsize=(10, 6))
PartialDependenceDisplay.from_estimator(
model, X, [target_feature], ax=ax, kind='both'
)
plt.title(f'Partial Dependence Plot: {target_feature}')
plt.show()
# 示例:分析预算对票房的影响
# plot_partial_dependence(model, X_test, features, 'budget')
SHAP值分析:解释个体预测
SHAP(SHapley Additive exPlanations)是解释机器学习模型的黄金标准:
import shap
def shap_analysis(model, X, features):
"""
SHAP值分析
"""
# 创建SHAP解释器
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X)
# 全局特征重要性
shap.summary_plot(shap_values, X, feature_names=features, plot_type="bar")
# 特征依赖图
shap.dependence_plot("budget", shap_values, X, feature_names=features)
# 单个样本解释
sample_idx = 0
shap.force_plot(
explainer.expected_value,
shap_values[sample_idx],
X.iloc[sample_idx],
feature_names=features,
matplotlib=True
)
return explainer, shap_values
# 使用示例
# explainer, shap_values = shap_analysis(model, X_test, features)
实战案例:完整项目流程
完整代码示例
以下是一个完整的端到端项目示例:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import mean_squared_error
import xgboost as xgb
import warnings
warnings.filterwarnings('ignore')
class MovieRevenuePredictor:
"""
电影票房预测器
"""
def __init__(self):
self.model = None
self.features = None
self.scaler = None
def load_and_clean(self, filepath):
"""加载和清洗数据"""
df = pd.read_csv(filepath)
df = clean_movie_data(df)
return df
def engineer_features(self, df):
"""特征工程"""
df = engineer_features(df)
return df
def select_features(self, df, target='revenue'):
"""特征选择"""
# 基础特征
base_features = [
'budget', 'popularity', 'runtime', 'vote_average', 'vote_count',
'release_year', 'release_month', 'is_summer', 'is_holiday'
]
# 添加类型特征
genre_features = [col for col in df.columns if col.startswith('genre_')]
# 添加公司特征
company_features = ['company_avg_revenue']
# 添加文本特征
text_features = ['title_length', 'tagline_length']
all_features = base_features + genre_features + company_features + text_features
# 过滤掉不存在的列
existing_features = [col for col in all_features if col in df.columns]
self.features = existing_features
return existing_features
def train(self, df, target='revenue', use_cv=True):
"""训练模型"""
features = self.select_features(df, target)
X = df[features].copy()
y = df[target].copy()
# 对数变换
y_log = np.log1p(y)
# 处理缺失值
X = X.fillna(X.median())
if use_cv:
# 交叉验证
kf = KFold(n_splits=5, shuffle=True, random_state=42)
scores = []
for train_idx, val_idx in kf.split(X):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y_log.iloc[train_idx], y_log.iloc[val_idx]
model = xgb.XGBRegressor(
n_estimators=200, max_depth=5, learning_rate=0.1,
subsample=0.8, colsample_bytree=0.8,
objective='reg:squarederror', random_state=42
)
model.fit(X_train, y_train)
pred = np.expm1(model.predict(X_val))
true = np.expm1(y_val)
rmse = np.sqrt(mean_squared_error(true, pred))
scores.append(rmse)
print(f"交叉验证RMSE: {np.mean(scores):.2f} (+/- {np.std(scores):.2f})")
else:
# 简单训练
X_train, X_test, y_train, y_test = train_test_split(X, y_log, test_size=0.2, random_state=42)
self.model = xgb.XGBRegressor(
n_estimators=200, max_depth=5, learning_rate=0.1,
subsample=0.8, colsample_bytree=0.8,
objective='reg:squarederror', random_state=42
)
self.model.fit(X_train, y_train)
pred = np.expm1(self.model.predict(X_test))
true = np.expm1(y_test)
rmse = np.sqrt(mean_squared_error(true, pred))
print(f"测试集RMSE: {rmse:.2f}")
def predict(self, df):
"""预测"""
if self.model is None:
raise ValueError("模型尚未训练")
X = df[self.features].copy()
X = X.fillna(X.median())
predictions = np.expm1(self.model.predict(X))
return predictions
# 完整使用示例
def main():
# 1. 加载数据
# df = pd.read_csv('train.csv')
# 2. 数据清洗
# df_cleaned = clean_movie_data(df)
# 3. 特征工程
# df_features = engineer_features(df_cleaned)
# 4. 训练模型
# predictor = MovieRevenuePredictor()
# predictor.train(df_features, use_cv=True)
# 5. 预测
# test_df = pd.read_csv('test.csv')
# test_cleaned = clean_movie_data(test_df)
# test_features = engineer_features(test_cleaned)
# predictions = predictor.predict(test_features)
# 6. 生成提交文件
# submission = pd.DataFrame({'id': test_df['id'], 'revenue': predictions})
# submission.to_csv('submission.csv', index=False)
print("电影票房预测流程完成!")
if __name__ == "__main__":
main()
高级技巧与Kaggle竞赛策略
时间序列特征与泄漏处理
电影票房预测需要注意时间序列特性,避免数据泄漏:
def time_aware_features(df):
"""
时间感知特征工程
"""
# 按时间排序
df = df.sort_values('release_date')
# 计算历史统计量(避免未来信息泄漏)
df['release_date'] = pd.to_datetime(df['release_date'])
# 滚动统计:每个导演/公司的历史平均票房(仅使用过去数据)
def rolling_company_stats(df):
result = []
for date, group in df.groupby(pd.Grouper(key='release_date', freq='M')):
if len(group) > 0:
# 计算到当前日期为止的历史统计
past_data = df[df['release_date'] < date]
if len(past_data) > 0:
company_stats = past_data.explode('production_companies').groupby('production_companies')['revenue'].mean()
# 应用到当前月份的电影
for idx, row in group.iterrows():
companies = row['production_companies']
if companies:
avg_rev = company_stats.reindex(companies).mean()
result.append((idx, avg_rev))
else:
result.append((idx, np.nan))
result_df = pd.DataFrame(result, columns=['index', 'company_historical_avg'])
result_df.set_index('index', inplace=True)
return result_df
# 注意:实际实现需要更高效的向量化方法
# 这里仅为概念演示
return df
特征选择与降维
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.decomposition import PCA
def advanced_feature_selection(X, y, k=50):
"""
高级特征选择
"""
# 1. 方差阈值(移除低方差特征)
from sklearn.feature_selection import VarianceThreshold
selector = VarianceThreshold(threshold=0.01)
X_var = selector.fit_transform(X)
selected_features = X.columns[selector.get_support()]
# 2. 互信息选择
from sklearn.feature_selection import mutual_info_regression
mi_scores = mutual_info_regression(X_var, y)
mi_df = pd.DataFrame({'feature': selected_features, 'mi_score': mi_scores})
mi_df = mi_df.sort_values('mi_score', ascending=False)
# 3. 递归特征消除(RFE)
from sklearn.feature_selection import RFE
from sklearn.ensemble import RandomForestRegressor
rfe = RFE(
estimator=RandomForestRegressor(n_estimators=50, random_state=42),
n_features_to_select=min(k, len(selected_features)),
step=1
)
rfe.fit(X_var, y)
final_features = selected_features[rfe.support_]
return final_features, mi_df
# 使用示例
# selected_features, mi_scores = advanced_feature_selection(X, y, k=30)
模型解释与业务洞察
def generate_business_insights(model, X, features, df):
"""
生成业务洞察报告
"""
# 1. 特征重要性分析
importance = pd.DataFrame({
'feature': features,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
# 2. 关键发现
insights = []
# 预算影响
budget_impact = importance[importance['feature'] == 'budget']['importance'].iloc[0]
insights.append(f"制作预算是票房预测的第{importance['feature'].tolist().index('budget')+1}大影响因素,重要性为{budget_impact:.3f}")
# 类型影响
genre_importance = importance[importance['feature'].str.startswith('genre_')]
if not genre_importance.empty:
top_genre = genre_importance.iloc[0]
insights.append(f"电影类型 '{top_genre['feature'].replace('genre_', '')}' 对票房有显著影响")
# 时间影响
if 'is_summer' in importance['feature'].values:
summer_impact = importance[importance['feature'] == 'is_summer']['importance'].iloc[0]
insights.append(f"暑期档上映对票房有{'正面' if summer_impact > 0 else '负面'}影响")
# 3. 可视化洞察
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 特征重要性
importance.head(10).plot(kind='barh', x='feature', y='importance', ax=axes[0,0])
axes[0,0].set_title('Top 10 Feature Importance')
# 预算 vs 票房散点图
axes[0,1].scatter(df['budget'], df['revenue'], alpha=0.5)
axes[0,1].set_xlabel('Budget')
axes[0,1].set_ylabel('Revenue')
axes[0,1].set_title('Budget vs Revenue')
# 类型平均票房
genre_revenue = []
for genre in all_genres:
mask = df[f'genre_{genre}'] == 1
if mask.sum() > 0:
avg_rev = df.loc[mask, 'revenue'].mean()
genre_revenue.append((genre, avg_rev))
if genre_revenue:
genre_df = pd.DataFrame(genre_revenue, columns=['genre', 'avg_revenue']).sort_values('avg_revenue', ascending=False)
genre_df.plot(kind='bar', x='genre', y='avg_revenue', ax=axes[1,0])
axes[1,0].set_title('Average Revenue by Genre')
axes[1,0].tick_params(axis='x', rotation=45)
# 月份影响
monthly_revenue = df.groupby('release_month')['revenue'].mean()
monthly_revenue.plot(kind='bar', ax=axes[1,1])
axes[1,1].set_title('Average Revenue by Release Month')
plt.tight_layout()
plt.show()
return insights
# 使用示例
# insights = generate_business_insights(model, X_test, features, df)
# for insight in insights:
# print(f"- {insight}")
总结与最佳实践
通过Kaggle竞赛中的电影票房预测项目,我们不仅能够构建高精度的预测模型,还能深入理解影响电影商业成功的各种因素。关键成功因素包括:
- 数据质量:彻底的数据清洗和异常值处理
- 特征工程:创造性地从原始数据中提取有意义的特征
- 模型选择:XGBoost等集成方法通常表现最佳
- 模型解释:使用SHAP等工具理解模型决策
- 避免泄漏:严格处理时间序列特征,防止未来信息泄漏
这些技术不仅适用于电影行业,还可以推广到其他娱乐内容(如音乐、游戏)的商业预测中。通过持续迭代和优化,机器学习正在改变娱乐产业的决策方式,让数据驱动的洞察成为商业成功的关键。
