引言:为什么选择 Python 进行数据分析?
在当今数据驱动的时代,数据分析已经成为各行各业不可或缺的核心技能。Python 作为一门简洁、强大且生态系统丰富的编程语言,已经成为数据分析领域的首选工具。无论你是数据分析师、数据科学家、业务分析师还是研究人员,掌握 Python 数据分析技能都能为你打开新的职业大门。
Python 之所以在数据分析领域占据主导地位,主要得益于以下几个关键优势:
- 简单易学:Python 的语法清晰直观,即使是编程初学者也能快速上手
- 强大的生态系统:拥有 NumPy、Pandas、Matplotlib、Seaborn 等专业库
- 社区支持:拥有全球最大的开发者社区,遇到问题总能找到解决方案
- 跨平台兼容:可在 Windows、macOS、Linux 等各种操作系统上运行
- 企业级应用:从初创公司到大型企业都在使用 Python 进行数据分析
第一部分:Python 数据分析环境搭建
1.1 安装 Python 和必要的工具
在开始数据分析之前,我们需要搭建一个完整的开发环境。以下是详细的步骤:
步骤 1:安装 Python
访问 Python 官方网站 下载最新版本的 Python。建议下载 Python 3.8 或更高版本。
Windows 用户:
- 下载 Windows installer
- 运行安装程序,务必勾选 “Add Python to PATH”
- 点击 “Install Now”
macOS 用户:
- 下载 macOS installer
- 运行安装程序
- 或者使用 Homebrew:
brew install python
Linux 用户:
# Ubuntu/Debian
sudo apt update
sudo apt install python3 python3-pip
# CentOS/RHEL
sudo yum install python3 python3-pip
步骤 2:安装 Anaconda(推荐)
Anaconda 是一个专门为数据科学设计的 Python 发行版,包含了所有必要的工具和库。
# 下载 Anaconda 安装包
# 访问 https://www.anaconda.com/products/distribution
# 安装完成后,更新 conda
conda update conda
# 创建新的环境
conda create -n data_analysis python=3.9
conda activate data_analysis
步骤 3:安装核心数据分析库
# 使用 pip 安装
pip install numpy pandas matplotlib seaborn scikit-learn jupyter
# 或者使用 conda 安装(推荐)
conda install numpy pandas matplotlib seaborn scikit-learn jupyter
1.2 验证安装
创建一个简单的 Python 脚本来验证所有库都已正确安装:
# verify_installation.py
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
print("✅ NumPy 版本:", np.__version__)
print("✅ Pandas 版本:", pd.__version__)
print("✅ Matplotlib 版本:", plt.__version__)
print("✅ Seaborn 版本:", sns.__version__)
print("✅ Scikit-learn 版本:", sklearn.__version__)
# 简单测试
data = pd.DataFrame({
'A': [1, 2, 3, 4, 5],
'B': [10, 20, 30, 40, 50]
})
print("\n测试数据框创建成功:")
print(data)
运行这个脚本,如果看到版本号和测试数据,说明环境搭建成功!
第二部分:Python 数据分析核心库详解
2.1 NumPy:数值计算的基石
NumPy 是 Python 科学计算的基础库,提供了高效的多维数组对象和大量数学函数。
NumPy 数组基础
import numpy as np
# 创建数组
arr1 = np.array([1, 2, 3, 4, 5]) # 一维数组
arr2 = np.array([[1, 2, 3], [4, 5, 6]]) # 二维数组
arr3 = np.zeros((3, 4)) # 3x4 的零数组
arr4 = np.ones((2, 3)) # 2x3 的一数组
arr5 = np.arange(0, 10, 2) # 0到10,步长为2
arr6 = np.linspace(0, 1, 5) # 0到1之间生成5个等间距数
print("一维数组:", arr1)
print("二维数组:\n", arr2)
print("零数组:\n", arr3)
print("一数组:\n", arr4)
print("arange 数组:", arr5)
print("linspace 数组:", arr6)
# 数组运算(向量化操作)
arr = np.array([1, 2, 3, 4, 5])
print("\n原始数组:", arr)
print("加 10:", arr + 10)
print("乘 2:", arr * 2)
print("平方:", arr ** 2)
print("正弦值:", np.sin(arr))
# 数组索引和切片
matrix = np.array([[1, 2, 3, 4],
[5, 6, 7, 8],
[9, 10, 11, 12]])
print("\n原始矩阵:\n", matrix)
print("第2行第3列的元素:", matrix[1, 2]) # 索引从0开始
print("第1行:", matrix[0, :])
print("第2列:", matrix[:, 1])
print("前2行,前2列:\n", matrix[0:2, 0:2])
# 布尔索引
data = np.array([15, 22, 8, 35, 12, 45])
print("\n原始数据:", data)
print("大于20的元素:", data[data > 20])
# 统计运算
print("\n统计信息:")
print("总和:", data.sum())
print("平均值:", data.mean())
print("标准差:", data.std())
print("最小值:", data.min())
print("最大值:", data.max())
NumPy 高级功能
# 形状操作
arr = np.arange(1, 13).reshape(3, 4)
print("3x4 矩阵:\n", arr)
# 转置
print("\n转置矩阵:\n", arr.T)
# 数组拼接
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
# 垂直拼接
vertical = np.vstack((a, b))
print("\n垂直拼接:\n", vertical)
# 水平拼接
horizontal = np.hstack((a, b))
print("水平拼接:\n", horizontal)
# 广播机制
arr1 = np.array([[1, 2, 3], [4, 5, 6]])
arr2 = np.array([10, 20, 30])
result = arr1 + arr2 # arr2 自动广播到 arr1 的形状
print("\n广播结果:\n", result)
# 随机数生成
np.random.seed(42) # 设置随机种子,确保结果可重现
random_data = np.random.randn(5, 3) # 标准正态分布
print("\n随机数据:\n", random_data)
# 随机整数
random_ints = np.random.randint(1, 100, size=10)
print("随机整数:", random_ints)
2.2 Pandas:数据处理的瑞士军刀
Pandas 建立在 NumPy 之上,提供了 DataFrame 和 Series 两种主要数据结构,是数据清洗、转换和分析的核心工具。
Series 和 DataFrame 基础
import pandas as pd
# 创建 Series
s = pd.Series([1, 3, 5, np.nan, 6, 8])
print("Series:\n", s)
# 创建 DataFrame
# 方法1:从字典创建
df1 = pd.DataFrame({
'姓名': ['张三', '李四', '王五', '赵六'],
'年龄': [25, 30, 35, 28],
'城市': ['北京', '上海', '广州', '深圳'],
'薪资': [15000, 20000, 18000, 22000]
})
print("\nDataFrame 1:\n", df1)
# 方法2:从列表创建
df2 = pd.DataFrame([
['张三', 25, '北京', 15000],
['李四', 30, '上海', 20000],
['王五', 35, '广州', 18000],
['赵六', 28, '深圳', 22000]
], columns=['姓名', '年龄', '城市', '薪资'])
print("\nDataFrame 2:\n", df2)
# 方法3:从 NumPy 数组创建
data = np.random.randn(5, 4)
df3 = pd.DataFrame(data, columns=['A', 'B', 'C', 'D'])
print("\nDataFrame 3:\n", df3)
数据查看和基本信息
# 继续使用 df1
df = df1
# 查看前几行
print("前3行:\n", df.head(3))
# 查看后几行
print("\n后2行:\n", df.tail(2))
# 查看索引、列名和数据类型
print("\n索引:", df.index)
print("列名:", df.columns)
print("数据类型:\n", df.dtypes)
# 查看基本信息
print("\n数据框信息:")
df.info()
# 统计描述
print("\n统计描述:\n", df.describe())
# 查看唯一值
print("\n城市唯一值:", df['城市'].unique())
print("城市值计数:\n", df['城市'].value_counts())
数据选择和索引
# 列选择
print("选择单列:\n", df['姓名'])
print("\n选择多列:\n", df[['姓名', '薪资']])
# 行选择 - loc(标签索引)
print("选择第0行:\n", df.loc[0])
print("\n选择第0-2行,姓名和城市列:\n", df.loc[0:2, ['姓名', '城市']])
# 行选择 - iloc(位置索引)
print("选择第0行:\n", df.iloc[0])
print("\n选择第0-2行,第0-1列:\n", df.iloc[0:3, 0:2])
# 条件选择
print("年龄大于28的员工:\n", df[df['年龄'] > 28])
print("\n在北京或上海的员工:\n", df[df['城市'].isin(['北京', '上海'])])
print("\n薪资在16000-22000之间的员工:\n", df[(df['薪资'] >= 16000) & (df['薪资'] <= 22000)])
# 布尔索引组合条件
condition = (df['年龄'] > 25) & (df['薪资'] > 18000)
print("\n年龄>25且薪资>18000:\n", df[condition])
数据清洗和处理
# 创建包含缺失值的数据
df_missing = pd.DataFrame({
'A': [1, 2, np.nan, 4, 5],
'B': [10, np.nan, 30, 40, 50],
'C': ['a', 'b', 'c', 'd', 'e']
})
print("原始数据(含缺失值):\n", df_missing)
# 检查缺失值
print("\n每列缺失值数量:\n", df_missing.isnull().sum())
# 处理缺失值
# 方法1:删除缺失值
df_dropped = df_missing.dropna()
print("\n删除缺失值后:\n", df_dropped)
# 方法2:填充缺失值
df_filled = df_missing.fillna(0)
print("\n用0填充缺失值:\n", df_filled)
# 方法3:用均值填充
df_filled_mean = df_missing.copy()
df_filled_mean['A'] = df_filled_mean['A'].fillna(df_filled_mean['A'].mean())
df_filled_mean['B'] = df_filled_mean['B'].fillna(df_filled_mean['B'].mean())
print("\n用均值填充数值列:\n", df_filled_mean)
# 数据类型转换
df_type = pd.DataFrame({
'数值字符串': ['1', '2', '3', '4'],
'日期字符串': ['2023-01-01', '2023-02-01', '2023-03-01', '2023-04-01']
})
print("\n原始类型:\n", df_type.dtypes)
df_type['数值字符串'] = df_type['数值字符串'].astype(int)
df_type['日期字符串'] = pd.to_datetime(df_type['日期字符串'])
print("\n转换后类型:\n", df_type.dtypes)
print("转换后数据:\n", df_type)
# 数据排序
df_sorted = df.sort_values('薪资', ascending=False)
print("\n按薪资降序排列:\n", df_sorted)
# 重命名列
df_renamed = df.rename(columns={'姓名': 'Name', '年龄': 'Age', '城市': 'City', '薪资': 'Salary'})
print("\n重命名列后:\n", df_renamed)
# 添加新列
df['薪资等级'] = df['薪资'].apply(lambda x: '高' if x > 19000 else '中' if x > 16000 else '低')
print("\n添加薪资等级列:\n", df)
数据分组和聚合
# 创建销售数据
sales_data = pd.DataFrame({
'日期': ['2023-01-01', '2023-01-01', '2023-01-02', '2023-01-02', '2023-01-03', '2023-01-03'],
'产品': ['A', 'B', 'A', 'B', 'A', 'B'],
'销售额': [1000, 1500, 1200, 1800, 1100, 1600],
'数量': [10, 15, 12, 18, 11, 16]
})
print("销售数据:\n", sales_data)
# 分组聚合
grouped = sales_data.groupby('产品').agg({
'销售额': ['sum', 'mean', 'max', 'min'],
'数量': 'sum'
})
print("\n按产品分组聚合:\n", grouped)
# 多级分组
grouped_multi = sales_data.groupby(['日期', '产品']).agg({
'销售额': 'sum',
'数量': 'sum'
})
print("\n按日期和产品分组:\n", group)
# 应用多个聚合函数
grouped_multi_agg = sales_data.groupby('产品').agg(
总销售额=('销售额', 'sum'),
平均销售额=('销售额', 'mean'),
总数量=('数量', 'sum'),
交易次数=('数量', 'count')
)
print("\n多聚合函数:\n", grouped_multi_agg)
数据合并和连接
# 创建两个数据框
df_left = pd.DataFrame({
'key': ['A', 'B', 'C', 'D'],
'value1': [1, 2, 3, 4]
})
df_right = pd.DataFrame({
'key': ['B', 'D', 'E', 'F'],
'value2': [5, 6, 7, 8]
})
print("左数据框:\n", df_left)
print("\n右数据框:\n", df_right)
# 内连接(只保留匹配的行)
inner_join = pd.merge(df_left, df_right, on='key', how='inner')
print("\n内连接:\n", inner_join)
# 左连接(保留左表所有行)
left_join = pd.merge(df_left, df_right, on='key', how='left')
print("\n左连接:\n", left_join)
# 右连接(保留右表所有行)
right_join = pd.merge(df_left, df_right, on='key', how='right')
print("\n右连接:\n", right_join)
# 外连接(保留所有行)
outer_join = pd.merge(df_left, df_right, on='key', how='outer')
print("\n外连接:\n", outer_join)
# 纵向合并
df_a = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df_b = pd.DataFrame({'A': [5, 6], 'B': [7, 8]})
vertical_concat = pd.concat([df_a, df_b], ignore_index=True)
print("\n纵向合并:\n", vertical_concat)
时间序列处理
# 创建时间序列数据
dates = pd.date_range('2023-01-01', periods=6, freq='D')
ts_data = pd.DataFrame({
'date': dates,
'value': np.random.randn(6) * 100
})
print("时间序列数据:\n", ts_data)
# 设置时间索引
ts_data.set_index('date', inplace=True)
print("\n设置时间索引后:\n", ts_data)
# 时间重采样
# 按周求平均
weekly = ts_data.resample('W').mean()
print("\n按周重采样:\n", weekly)
# 按月求和
monthly = ts_data.resample('M').sum()
print("\n按月重采样:\n", monthly)
# 时间窗口计算
rolling_2d = ts_data.rolling(window=2).mean()
print("\n2天滚动平均:\n", rolling_2d)
2.3 Matplotlib 和 Seaborn:数据可视化
Matplotlib 基础绘图
import matplotlib.pyplot as plt
import numpy as np
# 设置中文字体(解决中文显示问题)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 准备数据
x = np.linspace(0, 10, 100)
y1 = np.sin(x)
y2 = np.cos(x)
# 创建图形
plt.figure(figsize=(12, 8))
# 1. 折线图
plt.subplot(2, 2, 1)
plt.plot(x, y1, label='sin(x)', color='blue', linewidth=2)
plt.plot(x, y2, label='cos(x)', color='red', linestyle='--')
plt.title('折线图')
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
plt.grid(True, alpha=0.3)
# 2. 散点图
np.random.seed(42)
x_scatter = np.random.normal(0, 1, 100)
y_scatter = np.random.normal(0, 1, 100)
plt.subplot(2, 2, 2)
plt.scatter(x_scatter, y_scatter, alpha=0.6, c='green', s=50)
plt.title('散点图')
plt.xlabel('X轴')
plt.ylabel('Y轴')
# 3. 柱状图
categories = ['A', 'B', 'C', 'D', 'E']
values = [23, 45, 56, 78, 32]
plt.subplot(2, 2, 3)
plt.bar(categories, values, color=['red', 'blue', 'green', 'orange', 'purple'])
plt.title('柱状图')
plt.xlabel('类别')
plt.ylabel('数值')
# 4. 直方图
data_normal = np.random.normal(0, 1, 1000)
plt.subplot(2, 2, 4)
plt.hist(data_normal, bins=30, color='skyblue', edgecolor='black', alpha=0.7)
plt.title('直方图')
plt.xlabel('数值')
plt.ylabel('频数')
plt.tight_layout()
plt.show()
Seaborn 高级可视化
import seaborn as sns
import pandas as pd
import numpy as np
# 设置风格
sns.set_style("whitegrid")
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 创建示例数据
np.random.seed(42)
data = pd.DataFrame({
'类别': np.random.choice(['A', 'B', 'C'], 200),
'数值': np.random.normal(0, 1, 200),
'分组': np.random.choice(['X', 'Y'], 200),
'时间': np.tile(pd.date_range('2023-01-01', periods=100, freq='D'), 2)
})
# 1. 箱线图
plt.figure(figsize=(15, 10))
plt.subplot(2, 3, 1)
sns.boxplot(x='类别', y='数值', data=data)
plt.title('箱线图')
# 2. 小提琴图
plt.subplot(2, 3, 2)
sns.violinplot(x='类别', y='数值', data=data)
plt.title('小提琴图')
# 3. 热力图
plt.subplot(2, 3, 3)
corr_data = np.random.randn(10, 10)
corr_matrix = np.corrcoef(corr_data)
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('热力图')
# 4. 分布图
plt.subplot(2, 3, 4)
sns.histplot(data=data, x='数值', hue='类别', multiple='stack', bins=20)
plt.title('分布图')
# 5. 关系图
plt.subplot(2, 3, 5)
sns.scatterplot(data=data, x='数值', y='分组', hue='类别', size='数值', sizes=(20, 200))
plt.title('关系图')
# 6. 线图
plt.subplot(2, 3, 6)
time_data = data.groupby('时间').agg({'数值': 'mean'}).reset_index()
sns.lineplot(data=time_data, x='时间', y='数值')
plt.title('时间序列线图')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
第三部分:完整数据分析项目实战
3.1 项目背景和目标
我们将通过一个完整的销售数据分析项目来演示 Python 数据分析的完整流程。项目目标:
- 数据清洗和预处理
- 销售趋势分析
- 产品表现分析
- 客户行为分析
- 生成可视化报告
3.2 完整代码实现
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")
# 1. 数据生成(模拟真实销售数据)
def generate_sales_data():
np.random.seed(42)
# 基础参数
n_records = 1000
products = ['笔记本电脑', '智能手机', '平板电脑', '耳机', '智能手表']
cities = ['北京', '上海', '广州', '深圳', '杭州']
customers = [f'客户_{i}' for i in range(1, 201)]
# 生成数据
data = {
'订单ID': [f'ORD{10000 + i}' for i in range(n_records)],
'订单日期': [datetime(2023, 1, 1) + timedelta(days=np.random.randint(0, 365))
for _ in range(n_records)],
'客户ID': np.random.choice(customers, n_records),
'产品': np.random.choice(products, n_records),
'城市': np.random.choice(cities, n_records),
'数量': np.random.randint(1, 6, n_records),
'单价': np.random.randint(1000, 10000, n_records)
}
df = pd.DataFrame(data)
df['销售额'] = df['数量'] * df['单价']
# 添加一些缺失值和异常值来模拟真实数据
df.loc[10:15, '数量'] = np.nan
df.loc[20:25, '单价'] = np.nan
df.loc[50, '销售额'] = df.loc[50, '销售额'] * 10 # 异常值
return df
# 2. 数据清洗函数
def clean_data(df):
print("=" * 50)
print("数据清洗阶段")
print("=" * 50)
# 复制数据避免修改原数据
df_clean = df.copy()
# 查看基本信息
print("\n原始数据形状:", df_clean.shape)
print("\n数据类型:\n", df_clean.dtypes)
print("\n缺失值统计:\n", df_clean.isnull().sum())
# 处理缺失值
# 数量用中位数填充
quantity_median = df_clean['数量'].median()
df_clean['数量'] = df_clean['数量'].fillna(quantity_median)
# 单价用同类产品的平均单价填充
df_clean['单价'] = df_clean.groupby('产品')['单价'].transform(
lambda x: x.fillna(x.mean())
)
# 处理异常值(使用 IQR 方法)
Q1 = df_clean['销售额'].quantile(0.25)
Q3 = df_clean['销售额'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df_clean[(df_clean['销售额'] < lower_bound) |
(df_clean['销售额'] > upper_bound)]
print(f"\n检测到 {len(outliers)} 个异常值")
# 移除异常值
df_clean = df_clean[(df_clean['销售额'] >= lower_bound) &
(df_clean['销售额'] <= upper_bound)]
# 数据类型转换
df_clean['订单日期'] = pd.to_datetime(df_clean['订单日期'])
# 添加衍生字段
df_clean['订单月份'] = df_clean['订单日期'].dt.to_period('M')
df_clean['季度'] = df_clean['订单日期'].dt.quarter
df_clean['星期'] = df_clean['订单日期'].dt.day_name()
print("\n清洗后数据形状:", df_clean.shape)
print("\n清洗后缺失值:\n", df_clean.isnull().sum())
return df_clean
# 3. 数据分析函数
def analyze_data(df):
print("\n" + "=" * 50)
print("数据分析阶段")
print("=" * 50)
analysis_results = {}
# 3.1 整体销售概况
total_sales = df['销售额'].sum()
total_orders = len(df)
avg_order_value = df['销售额'].mean()
print(f"\n整体销售概况:")
print(f"总销售额: {total_sales:,.2f}")
print(f"总订单数: {total_orders}")
print(f"平均订单价值: {avg_order_value:,.2f}")
analysis_results['total_sales'] = total_sales
analysis_results['total_orders'] = total_orders
analysis_results['avg_order_value'] = avg_order_value
# 3.2 按月份分析
monthly_sales = df.groupby('订单月份')['销售额'].agg(['sum', 'count', 'mean']).reset_index()
monthly_sales.columns = ['月份', '销售额', '订单数', '平均订单额']
print(f"\n月度销售分析:\n{monthly_sales}")
analysis_results['monthly_sales'] = monthly_sales
# 3.3 按产品分析
product_analysis = df.groupby('产品').agg({
'销售额': ['sum', 'mean', 'count'],
'数量': 'sum'
}).round(2)
product_analysis.columns = ['总销售额', '平均销售额', '订单数', '总数量']
product_analysis = product_analysis.sort_values('总销售额', ascending=False)
print(f"\n产品销售分析:\n{product_analysis}")
analysis_results['product_analysis'] = product_analysis
# 3.4 按城市分析
city_analysis = df.groupby('城市')['销售额'].agg(['sum', 'count', 'mean']).round(2)
city_analysis.columns = ['总销售额', '订单数', '平均订单额']
city_analysis = city_analysis.sort_values('总销售额', ascending=False)
print(f"\n城市销售分析:\n{city_analysis}")
analysis_results['city_analysis'] = city_analysis
# 3.5 客户分析
customer_analysis = df.groupby('客户ID').agg({
'销售额': ['sum', 'count'],
'订单日期': 'min'
}).round(2)
customer_analysis.columns = ['总消费', '购买次数', '首次购买日期']
customer_analysis = customer_analysis.sort_values('总消费', ascending=False)
print(f"\n客户消费排行(前10):\n{customer_analysis.head(10)}")
analysis_results['customer_analysis'] = customer_analysis
# 3.6 产品-城市交叉分析
pivot_table = pd.pivot_table(df,
values='销售额',
index='城市',
columns='产品',
aggfunc='sum',
fill_value=0)
print(f"\n产品-城市销售透视表:\n{pivot_table}")
analysis_results['pivot_table'] = pivot_table
return analysis_results
# 4. 可视化函数
def create_visualizations(df, analysis_results):
print("\n" + "=" * 50)
print("可视化阶段")
print("=" * 50)
fig, axes = plt.subplots(3, 2, figsize=(20, 18))
fig.suptitle('销售数据分析报告', fontsize=20, fontweight='bold')
# 4.1 月度销售趋势
monthly_data = analysis_results['monthly_sales']
axes[0, 0].plot(range(len(monthly_data)), monthly_data['销售额'],
marker='o', linewidth=2, markersize=8)
axes[0, 0].set_title('月度销售额趋势', fontsize=14, fontweight='bold')
axes[0, 0].set_xlabel('月份')
axes[0, 0].set_ylabel('销售额')
axes[0, 0].set_xticks(range(len(monthly_data)))
axes[0, 0].set_xticklabels([str(m) for m in monthly_data['月份']], rotation=45)
axes[0, 0].grid(True, alpha=0.3)
# 4.2 产品销售占比
product_data = analysis_results['product_analysis']
axes[0, 1].pie(product_data['总销售额'],
labels=product_data.index,
autopct='%1.1f%%',
startangle=90)
axes[0, 1].set_title('产品销售占比', fontsize=14, fontweight='bold')
# 4.3 城市销售对比
city_data = analysis_results['city_analysis']
axes[1, 0].bar(city_data.index, city_data['总销售额'],
color=['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7'])
axes[1, 0].set_title('各城市销售额对比', fontsize=14, fontweight='bold')
axes[1, 0].set_ylabel('销售额')
axes[1, 0].tick_params(axis='x', rotation=45)
# 4.4 产品-城市热力图
pivot_data = analysis_results['pivot_table']
sns.heatmap(pivot_data, annot=True, fmt='.0f', cmap='YlOrRd',
ax=axes[1, 1], cbar_kws={'label': '销售额'})
axes[1, 1].set_title('产品-城市销售热力图', fontsize=14, fontweight='bold')
# 4.5 销售数量分布
sns.histplot(data=df, x='销售额', bins=30, kde=True, ax=axes[2, 0])
axes[2, 0].set_title('销售额分布直方图', fontsize=14, fontweight='bold')
axes[2, 0].set_xlabel('销售额')
# 4.6 产品单价与数量关系
scatter = axes[2, 1].scatter(df['单价'], df['数量'],
c=df['销售额'], cmap='viridis', alpha=0.6)
axes[2, 1].set_title('单价 vs 数量(颜色=销售额)', fontsize=14, fontweight='bold')
axes[2, 1].set_xlabel('单价')
axes[2, 1].set_ylabel('数量')
plt.colorbar(scatter, ax=axes[2, 1], label='销售额')
plt.tight_layout()
plt.show()
# 额外的高级可视化
create_advanced_visualizations(df)
def create_advanced_visualizations(df):
"""创建高级可视化图表"""
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
# 1. 箱线图:各产品销售额分布
sns.boxplot(data=df, x='产品', y='销售额', ax=axes[0, 0])
axes[0, 0].set_title('各产品销售额分布', fontweight='bold')
axes[0, 0].tick_params(axis='x', rotation=45)
# 2. 小提琴图:各城市销售额分布
sns.violinplot(data=df, x='城市', y='销售额', ax=axes[0, 1])
axes[0, 1].set_title('各城市销售额分布', fontweight='bold')
# 3. 客户消费累积图
customer_total = df.groupby('客户ID')['销售额'].sum().sort_values(ascending=False)
axes[1, 0].plot(range(len(customer_total)), customer_total.values,
linewidth=2)
axes[1, 0].set_title('客户消费累积排名', fontweight='bold')
axes[1, 0].set_xlabel('客户排名')
axes[1, 0].set_ylabel('累计消费')
axes[1, 0].grid(True, alpha=0.3)
# 4. 星期销售分布
weekday_sales = df.groupby('星期')['销售额'].sum()
weekday_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
weekday_sales = weekday_sales.reindex(weekday_order)
axes[1, 1].bar(range(len(weekday_sales)), weekday_sales.values,
color='skyblue')
axes[1, 1].set_title('星期销售分布', fontweight='bold')
axes[1, 1].set_xticks(range(len(weekday_sales)))
axes[1, 1].set_xticklabels(['周一', '周二', '周三', '周四', '周五', '周六', '周日'])
plt.tight_layout()
plt.show()
# 5. 主执行函数
def main():
print("开始销售数据分析项目...")
# 生成数据
raw_data = generate_sales_data()
print("数据生成完成,前5行:")
print(raw_data.head())
# 数据清洗
cleaned_data = clean_data(raw_data)
# 数据分析
analysis_results = analyze_data(cleaned_data)
# 可视化
create_visualizations(cleaned_data, analysis_results)
print("\n" + "=" * 50)
print("分析完成!")
print("=" * 50)
# 生成分析报告
generate_report(cleaned_data, analysis_results)
def generate_report(df, results):
"""生成简要分析报告"""
print("\n📊 分析报告摘要")
print("=" * 50)
print(f"\n1. 核心指标:")
print(f" • 总销售额: {results['total_sales']:,.2f}")
print(f" • 总订单数: {results['total_orders']}")
print(f" • 平均订单价值: {results['avg_order_value']:,.2f}")
print(f"\n2. 最佳表现:")
top_product = results['product_analysis'].index[0]
top_city = results['city_analysis'].index[0]
top_customer = results['customer_analysis'].index[0]
print(f" • 最畅销产品: {top_product}")
print(f" • 最高销售额城市: {top_city}")
print(f" • 最高消费客户: {top_customer}")
print(f"\n3. 关键发现:")
# 计算一些洞察
monthly_trend = results['monthly_sales']['销售额'].pct_change().mean()
if monthly_trend > 0:
trend_text = "增长"
else:
trend_text = "下降"
print(f" • 月均销售额趋势: {trend_text} ({monthly_trend:.2%})")
product_var = results['product_analysis']['总销售额'].std() / results['product_analysis']['总销售额'].mean()
print(f" • 产品销售集中度: {'高' if product_var > 0.5 else '低'}")
print(f"\n4. 建议:")
print(" • 加强畅销产品的库存管理")
print(" • 针对低销售额城市制定营销策略")
print(" • 建立客户忠诚度计划")
print(" • 优化产品定价策略")
# 执行主函数
if __name__ == "__main__":
main()
第四部分:高级数据分析技巧
4.1 数据透视表高级应用
# 高级透视表操作
advanced_df = pd.DataFrame({
'日期': pd.date_range('2023-01-01', periods=100, freq='D'),
'产品': np.random.choice(['A', 'B', 'C'], 100),
'区域': np.random.choice(['东区', '西区', '南区', '北区'], 100),
'销售额': np.random.randint(1000, 5000, 100),
'成本': np.random.randint(500, 3000, 100)
})
# 计算利润
advanced_df['利润'] = advanced_df['销售额'] - advanced_df['成本']
# 多级透视表
pivot_advanced = pd.pivot_table(advanced_df,
values=['销售额', '利润'],
index=['产品'],
columns=['区域'],
aggfunc={'销售额': ['sum', 'mean'],
'利润': ['sum', 'mean']},
fill_value=0,
margins=True, # 添加总计
margins_name='总计')
print("高级透视表:\n", pivot_advanced)
# 交叉表(频率表)
cross_tab = pd.crosstab(advanced_df['产品'], advanced_df['区域'],
margins=True, margins_name='总计')
print("\n交叉表:\n", cross_tab)
4.2 数据合并的高级技巧
# 使用 join 方法
df1 = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}, index=['a', 'b', 'c'])
df2 = pd.DataFrame({'C': [7, 8, 9], 'D': [10, 11, 12]}, index=['b', 'c', 'd'])
# 左连接
joined_left = df1.join(df2, how='left')
print("左连接:\n", joined_left)
# 使用 concat 进行高级合并
df3 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df4 = pd.DataFrame({'A': [5, 6], 'B': [7, 8]})
df5 = pd.DataFrame({'C': [9, 10], 'D': [11, 12]})
# 按列合并
concat_col = pd.concat([df3, df4], axis=1)
print("\n按列合并:\n", concat_col)
# 按行合并
concat_row = pd.concat([df3, df5], axis=0, ignore_index=True)
print("\n按行合并(不同列):\n", concat_row)
4.3 高级数据清洗技术
# 重复数据处理
df_duplicates = pd.DataFrame({
'A': [1, 2, 2, 3, 3, 3],
'B': ['a', 'b', 'b', 'c', 'c', 'c']
})
print("原始数据:\n", df_duplicates)
print("\n重复值数量:", df_duplicates.duplicated().sum())
print("\n删除重复值(保留第一个):\n", df_duplicates.drop_duplicates())
print("\n删除重复值(保留最后一个):\n", df_duplicates.drop_duplicates(keep='last'))
# 异常值检测和处理
def detect_outliers_iqr(series):
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
return (series < lower) | (series > upper)
data_with_outliers = pd.DataFrame({
'values': [10, 12, 11, 13, 12, 100, 11, 12, 9, 150]
})
outliers_mask = detect_outliers_iqr(data_with_outliers['values'])
print("\n异常值检测:\n", data_with_outliers[outliers_mask])
# 使用 winsorize 处理异常值
from scipy.stats import mstats
data_winsorized = data_with_outliers.copy()
data_winsorized['values'] = mstats.winsorize(data_winsorized['values'], limits=[0.1, 0.1])
print("\nWinsorize 处理后:\n", data_winsorized)
4.4 性能优化技巧
# 大数据集处理优化
import time
# 1. 使用适当的数据类型
def optimize_memory(df):
"""优化数据框内存使用"""
df_optimized = df.copy()
for col in df_optimized.columns:
col_type = df_optimized[col].dtype
if col_type != object:
c_min = df_optimized[col].min()
c_max = df_optimized[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df_optimized[col] = df_optimized[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df_optimized[col] = df_optimized[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df_optimized[col] = df_optimized[col].astype(np.int32)
else:
if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df_optimized[col] = df_optimized[col].astype(np.float32)
return df_optimized
# 2. 使用向量化操作替代循环
def vectorized_vs_loop():
"""向量化 vs 循环性能对比"""
size = 100000
arr = np.random.rand(size)
# 循环方式
start = time.time()
result_loop = []
for i in range(len(arr)):
result_loop.append(arr[i] ** 2 + 2 * arr[i] + 1)
time_loop = time.time() - start
# 向量化方式
start = time.time()
result_vec = arr ** 2 + 2 * arr + 1
time_vec = time.time() - start
print(f"循环方式耗时: {time_loop:.4f}秒")
print(f"向量化方式耗时: {time_vec:.4f}秒")
print(f"性能提升: {time_loop/time_vec:.2f}倍")
# 3. 使用 query 方法进行快速筛选
def query_vs_boolean():
"""query 方法 vs 布尔索引性能对比"""
df = pd.DataFrame({
'A': np.random.randint(1, 100, 100000),
'B': np.random.randint(1, 100, 100000),
'C': np.random.choice(['X', 'Y', 'Z'], 100000)
})
# 布尔索引
start = time.time()
result1 = df[(df['A'] > 50) & (df['B'] < 75) & (df['C'] == 'X')]
time_bool = time.time() - start
# Query 方法
start = time.time()
result2 = df.query('A > 50 and B < 75 and C == "X"')
time_query = time.time() - start
print(f"布尔索引耗时: {time_bool:.4f}秒")
print(f"Query 方法耗时: {time_query:.4f}秒")
print(f"结果相同: {result1.equals(result2)}")
# 运行性能测试
print("性能优化测试:")
vectorized_vs_loop()
query_vs_boolean()
第五部分:实际工作中的最佳实践
5.1 代码组织和模块化
# 将数据分析代码组织成函数和类
class SalesAnalyzer:
"""销售数据分析器类"""
def __init__(self, data_path=None, df=None):
if data_path:
self.df = pd.read_csv(data_path)
elif df is not None:
self.df = df
else:
raise ValueError("必须提供数据路径或数据框")
self.clean_df = None
self.analysis_results = {}
def clean_data(self):
"""数据清洗"""
self.clean_df = self.df.copy()
# 处理缺失值
self.clean_df = self.clean_df.dropna()
# 处理重复值
self.clean_df = self.clean_df.drop_duplicates()
return self.clean_df
def analyze(self):
"""执行完整分析"""
if self.clean_df is None:
self.clean_data()
# 销售汇总
self.analysis_results['summary'] = {
'total_sales': self.clean_df['销售额'].sum(),
'avg_sales': self.clean_df['销售额'].mean(),
'total_orders': len(self.clean_df)
}
# 产品分析
self.analysis_results['product'] = self.clean_df.groupby('产品')['销售额'].agg(['sum', 'count', 'mean'])
return self.analysis_results
def generate_report(self, output_path='report.txt'):
"""生成文本报告"""
with open(output_path, 'w', encoding='utf-8') as f:
f.write("销售数据分析报告\n")
f.write("=" * 50 + "\n\n")
summary = self.analysis_results['summary']
f.write("核心指标:\n")
f.write(f"总销售额: {summary['total_sales']:,.2f}\n")
f.write(f"平均销售额: {summary['avg_sales']:,.2f}\n")
f.write(f"总订单数: {summary['total_orders']}\n\n")
f.write("产品分析:\n")
f.write(self.analysis_results['product'].to_string())
print(f"报告已保存到: {output_path}")
# 使用示例
# analyzer = SalesAnalyzer(df=cleaned_data)
# analyzer.analyze()
# analyzer.generate_report()
5.2 错误处理和日志记录
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data_analysis.log'),
logging.StreamHandler()
]
)
def safe_analysis(df, column):
"""安全的数据分析函数"""
try:
if column not in df.columns:
raise ValueError(f"列 '{column}' 不存在")
if df[column].dtype not in ['int64', 'float64']:
raise TypeError(f"列 '{column}' 不是数值类型")
result = {
'mean': df[column].mean(),
'median': df[column].median(),
'std': df[column].std()
}
logging.info(f"成功分析列 '{column}'")
return result
except Exception as e:
logging.error(f"分析失败: {e}")
return None
# 使用示例
# result = safe_analysis(df, '销售额')
5.3 数据验证和质量检查
def validate_data(df, expected_columns, expected_dtypes=None):
"""数据验证函数"""
errors = []
# 检查列是否存在
for col in expected_columns:
if col not in df.columns:
errors.append(f"缺少必要列: {col}")
# 检查数据类型
if expected_dtypes:
for col, expected_dtype in expected_dtypes.items():
if col in df.columns:
actual_dtype = df[col].dtype
if not np.issubdtype(actual_dtype, np.number) and expected_dtype == 'numeric':
errors.append(f"列 {col} 应为数值类型,实际为 {actual_dtype}")
# 检查空值
null_counts = df.isnull().sum()
for col in df.columns:
if null_counts[col] > len(df) * 0.1: # 超过10%为空
errors.append(f"列 {col} 缺失值过多: {null_counts[col]}")
if errors:
for error in errors:
print(f"❌ {error}")
return False
else:
print("✅ 数据验证通过")
return True
# 使用示例
# expected_cols = ['订单ID', '产品', '销售额']
# validate_data(df, expected_cols)
第六部分:扩展学习和资源
6.1 推荐的学习路径
基础阶段(1-2个月)
- Python 基础语法
- NumPy 数组操作
- Pandas 数据处理
- Matplotlib/Seaborn 可视化
进阶阶段(2-3个月)
- 统计学基础
- 数据清洗高级技巧
- 探索性数据分析(EDA)
- 数据库基础(SQL)
高级阶段(3-6个月)
- 机器学习基础(Scikit-learn)
- 时间序列分析
- 大数据处理(Dask, PySpark)
- 数据工程基础
6.2 重要资源推荐
- 官方文档:Pandas, NumPy, Matplotlib 官方文档
- 在线课程:Coursera, DataCamp, Kaggle Learn
- 书籍:《利用Python进行数据分析》、《Python数据科学手册》
- 社区:Stack Overflow, GitHub, Kaggle
6.3 常用快捷代码片段
# 快速数据探索
def quick_look(df):
"""快速数据概览"""
print("数据形状:", df.shape)
print("\n前5行:")
print(df.head())
print("\n数据类型:")
print(df.dtypes)
print("\n缺失值:")
print(df.isnull().sum())
print("\n统计描述:")
print(df.describe())
# 快速相关性分析
def quick_correlation(df, numeric_cols=None):
"""快速相关性分析"""
if numeric_cols is None:
numeric_cols = df.select_dtypes(include=[np.number]).columns
corr_matrix = df[numeric_cols].corr()
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('相关性热力图')
plt.show()
return corr_matrix
# 快速保存结果
def save_results(results, filename):
"""保存分析结果"""
import pickle
with open(filename, 'wb') as f:
pickle.dump(results, f)
print(f"结果已保存到 {filename}")
def load_results(filename):
"""加载分析结果"""
import pickle
with open(filename, 'rb') as f:
return pickle.load(f)
结语
通过本指南,你已经学习了 Python 数据分析的完整流程,从环境搭建到高级技巧。记住,数据分析是一个实践性很强的技能,理论知识必须通过大量实践来巩固。
关键要点总结:
- 环境搭建是基础,推荐使用 Anaconda
- NumPy 是数值计算的基石
- Pandas 是数据处理的核心工具
- 可视化是理解数据的关键
- 代码组织和错误处理很重要
- 持续学习和实践是成功的关键
下一步行动建议:
- 找一个真实数据集进行练习
- 参与 Kaggle 竞赛
- 在 GitHub 上分享你的分析项目
- 阅读其他人的优秀代码
- 持续学习新的库和技术
数据分析的世界充满机遇,祝你在数据分析的道路上取得成功!
