引言:为什么Python是数据分析的首选工具
在当今数据驱动的世界中,数据分析已成为企业决策和个人技能发展的核心。Python凭借其简洁的语法、丰富的库生态系统和强大的社区支持,已成为数据分析领域的事实标准。根据2023年KDnuggets调查,超过85%的数据科学家将Python作为主要编程语言。本文将带您从Python基础开始,逐步掌握高效数据分析的核心技能,包括数据清洗、可视化、统计分析和机器学习入门。
Python数据分析环境搭建:专业工具链配置
1. 安装Python和包管理器
首先,我们需要安装Python和pip(Python包管理器)。推荐使用Anaconda发行版,它预装了数据分析所需的大部分库。
# 安装Anaconda(推荐)
wget https://repo.anaconda.com/archive/Anaconda3-2023.09-0-Linux-x86_64.sh
bash Anaconda3-2023.09-0-Linux-x86_64.sh
# 或者使用原生Python安装
sudo apt update
sudo apt install python3 python3-pip
2. 创建虚拟环境
虚拟环境可以隔离项目依赖,避免版本冲突。
# 创建虚拟环境
python3 -m venv data_analysis_env
# 激活虚拟环境
source data_analysis_env/bin/activate
# 升级pip
pip install --upgrade pip
3. 安装核心数据分析库
# 基础数据处理
pip install numpy pandas openpyxl
# 数据可视化
pip install matplotlib seaborn plotly
# 统计分析和机器学习
pip install scipy scikit-learn statsmodels
# Jupyter Notebook(交互式分析)
pip install jupyterlab notebook
数据处理基础:Pandas的高效使用技巧
1. DataFrame基础操作
Pandas是Python数据分析的核心库,提供了类似Excel的数据结构和操作。
import pandas as pd
import numpy as np
# 创建DataFrame
data = {
'姓名': ['张三', '李四', '王五', '赵六'],
'年龄': [25, 30, 35, 28],
'部门': ['技术', '市场', '技术', '人事'],
'薪资': [15000, 18000, 20000, 16000]
}
df = pd.DataFrame(data)
# 查看数据基本信息
print(df.info())
print(df.describe())
# 数据筛选
tech_dept = df[df['部门'] == '技术']
high_salary = df[df['薪资'] > 17000]
# 数据排序
df_sorted = df.sort_values('薪资', ascending=False)
2. 数据清洗高级技巧
真实数据往往包含缺失值、重复值和异常值。
# 处理缺失值
df_with_nan = df.copy()
df_with_nan.loc[1, '薪资'] = np.nan
# 检测缺失值
print(df_with_nan.isnull().sum())
# 填充缺失值
df_filled = df_with_nan.fillna({'薪资': df_with_nan['薪资'].median()})
# 删除重复值
df_duplicated = pd.concat([df, df.iloc[0:1]], ignore_index=True)
df_unique = df_duplicated.drop_duplicates()
# 处理异常值(使用IQR方法)
Q1 = df['薪资'].quantile(0.25)
Q3 = df['薪资'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df_no_outliers = df[(df['薪资'] >= lower_bound) & (df['薪资'] <= upper_bound)]
3. 数据分组与聚合
# 分组统计
grouped = df.groupby('部门').agg({
'薪资': ['mean', 'max', 'min', 'count'],
'年龄': 'mean'
})
# 多级索引扁平化
grouped.columns = ['_'.join(col).strip() for col in group.columns.values]
grouped.reset_index(inplace=True)
# 自定义聚合函数
def salary_range(series):
return series.max() - series.min()
custom_agg = df.groupby('部门').agg({
'薪资': [salary_range, 'mean']
})
数据可视化:从基础到高级图表
1. Matplotlib基础绘图
import matplotlib.pyplot as plt
# 设置中文字体(解决中文显示问题)
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 基础折线图
x = np.linspace(0, 10, 100)
y = np.sin(x)
plt.figure(figsize=(10, 6))
plt.plot(x, y, label='sin(x)', color='blue', linewidth=2)
plt.title('正弦函数图像')
plt.xlabel('X轴')
plt.ylabel('Y轴')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 多子图
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
axes[0, 0].hist(df['年龄'], bins=5, color='skyblue', edgecolor='black')
axes[0, 0].set_title('年龄分布')
axes[0, 1].scatter(df['年龄'], df['薪资'], color='red')
axes[0, 1].set_title('年龄与薪资关系')
plt.tight_layout()
plt.show()
2. Seaborn高级可视化
Seaborn基于Matplotlib,提供了更美观的统计图表。
import seaborn as sns
# 设置风格
sns.set_theme(style="whitegrid")
# 箱线图
plt.figure(figsize=(8, 6))
sns.boxplot(data=df, x='部门', y='薪资', palette='Set2')
plt.title('各部门薪资分布箱线图')
plt.show()
# 热力图(相关性矩阵)
corr_data = df[['年龄', '薪资']].corr()
plt.figure(figsize=(6, 4))
sns.heatmap(corr_data, annot=True, cmap='coolwarm', center=0)
plt.title('相关性热力图')
plt.show()
# 小提琴图
plt.figure(figsize=(8, 6))
sns.violinplot(data=df, x='部门', y='薪资', inner='quartile')
plt.title('各部门薪资分布小提琴图')
plt.show()
3. Plotly交互式可视化
import plotly.express as px
import plotly.graph_objects as go
# 交互式散点图
fig = px.scatter(df, x='年龄', y='薪资', color='部门',
size='薪资', hover_data=['姓名'],
title='员工年龄与薪资关系')
fig.show()
# 交互式柱状图
fig = px.bar(df, x='部门', y='薪资', color='姓名',
title='各部门薪资对比')
fig.show()
# 自定义交互式图表
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df['年龄'], y=df['薪资'],
mode='markers+text',
text=df['姓名'],
textposition='top center',
marker=dict(size=10, color=df['部门'].astype('category').cat.codes)
))
fig.update_layout(title='交互式员工数据图', xaxis_title='年龄', yaxis_title='薪资')
fig.show()
统计分析:用数据说话
1. 描述性统计分析
from scipy import stats
# 基础统计量
print("平均薪资:", df['薪资'].mean())
print("薪资中位数:", df['薪资'].median())
print("薪资标准差:", df['薪资'].std())
print("薪资偏度:", df['薪资'].skew())
print("薪资峰度:", df['薪资'].kurtosis())
# 正态性检验(Shapiro-Wilk检验)
stat, p_value = stats.shapiro(df['薪资'])
print(f"Shapiro-Wilk检验: 统计量={stat:.4f}, p值={p_value:.4f}")
if p_value > 0.05:
print("薪资数据符合正态分布")
else:
print("薪资数据不符合正态分布")
2. 假设检验
# 独立样本t检验(比较两个部门的薪资差异)
tech_salary = df[df['部门'] == '技术']['薪资']
market_salary = df[df['部门'] == '市场']['薪资']
# 假设检验:技术部门薪资是否显著高于市场部门
t_stat, p_val = stats.ttest_ind(tech_salary, market_salary, alternative='greater')
print(f"t统计量: {t_stat:.4f}, p值: {p_val:.4f}")
if p_val < 0.05:
print("技术部门薪资显著高于市场部门")
else:
print("两个部门薪资差异不显著")
# 方差分析(ANOVA)
from scipy.stats import f_oneway
groups = [df[df['部门'] == dept]['薪资'] for dept in df['部门'].unique()]
f_stat, f_p = f_oneway(*groups)
print(f"ANOVA F统计量: {f_stat:.4f}, p值: {f_p:.4f}")
3. 相关性分析
# Pearson相关系数
pearson_corr, p_val = stats.pearsonr(df['年龄'], df['薪资'])
print(f"Pearson相关系数: {pearson_corr:.4f}, p值: {p_val:.4f}")
# Spearman秩相关(非参数)
spearman_corr, p_val = stats.spearmanr(df['年龄'], df['薪资'])
print(f"Spearman相关系数: {spearman_corr:.4f}, p值: {g_p_val:.4f}")
# 多变量相关性矩阵
corr_matrix = df[['年龄', '薪资']].corr(method='pearson')
print(corr_matrix)
机器学习入门:预测分析基础
1. 线性回归预测薪资
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
# 准备数据(增加样本量以提高模型稳定性)
np.random.seed(42)
n_samples = 100
ages = np.random.randint(22, 45, n_samples)
# 薪资与年龄的关系:基础薪资 + 年龄*系数 + 随机噪声
salaries = 10000 + ages * 300 + np.random.normal(0, 2000, n_samples)
depts = np.random.choice(['技术', '市场', '人事'], n_samples)
df_ml = pd.DataFrame({'年龄': ages, '薪资': salaries, '部门': depts})
# 特征工程:部门编码
df_ml = pd.get_dummies(df_ml, columns=['部门'], prefix='dept')
# 分割数据集
X = df_ml.drop('薪资', axis=1)
y = df_ml['薪资']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练模型
model = LinearRegression()
model.fit(X_train, y_train)
# 预测与评估
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y2_pred)
r2 = r2_score(y_test, y_pred)
print(f"均方误差: {mse:.2f}")
print(f"R²分数: {r2:.4f}")
# 模型解释
print("\n模型系数:")
for feature, coef in zip(X.columns, model.coef_):
print(f"{feature}: {coef:.2f}")
print(f"截距: {model.intercept_:.2f}")
2. 分类模型:预测部门
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
# 准备分类数据
X_clf = df_ml[['年龄', '薪资']]
y_clf = df_ml['部门_technology'] # 预测是否为技术部门
# 分割数据
X_train_clf, X_test_clf, y_train_clf, y_test_clf = train_test_split(
X_clf, y_clf, test_size=0.2, random_state=42
)
# 训练随机森林
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train_clf, y_train_clf)
# 评估
y_pred_clf = clf.predict(X_test_clf)
print(classification_report(y_test_clf, y_pred_clf))
print("混淆矩阵:")
print(confusion_matrix(y_test_clf, y_pred_clf))
实战案例:销售数据分析全流程
1. 数据加载与探索
# 创建模拟销售数据
np.random.seed(42)
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
n = len(dates)
sales_data = pd.DataFrame({
'日期': dates,
'销售额': np.random.normal(50000, 15000, n),
'订单量': np.random.poisson(100, n),
'客户数': np.random.randint(50, 200, n),
'产品类别': np.random.choice(['A', 'B', 'C'], n, p=[0.4, 0.35, 0.25])
})
# 添加趋势和季节性
sales_data['销售额'] += np.linspace(0, 20000, n) # 上升趋势
sales_data['销售额'] += 5000 * np.sin(2 * np.pi * np.arange(n) / 30) # 月度季节性
# 保存为CSV
sales_data.to_csv('sales_2023.csv', index=False)
# 加载并探索
df_sales = pd.read_csv('sales_2023.csv')
df_sales['日期'] = pd.to_datetime(df_sales['日期'])
print(df_sales.head())
print(df_sales.describe())
2. 时间序列分析
# 按月汇总
monthly_sales = df_sales.groupby(df_sales['日期'].dt.to_period('M')).agg({
'销售额': 'sum',
'订单量': 'sum',
'客户数': 'mean'
}).reset_index()
monthly_sales['日期'] = monthly_sales['日期'].dt.to_timestamp()
# 可视化月度趋势
plt.figure(figsize=(12, 6))
plt.plot(monthly_sales['日期'], monthly_sales['销售额'], marker='o')
plt.title('2023年月度销售额趋势')
plt.xlabel('月份')
plt.ylabel('销售额')
plt.grid(True)
plt.show()
# 季节性分解
from statsmodels.tsa.seasonal import seasonal_decompose
# 需要设置频率为日,这里用周汇总数据
weekly_sales = df_sales.set_index('日期').resample('W').sum()
decomposition = seasonal_decompose(weekly_sales['销售额'], model='additive', period=4)
decomposition.plot()
plt.show()
3. 预测未来销售
from statsmodels.tsa.arima.model import ARIMA
# 使用ARIMA模型预测
# 首先检查平稳性(简化处理)
weekly_sales['销售额_diff'] = weekly_sales['销售额'].diff().dropna()
# 拟合ARIMA模型 (p,d,q) = (2,1,2)
model = ARIMA(weekly_sales['销售额'], order=(2,1,2))
model_fit = model.fit()
# 预测未来4周
forecast = model_fit.forecast(steps=4)
print("未来4周销售预测:")
print(forecast)
# 可视化预测
plt.figure(figsize=(12, 6))
plt.plot(weekly_sales.index, weekly_sales['销售额'], label='历史数据')
plt.plot(pd.date_range(weekly_sales.index[-1] + pd.Timedelta(days=7), periods=4, freq='W'),
forecast, label='预测', color='red', marker='o')
plt.title('销售预测')
plt.legend()
plt.show()
性能优化:高效数据分析技巧
1. 向量化操作
# 避免使用循环,使用NumPy向量化
def slow_calculation(df):
result = []
for i in1 range(len(df)):
if df.iloc[i]['年龄'] > 25:
result.append(df.iloc[i]['薪资'] * 1.1)
else:
result.append(df.iloc[i]['薪资'])
return result
def fast_calculation(df):
# 向量化版本
return np.where(df['年龄'] > 25, df['薪资'] * 1.1, df['薪资'])
# 性能对比
import time
start = time.time()
slow_result = slow_calculation(df)
print(f"循环耗时: {time.time() - start:.4f}秒")
start = time.time()
fast_result = fast_calculation(df)
print(f"向量化耗时: {time.time() -1 start:.4f}秒")
2. 使用Categorical类型优化内存
# 原始字符串类型
df['部门_str'] = df['部门'].astype(str)
print(f"字符串类型内存: {df['部门_str'].memory_usage(deep=True)} bytes")
# 转换为Categorical类型
df['部门_cat'] = df['部门'].astype('category')
print(f"Categorical类型内存: {df['部门_cat'].memory_usage(deep=True)} bytes")
# 性能提升:分组操作
import time
n = 100000
large_df = pd.DataFrame({
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
'value': np.random.randn(n)
})
# 字符串版本
start = time.time()
result_str = large_df.groupby('category').mean()
time_str = time.time() - start
# Categorical版本
large_df['category_cat'] = large_df['category'].astype('category')
start = time.time()
result_cat = large_df.groupby('category_cat').mean()
time_cat = time.time() - start
print(f"字符串分组耗时: {time_str:.4f}秒")
print(f"Categorical分组耗时: {time_cat:.4f}秒")
3. 并行处理
from joblib import Parallel, delayed
import multiprocessing
def process_chunk(chunk):
# 模拟耗时计算
return chunk.apply(lambda x: x**2 + x**2 - x**2)
# 并行处理大数据
def parallel_processing(df, n_jobs=-1):
# 自动使用所有CPU核心
chunks = np.array_split(df, multiprocessing.cpu_count())
results = Parallel(n_jobs=n_jobs)(delayed(process_chunk)(chunk) for chunk in chunks)
return pd.concat(results)
# 测试
test_df = pd.DataFrame(np.random.randn(10000, 5), columns=['A', 'B', 'C', 'D', 'E'])
start = time.time()
result_parallel = parallel_processing(test_df)
print(f"并行处理耗时: {time.time() - start:.4f}秒")
结论:持续学习与实践
通过本文的系统学习,您已经掌握了Python数据分析的核心技能:从环境搭建、数据处理、可视化、统计分析到机器学习入门。关键要点包括:
- Pandas是核心:熟练掌握DataFrame操作、数据清洗和分组聚合
- 可视化是沟通桥梁:根据场景选择Matplotlib、Seaborn或Plotly
- 统计分析提供洞察:正确使用假设检验和相关性分析
- 机器学习是扩展:从简单的回归和分类开始,逐步深入
- 性能优化不可忽视:向量化、内存优化和并行处理能显著提升效率
下一步建议:
- 实际项目练习:Kaggle竞赛或工作中的真实数据
- 深入学习:时间序列分析、特征工程、模型调优
- 工具扩展:学习SQL、Spark、Docker等配套工具
- 社区参与:关注PyData社区,参加线下Meetup
记住,数据分析是实践性很强的技能,只有通过不断解决实际问题,才能真正掌握这些工具和方法。祝您在数据分析的道路上取得成功!# Python数据分析完整指南:从入门到精通
1. Python数据分析环境搭建
1.1 安装Python和包管理器
Python数据分析的第一步是搭建合适的开发环境。推荐使用Anaconda,它预装了数据分析所需的大部分库。
# 安装Anaconda(推荐方式)
wget https://repo.anaconda.com/archive/Anaconda3-2023.09-0-Linux-x86_64.sh
bash Anaconda3-2023.09-0-Linux-x86_64.sh
# 或者使用原生Python安装
sudo apt update
sudo apt install python3 python3-pip
1.2 创建虚拟环境
虚拟环境可以隔离项目依赖,避免版本冲突。
# 创建虚拟环境
python3 -m venv data_analysis_env
# 激活虚拟环境
source data_analysis_env/bin/activate
# 升级pip
pip install --upgrade pip
1.3 安装核心数据分析库
# 基础数据处理
pip install numpy pandas openpyxl xlrd
# 数据可视化
pip install matplotlib seaborn plotly
# 统计分析和机器学习
pip install scipy scikit-learn statsmodels
# Jupyter Notebook(交互式分析)
pip install jupyterlab notebook
# 大数据处理(可选)
pip install dask
2. Pandas数据处理基础
2.1 DataFrame基础操作
Pandas是Python数据分析的核心库,提供了类似Excel的数据结构和操作。
import pandas as pd
import numpy as np
# 创建DataFrame
data = {
'姓名': ['张三', '李四', '王五', '赵六', '钱七'],
'年龄': [25, 30, 35, 28, 32],
'部门': ['技术', '市场', '技术', '人事', '市场'],
'薪资': [15000, 18000, 20000, 16000, 19000],
'入职日期': pd.date_range('2020-01-01', periods=5)
}
df = pd.DataFrame(data)
# 查看数据基本信息
print("数据概览:")
print(df.info())
print("\n描述性统计:")
print(df.describe())
# 数据筛选
tech_dept = df[df['部门'] == '技术']
high_salary = df[df['薪资'] > 17000]
# 数据排序
df_sorted = df.sort_values('薪资', ascending=False)
# 基本统计
print(f"平均薪资: {df['薪资'].mean():.2f}")
print(f"薪资中位数: {df['薪资'].median():.2f}")
print(f"薪资标准差: {df['薪资'].std():.2f}")
2.2 数据清洗高级技巧
真实数据往往包含缺失值、重复值和异常值。
# 创建包含问题的数据
df_dirty = df.copy()
df_dirty.loc[1, '薪资'] = np.nan # 缺失值
df_dirty.loc[3, '年龄'] = 200 # 异常值
df_dirty = pd.concat([df_dirty, df_dirty.iloc[0:1]], ignore_index=True) # 重复值
print("原始数据:")
print(df_dirty)
# 检测缺失值
print("\n缺失值统计:")
print(df_dirty.isnull().sum())
# 填充缺失值
df_filled = df_dirty.copy()
df_filled['薪资'] = df_filled['薪资'].fillna(df_filled['薪资'].median())
# 删除重复值
df_unique = df_filled.drop_duplicates()
# 处理异常值(使用IQR方法)
def remove_outliers_iqr(df, column):
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
df_no_outliers = remove_outliers_iqr(df_filled, '年龄')
print("\n清洗后的数据:")
print(df_no_outliers)
2.3 数据分组与聚合
# 分组统计
grouped = df.groupby('部门').agg({
'薪资': ['mean', 'max', 'min', 'count', 'std'],
'年龄': ['mean', 'std']
})
# 扁平化列名
grouped.columns = ['_'.join(col).strip() for col in grouped.columns.values]
grouped.reset_index(inplace=True)
print("部门统计:")
print(grouped)
# 自定义聚合函数
def salary_range(series):
return series.max() - series.min()
def coefficient_of_variation(series):
return series.std() / series.mean() if series.mean() != 0 else 0
custom_agg = df.groupby('部门').agg({
'薪资': [salary_range, 'mean', coefficient_of_variation],
'年龄': ['mean', 'std']
})
print("\n自定义聚合:")
print(custom_agg)
# 多级分组
df_multi = df.copy()
df_multi['年份'] = [2020, 2021, 2020, 2021, 2021]
multi_group = df_multi.groupby(['部门', '年份']).agg({
'薪资': 'mean',
'姓名': 'count'
}).rename(columns={'姓名': '人数'})
print("\n多级分组:")
print(multi_group)
3. 数据可视化技术
3.1 Matplotlib基础绘图
import matplotlib.pyplot as plt
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 创建示例数据
np.random.seed(42)
x = np.linspace(0, 10, 100)
y1 = np.sin(x)
y2 = np.cos(x)
y3 = np.sin(x) * np.cos(x)
# 基础折线图
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(x, y1, label='sin(x)', color='blue', linewidth=2, linestyle='-')
plt.plot(x, y2, label='cos(x)', color='red', linewidth=2, linestyle='--')
plt.title('基础折线图')
plt.xlabel('X轴')
plt.ylabel('Y轴')
plt.legend()
plt.grid(True, alpha=0.3)
# 散点图
plt.subplot(2, 2, 2)
plt.scatter(df['年龄'], df['薪资'], c=range(len(df)), cmap='viridis', s=100, alpha=0.7)
plt.colorbar(label='索引')
plt.title('年龄与薪资散点图')
plt.xlabel('年龄')
plt.ylabel('薪资')
# 柱状图
plt.subplot(2, 2, 3)
dept_counts = df['部门'].value_counts()
plt.bar(dept_counts.index, dept_counts.values, color=['skyblue', 'lightcoral', 'lightgreen'])
plt.title('部门人数分布')
plt.xlabel('部门')
plt.ylabel('人数')
# 直方图
plt.subplot(2, 2, 4)
plt.hist(df['薪资'], bins=5, color='orange', edgecolor='black', alpha=0.7)
plt.title('薪资分布直方图')
plt.xlabel('薪资')
plt.ylabel('频数')
plt.tight_layout()
plt.show()
3.2 Seaborn高级可视化
import seaborn as sns
# 设置风格
sns.set_theme(style="whitegrid")
# 创建更多数据用于演示
np.random.seed(42)
n = 200
demo_data = pd.DataFrame({
'年龄': np.random.normal(30, 5, n),
'薪资': np.random.normal(18000, 3000, n),
'部门': np.random.choice(['技术', '市场', '人事', '销售'], n),
'工作年限': np.random.randint(1, 10, n)
})
demo_data['薪资'] = demo_data['薪资'] + demo_data['工作年限'] * 1000
# 箱线图
plt.figure(figsize=(14, 10))
plt.subplot(2, 2, 1)
sns.boxplot(data=demo_data, x='部门', y='薪资', palette='Set2')
plt.title('各部门薪资分布箱线图')
plt.xticks(rotation=45)
# 小提琴图
plt.subplot(2, 2, 2)
sns.violinplot(data=demo_data, x='部门', y='薪资', inner='quartile', palette='pastel')
plt.title('各部门薪资分布小提琴图')
plt.xticks(rotation=45)
# 热力图(相关性矩阵)
plt.subplot(2, 2, 3)
corr_data = demo_data[['年龄', '薪资', '工作年限']].corr()
sns.heatmap(corr_data, annot=True, cmap='coolwarm', center=0, fmt='.2f')
plt.title('相关性热力图')
# 联合分布图
plt.subplot(2, 2, 4)
sns.scatterplot(data=demo_data, x='年龄', y='薪资', hue='部门', style='部门', s=60)
plt.title('年龄与薪资关系(按部门着色)')
plt.tight_layout()
plt.show()
# Pairplot(多变量关系)
sns.pairplot(demo_data, hue='部门', height=2.5)
plt.suptitle('多变量关系图', y=1.02)
plt.show()
3.3 Plotly交互式可视化
import plotly.express as px
import plotly.graph_objects as go
# 交互式散点图
fig1 = px.scatter(
demo_data,
x='年龄',
y='薪资',
color='部门',
size='工作年限',
hover_data=['工作年限'],
title='员工年龄与薪资关系(交互式)',
labels={'年龄': '员工年龄', '薪资': '月薪(元)'}
)
fig1.show()
# 交互式柱状图
dept_stats = demo_data.groupby('部门').agg({
'薪资': ['mean', 'std'],
'年龄': 'mean'
}).reset_index()
dept_stats.columns = ['部门', '平均薪资', '薪资标准差', '平均年龄']
fig2 = px.bar(
dept_stats,
x='部门',
y='平均薪资',
error_y='薪资标准差',
color='部门',
title='各部门平均薪资(含标准差)'
)
fig2.show()
# 交互式时间序列(如果有时间数据)
# 创建时间序列数据
dates = pd.date_range('2023-01-01', periods=100, freq='D')
time_data = pd.DataFrame({
'日期': dates,
'销售额': np.cumsum(np.random.randn(100) * 100) + 1000,
'订单数': np.random.poisson(50, 100)
})
fig3 = go.Figure()
fig3.add_trace(go.Scatter(
x=time_data['日期'],
y=time_data['销售额'],
mode='lines+markers',
name='销售额',
line=dict(color='royalblue', width=2),
marker=dict(size=4)
))
fig3.add_trace(go.Bar(
x=time_data['日期'],
y=time_data['订单数'],
name='订单数',
opacity=0.6,
yaxis='y2'
))
fig3.update_layout(
title='销售趋势(双Y轴)',
xaxis_title='日期',
yaxis=dict(title='销售额', side='left'),
yaxis2=dict(title='订单数', side='right', overlaying='y')
)
fig3.show()
4. 统计分析方法
4.1 描述性统计分析
from scipy import stats
# 基础统计量
print("=== 描述性统计 ===")
print(f"薪资平均值: {demo_data['薪资'].mean():.2f}")
print(f"薪资中位数: {demo_data['薪资'].median():.2f}")
print(f"薪资标准差: {demo_data['薪资'].std():.2f}")
print(f"薪资变异系数: {demo_data['薪资'].std()/demo_data['薪资'].mean():.2f}")
print(f"薪资偏度: {demo_data['薪资'].skew():.2f}")
print(f"薪资峰度: {demo_data['薪资'].kurtosis():.2f}")
# 分位数
quantiles = demo_data['薪资'].quantile([0, 0.25, 0.5, 0.75, 1.0])
print("\n薪资分位数:")
print(quantiles)
# 正态性检验
stat, p_value = stats.shapiro(demo_data['薪资'])
print(f"\nShapiro-Wilk正态性检验:")
print(f"统计量: {stat:.4f}, p值: {p_value:.4f}")
if p_value > 0.05:
print("薪资数据符合正态分布")
else:
print("薪资数据不符合正态分布")
4.2 假设检验
# 独立样本t检验(比较两个部门的薪资差异)
tech_salary = demo_data[demo_data['部门'] == '技术']['薪资']
market_salary = demo_data[demo_data['部门'] == '市场']['薪资']
# 方差齐性检验
levene_stat, levene_p = stats.levene(tech_salary, market_salary)
print(f"Levene方差齐性检验: p值={levene_p:.4f}")
# t检验
t_stat, p_val = stats.ttest_ind(tech_salary, market_salary, equal_var=(levene_p > 0.05))
print(f"\n独立样本t检验(技术vs市场):")
print(f"t统计量: {t_stat:.4f}, p值: {p_val:.4f}")
if p_val < 0.05:
print("两个部门薪资存在显著差异")
else:
print("两个部门薪资差异不显著")
# 方差分析(ANOVA)
from scipy.stats import f_oneway
groups = [demo_data[demo_data['部门'] == dept]['薪资'] for dept in demo_data['部门'].unique()]
f_stat, f_p = f_oneway(*groups)
print(f"\n单因素方差分析:")
print(f"F统计量: {f_stat:.4f}, p值: {f_p:.4f}")
# 多重比较(Tukey HSD)
from statsmodels.stats.multicomp import pairwise_tukeyhsd
tukey = pairwise_tukeyhsd(endog=demo_data['薪资'], groups=demo_data['部门'], alpha=0.05)
print("\nTukey HSD多重比较:")
print(tukey)
4.3 相关性分析
# Pearson相关系数(线性相关)
pearson_corr, p_val = stats.pearsonr(demo_data['年龄'], demo_data['薪资'])
print(f"Pearson相关系数(年龄vs薪资): {pearson_corr:.4f}, p值: {p_val:.4f}")
# Spearman秩相关(非参数,适合非线性关系)
spearman_corr, p_val = stats.spearmanr(demo_data['年龄'], demo_data['薪资'])
print(f"Spearman相关系数(年龄vs薪资): {spearman_corr:.4f}, p值: {p_val:.4f}")
# 多变量相关性矩阵
corr_matrix = demo_data[['年龄', '薪资', '工作年限']].corr(method='pearson')
print("\n相关性矩阵:")
print(corr_matrix)
# 相关性可视化
plt.figure(figsize=(8, 6))
sns.heatmap(corr_matrix, annot=True, cmap='RdBu_r', center=0, fmt='.3f')
plt.title('变量相关性热力图')
plt.show()
# 偏相关分析(控制其他变量影响)
from scipy.stats import partial_corr
# 注意:partial_corr需要自定义或使用pingouin库
# 这里演示概念
print("\n偏相关概念:在控制工作年限的情况下,年龄与薪资的相关性")
5. 机器学习入门
5.1 线性回归预测
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.preprocessing import StandardScaler
# 准备数据(增加样本量)
np.random.seed(42)
n_samples = 500
ages = np.random.randint(22, 50, n_samples)
work_years = np.random.randint(1, 15, n_samples)
depts = np.random.choice(['技术', '市场', '人事', '销售'], n_samples)
# 构建薪资公式:薪资 = 基础 + 年龄*系数 + 年限*系数 + 部门影响 + 随机噪声
base_salary = 8000
dept_effect = {'技术': 3000, '市场': 2000, '人事': 1500, '销售': 2500}
salaries = (base_salary +
ages * 250 +
work_years * 500 +
np.array([dept_effect[d] for d in depts]) +
np.random.normal(0, 1500, n_samples))
df_ml = pd.DataFrame({
'年龄': ages,
'工作年限': work_years,
'部门': depts,
'薪资': salaries
})
# 特征工程:部门编码
df_ml = pd.get_dummies(df_ml, columns=['部门'], prefix='dept')
# 分割数据集
X = df_ml.drop('薪资', axis=1)
y = df_ml['薪资']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练模型
model = LinearRegression()
model.fit(X_train_scaled, y_train)
# 预测与评估
y_pred = model.predict(X_test_scaled)
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print("=== 线性回归模型评估 ===")
print(f"均方误差 (MSE): {mse:.2f}")
print(f"均方根误差 (RMSE): {rmse:.2f}")
print(f"平均绝对误差 (MAE): {mae:.2f}")
print(f"R²分数: {r2:.4f}")
# 模型解释
print("\n=== 模型系数 ===")
feature_names = X.columns
for feature, coef in zip(feature_names, model.coef_):
print(f"{feature}: {coef:.2f}")
print(f"截距: {model.intercept_:.2f}")
# 可视化预测结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, y_pred, alpha=0.6)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
plt.xlabel('真实薪资')
plt.ylabel('预测薪资')
plt.title('预测vs真实薪资')
plt.grid(True)
plt.show()
5.2 分类模型:预测部门
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.preprocessing import LabelEncoder
# 准备分类数据
df_clf = df_ml.copy()
# 创建目标变量:是否高薪(>20000)
df_clf['是否高薪'] = (df_clf['薪资'] > 20000).astype(int)
X_clf = df_clf.drop(['薪资', '是否高薪'], axis=1)
y_clf = df_clf['是否高薪']
# 分割数据
X_train_clf, X_test_clf, y_train_clf, y_test_clf = train_test_split(
X_clf, y_clf, test_size=0.2, random_state=42, stratify=y_clf
)
# 训练随机森林
clf = RandomForestClassifier(n_estimators=100, random_state=42, max_depth=5)
clf.fit(X_train_clf, y_train_clf)
# 预测与评估
y_pred_clf = clf.predict(X_test_clf)
accuracy = accuracy_score(y_test_clf, y_pred_clf)
print("=== 随机森林分类器评估 ===")
print(f"准确率: {accuracy:.4f}")
print("\n分类报告:")
print(classification_report(y_test_clf, y_pred_clf, target_names=['低薪', '高薪']))
# 混淆矩阵
cm = confusion_matrix(y_test_clf, y_pred_clf)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['预测低薪', '预测高薪'],
yticklabels=['实际低薪', '实际高薪'])
plt.title('混淆矩阵')
plt.ylabel('实际类别')
plt.xlabel('预测类别')
plt.show()
# 特征重要性
feature_importance = pd.DataFrame({
'特征': X_clf.columns,
'重要性': clf.feature_importances_
}).sort_values('重要性', ascending=False)
plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance, x='重要性', y='特征')
plt.title('随机森林特征重要性')
plt.show()
5.3 聚类分析:客户分群
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
# 准备聚类数据(使用年龄和薪资)
X_cluster = demo_data[['年龄', '薪资']].copy()
scaler_cluster = StandardScaler()
X_scaled = scaler_cluster.fit_transform(X_cluster)
# 确定最佳K值(肘部法则和轮廓系数)
inertias = []
silhouette_scores = []
K_range = range(2, 8)
for k in K_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
kmeans.fit(X_scaled)
inertias.append(kmeans.inertia_)
silhouette_scores.append(silhouette_score(X_scaled, kmeans.labels_))
# 可视化K值选择
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
ax1.plot(K_range, inertias, 'bo-')
ax1.set_xlabel('K值')
ax1.set_ylabel('惯性(Inertia)')
ax1.set_title('肘部法则')
ax1.grid(True)
ax2.plot(K_range, silhouette_scores, 'ro-')
ax2.set_xlabel('K值')
ax2.set_ylabel('轮廓系数')
ax2.set_title('轮廓系数分析')
ax2.grid(True)
plt.show()
# 选择K=3进行聚类
optimal_k = 3
kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
demo_data['集群'] = kmeans.fit_predict(X_scaled)
# 可视化聚类结果
plt.figure(figsize=(10, 6))
sns.scatterplot(data=demo_data, x='年龄', y='薪资', hue='集群', palette='viridis', s=60)
plt.title('K-Means聚类结果(K=3)')
plt.show()
# 集群分析
cluster_analysis = demo_data.groupby('集群').agg({
'年龄': ['mean', 'std', 'count'],
'薪资': ['mean', 'std', 'min', 'max']
}).round(2)
print("=== 聚群分析 ===")
print(cluster_analysis)
6. 实战案例:销售数据分析全流程
6.1 数据加载与探索
# 创建模拟销售数据
np.random.seed(42)
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
n = len(dates)
sales_data = pd.DataFrame({
'日期': dates,
'销售额': np.random.normal(50000, 15000, n),
'订单量': np.random.poisson(100, n),
'客户数': np.random.randint(50, 200, n),
'产品类别': np.random.choice(['电子产品', '服装', '家居'], n, p=[0.4, 0.35, 0.25]),
'地区': np.random.choice(['北京', '上海', '广州', '深圳'], n)
})
# 添加趋势和季节性
sales_data['销售额'] += np.linspace(0, 20000, n) # 上升趋势
sales_data['销售额'] += 5000 * np.sin(2 * np.pi * np.arange(n) / 30) # 月度季节性
sales_data['销售额'] += 3000 * np.sin(2 * np.pi * np.arange(n) / 7) # 周度季节性
# 保存并加载
sales_data.to_csv('sales_2023.csv', index=False)
df_sales = pd.read_csv('sales_2023.csv')
df_sales['日期'] = pd.to_datetime(df_sales['日期'])
print("销售数据概览:")
print(df_sales.head())
print(f"\n数据形状: {df_sales.shape}")
print(f"\n日期范围: {df_sales['日期'].min()} 到 {df_sales['日期'].max()}")
6.2 时间序列分析
# 按月汇总
monthly_sales = df_sales.groupby(df_sales['日期'].dt.to_period('M')).agg({
'销售额': ['sum', 'mean', 'std'],
'订单量': 'sum',
'客户数': 'mean'
}).reset_index()
monthly_sales.columns = ['月份', '总销售额', '平均日销售额', '销售额标准差', '总订单量', '平均客户数']
monthly_sales['月份'] = monthly_sales['月份'].dt.to_timestamp()
# 按周汇总
weekly_sales = df_sales.set_index('日期').resample('W').agg({
'销售额': 'sum',
'订单量': 'sum',
'客户数': 'mean'
})
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 月度销售额
axes[0, 0].plot(monthly_sales['月份'], monthly_sales['总销售额'], marker='o', linewidth=2)
axes[0, 0].set_title('2023年月度销售额')
axes[0, 0].set_xlabel('月份')
axes[0, 0].set_ylabel('销售额')
axes[0, 0].grid(True)
# 周度销售额
axes[0, 1].plot(weekly_sales.index, weekly_sales['销售额'], color='orange')
axes[0, 1].set_title('周度销售额趋势')
axes[0, 1].set_xlabel('日期')
axes[0, 1].set_ylabel('销售额')
axes[0, 1].tick_params(axis='x', rotation=45)
# 按产品类别
category_sales = df_sales.groupby('产品类别')['销售额'].sum()
axes[1, 0].bar(category_sales.index, category_sales.values, color=['skyblue', 'lightcoral', 'lightgreen'])
axes[1, 0].set_title('各产品类别销售额')
axes[1, 0].set_ylabel('销售额')
# 按地区
region_sales = df_sales.groupby('地区')['销售额'].sum()
axes[1, 1].pie(region_sales.values, labels=region_sales.index, autopct='%1.1f%%', startangle=90)
axes[1, 1].set_title('各地区销售额占比')
plt.tight_layout()
plt.show()
# 季节性分解
from statsmodels.tsa.seasonal import seasonal_decompose
# 需要设置频率,这里使用周汇总数据
weekly_sales_freq = weekly_sales.asfreq('W')
decomposition = seasonal_decompose(weekly_sales_freq['销售额'], model='additive', period=4)
decomposition.plot()
plt.suptitle('销售额季节性分解')
plt.tight_layout()
plt.show()
6.3 销售预测
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error
# 使用周度数据进行预测
weekly_sales_clean = weekly_sales.dropna()
# 分割数据(最后4周作为测试集)
train_size = len(weekly_sales_clean) - 4
train_data = weekly_sales_clean.iloc[:train_size]['销售额']
test_data = weekly_sales_clean.iloc[train_size:]['销售额']
# 拟合ARIMA模型
# 通过ACF/PACF确定参数,这里简化使用(2,1,2)
model = ARIMA(train_data, order=(2, 1, 2))
model_fit = model.fit()
# 预测
forecast = model_fit.forecast(steps=4)
forecast_index = pd.date_range(start=weekly_sales_clean.index[-1] + pd.Timedelta(days=7),
periods=4, freq='W')
# 评估
mse = mean_squared_error(test_data, forecast)
rmse = np.sqrt(mse)
print("=== ARIMA销售预测 ===")
print(f"测试集RMSE: {rmse:.2f}")
print("\n未来4周预测:")
for date, value in zip(forecast_index, forecast):
print(f"{date.date()}: {value:.2f}")
# 可视化预测结果
plt.figure(figsize=(12, 6))
plt.plot(weekly_sales_clean.index, weekly_sales_clean['销售额'], label='历史数据', linewidth=2)
plt.plot(test_data.index, test_data['销售额'], label='实际值', marker='o', color='green')
plt.plot(forecast_index, forecast, label='预测值', marker='s', color='red', linestyle='--')
plt.title('销售预测(ARIMA模型)')
plt.xlabel('日期')
plt.ylabel('销售额')
plt.legend()
plt.grid(True)
plt.show()
7. 性能优化技巧
7.1 向量化操作
import time
# 创建大数据集
n = 1000000
large_df = pd.DataFrame({
'年龄': np.random.randint(20, 60, n),
'薪资': np.random.normal(20000, 5000, n),
'部门': np.random.choice(['技术', '市场', '人事'], n)
})
# 慢速方法:循环
def slow_calculation(df):
result = []
for i in range(len(df)):
if df.iloc[i]['年龄'] > 30:
result.append(df.iloc[i]['薪资'] * 1.1)
else:
result.append(df.iloc[i]['薪资'])
return result
# 快速方法:向量化
def fast_calculation(df):
return np.where(df['年龄'] > 30, df['薪资'] * 1.1, df['薪资'])
# 性能对比
start = time.time()
slow_result = slow_calculation(large_df)
slow_time = time.time() - start
start = time.time()
fast_result = fast_calculation(large_df)
fast_time = time.time() - start
print(f"循环方法耗时: {slow_time:.4f}秒")
print(f"向量化方法耗时: {fast_time:.4f}秒")
print(f"性能提升: {slow_time/fast_time:.1f}倍")
7.2 内存优化
# 对比不同数据类型的内存使用
df_types = pd.DataFrame({
'字符串类型': ['A', 'B', 'C'] * 1000,
'整数类型': [1, 2, 3] * 1000,
'浮点类型': [1.1, 2.2, 3.3] * 1000
})
print("原始内存使用:")
print(df_types.info())
# 优化内存
df_optimized = df_types.copy()
df_optimized['字符串类型'] = df_optimized['字符串类型'].astype('category')
df_optimized['整数类型'] = pd.to_numeric(df_optimized['整数类型'], downcast='integer')
df_optimized['浮点类型'] = pd.to_numeric(df_optimized['浮点类型'], downcast='float')
print("\n优化后内存使用:")
print(df_optimized.info())
# 大数据集性能测试
n = 100000
large_df = pd.DataFrame({
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
'value': np.random.randn(n)
})
# 字符串 vs Categorical
start = time.time()
result_str = large_df.groupby('category').mean()
time_str = time.time() - start
large_df['category_cat'] = large_df['category'].astype('category')
start = time.time()
result_cat = large_df.groupby('category_cat').mean()
time_cat = time.time() - start
print(f"\n字符串分组耗时: {time_str:.4f}秒")
print(f"Categorical分组耗时: {time_cat:.4f}秒")
print(f"内存节省: {large_df['category'].memory_usage(deep=True) / large_df['category_cat'].memory_usage():.1f}倍")
7.3 并行处理
from joblib import Parallel, delayed
import multiprocessing
def process_data_chunk(chunk):
"""模拟耗时的数据处理函数"""
return chunk.apply(lambda x: x**2 + np.sin(x))
# 创建大数据
n = 100000
data = pd.DataFrame(np.random.randn(n, 5), columns=['A', 'B', 'C', 'D', 'E'])
# 串行处理
start = time.time()
result_serial = process_data_chunk(data)
serial_time = time.time() - start
# 并行处理
def parallel_process(df, n_jobs=-1):
if n_jobs == -1:
n_jobs = multiprocessing.cpu_count()
chunks = np.array_split(df, n_jobs)
results = Parallel(n_jobs=n_jobs)(delayed(process_data_chunk)(chunk) for chunk in chunks)
return pd.concat(results)
start = time.time()
result_parallel = parallel_process(data)
parallel_time = time.time() - start
print(f"串行处理耗时: {serial_time:.4f}秒")
print(f"并行处理耗时: {parallel_time:.4f}秒")
print(f"并行加速比: {serial_time/parallel_time:.2f}x")
print(f"CPU核心数: {multiprocessing.cpu_count()}")
8. 总结与最佳实践
8.1 关键要点回顾
- 环境管理:使用虚拟环境隔离依赖,推荐Anaconda
- 数据处理:Pandas是核心,熟练掌握DataFrame操作
- 可视化:根据场景选择Matplotlib(静态)、Seaborn(统计)、Plotly(交互)
- 统计分析:正确使用假设检验,理解p值和置信区间
- 机器学习:从简单模型开始,重视特征工程
- 性能优化:向量化 > 循环,Categorical类型,必要时并行处理
8.2 常见陷阱与解决方案
# 陷阱1:链式索引警告
df_copy = df.copy()
# 错误:df_copy[df_copy['年龄'] > 25]['薪资'] = 20000 # 会报警告
# 正确:使用.loc
df_copy.loc[df_copy['年龄'] > 25, '薪资'] = 20000
# 陷阱2:数据类型混淆
# 确保数值列是数值类型
df['薪资'] = pd.to_numeric(df['薪资'], errors='coerce')
# 陷阱3:内存泄漏
# 处理大文件时分块读取
chunk_size = 10000
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
process(chunk) # 处理每个块
8.3 持续学习路径
- 基础巩固:深入学习Pandas官方文档
- 可视化进阶:学习Plotly Dash构建交互式仪表板
- 统计深入:学习贝叶斯统计、时间序列分析
- 机器学习:掌握特征工程、模型调优、交叉验证
- 大数据:学习Dask、PySpark处理海量数据
- 工程化:学习单元测试、代码规范、版本控制
通过系统掌握以上内容,您将能够高效地进行Python数据分析,解决实际工作中的各种数据问题。记住,实践是最好的老师,多做项目,多总结经验!
