引言:数据清洗的重要性
在数据科学和分析领域,数据清洗是整个工作流程中最耗时但也最关键的步骤。根据业界统计,数据科学家通常花费60-80%的时间在数据准备和清洗上。原始数据往往包含各种问题:缺失值、异常值、重复记录、格式不一致、编码问题等。这些问题如果得不到妥善处理,会严重影响后续分析的准确性和可靠性。
Python作为数据科学的首选语言,提供了强大的工具生态系统来处理这些问题。本文将详细介绍如何使用Python进行高效的数据清洗,涵盖从基础到高级的各种技术和最佳实践。
1. 数据清洗的基础工具和环境设置
1.1 必需的Python库
在开始数据清洗之前,我们需要安装并导入以下核心库:
# 数据处理基础库
import pandas as pd
import numpy as np
# 可视化库
import matplotlib.pyplot as plt
import seaborn as sns
# 正则表达式处理
import re
# 日期时间处理
from datetime import datetime
# 设置显示选项
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', 1000)
print("所有必要的库已成功导入!")
1.2 创建示例数据集
为了演示数据清洗技术,我们首先创建一个包含常见数据问题的示例数据集:
# 创建一个包含各种数据问题的示例数据集
data = {
'customer_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 11],
'name': ['John Doe', 'Jane Smith', 'Bob Johnson', 'Alice Brown', 'Charlie Wilson',
'David Lee', 'Eva Martinez', 'Frank White', 'Grace Lee', 'Henry Davis',
'John Doe', 'Tom Wilson'],
'age': [25, 30, 35, 28, 42, 38, 45, 29, 33, 41, 25, 36],
'email': ['john.doe@email.com', 'jane.smith@email.com', 'bob.johnson@email.com',
'alice.brown@email.com', 'charlie.wilson@email.com', 'david.lee@email.com',
'eva.martinez@email.com', 'frank.white@email.com', 'grace.lee@email.com',
'henry.davis@email.com', 'john.doe@email.com', None],
'salary': ['$50,000', '$60,000', '$75,000', '$55,000', '$90,000', '$80,000',
'$95,000', '$58,000', '$68,000', '$85,000', '$50,000', '$72,000'],
'join_date': ['2020-01-15', '2019-03-20', '2021-06-10', '2020-08-05', '2018-11-30',
'2019-05-25', '2022-02-14', '2020-09-18', '2021-12-01', '2019-07-22',
'2020-01-15', '2023-01-10'],
'department': ['Sales', 'Engineering', 'Marketing', 'Sales', 'Engineering',
'Marketing', 'Engineering', 'Sales', 'Marketing', 'Engineering',
'Sales', 'Engineering'],
'status': ['Active', 'Active', 'Inactive', 'Active', 'Active', 'Inactive',
'Active', 'Active', 'Active', 'Inactive', 'Active', 'Active'],
'phone': ['123-456-7890', '234-567-8901', '345-678-9012', '456-789-0123', '567-890-1234',
'678-901-2345', '789-012-3456', '890-123-4567', '901-234-5678', '012-345-6789',
'123-456-7890', '']
}
df = pd.DataFrame(data)
print("原始数据集预览:")
print(df)
print(f"\n数据集形状:{df.shape}")
2. 数据探索和质量评估
2.1 基本数据信息检查
在清洗之前,我们需要全面了解数据的状况:
def explore_data_quality(df):
"""
全面评估数据质量的函数
"""
print("=" * 60)
print("数据质量评估报告")
print("=" * 60)
# 1. 基本信息
print("\n1. 数据集基本信息:")
print(f" - 行数: {df.shape[0]}")
print(f" - 列数: {df.shape[1]}")
print(f" - 内存使用: {df.memory_usage(deep=True).sum() / 1024:.2f} KB")
# 2. 数据类型
print("\n2. 数据类型:")
print(df.dtypes)
# 3. 缺失值统计
print("\n3. 缺失值统计:")
missing_values = df.isnull().sum()
missing_percentage = (missing_values / len(df)) * 100
missing_info = pd.DataFrame({
'缺失数量': missing_values,
'缺失比例(%)': missing_percentage.round(2)
})
print(missing_info[missing_info['缺失数量'] > 0])
# 4. 重复值检查
print("\n4. 重复值检查:")
duplicate_rows = df.duplicated().sum()
print(f" - 完全重复的行数: {duplicate_rows}")
# 5. 数值列的统计描述
print("\n5. 数值列统计描述:")
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
print(df[numeric_cols].describe())
else:
print(" - 无数值列")
# 6. 唯一值统计
print("\n6. 各列唯一值数量:")
for col in df.columns:
unique_count = df[col].nunique()
print(f" - {col}: {unique_count} 个唯一值")
# 执行数据探索
explore_data_quality(df)
2.2 数据可视化探索
可视化是发现数据问题的有效方法:
def visualize_data_issues(df):
"""
可视化展示数据质量问题
"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('数据质量问题可视化', fontsize=16)
# 1. 缺失值分布
missing_data = df.isnull().sum()
missing_data[missing_data > 0].plot(kind='bar', ax=axes[0,0], color='coral')
axes[0,0].set_title('各列缺失值数量')
axes[0,0].set_ylabel('缺失值数量')
# 2. 数值分布(如果有数值列)
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
df[numeric_cols].hist(bins=15, ax=axes[0,1], color='skyblue', alpha=0.7)
axes[0,1].set_title('数值列分布')
# 3. 类别分布
categorical_cols = df.select_dtypes(include=['object']).columns
if len(categorical_cols) > 0:
# 选择一个类别列进行展示
sample_col = categorical_cols[0]
value_counts = df[sample_col].value_counts().head(10)
value_counts.plot(kind='bar', ax=axes[1,0], color='lightgreen')
axes[1,0].set_title(f'{sample_col} 分布')
axes[1,0].tick_params(axis='x', rotation=45)
# 4. 数据完整性热图
sns.heatmap(df.isnull(), cbar=False, yticklabels=False, cmap='viridis', ax=axes[1,1])
axes[1,1].set_title('缺失值分布热图')
plt.tight_layout()
plt.show()
# 执行可视化
visualize_data_issues(df)
3. 处理重复数据
3.1 识别和处理完全重复的行
# 识别完全重复的行
print("重复行检查:")
print(df[df.duplicated()])
print(f"\n重复行数量:{df.duplicated().sum()}")
# 删除完全重复的行
df_deduplicated = df.drop_duplicates()
print(f"\n删除重复行前:{df.shape[0]} 行")
print(f"删除重复行后:{df_deduplicated.shape[0]} 行")
# 保留第一个出现的重复行,删除后续重复行
df_deduplicated_keep_first = df.drop_duplicates(keep='first')
print(f"保留第一个重复行:{df_deduplicated_keep_first.shape[0]} 行")
3.2 处理部分重复的数据
# 基于特定列判断重复(例如customer_id重复但其他信息可能不同)
print("\n基于customer_id检查重复:")
duplicate_by_id = df[df.duplicated(subset=['customer_id'], keep=False)]
print(duplicate_by_id)
# 删除基于customer_id的重复,保留第一个
df_unique_by_id = df.drop_duplicates(subset=['customer_id'], keep='first')
print(f"\n基于customer_id去重后:{df_unique_by_id.shape[0]} 行")
# 更复杂的去重逻辑:保留最新日期的记录
df_sorted = df.sort_values('join_date', ascending=False)
df_unique_latest = df_sorted.drop_duplicates(subset=['customer_id'], keep='first')
print(f"保留最新日期记录:{df_unique_latest.shape[0]} 行")
4. 处理缺失值
4.1 缺失值识别和分析
def analyze_missing_data(df):
"""
详细分析缺失数据
"""
print("缺失值详细分析:")
print("=" * 50)
# 按列统计
missing_by_column = df.isnull().sum()
missing_percentage = (missing_by_column / len(df)) * 100
missing_info = pd.DataFrame({
'缺失数量': missing_by_column,
'缺失比例(%)': missing_percentage.round(2)
})
print("按列统计:")
print(missing_info[missing_info['缺失数量'] > 0])
# 按行统计
missing_by_row = df.isnull().sum(axis=1)
rows_with_missing = missing_by_row[missing_by_row > 0]
if len(rows_with_missing) > 0:
print(f"\n有缺失值的行数:{len(rows_with_missing)}")
print("缺失值最多的前5行:")
print(df[missing_by_row > 0].head())
analyze_missing_data(df)
4.2 删除缺失值
# 删除包含缺失值的行
df_drop_rows = df.dropna()
print(f"删除缺失行前:{df.shape[0]} 行")
print(f"删除缺失行后:{df_drop_rows.shape[0]} 行")
# 删除包含缺失值的列
df_drop_cols = df.dropna(axis=1)
print(f"删除缺失列前:{df.shape[1]} 列")
print(f"删除缺失列后:{df_drop_cols.shape[1]} 列")
# 只删除完全缺失的行
df_drop_all_missing = df.dropna(how='all')
print(f"删除完全缺失行:{df_drop_all_missing.shape[0]} 行")
# 删除缺失值超过阈值的行(例如超过50%缺失)
threshold = 0.5 * len(df.columns)
df_drop_threshold = df.dropna(thresh=threshold)
print(f"删除缺失超过{threshold}列的行:{df_drop_threshold.shape[0]} 行")
4.3 填充缺失值
# 1. 用固定值填充
df_filled_fixed = df.copy()
df_filled_fixed['email'].fillna('unknown@email.com', inplace=True)
df_filled_fixed['phone'].fillna('000-000-0000', inplace=True)
print("1. 固定值填充:")
print(df_filled_fixed[['email', 'phone']].head())
# 2. 用统计值填充(均值、中位数、众数)
df_filled_stats = df.copy()
# 对于数值列
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if df[col].isnull().sum() > 0:
df_filled_stats[col].fillna(df[col].median(), inplace=True)
# 对于类别列
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
if df[col].isnull().sum() > 0:
df_filled_stats[col].fillna(df[col].mode()[0], inplace=True)
print("\n2. 统计值填充:")
print(df_filled_stats.head())
# 3. 前向填充和后向填充
df_filled_pad = df.copy()
df_filled_pad['email'].fillna(method='ffill', inplace=True) # 用前一个值填充
df_filled_pad['email'].fillna(method='bfill', inplace=True) # 用后一个值填充
print("\n3. 前向/后向填充:")
print(df_filled_pad[['email']].head())
# 4. 基于分组的填充
df_filled_group = df.copy()
# 按部门分组,用该部门的均值填充年龄
df_filled_group['age'] = df_filled_group.groupby('department')['age'].transform(
lambda x: x.fillna(x.median())
)
print("\n4. 分组填充:")
print(df_filled_group[['department', 'age']].head())
# 5. 插值填充
df_filled_interpolate = df.copy()
df_filled_interpolate['age'] = df_filled_interpolate['age'].interpolate(method='linear')
print("\n5. 插值填充:")
print(df_filled_interpolate[['age']].head())
4.4 高级缺失值处理:使用机器学习预测
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.model_selection import train_test_split
def impute_with_ml(df, target_column, problem_type='regression'):
"""
使用机器学习模型预测并填充缺失值
"""
# 创建训练数据(没有缺失值的行)
train_data = df[df[target_column].notnull()]
test_data = df[df[target_column].isnull()]
if len(test_data) == 0:
print(f"列 {target_column} 没有缺失值")
return df
# 准备特征(排除目标列和包含过多缺失值的列)
feature_columns = [col for col in df.columns
if col != target_column
and df[col].isnull().sum() == 0
and col != 'customer_id'] # 排除ID列
X_train = train_data[feature_columns]
y_train = train_data[target_column]
X_test = test_data[feature_columns]
# 转换类别变量为数值
X_train_encoded = pd.get_dummies(X_train)
X_test_encoded = pd.get_dummies(X_test)
# 确保训练集和测试集有相同的列
all_columns = set(X_train_encoded.columns) | set(X_test_encoded.columns)
for col in all_columns:
if col not in X_train_encoded.columns:
X_train_encoded[col] = 0
if col not in X_test_encoded.columns:
X_test_encoded[col] = 0
# 训练模型
if problem_type == 'regression':
model = RandomForestRegressor(n_estimators=100, random_state=42)
else:
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train_encoded, y_train)
# 预测缺失值
predictions = model.predict(X_test_encoded)
# 填充缺失值
df_filled = df.copy()
df_filled.loc[df_filled[target_column].isnull(), target_column] = predictions
print(f"使用ML模型填充 {target_column} 的缺失值")
print(f"预测值:{predictions}")
return df_filled
# 示例:使用ML填充年龄(数值型)
# df_ml_filled = impute_with_ml(df, 'age', 'regression')
5. 处理异常值
5.1 异常值检测方法
def detect_outliers(df, column, method='all'):
"""
使用多种方法检测异常值
"""
if column not in df.select_dtypes(include=[np.number]).columns:
print(f"{column} 不是数值列")
return None
data = df[column].dropna()
outliers_info = {}
# 1. IQR方法(四分位距)
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers_iqr = data[(data < lower_bound) | (data > upper_bound)]
outliers_info['IQR'] = {
'outliers': outliers_iqr,
'count': len(outliers_iqr),
'bounds': (lower_bound, upper_bound)
}
# 2. Z-score方法
z_scores = np.abs((data - data.mean()) / data.std())
outliers_z = data[z_scores > 3]
outliers_info['Z-score'] = {
'outliers': outliers_z,
'count': len(outliers_z)
}
# 3. 百分位数方法
lower_percentile = data.quantile(0.01)
upper_percentile = data.quantile(0.99)
outliers_percentile = data[(data < lower_percentile) | (data > upper_percentile)]
outliers_info['Percentile'] = {
'outliers': outliers_percentile,
'count': len(outliers_percentile),
'bounds': (lower_percentile, upper_percentile)
}
# 打印结果
print(f"\n异常值检测 - {column}:")
print(f"数据范围: {data.min():.2f} - {data.max():.2f}")
print(f"均值: {data.mean():.2f}, 标准差: {data.std():.2f}")
if method == 'all':
for method_name, info in outliers_info.items():
print(f"\n{method_name} 方法:")
print(f" 异常值数量: {info['count']}")
if 'bounds' in info:
print(f" 边界: [{info['bounds'][0]:.2f}, {info['bounds'][1]:.2f}]")
if info['count'] > 0:
print(f" 异常值: {info['outliers'].tolist()}")
return outliers_info
# 检测年龄列的异常值
outliers_age = detect_outliers(df, 'age')
5.2 处理异常值的方法
def handle_outliers(df, column, method='cap', threshold='auto'):
"""
处理异常值的多种方法
"""
df_clean = df.copy()
if column not in df.select_dtypes(include=[np.number]).columns:
return df_clean
data = df[column].copy()
if method == 'remove':
# 方法1:删除异常值
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df_clean = df_clean[(df_clean[column] >= lower_bound) &
(df_clean[column] <= upper_bound)]
print(f"删除异常值后,剩余 {df_clean.shape[0]} 行")
elif method == 'cap':
# 方法2:截断(winsorize)
if threshold == 'auto':
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
else:
lower_bound = data.quantile(0.01)
upper_bound = data.quantile(0.99)
df_clean[column] = df_clean[column].clip(lower=lower_bound, upper=upper_bound)
print(f"异常值截断处理完成")
elif method == 'log':
# 方法3:对数变换(适用于右偏分布)
# 注意:只适用于正值
if (data > 0).all():
df_clean[column] = np.log1p(df_clean[column])
print(f"对数变换完成")
else:
print("对数变换只适用于正值")
elif method == 'transform':
# 方法4:使用统计变换
median = data.median()
mad = np.median(np.abs(data - median))
modified_z_scores = 0.6745 * (data - median) / mad
df_clean.loc[np.abs(modified_z_scores) > 3.5, column] = median
print(f"使用MAD方法处理异常值完成")
return df_clean
# 示例:处理年龄异常值
df_no_outliers = handle_outliers(df, 'age', method='cap')
print("\n处理异常值后的年龄统计:")
print(df_no_outliers['age'].describe())
6. 数据格式标准化
6.1 文本数据清洗
def clean_text_data(df):
"""
清理和标准化文本数据
"""
df_clean = df.copy()
# 1. 姓名标准化
df_clean['name'] = df_clean['name'].str.strip() # 去除首尾空格
df_clean['name'] = df_clean['name'].str.title() # 首字母大写
df_clean['name'] = df_clean['name'].str.replace(r'\s+', ' ', regex=True) # 多个空格变一个
# 2. 邮箱标准化
df_clean['email'] = df_clean['email'].str.lower() # 转小写
df_clean['email'] = df_clean['email'].str.strip() # 去除空格
# 3. 部门名称标准化
department_mapping = {
'sales': 'Sales',
'Sales': 'Sales',
'SALES': 'Sales',
'engineering': 'Engineering',
'Engineering': 'Engineering',
'ENGINEERING': 'Engineering',
'marketing': 'Marketing',
'Marketing': 'Marketing',
'MARKETING': 'Marketing'
}
df_clean['department'] = df_clean['department'].str.strip().str.title()
# 4. 状态标准化
df_clean['status'] = df_clean['status'].str.strip().str.upper()
# 5. 移除特殊字符(保留字母、数字、空格)
df_clean['name'] = df_clean['name'].str.replace(r'[^a-zA-Z0-9\s]', '', regex=True)
return df_clean
df_text_cleaned = clean_text_data(df)
print("文本数据清洗结果:")
print(df_text_cleaned[['name', 'email', 'department', 'status']].head())
6.2 数值数据标准化
def clean_numeric_data(df):
"""
清理和标准化数值数据
"""
df_clean = df.copy()
# 1. 清理salary列(去除$和,,转换为数值)
if 'salary' in df_clean.columns:
df_clean['salary_clean'] = (
df_clean['salary']
.str.replace('$', '', regex=False)
.str.replace(',', '', regex=False)
.str.strip()
.astype(float)
)
# 2. 清理phone列(统一格式)
if 'phone' in df_clean.columns:
def standardize_phone(phone):
if pd.isna(phone) or phone == '':
return np.nan
# 移除所有非数字字符
digits = re.sub(r'\D', '', str(phone))
# 重新格式化为XXX-XXX-XXXX
if len(digits) == 10:
return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
else:
return np.nan
df_clean['phone_clean'] = df_clean['phone'].apply(standardize_phone)
# 3. 年龄范围验证
if 'age' in df_clean.columns:
# 移除不合理的年龄值
df_clean.loc[(df_clean['age'] < 0) | (df_clean['age'] > 120), 'age'] = np.nan
return df_clean
df_numeric_cleaned = clean_numeric_data(df)
print("\n数值数据清洗结果:")
print(df_numeric_cleaned[['salary', 'salary_clean', 'phone', 'phone_clean']].head())
6.3 日期时间标准化
def clean_datetime_data(df):
"""
清理和标准化日期时间数据
"""
df_clean = df.copy()
if 'join_date' in df_clean.columns:
# 1. 转换为datetime对象
df_clean['join_date'] = pd.to_datetime(df_clean['join_date'], errors='coerce')
# 2. 提取有用的时间特征
df_clean['join_year'] = df_clean['join_date'].dt.year
df_clean['join_month'] = df_clean['join_date'].dt.month
df_clean['join_day'] = df_clean['join_date'].dt.day
df_clean['join_weekday'] = df_clean['join_date'].dt.weekday # 0=Monday, 6=Sunday
# 3. 计算相对于当前日期的天数
current_date = pd.Timestamp.now()
df_clean['days_since_join'] = (current_date - df_clean['join_date']).dt.days
# 4. 格式化为标准字符串
df_clean['join_date_str'] = df_clean['join_date'].dt.strftime('%Y-%m-%d')
return df_clean
df_datetime_cleaned = clean_datetime_data(df)
print("\n日期时间清洗结果:")
print(df_datetime_cleaned[['join_date', 'join_year', 'join_month', 'days_since_join']].head())
7. 高级数据清洗技术
7.1 使用正则表达式进行复杂清洗
def advanced_regex_cleaning(df):
"""
使用正则表达式进行高级数据清洗
"""
df_clean = df.copy()
# 1. 从复杂文本中提取信息
# 假设email列包含一些不规范的格式
email_pattern = r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'
df_clean['email_extracted'] = df_clean['email'].str.extract(email_pattern)
# 2. 清理电话号码(多种格式)
phone_pattern = r'(\d{3})[-.\s]?(\d{3})[-.\s]?(\d{4})'
df_clean['phone_formatted'] = df_clean['phone'].str.extract(phone_pattern).fillna('').agg('-'.join, axis=1)
# 3. 检测并标记可疑数据
# 检测邮箱是否包含可疑字符
df_clean['email_suspicious'] = df_clean['email'].str.contains(r'[!#$%^&*(),?/{}|<>]', na=False)
# 4. 清理姓名中的特殊字符但保留空格
df_clean['name_clean'] = df_clean['name'].str.replace(r'[^\w\s]', '', regex=True)
return df_clean
df_regex_cleaned = advanced_regex_cleaning(df)
print("\n正则表达式高级清洗结果:")
print(df_regex_cleaned[['email', 'email_extracted', 'phone', 'phone_formatted']].head())
7.2 数据类型优化
def optimize_data_types(df):
"""
优化数据类型以减少内存使用
"""
df_optimized = df.copy()
# 1. 优化整数列
for col in df_optimized.select_dtypes(include=['int']).columns:
df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='integer')
# 2. 优化浮点数列
for col in df_optimized.select_dtypes(include=['float']).columns:
df_optimized[col] =1
df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='float')
# 3. 优化对象列(转换为category如果唯一值较少)
for col in df_optimized.select_dtypes(include=['object']).columns:
num_unique = df_optimized[col].nunique()
num_total = len(df_optimized)
if num_unique / num_total < 0.5: # 如果唯一值少于50%
df_optimized[col] = df_optimized[col].astype('category')
# 4. 优化布尔列
for col in df_optimized.columns:
if df_optimized[col].dtype == 'object':
unique_vals = df_optimized[col].dropna().unique()
if set(unique_vals) == {'True', 'False', 'true', 'false', '1', '0'}:
df_optimized[col] = df_optimized[col].map({
'True': True, 'False': False, 'true': True, 'false': False, '1': True, '0': False
})
print("内存使用优化:")
print(f"原始内存: {df.memory_usage(deep=True).sum() / 1024:.2f} KB")
print(f"优化后内存: {df_optimized.memory_usage(deep=True).sum() / 1024:.2f} KB")
return df_optimized
# 注意:由于我们的示例数据较小,优化效果不明显,但在大数据集上效果显著
7.3 数据验证和质量检查
def validate_data_quality(df):
"""
数据验证和质量检查
"""
validation_results = {}
# 1. 验证email格式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
valid_emails = df['email'].dropna().str.match(email_pattern).sum()
validation_results['email_valid'] = {
'valid_count': valid_emails,
'invalid_count': len(df['email'].dropna()) - valid_emails,
'valid_rate': valid_emails / len(df['email'].dropna()) if len(df['email'].dropna()) > 0 else 0
}
# 2. 验证年龄范围
valid_ages = df['age'].between(0, 120, inclusive='both').sum()
validation_results['age_valid'] = {
'valid_count': valid_ages,
'invalid_count': len(df['age'].dropna()) - valid_ages,
'valid_rate': valid_ages / len(df['age'].dropna()) if len(df['age'].dropna()) > 0 else 0
}
# 3. 验证salary为正数
if 'salary_clean' in df.columns:
valid_salaries = (df['salary_clean'] > 0).sum()
validation_results['salary_valid'] = {
'valid_count': valid_salaries,
'invalid_count': len(df['salary_clean'].dropna()) - valid_salaries,
'valid_rate': valid_salaries / len(df['salary_clean'].dropna()) if len(df['salary_clean'].dropna()) > 0 else 0
}
# 4. 验证部门值
valid_departments = df['department'].isin(['Sales', 'Engineering', 'Marketing']).sum()
validation_results['department_valid'] = {
'valid_count': valid_departments,
'invalid_count': len(df['department'].dropna()) - valid_departments,
'valid_rate': valid_departments / len(df['department'].dropna()) if len(df['department'].dropna()) > 0 else 0
}
# 打印验证结果
print("\n数据质量验证结果:")
print("=" * 50)
for check, result in validation_results.items():
print(f"\n{check}:")
print(f" 有效数量: {result['valid_count']}")
print(f" 无效数量: {result['invalid_count']}")
print(f" 有效率: {result['valid_rate']:.2%}")
return validation_results
validation_results = validate_data_quality(df_numeric_cleaned)
8. 构建完整的数据清洗管道
8.1 创建可重用的清洗函数
class DataCleaner:
"""
数据清洗器类,封装所有清洗步骤
"""
def __init__(self, df):
self.original_df = df.copy()
self.cleaned_df = df.copy()
self.cleaning_log = []
self.validation_results = {}
def log_step(self, step_name, description):
"""记录清洗步骤"""
self.cleaning_log.append({
'step': step_name,
'description': description,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
def remove_duplicates(self, subset=None, keep='first'):
"""删除重复数据"""
if subset is None:
subset = self.cleaned_df.columns.tolist()
before = len(self.cleaned_df)
self.cleaned_df = self.cleaned_df.drop_duplicates(subset=subset, keep=keep)
after = len(self.cleaned_df)
self.log_step('remove_duplicates',
f"Removed {before - after} duplicate rows based on {subset}")
return self
def handle_missing_values(self, strategy_dict):
"""
处理缺失值
strategy_dict: {column: strategy}
strategy: 'drop', 'mean', 'median', 'mode', 'ffill', 'bfill', or specific value
"""
for column, strategy in strategy_dict.items():
if column not in self.cleaned_df.columns:
continue
missing_count = self.cleaned_df[column].isnull().sum()
if missing_count == 0:
continue
if strategy == 'drop':
self.cleaned_df = self.cleaned_df.dropna(subset=[column])
elif strategy == 'mean' and self.cleaned_df[column].dtype in ['int64', 'float64']:
self.cleaned_df[column].fillna(self.cleaned_df[column].mean(), inplace=True)
elif strategy == 'median' and self.cleaned_df[column].dtype in ['int64', 'float64']:
self.cleaned_df[column].fillna(self.cleaned_df[column].median(), inplace=True)
elif strategy == 'mode':
self.cleaned_df[column].fillna(self.cleaned_df[column].mode()[0], inplace=True)
elif strategy == 'ffill':
self.cleaned_df[column].fillna(method='ffill', inplace=True)
elif strategy == 'bfill':
self.cleaned_df[column].fillna(method='bfill', inplace=True)
else:
self.cleaned_df[column].fillna(strategy, inplace=True)
self.log_step('handle_missing',
f"Filled {missing_count} missing values in {column} using {strategy}")
return self
def clean_text_columns(self, columns):
"""清理文本列"""
for col in columns:
if col in self.cleaned_df.columns:
self.cleaned_df[col] = self.cleaned_df[col].astype(str).str.strip()
self.cleaned_df[col] = self.cleaned_df[col].str.replace(r'\s+', ' ', regex=True)
self.log_step('clean_text', f"Cleaned text in {col}")
return self
def standardize_format(self, column, format_type):
"""标准化特定格式"""
if column not in self.cleaned_df.columns:
return self
if format_type == 'email':
self.cleaned_df[column] = self.cleaned_df[column].str.lower().str.strip()
elif format_type == 'phone':
def format_phone(p):
if pd.isna(p) or p == '':
return np.nan
digits = re.sub(r'\D', str(p))
if len(digits) == 10:
return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
return np.nan
self.cleaned_df[column] = self.cleaned_df[column].apply(format_phone)
elif format_type == 'currency':
self.cleaned_df[column] = (self.cleaned_df[column]
.str.replace('$', '', regex=False)
.str.replace(',', '', regex=False)
.astype(float))
self.log_step('standardize', f"Standardized {column} to {format_type} format")
return self
def remove_outliers(self, column, method='cap'):
"""移除或处理异常值"""
if column not in self.cleaned_df.columns:
return self
if self.cleaned_df[column].dtype not in ['int64', 'float64']:
return self
before = len(self.cleaned_df)
if method == 'cap':
Q1 = self.cleaned_df[column].quantile(0.25)
Q3 = self.cleaned_df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
self.cleaned_df[column] = self.cleaned_df[column].clip(lower=lower_bound, upper=upper_bound)
elif method == 'remove':
Q1 = self.cleaned_df[column].quantile(0.25)
Q3 = self.cleaned_df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
self.cleaned_df = self.cleaned_df[(self.cleaned_df[column] >= lower_bound) &
(self.cleaned_df[column] <= upper_bound)]
after = len(self.cleaned_df)
self.log_step('remove_outliers',
f"Processed outliers in {column} using {method} method ({before - after} rows removed)")
return self
def validate_data(self, validation_rules):
"""验证数据质量"""
for column, rule in validation_rules.items():
if column not in self.cleaned_df.columns:
continue
if rule == 'positive':
valid = (self.cleaned_df[column] > 0).sum()
elif rule == 'email':
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
valid = self.cleaned_df[column].dropna().str.match(pattern).sum()
elif rule == 'age_range':
valid = self.cleaned_df[column].between(0, 120).sum()
else:
continue
total = len(self.cleaned_df[column].dropna())
self.validation_results[column] = {
'rule': rule,
'valid': valid,
'total': total,
'pass_rate': valid / total if total > 0 else 0
}
return self
def get_cleaned_data(self):
"""获取清洗后的数据"""
return self.cleaned_df
def get_cleaning_report(self):
"""生成清洗报告"""
report = {
'original_shape': self.original_df.shape,
'cleaned_shape': self.cleaned_df.shape,
'rows_removed': self.original_df.shape[0] - self.cleaned_df.shape[0],
'columns_removed': self.original_df.shape[1] - self.cleaned_df.shape[1],
'cleaning_steps': len(self.cleaning_log),
'validation_results': self.validation_results,
'cleaning_log': self.cleaning_log
}
return report
def print_report(self):
"""打印清洗报告"""
report = self.get_cleaning_report()
print("\n" + "="*60)
print("数据清洗报告")
print("="*60)
print(f"原始数据形状: {report['original_shape']}")
print(f"清洗后形状: {report['cleaned_shape']}")
print(f"删除的行数: {report['rows_removed']}")
print(f"删除的列数: {report['columns_removed']}")
print(f"清洗步骤数: {report['cleaning_steps']}")
print("\n清洗日志:")
for i, step in enumerate(report['cleaning_log'], 1):
print(f" {i}. [{step['timestamp']}] {step['step']}: {step['description']}")
if report['validation_results']:
print("\n验证结果:")
for col, result in report['validation_results'].items():
print(f" {col} ({result['rule']}): {result['valid']}/{result['total']} ({result['pass_rate']:.2%})")
8.2 使用清洗管道
# 创建清洗器实例
cleaner = DataCleaner(df)
# 执行完整的清洗流程
cleaner.remove_duplicates(subset=['customer_id']) # 去重
cleaner.handle_missing_values({
'email': 'unknown@email.com', # 固定值
'age': 'median', # 中位数
'phone': '000-000-0000' # 固定值
}) # 处理缺失值
cleaner.clean_text_columns(['name', 'department', 'status']) # 清理文本
cleaner.standardize_format('email', 'email') # 标准化邮箱
cleaner.standardize_format('phone', 'phone') # 标准化电话
cleaner.standardize_format('salary', 'currency') # 标准化薪资
cleaner.remove_outliers('age', 'cap') # 处理异常值
# 验证数据
validation_rules = {
'age': 'age_range',
'salary_clean': 'positive',
'email': 'email'
}
cleaner.validate_data(validation_rules)
# 获取结果
cleaned_df = cleaner.get_cleaned_data()
cleaner.print_report()
print("\n清洗后的数据预览:")
print(cleaned_df.head())
9. 性能优化和最佳实践
9.1 处理大数据集的技巧
def process_large_dataset(file_path, chunk_size=10000):
"""
处理大型数据集的分块处理方法
"""
cleaned_chunks = []
# 使用迭代器分块读取
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 对每个块应用清洗逻辑
chunk_cleaned = (
chunk
.drop_duplicates()
.dropna(subset=['essential_column'])
.fillna({'column1': 'default', 'column2': 0})
)
cleaned_chunks.append(chunk_cleaned)
# 合并所有清洗后的块
final_df = pd.concat(cleaned_chunks, ignore_index=True)
return final_df
# 内存优化技巧
def optimize_memory_usage(df):
"""
优化内存使用的多种方法
"""
# 1. 只读取需要的列
# df = pd.read_csv('file.csv', usecols=['col1', 'col2'])
# 2. 指定数据类型
# dtype = {'col1': 'int32', 'col2': 'category'}
# df = pd.read_csv('file.csv', dtype=dtype)
# 3. 处理日期时指定格式
# parse_dates = ['date_col']
# df = pd.read_csv('file.csv', parse_dates=parse_dates)
# 4. 使用category类型
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype('category')
# 5. 降级数值类型
for col in df.select_dtypes(include=['int']).columns:
df[col] = pd.to_numeric(df[col], downcast='integer')
for col in df.select_dtypes(include=['float']).columns:
df[col] = pd.to_numeric(df[col], downcast='float')
return df
9.2 并行处理
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def parallel_cleaning(df, cleaning_func, n_workers=None):
"""
使用多进程并行处理大数据清洗
"""
if n_workers is None:
n_workers = multiprocessing.cpu_count()
# 将数据分割为多个部分
df_split = np.array_split(df, n_workers)
with ProcessPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(cleaning_func, df_split))
# 合并结果
return pd.concat(results, ignore_index=True)
# 示例清洗函数
def clean_chunk(chunk):
"""清洗单个数据块"""
return (
chunk
.drop_duplicates()
.fillna(method='ffill')
.reset_index(drop=True)
)
# 使用并行处理
# cleaned_df = parallel_cleaning(large_df, clean_chunk)
10. 总结和最佳实践
10.1 数据清洗检查清单
def data_cleaning_checklist():
"""
数据清洗检查清单
"""
checklist = {
"数据探索": [
"✓ 检查数据形状和类型",
"✓ 识别缺失值分布",
"✓ 检查重复数据",
"✓ 分析数值分布",
"✓ 可视化数据问题"
],
"数据清洗": [
"✓ 删除完全重复的行",
"✓ 处理缺失值(删除/填充)",
"✓ 识别和处理异常值",
"✓ 标准化文本格式",
"✓ 清理特殊字符",
"✓ 验证数据范围"
],
"数据转换": [
"✓ 转换日期格式",
"✓ 标准化数值格式",
"✓ 优化数据类型",
"✓ 创建衍生特征"
],
"验证和文档": [
"✓ 验证数据质量",
"✓ 记录清洗步骤",
"✓ 保存清洗后的数据",
"✓ 创建清洗报告"
]
}
print("\n数据清洗检查清单:")
print("=" * 50)
for category, items in checklist.items():
print(f"\n{category}:")
for item in items:
print(f" {item}")
data_cleaning_checklist()
10.2 常见陷阱和解决方案
"""
数据清洗常见陷阱及解决方案:
1. 过度清洗
- 陷阱:删除过多数据,导致样本偏差
- 解决方案:记录删除原因,确保删除比例合理
2. 忽略数据上下文
- 陷阱:不了解业务含义,错误处理数据
- 解决方案:与领域专家沟通,理解数据含义
3. 硬编码阈值
- 陷阱:使用固定的异常值阈值
- 解决方案:根据数据分布动态调整
4. 不保存中间步骤
- 陷阱:无法回溯和调试
- 解决方案:记录所有清洗步骤和参数
5. 忽略数据类型
- 陷阱:内存浪费和性能问题
- 解决方案:优化数据类型,使用category等
6. 不验证结果
- 陷阱:清洗引入新错误
- 解决方案:清洗后进行验证和测试
"""
结论
数据清洗是数据分析和机器学习项目成功的关键步骤。通过本文介绍的技术和方法,您可以:
- 系统化地处理数据质量问题:从重复数据、缺失值到异常值
- 提高数据质量:通过标准化和验证确保数据可靠性
- 优化性能:使用高效的方法处理大数据集
- 建立可重用的流程:创建标准化的清洗管道
记住,好的数据清洗不仅仅是技术问题,更需要理解数据的业务含义。始终记录您的清洗步骤,保持数据的可追溯性,并在清洗后进行充分的验证。
通过实践这些方法,您将能够处理各种复杂的数据清洗任务,为后续的数据分析和建模打下坚实的基础。
