引言:数据清洗的重要性与挑战
在数据科学和机器学习项目中,数据清洗是整个流程中最关键且最耗时的环节之一。根据业界统计,数据科学家通常花费60-80%的时间在数据准备和清洗上。原始数据往往包含各种问题:缺失值、异常值、重复记录、格式不一致、数据类型错误等。这些问题如果不得到妥善处理,会直接影响后续分析的准确性和模型的性能。
Python作为数据科学领域的主流语言,提供了强大的工具生态系统来应对这些挑战。本文将详细介绍如何使用Python进行高效的数据清洗,涵盖从基础操作到高级技巧的完整流程,并通过实际代码示例展示每个步骤的具体实现。
1. 数据加载与初步探索
1.1 使用Pandas加载数据
数据清洗的第一步是将数据加载到Python环境中。Pandas是Python数据分析的核心库,提供了灵活的DataFrame结构来处理表格数据。
import pandas as pd
import numpy as np
# 从CSV文件加载数据
df = pd.read_csv('data.csv')
# 从Excel文件加载数据
df = pd.read_excel('data.xlsx')
# 从JSON文件加载数据
df = pd.read_json('data.json')
# 查看数据的基本信息
print(df.info())
print(df.head())
print(df.describe())
1.2 初步数据探索
在开始清洗之前,我们需要了解数据的基本情况:
# 查看数据维度
print(f"数据集包含 {df.shape[0]} 行和 {df.shape[1]} 列")
# 查看列名和数据类型
print(df.dtypes)
# 查看数值型列的统计摘要
print(df.describe())
# 查看分类变量的分布
print(df['category_column'].value_counts())
# 检查缺失值情况
missing_values = df.isnull().sum()
print("缺失值统计:")
print(missing_values[missing_values > 0])
2. 处理缺失值
缺失值是数据中最常见的问题之一。处理缺失值的方法需要根据数据的特性和业务场景来决定。
2.1 识别缺失值
# 检查每列的缺失值数量
missing_count = df.isnull().sum()
# 计算缺失值比例
missing_percentage = (df.isnull().sum() / len(df)) * 100
# 可视化缺失值分布
import seaborn as sns
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
sns.heatmap(df.isnull(), cbar=False, cmap='viridis')
plt.title('缺失值分布热力图')
plt.show()
2.2 删除缺失值
当缺失值比例很小或者数据量很大时,可以直接删除包含缺失值的行或列:
# 删除包含任何缺失值的行
df_cleaned = df.dropna()
# 删除包含任何缺失值的列
df_cleaned = df.dropna(axis=1)
# 删除缺失值超过阈值的列(例如50%)
threshold = 0.5
df_cleaned = df.loc[:, df.isnull().mean() < threshold]
# 删除缺失值超过阈值的行
threshold = 0.3
df_cleaned = df.loc[df.isnull().mean(axis=1) < threshold]
2.3 填充缺失值
填充缺失值是更常用的方法,需要根据数据类型选择合适的策略:
# 用0填充数值型列
df['numeric_column'] = df['numeric_column'].fillna(0)
# 用均值填充数值型列
df['numeric_column'] = df['numeric_column'].fillna(df['numeric_column'].mean())
# 用中位数填充数值型列(对异常值更鲁棒)
df['numeric_column'] = df['numeric_column'].fillna(df['numeric_column'].median())
# 用众数填充分类变量
df['category_column'] = df['category_column'].fillna(df['category_column'].mode()[0])
# 前向填充(适用于时间序列数据)
df['time_series_column'] = df['time_series_column'].fillna(method='ffill')
# 后向填充
df['time_series_column'] = df['time_series_column'].fillna(method='bfill')
# 使用插值法填充
df['numeric_column'] = df['numeric_column'].interpolate(method='linear')
# 使用分组均值填充(基于其他列的分组)
df['value'] = df.groupby('category')['value'].transform(lambda x: x.fillna(x.mean()))
2.4 高级缺失值处理
对于更复杂的场景,可以使用机器学习方法预测缺失值:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
def impute_with_ml(df, target_column):
"""
使用随机森林预测缺失值
"""
# 分离有缺失值和无缺失值的数据
known = df[df[target_column].notnull()]
unknown = df[df[target_column].isnull()]
if len(unknown) == 0:
return df
# 准备特征和目标变量
features = [col for col in df.columns if col != target_column]
X = known[features]
y = known[target_column]
# 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
# 预测缺失值
predicted = model.predict(unknown[features])
df.loc[df[target_column].isnull(), target_column] = predicted
return df
# 使用示例
# df = impute_with_ml(df, 'target_column')
3. 处理异常值
异常值可能是数据录入错误、测量误差或真实的极端值。识别和处理异常值需要谨慎。
3.1 异常值检测方法
# 3σ原则(适用于正态分布)
def detect_outliers_zscore(df, column, threshold=3):
z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
return df[z_scores > threshold]
# IQR方法(适用于偏态分布)
def detect_outliers_iqr(df, column):
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return df[(df[column] < lower_bound) | (df[column] > upper_bound)]
# 可视化检测
def plot_outliers(df, column):
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# 箱线图
ax1.boxplot(df[column])
ax1.set_title(f'Boxplot of {column}')
# 直方图
ax2.hist(df[column], bins=30)
ax2.axvline(df[column].mean(), color='red', linestyle='--', label='Mean')
ax2.axvline(df[column].median(), color='green', linestyle='--', label='Median')
ax2.set_title(f'Histogram of {column}')
ax2.legend()
plt.show()
# 使用示例
# outliers = detect_outliers_iqr(df, 'price')
# plot_outliers(df, 'price')
3.2 异常值处理策略
# 1. 删除异常值
def remove_outliers(df, column, method='iqr'):
if method == 'zscore':
z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
return df[z_scores <= 3]
elif method == 'iqr':
Q1 = df[column].quantile(0.25)
Q3 = df[column].column.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
# 2. 截断异常值(Winsorization)
def cap_outliers(df, column, method='iqr'):
if method == 'iqr':
Q1 = df[column].quantile(0.05) # 使用5%和95%分位数
Q3 = df[column].quantile(0.95)
df[column] = np.where(df[column] < Q1, Q1, df[column])
df[column] = np.where(df[column] > Q3, Q3, df[column])
return df
# 3. 转换处理(对数转换)
def transform_outliers(df, column):
# 对数转换可以减少极端值的影响
df[column] = np.log1p(df[column]) # log(1+x) 避免log(0)
return df
# 4. 分箱处理
def bin_outliers(df, column, bins=10):
df[column] = pd.cut(df[column], bins=bins, labels=False)
return df
# 5. 标记异常值
def flag_outliers(df, column, method='iqr'):
if method == 'iqr':
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df[f'{column}_outlier'] = ((df[column] < lower_bound) | (df[column] > upper_bound)).astype(int)
return df
4. 数据格式标准化
4.1 文本数据清洗
# 基础文本清洗
def clean_text(text):
"""
清洗文本数据的通用函数
"""
if pd.isna(text):
return ""
# 转换为小写
text = text.lower()
# 移除特殊字符(保留字母、数字和空格)
text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
# 移除多余空格
text = ' '.join(text.split())
# 去除首尾空格
text = text.strip()
return text
# 应用到DataFrame
df['cleaned_text'] = df['text_column'].apply(clean_text)
# 处理电话号码
def clean_phone_number(phone):
"""
标准化电话号码格式
"""
if pd.isna(phone):
return ""
# 移除所有非数字字符
digits = re.sub(r'\D', '', str(phone))
# 根据长度判断格式
if len(digits) == 10:
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
elif len(digits) == 11:
return f"+{digits[0]} ({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
else:
return digits
df['cleaned_phone'] = df['phone_column'].apply(clean_phone_number)
# 处理地址
def clean_address(address):
"""
标准化地址格式
"""
if pd.isna(address):
return ""
# 定义缩写映射
abbreviations = {
'street': 'st',
'avenue': 'ave',
'boulevard': 'blvd',
'road': 'rd',
'drive': 'dr',
'lane': 'ln',
'court': 'ct'
}
address = address.lower()
# 替换缩写
for full, abbr in abbreviations.items():
address = re.sub(r'\b' + full + r'\b', abbr, address)
# 标准化方向词
direction_map = {'north': 'N', 'south': 'S', 'east': 'E', 'west': 'W'}
for word, direction in direction_map.items():
address = re.sub(r'\b' + word + r'\b', direction, address)
return address.title()
df['cleaned_address'] = df['address_column'].apply(clean_address)
4.2 日期时间标准化
# 统一日期格式
def standardize_date(date_str):
"""
将各种日期格式统一为 YYYY-MM-DD
"""
if pd.isna(date_str):
return None
# 尝试多种格式
formats = [
'%Y-%m-%d',
'%m/%d/%Y',
'%d/%m/%Y',
'%Y/%m/%d',
'%m-%d-%Y',
'%d-%m-%Y',
'%b %d, %Y',
'%B %d, %Y'
]
for fmt in formats:
try:
date_obj = datetime.strptime(str(date_str), fmt)
return date_obj.strftime('%Y-%m-%d')
except ValueError:
continue
return None
# 使用pandas to_datetime
df['date_column'] = pd.to_datetime(df['date_column'], errors='coerce')
# 提取日期特征
df['year'] = df['date_column'].dt.year
df['month'] = df['date_column'].dt.month
df['day'] = df['date_column'].dt.day
df['day_of_week'] = df['date_column'].dt.dayofweek
df['is_weekend'] = df['date_column'].dt.weekday >= 5
# 处理时间戳
df['timestamp'] = pd.to_datetime(df['timestamp_column'], unit='s')
4.3 数值格式标准化
# 处理货币格式
def clean_currency(value):
"""
清理货币字符串并转换为浮点数
"""
if pd.isna(value):
return np.nan
# 移除货币符号和逗号
cleaned = re.sub(r'[^\d.-]', '', str(value))
try:
return float(cleaned)
except ValueError:
return np.nan
df['price'] = df['price_column'].apply(clean_currency)
# 处理百分比
def clean_percentage(value):
"""
将百分比字符串转换为小数
"""
if pd.isna(value):
return np.nan
# 移除百分号
cleaned = str(value).replace('%', '')
try:
return float(cleaned) / 100
except ValueError:
return np.nan
df['rate'] = df['rate_column'].apply(clean_percentage)
# 标准化数值范围
def normalize_column(df, column, method='minmax'):
"""
标准化数值列到指定范围
"""
if method == 'minmax':
min_val = df[column].min()
max_val = df[column].max()
df[f'{column}_normalized'] = (df[column] - min_val) / (max_val - min_val)
elif method == 'zscore':
df[f'{column}_normalized'] = (df[column] - df[column].mean()) / df[column].std()
return df
df = normalize_column(df, 'price', method='minmax')
5. 处理重复数据
5.1 识别重复值
# 查找完全重复的行
duplicates = df.duplicated()
print(f"发现 {duplicates.sum()} 个重复行")
# 查找特定列的重复值
duplicates_subset = df.duplicated(subset=['id', 'date'])
print(f"基于id和date的重复行: {duplicates_subset.sum()}")
# 查看重复行的详细信息
duplicate_rows = df[df.duplicated(keep=False)]
print(duplicate_rows.sort_values(by=['id', 'date']))
# 统计每组重复值的数量
duplicate_counts = df.groupby(['id', 'date']).size().reset_index(name='count')
duplicate_counts = duplicate_counts[duplicate_counts['count'] > 1]
print(duplicate_counts)
5.2 处理重复值
# 删除完全重复的行
df_unique = df.drop_duplicates()
# 删除特定列的重复行(保留第一条)
df_unique = df.drop_duplicates(subset=['id', 'date'], keep='first')
# 删除特定列的重复行(保留最后一条)
df_unique = df.drop_duplicates(subset=['id', 'date'], keep='last')
# 删除重复行但保留最新记录(基于时间戳)
df_sorted = df.sort_values('timestamp', ascending=False)
df_unique = df_sorted.drop_duplicates(subset=['id'], keep='first')
# 处理重复值的高级策略
def handle_duplicates_advanced(df, id_cols, sort_cols=None, keep='first'):
"""
高级重复值处理:可以指定排序和保留策略
"""
if sort_cols:
df = df.sort_values(sort_cols, ascending=False)
return df.drop_duplicates(subset=id_cols, keep=keep)
# 使用示例
df = handle_duplicates_advanced(
df,
id_cols=['customer_id', 'product_id'],
sort_cols=['transaction_date'],
keep='first'
)
# 标记重复值而不删除
df['is_duplicate'] = df.duplicated(subset=['id'], keep=False)
df['duplicate_rank'] = df.groupby(['id']).cumcount() + 1
6. 数据类型转换与优化
6.1 数据类型检查与转换
# 检查当前数据类型
print(df.dtypes)
# 优化内存使用的数据类型转换
def optimize_memory(df):
"""
优化DataFrame的内存使用
"""
start_mem = df.memory_usage().sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if col_type != object:
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
df[col] = df[col].astype(np.int64)
else:
if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
else:
# 对于对象类型,检查是否可以转换为category
num_unique_values = len(df[col].unique())
num_total_values = len(df[col])
if num_unique_values / num_total_values < 0.5:
df[col] = df[col].astype('category')
end_mem = df.memory_usage().sum() / 1024**2
print(f"内存使用从 {start_mem:.2f} MB 优化到 {end_mem:.2f} MB")
return df
df = optimize_memory(df)
# 手动转换数据类型
df['id'] = df['id'].astype(int)
df['price'] = df['price'].astype(float)
df['category'] = df['category'].astype('category')
df['date'] = pd.to_datetime(df['date'])
6.2 处理编码问题
# 检测文件编码
import chardet
def detect_encoding(file_path):
"""
检测文件编码
"""
with open(file_path, 'rb') as f:
result = chardet.detect(f.read(10000))
return result['encoding']
# 处理编码问题
def read_with_encoding(file_path, encoding='utf-8'):
"""
尝试多种编码读取文件
"""
encodings = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252']
for enc in encodings + [encoding]:
try:
return pd.read_csv(file_path, encoding=enc)
except UnicodeDecodeError:
continue
raise ValueError(f"无法读取文件,尝试了多种编码: {encodings}")
# 处理DataFrame中的编码问题
def clean_encoding(series):
"""
清理文本列的编码问题
"""
return series.astype(str).str.encode('utf-8', errors='ignore').str.decode('utf-8', errors='ignore')
df['text_column'] = clean_encoding(df['text_column'])
7. 高级数据清洗技术
7.1 使用管道(Pipeline)进行自动化清洗
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
class DataCleaner(BaseEstimator, TransformerMixin):
"""
自定义数据清洗转换器
"""
def __init__(self, fill_method='median', outlier_method='iqr'):
self.fill_method = fill_method
self.outlier_method = outlier_method
self.fill_values = {}
self.outlier_bounds = {}
def fit(self, X, y=None):
# 计算填充值
for col in X.select_dtypes(include=[np.number]).columns:
if self.fill_method == 'median':
self.fill_values[col] = X[col].median()
elif self.fill_method == 'mean':
self.fill_values[col] = X[col].mean()
else:
self.fill_values[col] = 0
# 计算异常值边界
for col in X.select_dtypes(include=[np.number]).columns:
Q1 = X[col].quantile(0.25)
Q3 = X[col].quantile(0.75)
IQR = Q3 - Q1
self.outlier_bounds[col] = (Q1 - 1.5 * IQR, Q3 + 1.5 * IQR)
return self
def transform(self, X):
X = X.copy()
# 填充缺失值
for col, value in self.fill_values.items():
X[col] = X[col].fillna(value)
# 处理异常值(截断)
for col, (lower, upper) in self.outlier_bounds.items():
X[col] = np.clip(X[col], lower, upper)
return X
# 使用管道
cleaning_pipeline = Pipeline([
('cleaner', DataCleaner(fill_method='median', outlier_method='iqr'))
])
# 应用清洗
df_cleaned = cleaning_pipeline.fit_transform(df)
7.2 使用Dask处理大数据
import dask.dataframe as dd
def clean_large_dataset(file_path, chunksize=100000):
"""
清洗大型数据集(内存不足时使用)
"""
# 使用Dask读取大文件
ddf = dd.read_csv(file_path, blocksize=chunksize)
# 定义清洗函数
def clean_chunk(df):
# 填充缺失值
df = df.fillna({
'numeric_column': df['numeric_column'].mean(),
'category_column': 'Unknown'
})
# 处理异常值
Q1 = df['price'].quantile(0.25)
Q3 = df['price'].quantile(0.75)
IQR = Q3 - Q1
df = df[(df['price'] >= Q1 - 1.5 * IQR) & (df['price'] <= Q3 + 1.5 * IQR)]
return df
# 应用清洗
ddf_cleaned = ddf.map_partitions(clean_chunk)
# 保存结果
ddf_cleaned.to_csv('cleaned_data_*.csv', index=False)
return ddf_cleaned
# 使用示例
# ddf = clean_large_dataset('large_file.csv')
7.3 数据质量验证
import great_expectations as ge
def validate_data_quality(df):
"""
使用Great Expectations进行数据质量验证
"""
# 将Pandas DataFrame转换为Great Expectations DataFrame
ge_df = ge.from_pandas(df)
# 定义期望
expectations = [
# 列存在性检查
ge_df.expect_column_to_exist('id'),
ge_df.expect_column_to_exist('price'),
# 值域检查
ge_df.expect_column_values_to_be_between('price', 0, 10000),
# 唯一性检查
ge_df.expect_column_values_to_be_unique('id'),
# 缺失值检查
ge_df.expect_column_values_to_not_be_null('customer_id'),
ge_df.expect_column_values_to_not_be_null('transaction_date'),
# 数据类型检查
ge_df.expect_column_dtype_to_be('price', 'float64'),
ge_df.expect_column_dtype_to_be('id', 'int64'),
# 格式检查
ge_df.expect_column_values_to_match_regex('email', r'^[\w\.-]+@[\w\.-]+\.\w+$'),
# 范围检查
ge_df.expect_column_values_to_be_between('age', 0, 120),
# 类别检查
ge_df.expect_column_values_to_be_in_set('status', ['active', 'inactive', 'pending']),
# 自定义验证
ge_df.expect_column_pair_values_A_to_be_greater_than_B(
'start_date', 'end_date', or_equal=True
)
]
# 运行验证
results = ge_df.validate()
# 输出结果
print(f"验证通过率: {results['statistics']['success_percent']:.2f}%")
# 查看失败的期望
failed_expectations = [r for r in results['results'] if not r['success']]
for exp in failed_expectations:
print(f"失败: {exp['expectation_config']['expectation_type']}")
print(f" 错误: {exp.get('result', {}).get('unexpected_list', [])[:5]}...")
return results
# 使用示例
# quality_report = validate_data_quality(df)
8. 实战案例:完整数据清洗流程
8.1 案例背景
假设我们有一个电商交易数据集,包含以下问题:
- 缺失的客户信息
- 异常的价格和数量
- 重复的交易记录
- 不一致的日期格式
- 特殊字符和编码问题
8.2 完整清洗代码
import pandas as pd
import numpy as np
import re
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
class EcommerceDataCleaner:
"""
电商数据清洗器
"""
def __init__(self):
self.fill_values = {}
self.outlier_bounds = {}
self.duplicate_strategy = 'latest'
def load_data(self, file_path):
"""加载数据"""
try:
self.df = pd.read_csv(file_path, encoding='utf-8')
print(f"成功加载数据: {self.df.shape}")
return self.df
except:
# 尝试其他编码
self.df = pd.read_csv(file_path, encoding='latin-1')
print(f"成功加载数据(使用latin-1编码): {self.df.shape}")
return self.df
def explore_data(self):
"""数据探索"""
print("\n=== 数据概览 ===")
print(f"数据维度: {self.df.shape}")
print(f"\n列名: {list(self.df.columns)}")
print(f"\n=== 数据类型 ===")
print(self.df.dtypes)
print(f"\n=== 缺失值统计 ===")
missing = self.df.isnull().sum()
print(missing[missing > 0])
print(f"\n=== 数值列描述性统计 ===")
print(self.df.describe())
print(f"\n=== 分类列分布 ===")
for col in self.df.select_dtypes(include=['object']).columns:
if self.df[col].nunique() < 20:
print(f"\n{col}:")
print(self.df[col].value_counts().head())
def clean_text_columns(self):
"""清洗文本列"""
print("\n=== 清洗文本列 ===")
# 清洗客户姓名
if 'customer_name' in self.df.columns:
self.df['customer_name'] = self.df['customer_name'].str.strip()
self.df['customer_name'] = self.df['customer_name'].str.title()
# 移除特殊字符
self.df['customer_name'] = self.df['customer_name'].str.replace(r'[^a-zA-Z\s]', '', regex=True)
# 清洗产品名称
if 'product_name' in self.df.columns:
self.df['product_name'] = self.df['product_name'].str.strip()
self.df['product_name'] = self.df['product_name'].str.upper()
# 清洗邮箱
if 'email' in self.df.columns:
self.df['email'] = self.df['email'].str.strip().str.lower()
print("文本列清洗完成")
def standardize_dates(self):
"""标准化日期列"""
print("\n=== 标准化日期列 ===")
date_columns = ['order_date', 'payment_date', 'delivery_date']
for col in date_columns:
if col in self.df.columns:
# 转换为datetime
self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
# 检查转换失败的记录
failed = self.df[col].isnull().sum()
if failed > 0:
print(f" {col}: {failed} 条记录转换失败")
print("日期列标准化完成")
def handle_missing_values(self):
"""处理缺失值"""
print("\n=== 处理缺失值 ===")
# 数值列:用中位数填充
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if self.df[col].isnull().sum() > 0:
median_val = self.df[col].median()
self.df[col] = self.df[col].fillna(median_val)
self.fill_values[col] = median_val
print(f" {col}: 用中位数 {median_val:.2f} 填充")
# 分类列:用'Unknown'填充
categorical_cols = self.df.select_dtypes(include=['object']).columns
for col in categorical_cols:
if self.df[col].isnull().sum() > 0:
self.df[col] = self.df[col].fillna('Unknown')
print(f" {col}: 用 'Unknown' 填充")
print("缺失值处理完成")
def detect_and_handle_outliers(self):
"""检测和处理异常值"""
print("\n=== 处理异常值 ===")
# 价格列
if 'price' in self.df.columns:
Q1 = self.df['price'].quantile(0.25)
Q3 = self.df['price'].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outliers = self.df[(self.df['price'] < lower) | (self.df['price'] > upper)]
print(f" 价格异常值: {len(outliers)} 条")
# 截断异常值
self.df['price'] = np.clip(self.df['price'], lower, upper)
self.outlier_bounds['price'] = (lower, upper)
# 数量列
if 'quantity' in self.df.columns:
# 数量应该为正数
invalid_qty = self.df[self.df['quantity'] <= 0]
print(f" 无效数量: {len(invalid_qty)} 条")
# 用中位数替换无效值
median_qty = self.df[self.df['quantity'] > 0]['quantity'].median()
self.df.loc[self.df['quantity'] <= 0, 'quantity'] = median_qty
print("异常值处理完成")
def handle_duplicates(self):
"""处理重复值"""
print("\n=== 处理重复值 ===")
# 查找基于订单ID的重复
if 'order_id' in self.df.columns:
duplicates = self.df.duplicated(subset=['order_id'], keep=False)
dup_count = duplicates.sum()
print(f" 基于order_id的重复记录: {dup_count} 条")
if dup_count > 0:
# 按时间排序,保留最新的
if 'order_date' in self.df.columns:
self.df = self.df.sort_values('order_date', ascending=False)
self.df = self.df.drop_duplicates(subset=['order_id'], keep='first')
print(f" 保留最新记录,剩余: {len(self.df)} 条")
print("重复值处理完成")
def optimize_data_types(self):
"""优化数据类型"""
print("\n=== 优化数据类型 ===")
# 转换分类列为category类型
for col in self.df.select_dtypes(include=['object']).columns:
if self.df[col].nunique() / len(self.df) < 0.5:
self.df[col] = self.df[col].astype('category')
print(f" {col}: 转换为category类型")
# 优化数值类型
for col in self.df.select_dtypes(include=[np.number]).columns:
if self.df[col].dtype == 'float64':
self.df[col] = self.df[col].astype('float32')
elif self.df[col].dtype == 'int64':
self.df[col] = self.df[col].astype('int32')
print("数据类型优化完成")
def validate_data(self):
"""数据质量验证"""
print("\n=== 数据质量验证 ===")
# 检查缺失值
total_missing = self.df.isnull().sum().sum()
print(f"剩余缺失值: {total_missing}")
# 检查重复值
total_duplicates = self.df.duplicated().sum()
print(f"剩余重复值: {total_duplicates}")
# 检查数值范围
if 'price' in self.df.columns:
invalid_price = ((self.df['price'] < 0) | (self.df['price'] > 100000)).sum()
print(f"异常价格: {invalid_price}")
if 'quantity' in self.df.columns:
invalid_qty = (self.df['quantity'] <= 0).sum()
print(f"异常数量: {invalid_qty}")
# 检查日期范围
if 'order_date' in self.df.columns:
min_date = self.df['order_date'].min()
max_date = self.df['order_date'].max()
print(f"日期范围: {min_date} 到 {max_date}")
print(f"\n最终数据维度: {self.df.shape}")
return total_missing == 0 and total_duplicates == 0
def save_data(self, output_path):
"""保存清洗后的数据"""
self.df.to_csv(output_path, index=False)
print(f"\n数据已保存到: {output_path}")
def run_full_cleaning(self, input_path, output_path):
"""运行完整的清洗流程"""
print("=" * 60)
print("开始数据清洗流程")
print("=" * 60)
self.load_data(input_path)
self.explore_data()
self.clean_text_columns()
self.standardize_dates()
self.handle_missing_values()
self.detect_and_handle_outliers()
self.handle_duplicates()
self.optimize_data_types()
is_valid = self.validate_data()
self.save_data(output_path)
print("\n" + "=" * 60)
if is_valid:
print("数据清洗完成!数据质量良好。")
else:
print("数据清洗完成,但仍有部分问题需要手动检查。")
print("=" * 60)
return self.df
# 使用示例
if __name__ == "__main__":
cleaner = EcommerceDataCleaner()
# 运行完整清洗
cleaned_df = cleaner.run_full_cleaning(
input_path='raw_ecommerce_data.csv',
output_path='cleaned_ecommerce_data.csv'
)
# 查看清洗结果
print("\n清洗后的数据前5行:")
print(cleaned_df.head())
9. 数据清洗最佳实践
9.1 清洗流程标准化
# 创建可复用的清洗配置
CLEANING_CONFIG = {
'numeric_columns': {
'price': {'fill_method': 'median', 'outlier_method': 'iqr', 'bounds': (0, 10000)},
'quantity': {'fill_method': 'median', 'outlier_method': 'range', 'bounds': (1, 1000)},
'rating': {'fill_method': 'mean', 'outlier_method': 'range', 'bounds': (1, 5)}
},
'categorical_columns': {
'category': {'fill_method': 'mode', 'max_categories': 20},
'status': {'fill_method': 'Unknown', 'allowed_values': ['active', 'inactive', 'pending']}
},
'date_columns': {
'order_date': {'format': 'auto', 'min_date': '2020-01-01'},
'delivery_date': {'format': 'auto', 'min_date': 'order_date'}
},
'text_columns': {
'customer_name': {'clean': True, 'strip': True, 'title': True},
'email': {'clean': True, 'validate': True}
}
}
def apply_cleaning_config(df, config):
"""
应用标准化的清洗配置
"""
# 处理数值列
for col, settings in config.get('numeric_columns', {}).items():
if col in df.columns:
# 填充缺失值
if settings['fill_method'] == 'median':
fill_val = df[col].median()
elif settings['fill_method'] == 'mean':
fill_val = df[col].mean()
else:
fill_val = 0
df[col] = df[col].fillna(fill_val)
# 处理异常值
if settings['outlier_method'] == 'iqr':
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
elif settings['outlier_method'] == 'range':
lower, upper = settings['bounds']
df[col] = np.clip(df[col], lower, upper)
# 处理分类列
for col, settings in config.get('categorical_columns', {}).items():
if col in df.columns:
if settings['fill_method'] == 'mode':
fill_val = df[col].mode()[0] if not df[col].mode().empty else 'Unknown'
else:
fill_val = settings['fill_method']
df[col] = df[col].fillna(fill_val)
# 限制类别数量
if 'max_categories' in settings:
value_counts = df[col].value_counts()
if len(value_counts) > settings['max_categories']:
other_categories = value_counts.iloc[settings['max_categories']:].index
df[col] = df[col].replace(other_categories, 'Other')
# 处理日期列
for col, settings in config.get('date_columns', {}).items():
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
if 'min_date' in settings:
min_date = pd.to_datetime(settings['min_date'])
df.loc[df[col] < min_date, col] = np.nan
# 处理文本列
for col, settings in config.get('text_columns', {}).items():
if col in df.columns:
if settings.get('clean', False):
df[col] = df[col].astype(str).str.strip()
if settings.get('title', False):
df[col] = df[col].str.title()
return df
# 使用示例
# df_cleaned = apply_cleaning_config(df, CLEANING_CONFIG)
9.2 清洗过程文档化
def create_cleaning_report(df, original_df, cleaning_steps):
"""
生成数据清洗报告
"""
report = {
'原始数据维度': original_df.shape,
'清洗后维度': df.shape,
'删除行数': original_df.shape[0] - df.shape[0],
'删除列数': original_df.shape[1] - df.shape[1],
'清洗步骤': cleaning_steps,
'数据质量指标': {
'缺失值减少': original_df.isnull().sum().sum() - df.isnull().sum().sum(),
'重复值减少': original_df.duplicated().sum() - df.duplicated().sum(),
'内存优化': f"{original_df.memory_usage().sum() / 1024**2:.2f}MB -> {df.memory_usage().sum() / 1024**2:.2f}MB"
}
}
return report
# 记录清洗步骤
cleaning_steps = []
cleaning_steps.append("1. 加载数据并探索")
cleaning_steps.append("2. 清洗文本列")
cleaning_steps.append("3. 标准化日期")
cleaning_steps.append("4. 处理缺失值")
cleaning_steps.append("5. 处理异常值")
cleaning_steps.append("6. 处理重复值")
cleaning_steps.append("7. 优化数据类型")
# 生成报告
# report = create_cleaning_report(df_cleaned, df_original, cleaning_steps)
# print(json.dumps(report, indent=2, default=str))
10. 总结
数据清洗是数据科学项目成功的关键。通过本文介绍的方法和代码示例,您可以:
- 系统化处理缺失值:根据数据特性选择合适的填充或删除策略
- 有效识别和处理异常值:使用统计方法和可视化工具
- 标准化数据格式:统一文本、日期、数值的格式
- 处理重复数据:智能识别和去重
- 优化数据存储:减少内存使用,提高处理效率
- 验证数据质量:确保清洗后的数据符合业务要求
记住,数据清洗不是一次性的任务,而是一个迭代的过程。随着数据的变化和业务需求的发展,您可能需要调整清洗策略。建议将清洗流程自动化,建立可复用的清洗管道,并始终保持对数据质量的关注。
最后,数据清洗的最终目标是为后续的分析和建模提供高质量、可靠的数据基础。只有打好这个基础,才能确保数据科学项目的价值最大化。
