引言:数据清洗的重要性

在数据科学和分析领域,数据清洗是整个工作流程中最耗时但也最关键的步骤。根据业界统计,数据科学家通常花费60-80%的时间在数据准备和清洗上。原始数据往往包含各种问题:缺失值、异常值、重复记录、格式不一致、编码问题等。这些问题如果得不到妥善处理,会严重影响后续分析的准确性和可靠性。

Python作为数据科学的首选语言,提供了强大的工具生态系统来处理这些问题。本文将详细介绍如何使用Python进行高效的数据清洗,涵盖从基础到高级的各种技术和最佳实践。

1. 数据清洗的基础工具和环境设置

1.1 必需的Python库

在开始数据清洗之前,我们需要安装并导入以下核心库:

# 数据处理基础库
import pandas as pd
import numpy as np

# 可视化库
import matplotlib.pyplot as plt
import seaborn as sns

# 正则表达式处理
import re

# 日期时间处理
from datetime import datetime

# 设置显示选项
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', 1000)

print("所有必要的库已成功导入!")

1.2 创建示例数据集

为了演示数据清洗技术,我们首先创建一个包含常见数据问题的示例数据集:

# 创建一个包含各种数据问题的示例数据集
data = {
    'customer_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 11],
    'name': ['John Doe', 'Jane Smith', 'Bob Johnson', 'Alice Brown', 'Charlie Wilson', 
             'David Lee', 'Eva Martinez', 'Frank White', 'Grace Lee', 'Henry Davis',
             'John Doe', 'Tom Wilson'],
    'age': [25, 30, 35, 28, 42, 38, 45, 29, 33, 41, 25, 36],
    'email': ['john.doe@email.com', 'jane.smith@email.com', 'bob.johnson@email.com', 
              'alice.brown@email.com', 'charlie.wilson@email.com', 'david.lee@email.com',
              'eva.martinez@email.com', 'frank.white@email.com', 'grace.lee@email.com',
              'henry.davis@email.com', 'john.doe@email.com', None],
    'salary': ['$50,000', '$60,000', '$75,000', '$55,000', '$90,000', '$80,000', 
               '$95,000', '$58,000', '$68,000', '$85,000', '$50,000', '$72,000'],
    'join_date': ['2020-01-15', '2019-03-20', '2021-06-10', '2020-08-05', '2018-11-30',
                  '2019-05-25', '2022-02-14', '2020-09-18', '2021-12-01', '2019-07-22',
                  '2020-01-15', '2023-01-10'],
    'department': ['Sales', 'Engineering', 'Marketing', 'Sales', 'Engineering', 
                   'Marketing', 'Engineering', 'Sales', 'Marketing', 'Engineering',
                   'Sales', 'Engineering'],
    'status': ['Active', 'Active', 'Inactive', 'Active', 'Active', 'Inactive', 
               'Active', 'Active', 'Active', 'Inactive', 'Active', 'Active'],
    'phone': ['123-456-7890', '234-567-8901', '345-678-9012', '456-789-0123', '567-890-1234',
              '678-901-2345', '789-012-3456', '890-123-4567', '901-234-5678', '012-345-6789',
              '123-456-7890', '']
}

df = pd.DataFrame(data)
print("原始数据集预览:")
print(df)
print(f"\n数据集形状:{df.shape}")

2. 数据探索和质量评估

2.1 基本数据信息检查

在清洗之前,我们需要全面了解数据的状况:

def explore_data_quality(df):
    """
    全面评估数据质量的函数
    """
    print("=" * 60)
    print("数据质量评估报告")
    print("=" * 60)
    
    # 1. 基本信息
    print("\n1. 数据集基本信息:")
    print(f"   - 行数: {df.shape[0]}")
    print(f"   - 列数: {df.shape[1]}")
    print(f"   - 内存使用: {df.memory_usage(deep=True).sum() / 1024:.2f} KB")
    
    # 2. 数据类型
    print("\n2. 数据类型:")
    print(df.dtypes)
    
    # 3. 缺失值统计
    print("\n3. 缺失值统计:")
    missing_values = df.isnull().sum()
    missing_percentage = (missing_values / len(df)) * 100
    missing_info = pd.DataFrame({
        '缺失数量': missing_values,
        '缺失比例(%)': missing_percentage.round(2)
    })
    print(missing_info[missing_info['缺失数量'] > 0])
    
    # 4. 重复值检查
    print("\n4. 重复值检查:")
    duplicate_rows = df.duplicated().sum()
    print(f"   - 完全重复的行数: {duplicate_rows}")
    
    # 5. 数值列的统计描述
    print("\n5. 数值列统计描述:")
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 0:
        print(df[numeric_cols].describe())
    else:
        print("   - 无数值列")
    
    # 6. 唯一值统计
    print("\n6. 各列唯一值数量:")
    for col in df.columns:
        unique_count = df[col].nunique()
        print(f"   - {col}: {unique_count} 个唯一值")

# 执行数据探索
explore_data_quality(df)

2.2 数据可视化探索

可视化是发现数据问题的有效方法:

def visualize_data_issues(df):
    """
    可视化展示数据质量问题
    """
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('数据质量问题可视化', fontsize=16)
    
    # 1. 缺失值分布
    missing_data = df.isnull().sum()
    missing_data[missing_data > 0].plot(kind='bar', ax=axes[0,0], color='coral')
    axes[0,0].set_title('各列缺失值数量')
    axes[0,0].set_ylabel('缺失值数量')
    
    # 2. 数值分布(如果有数值列)
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 0:
        df[numeric_cols].hist(bins=15, ax=axes[0,1], color='skyblue', alpha=0.7)
        axes[0,1].set_title('数值列分布')
    
    # 3. 类别分布
    categorical_cols = df.select_dtypes(include=['object']).columns
    if len(categorical_cols) > 0:
        # 选择一个类别列进行展示
        sample_col = categorical_cols[0]
        value_counts = df[sample_col].value_counts().head(10)
        value_counts.plot(kind='bar', ax=axes[1,0], color='lightgreen')
        axes[1,0].set_title(f'{sample_col} 分布')
        axes[1,0].tick_params(axis='x', rotation=45)
    
    # 4. 数据完整性热图
    sns.heatmap(df.isnull(), cbar=False, yticklabels=False, cmap='viridis', ax=axes[1,1])
    axes[1,1].set_title('缺失值分布热图')
    
    plt.tight_layout()
    plt.show()

# 执行可视化
visualize_data_issues(df)

3. 处理重复数据

3.1 识别和处理完全重复的行

# 识别完全重复的行
print("重复行检查:")
print(df[df.duplicated()])
print(f"\n重复行数量:{df.duplicated().sum()}")

# 删除完全重复的行
df_deduplicated = df.drop_duplicates()
print(f"\n删除重复行前:{df.shape[0]} 行")
print(f"删除重复行后:{df_deduplicated.shape[0]} 行")

# 保留第一个出现的重复行,删除后续重复行
df_deduplicated_keep_first = df.drop_duplicates(keep='first')
print(f"保留第一个重复行:{df_deduplicated_keep_first.shape[0]} 行")

3.2 处理部分重复的数据

# 基于特定列判断重复(例如customer_id重复但其他信息可能不同)
print("\n基于customer_id检查重复:")
duplicate_by_id = df[df.duplicated(subset=['customer_id'], keep=False)]
print(duplicate_by_id)

# 删除基于customer_id的重复,保留第一个
df_unique_by_id = df.drop_duplicates(subset=['customer_id'], keep='first')
print(f"\n基于customer_id去重后:{df_unique_by_id.shape[0]} 行")

# 更复杂的去重逻辑:保留最新日期的记录
df_sorted = df.sort_values('join_date', ascending=False)
df_unique_latest = df_sorted.drop_duplicates(subset=['customer_id'], keep='first')
print(f"保留最新日期记录:{df_unique_latest.shape[0]} 行")

4. 处理缺失值

4.1 缺失值识别和分析

def analyze_missing_data(df):
    """
    详细分析缺失数据
    """
    print("缺失值详细分析:")
    print("=" * 50)
    
    # 按列统计
    missing_by_column = df.isnull().sum()
    missing_percentage = (missing_by_column / len(df)) * 100
    
    missing_info = pd.DataFrame({
        '缺失数量': missing_by_column,
        '缺失比例(%)': missing_percentage.round(2)
    })
    
    print("按列统计:")
    print(missing_info[missing_info['缺失数量'] > 0])
    
    # 按行统计
    missing_by_row = df.isnull().sum(axis=1)
    rows_with_missing = missing_by_row[missing_by_row > 0]
    
    if len(rows_with_missing) > 0:
        print(f"\n有缺失值的行数:{len(rows_with_missing)}")
        print("缺失值最多的前5行:")
        print(df[missing_by_row > 0].head())

analyze_missing_data(df)

4.2 删除缺失值

# 删除包含缺失值的行
df_drop_rows = df.dropna()
print(f"删除缺失行前:{df.shape[0]} 行")
print(f"删除缺失行后:{df_drop_rows.shape[0]} 行")

# 删除包含缺失值的列
df_drop_cols = df.dropna(axis=1)
print(f"删除缺失列前:{df.shape[1]} 列")
print(f"删除缺失列后:{df_drop_cols.shape[1]} 列")

# 只删除完全缺失的行
df_drop_all_missing = df.dropna(how='all')
print(f"删除完全缺失行:{df_drop_all_missing.shape[0]} 行")

# 删除缺失值超过阈值的行(例如超过50%缺失)
threshold = 0.5 * len(df.columns)
df_drop_threshold = df.dropna(thresh=threshold)
print(f"删除缺失超过{threshold}列的行:{df_drop_threshold.shape[0]} 行")

4.3 填充缺失值

# 1. 用固定值填充
df_filled_fixed = df.copy()
df_filled_fixed['email'].fillna('unknown@email.com', inplace=True)
df_filled_fixed['phone'].fillna('000-000-0000', inplace=True)
print("1. 固定值填充:")
print(df_filled_fixed[['email', 'phone']].head())

# 2. 用统计值填充(均值、中位数、众数)
df_filled_stats = df.copy()
# 对于数值列
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
    if df[col].isnull().sum() > 0:
        df_filled_stats[col].fillna(df[col].median(), inplace=True)

# 对于类别列
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
    if df[col].isnull().sum() > 0:
        df_filled_stats[col].fillna(df[col].mode()[0], inplace=True)

print("\n2. 统计值填充:")
print(df_filled_stats.head())

# 3. 前向填充和后向填充
df_filled_pad = df.copy()
df_filled_pad['email'].fillna(method='ffill', inplace=True)  # 用前一个值填充
df_filled_pad['email'].fillna(method='bfill', inplace=True)  # 用后一个值填充
print("\n3. 前向/后向填充:")
print(df_filled_pad[['email']].head())

# 4. 基于分组的填充
df_filled_group = df.copy()
# 按部门分组,用该部门的均值填充年龄
df_filled_group['age'] = df_filled_group.groupby('department')['age'].transform(
    lambda x: x.fillna(x.median())
)
print("\n4. 分组填充:")
print(df_filled_group[['department', 'age']].head())

# 5. 插值填充
df_filled_interpolate = df.copy()
df_filled_interpolate['age'] = df_filled_interpolate['age'].interpolate(method='linear')
print("\n5. 插值填充:")
print(df_filled_interpolate[['age']].head())

4.4 高级缺失值处理:使用机器学习预测

from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.model_selection import train_test_split

def impute_with_ml(df, target_column, problem_type='regression'):
    """
    使用机器学习模型预测并填充缺失值
    """
    # 创建训练数据(没有缺失值的行)
    train_data = df[df[target_column].notnull()]
    test_data = df[df[target_column].isnull()]
    
    if len(test_data) == 0:
        print(f"列 {target_column} 没有缺失值")
        return df
    
    # 准备特征(排除目标列和包含过多缺失值的列)
    feature_columns = [col for col in df.columns 
                      if col != target_column 
                      and df[col].isnull().sum() == 0
                      and col != 'customer_id']  # 排除ID列
    
    X_train = train_data[feature_columns]
    y_train = train_data[target_column]
    X_test = test_data[feature_columns]
    
    # 转换类别变量为数值
    X_train_encoded = pd.get_dummies(X_train)
    X_test_encoded = pd.get_dummies(X_test)
    
    # 确保训练集和测试集有相同的列
    all_columns = set(X_train_encoded.columns) | set(X_test_encoded.columns)
    for col in all_columns:
        if col not in X_train_encoded.columns:
            X_train_encoded[col] = 0
        if col not in X_test_encoded.columns:
            X_test_encoded[col] = 0
    
    # 训练模型
    if problem_type == 'regression':
        model = RandomForestRegressor(n_estimators=100, random_state=42)
    else:
        model = RandomForestClassifier(n_estimators=100, random_state=42)
    
    model.fit(X_train_encoded, y_train)
    
    # 预测缺失值
    predictions = model.predict(X_test_encoded)
    
    # 填充缺失值
    df_filled = df.copy()
    df_filled.loc[df_filled[target_column].isnull(), target_column] = predictions
    
    print(f"使用ML模型填充 {target_column} 的缺失值")
    print(f"预测值:{predictions}")
    
    return df_filled

# 示例:使用ML填充年龄(数值型)
# df_ml_filled = impute_with_ml(df, 'age', 'regression')

5. 处理异常值

5.1 异常值检测方法

def detect_outliers(df, column, method='all'):
    """
    使用多种方法检测异常值
    """
    if column not in df.select_dtypes(include=[np.number]).columns:
        print(f"{column} 不是数值列")
        return None
    
    data = df[column].dropna()
    outliers_info = {}
    
    # 1. IQR方法(四分位距)
    Q1 = data.quantile(0.25)
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    outliers_iqr = data[(data < lower_bound) | (data > upper_bound)]
    outliers_info['IQR'] = {
        'outliers': outliers_iqr,
        'count': len(outliers_iqr),
        'bounds': (lower_bound, upper_bound)
    }
    
    # 2. Z-score方法
    z_scores = np.abs((data - data.mean()) / data.std())
    outliers_z = data[z_scores > 3]
    outliers_info['Z-score'] = {
        'outliers': outliers_z,
        'count': len(outliers_z)
    }
    
    # 3. 百分位数方法
    lower_percentile = data.quantile(0.01)
    upper_percentile = data.quantile(0.99)
    outliers_percentile = data[(data < lower_percentile) | (data > upper_percentile)]
    outliers_info['Percentile'] = {
        'outliers': outliers_percentile,
        'count': len(outliers_percentile),
        'bounds': (lower_percentile, upper_percentile)
    }
    
    # 打印结果
    print(f"\n异常值检测 - {column}:")
    print(f"数据范围: {data.min():.2f} - {data.max():.2f}")
    print(f"均值: {data.mean():.2f}, 标准差: {data.std():.2f}")
    
    if method == 'all':
        for method_name, info in outliers_info.items():
            print(f"\n{method_name} 方法:")
            print(f"  异常值数量: {info['count']}")
            if 'bounds' in info:
                print(f"  边界: [{info['bounds'][0]:.2f}, {info['bounds'][1]:.2f}]")
            if info['count'] > 0:
                print(f"  异常值: {info['outliers'].tolist()}")
    
    return outliers_info

# 检测年龄列的异常值
outliers_age = detect_outliers(df, 'age')

5.2 处理异常值的方法

def handle_outliers(df, column, method='cap', threshold='auto'):
    """
    处理异常值的多种方法
    """
    df_clean = df.copy()
    
    if column not in df.select_dtypes(include=[np.number]).columns:
        return df_clean
    
    data = df[column].copy()
    
    if method == 'remove':
        # 方法1:删除异常值
        Q1 = data.quantile(0.25)
        Q3 = data.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        df_clean = df_clean[(df_clean[column] >= lower_bound) & 
                           (df_clean[column] <= upper_bound)]
        print(f"删除异常值后,剩余 {df_clean.shape[0]} 行")
        
    elif method == 'cap':
        # 方法2:截断(winsorize)
        if threshold == 'auto':
            Q1 = data.quantile(0.25)
            Q3 = data.quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
        else:
            lower_bound = data.quantile(0.01)
            upper_bound = data.quantile(0.99)
        
        df_clean[column] = df_clean[column].clip(lower=lower_bound, upper=upper_bound)
        print(f"异常值截断处理完成")
        
    elif method == 'log':
        # 方法3:对数变换(适用于右偏分布)
        # 注意:只适用于正值
        if (data > 0).all():
            df_clean[column] = np.log1p(df_clean[column])
            print(f"对数变换完成")
        else:
            print("对数变换只适用于正值")
            
    elif method == 'transform':
        # 方法4:使用统计变换
        median = data.median()
        mad = np.median(np.abs(data - median))
        modified_z_scores = 0.6745 * (data - median) / mad
        df_clean.loc[np.abs(modified_z_scores) > 3.5, column] = median
        print(f"使用MAD方法处理异常值完成")
    
    return df_clean

# 示例:处理年龄异常值
df_no_outliers = handle_outliers(df, 'age', method='cap')
print("\n处理异常值后的年龄统计:")
print(df_no_outliers['age'].describe())

6. 数据格式标准化

6.1 文本数据清洗

def clean_text_data(df):
    """
    清理和标准化文本数据
    """
    df_clean = df.copy()
    
    # 1. 姓名标准化
    df_clean['name'] = df_clean['name'].str.strip()  # 去除首尾空格
    df_clean['name'] = df_clean['name'].str.title()  # 首字母大写
    df_clean['name'] = df_clean['name'].str.replace(r'\s+', ' ', regex=True)  # 多个空格变一个
    
    # 2. 邮箱标准化
    df_clean['email'] = df_clean['email'].str.lower()  # 转小写
    df_clean['email'] = df_clean['email'].str.strip()  # 去除空格
    
    # 3. 部门名称标准化
    department_mapping = {
        'sales': 'Sales',
        'Sales': 'Sales',
        'SALES': 'Sales',
        'engineering': 'Engineering',
        'Engineering': 'Engineering',
        'ENGINEERING': 'Engineering',
        'marketing': 'Marketing',
        'Marketing': 'Marketing',
        'MARKETING': 'Marketing'
    }
    
    df_clean['department'] = df_clean['department'].str.strip().str.title()
    
    # 4. 状态标准化
    df_clean['status'] = df_clean['status'].str.strip().str.upper()
    
    # 5. 移除特殊字符(保留字母、数字、空格)
    df_clean['name'] = df_clean['name'].str.replace(r'[^a-zA-Z0-9\s]', '', regex=True)
    
    return df_clean

df_text_cleaned = clean_text_data(df)
print("文本数据清洗结果:")
print(df_text_cleaned[['name', 'email', 'department', 'status']].head())

6.2 数值数据标准化

def clean_numeric_data(df):
    """
    清理和标准化数值数据
    """
    df_clean = df.copy()
    
    # 1. 清理salary列(去除$和,,转换为数值)
    if 'salary' in df_clean.columns:
        df_clean['salary_clean'] = (
            df_clean['salary']
            .str.replace('$', '', regex=False)
            .str.replace(',', '', regex=False)
            .str.strip()
            .astype(float)
        )
    
    # 2. 清理phone列(统一格式)
    if 'phone' in df_clean.columns:
        def standardize_phone(phone):
            if pd.isna(phone) or phone == '':
                return np.nan
            # 移除所有非数字字符
            digits = re.sub(r'\D', '', str(phone))
            # 重新格式化为XXX-XXX-XXXX
            if len(digits) == 10:
                return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
            else:
                return np.nan
        
        df_clean['phone_clean'] = df_clean['phone'].apply(standardize_phone)
    
    # 3. 年龄范围验证
    if 'age' in df_clean.columns:
        # 移除不合理的年龄值
        df_clean.loc[(df_clean['age'] < 0) | (df_clean['age'] > 120), 'age'] = np.nan
    
    return df_clean

df_numeric_cleaned = clean_numeric_data(df)
print("\n数值数据清洗结果:")
print(df_numeric_cleaned[['salary', 'salary_clean', 'phone', 'phone_clean']].head())

6.3 日期时间标准化

def clean_datetime_data(df):
    """
    清理和标准化日期时间数据
    """
    df_clean = df.copy()
    
    if 'join_date' in df_clean.columns:
        # 1. 转换为datetime对象
        df_clean['join_date'] = pd.to_datetime(df_clean['join_date'], errors='coerce')
        
        # 2. 提取有用的时间特征
        df_clean['join_year'] = df_clean['join_date'].dt.year
        df_clean['join_month'] = df_clean['join_date'].dt.month
        df_clean['join_day'] = df_clean['join_date'].dt.day
        df_clean['join_weekday'] = df_clean['join_date'].dt.weekday  # 0=Monday, 6=Sunday
        
        # 3. 计算相对于当前日期的天数
        current_date = pd.Timestamp.now()
        df_clean['days_since_join'] = (current_date - df_clean['join_date']).dt.days
        
        # 4. 格式化为标准字符串
        df_clean['join_date_str'] = df_clean['join_date'].dt.strftime('%Y-%m-%d')
    
    return df_clean

df_datetime_cleaned = clean_datetime_data(df)
print("\n日期时间清洗结果:")
print(df_datetime_cleaned[['join_date', 'join_year', 'join_month', 'days_since_join']].head())

7. 高级数据清洗技术

7.1 使用正则表达式进行复杂清洗

def advanced_regex_cleaning(df):
    """
    使用正则表达式进行高级数据清洗
    """
    df_clean = df.copy()
    
    # 1. 从复杂文本中提取信息
    # 假设email列包含一些不规范的格式
    email_pattern = r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'
    df_clean['email_extracted'] = df_clean['email'].str.extract(email_pattern)
    
    # 2. 清理电话号码(多种格式)
    phone_pattern = r'(\d{3})[-.\s]?(\d{3})[-.\s]?(\d{4})'
    df_clean['phone_formatted'] = df_clean['phone'].str.extract(phone_pattern).fillna('').agg('-'.join, axis=1)
    
    # 3. 检测并标记可疑数据
    # 检测邮箱是否包含可疑字符
    df_clean['email_suspicious'] = df_clean['email'].str.contains(r'[!#$%^&*(),?/{}|<>]', na=False)
    
    # 4. 清理姓名中的特殊字符但保留空格
    df_clean['name_clean'] = df_clean['name'].str.replace(r'[^\w\s]', '', regex=True)
    
    return df_clean

df_regex_cleaned = advanced_regex_cleaning(df)
print("\n正则表达式高级清洗结果:")
print(df_regex_cleaned[['email', 'email_extracted', 'phone', 'phone_formatted']].head())

7.2 数据类型优化

def optimize_data_types(df):
    """
    优化数据类型以减少内存使用
    """
    df_optimized = df.copy()
    
    # 1. 优化整数列
    for col in df_optimized.select_dtypes(include=['int']).columns:
        df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='integer')
    
    # 2. 优化浮点数列
    for col in df_optimized.select_dtypes(include=['float']).columns:
        df_optimized[col] =1
        df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='float')
    
    # 3. 优化对象列(转换为category如果唯一值较少)
    for col in df_optimized.select_dtypes(include=['object']).columns:
        num_unique = df_optimized[col].nunique()
        num_total = len(df_optimized)
        if num_unique / num_total < 0.5:  # 如果唯一值少于50%
            df_optimized[col] = df_optimized[col].astype('category')
    
    # 4. 优化布尔列
    for col in df_optimized.columns:
        if df_optimized[col].dtype == 'object':
            unique_vals = df_optimized[col].dropna().unique()
            if set(unique_vals) == {'True', 'False', 'true', 'false', '1', '0'}:
                df_optimized[col] = df_optimized[col].map({
                    'True': True, 'False': False, 'true': True, 'false': False, '1': True, '0': False
                })
    
    print("内存使用优化:")
    print(f"原始内存: {df.memory_usage(deep=True).sum() / 1024:.2f} KB")
    print(f"优化后内存: {df_optimized.memory_usage(deep=True).sum() / 1024:.2f} KB")
    
    return df_optimized

# 注意:由于我们的示例数据较小,优化效果不明显,但在大数据集上效果显著

7.3 数据验证和质量检查

def validate_data_quality(df):
    """
    数据验证和质量检查
    """
    validation_results = {}
    
    # 1. 验证email格式
    email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    valid_emails = df['email'].dropna().str.match(email_pattern).sum()
    validation_results['email_valid'] = {
        'valid_count': valid_emails,
        'invalid_count': len(df['email'].dropna()) - valid_emails,
        'valid_rate': valid_emails / len(df['email'].dropna()) if len(df['email'].dropna()) > 0 else 0
    }
    
    # 2. 验证年龄范围
    valid_ages = df['age'].between(0, 120, inclusive='both').sum()
    validation_results['age_valid'] = {
        'valid_count': valid_ages,
        'invalid_count': len(df['age'].dropna()) - valid_ages,
        'valid_rate': valid_ages / len(df['age'].dropna()) if len(df['age'].dropna()) > 0 else 0
    }
    
    # 3. 验证salary为正数
    if 'salary_clean' in df.columns:
        valid_salaries = (df['salary_clean'] > 0).sum()
        validation_results['salary_valid'] = {
            'valid_count': valid_salaries,
            'invalid_count': len(df['salary_clean'].dropna()) - valid_salaries,
            'valid_rate': valid_salaries / len(df['salary_clean'].dropna()) if len(df['salary_clean'].dropna()) > 0 else 0
        }
    
    # 4. 验证部门值
    valid_departments = df['department'].isin(['Sales', 'Engineering', 'Marketing']).sum()
    validation_results['department_valid'] = {
        'valid_count': valid_departments,
        'invalid_count': len(df['department'].dropna()) - valid_departments,
        'valid_rate': valid_departments / len(df['department'].dropna()) if len(df['department'].dropna()) > 0 else 0
    }
    
    # 打印验证结果
    print("\n数据质量验证结果:")
    print("=" * 50)
    for check, result in validation_results.items():
        print(f"\n{check}:")
        print(f"  有效数量: {result['valid_count']}")
        print(f"  无效数量: {result['invalid_count']}")
        print(f"  有效率: {result['valid_rate']:.2%}")
    
    return validation_results

validation_results = validate_data_quality(df_numeric_cleaned)

8. 构建完整的数据清洗管道

8.1 创建可重用的清洗函数

class DataCleaner:
    """
    数据清洗器类,封装所有清洗步骤
    """
    
    def __init__(self, df):
        self.original_df = df.copy()
        self.cleaned_df = df.copy()
        self.cleaning_log = []
        self.validation_results = {}
    
    def log_step(self, step_name, description):
        """记录清洗步骤"""
        self.cleaning_log.append({
            'step': step_name,
            'description': description,
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        })
    
    def remove_duplicates(self, subset=None, keep='first'):
        """删除重复数据"""
        if subset is None:
            subset = self.cleaned_df.columns.tolist()
        
        before = len(self.cleaned_df)
        self.cleaned_df = self.cleaned_df.drop_duplicates(subset=subset, keep=keep)
        after = len(self.cleaned_df)
        
        self.log_step('remove_duplicates', 
                     f"Removed {before - after} duplicate rows based on {subset}")
        return self
    
    def handle_missing_values(self, strategy_dict):
        """
        处理缺失值
        strategy_dict: {column: strategy} 
        strategy: 'drop', 'mean', 'median', 'mode', 'ffill', 'bfill', or specific value
        """
        for column, strategy in strategy_dict.items():
            if column not in self.cleaned_df.columns:
                continue
            
            missing_count = self.cleaned_df[column].isnull().sum()
            if missing_count == 0:
                continue
            
            if strategy == 'drop':
                self.cleaned_df = self.cleaned_df.dropna(subset=[column])
            elif strategy == 'mean' and self.cleaned_df[column].dtype in ['int64', 'float64']:
                self.cleaned_df[column].fillna(self.cleaned_df[column].mean(), inplace=True)
            elif strategy == 'median' and self.cleaned_df[column].dtype in ['int64', 'float64']:
                self.cleaned_df[column].fillna(self.cleaned_df[column].median(), inplace=True)
            elif strategy == 'mode':
                self.cleaned_df[column].fillna(self.cleaned_df[column].mode()[0], inplace=True)
            elif strategy == 'ffill':
                self.cleaned_df[column].fillna(method='ffill', inplace=True)
            elif strategy == 'bfill':
                self.cleaned_df[column].fillna(method='bfill', inplace=True)
            else:
                self.cleaned_df[column].fillna(strategy, inplace=True)
            
            self.log_step('handle_missing', 
                         f"Filled {missing_count} missing values in {column} using {strategy}")
        
        return self
    
    def clean_text_columns(self, columns):
        """清理文本列"""
        for col in columns:
            if col in self.cleaned_df.columns:
                self.cleaned_df[col] = self.cleaned_df[col].astype(str).str.strip()
                self.cleaned_df[col] = self.cleaned_df[col].str.replace(r'\s+', ' ', regex=True)
                self.log_step('clean_text', f"Cleaned text in {col}")
        return self
    
    def standardize_format(self, column, format_type):
        """标准化特定格式"""
        if column not in self.cleaned_df.columns:
            return self
        
        if format_type == 'email':
            self.cleaned_df[column] = self.cleaned_df[column].str.lower().str.strip()
        elif format_type == 'phone':
            def format_phone(p):
                if pd.isna(p) or p == '':
                    return np.nan
                digits = re.sub(r'\D', str(p))
                if len(digits) == 10:
                    return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
                return np.nan
            self.cleaned_df[column] = self.cleaned_df[column].apply(format_phone)
        elif format_type == 'currency':
            self.cleaned_df[column] = (self.cleaned_df[column]
                                     .str.replace('$', '', regex=False)
                                     .str.replace(',', '', regex=False)
                                     .astype(float))
        
        self.log_step('standardize', f"Standardized {column} to {format_type} format")
        return self
    
    def remove_outliers(self, column, method='cap'):
        """移除或处理异常值"""
        if column not in self.cleaned_df.columns:
            return self
        
        if self.cleaned_df[column].dtype not in ['int64', 'float64']:
            return self
        
        before = len(self.cleaned_df)
        
        if method == 'cap':
            Q1 = self.cleaned_df[column].quantile(0.25)
            Q3 = self.cleaned_df[column].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            self.cleaned_df[column] = self.cleaned_df[column].clip(lower=lower_bound, upper=upper_bound)
        elif method == 'remove':
            Q1 = self.cleaned_df[column].quantile(0.25)
            Q3 = self.cleaned_df[column].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            self.cleaned_df = self.cleaned_df[(self.cleaned_df[column] >= lower_bound) & 
                                            (self.cleaned_df[column] <= upper_bound)]
        
        after = len(self.cleaned_df)
        self.log_step('remove_outliers', 
                     f"Processed outliers in {column} using {method} method ({before - after} rows removed)")
        return self
    
    def validate_data(self, validation_rules):
        """验证数据质量"""
        for column, rule in validation_rules.items():
            if column not in self.cleaned_df.columns:
                continue
            
            if rule == 'positive':
                valid = (self.cleaned_df[column] > 0).sum()
            elif rule == 'email':
                pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
                valid = self.cleaned_df[column].dropna().str.match(pattern).sum()
            elif rule == 'age_range':
                valid = self.cleaned_df[column].between(0, 120).sum()
            else:
                continue
            
            total = len(self.cleaned_df[column].dropna())
            self.validation_results[column] = {
                'rule': rule,
                'valid': valid,
                'total': total,
                'pass_rate': valid / total if total > 0 else 0
            }
        
        return self
    
    def get_cleaned_data(self):
        """获取清洗后的数据"""
        return self.cleaned_df
    
    def get_cleaning_report(self):
        """生成清洗报告"""
        report = {
            'original_shape': self.original_df.shape,
            'cleaned_shape': self.cleaned_df.shape,
            'rows_removed': self.original_df.shape[0] - self.cleaned_df.shape[0],
            'columns_removed': self.original_df.shape[1] - self.cleaned_df.shape[1],
            'cleaning_steps': len(self.cleaning_log),
            'validation_results': self.validation_results,
            'cleaning_log': self.cleaning_log
        }
        return report
    
    def print_report(self):
        """打印清洗报告"""
        report = self.get_cleaning_report()
        print("\n" + "="*60)
        print("数据清洗报告")
        print("="*60)
        print(f"原始数据形状: {report['original_shape']}")
        print(f"清洗后形状: {report['cleaned_shape']}")
        print(f"删除的行数: {report['rows_removed']}")
        print(f"删除的列数: {report['columns_removed']}")
        print(f"清洗步骤数: {report['cleaning_steps']}")
        
        print("\n清洗日志:")
        for i, step in enumerate(report['cleaning_log'], 1):
            print(f"  {i}. [{step['timestamp']}] {step['step']}: {step['description']}")
        
        if report['validation_results']:
            print("\n验证结果:")
            for col, result in report['validation_results'].items():
                print(f"  {col} ({result['rule']}): {result['valid']}/{result['total']} ({result['pass_rate']:.2%})")

8.2 使用清洗管道

# 创建清洗器实例
cleaner = DataCleaner(df)

# 执行完整的清洗流程
cleaner.remove_duplicates(subset=['customer_id'])  # 去重
cleaner.handle_missing_values({
    'email': 'unknown@email.com',  # 固定值
    'age': 'median',              # 中位数
    'phone': '000-000-0000'       # 固定值
})  # 处理缺失值
cleaner.clean_text_columns(['name', 'department', 'status'])  # 清理文本
cleaner.standardize_format('email', 'email')  # 标准化邮箱
cleaner.standardize_format('phone', 'phone')  # 标准化电话
cleaner.standardize_format('salary', 'currency')  # 标准化薪资
cleaner.remove_outliers('age', 'cap')  # 处理异常值

# 验证数据
validation_rules = {
    'age': 'age_range',
    'salary_clean': 'positive',
    'email': 'email'
}
cleaner.validate_data(validation_rules)

# 获取结果
cleaned_df = cleaner.get_cleaned_data()
cleaner.print_report()

print("\n清洗后的数据预览:")
print(cleaned_df.head())

9. 性能优化和最佳实践

9.1 处理大数据集的技巧

def process_large_dataset(file_path, chunk_size=10000):
    """
    处理大型数据集的分块处理方法
    """
    cleaned_chunks = []
    
    # 使用迭代器分块读取
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        # 对每个块应用清洗逻辑
        chunk_cleaned = (
            chunk
            .drop_duplicates()
            .dropna(subset=['essential_column'])
            .fillna({'column1': 'default', 'column2': 0})
        )
        cleaned_chunks.append(chunk_cleaned)
    
    # 合并所有清洗后的块
    final_df = pd.concat(cleaned_chunks, ignore_index=True)
    return final_df

# 内存优化技巧
def optimize_memory_usage(df):
    """
    优化内存使用的多种方法
    """
    # 1. 只读取需要的列
    # df = pd.read_csv('file.csv', usecols=['col1', 'col2'])
    
    # 2. 指定数据类型
    # dtype = {'col1': 'int32', 'col2': 'category'}
    # df = pd.read_csv('file.csv', dtype=dtype)
    
    # 3. 处理日期时指定格式
    # parse_dates = ['date_col']
    # df = pd.read_csv('file.csv', parse_dates=parse_dates)
    
    # 4. 使用category类型
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() / len(df) < 0.5:
            df[col] = df[col].astype('category')
    
    # 5. 降级数值类型
    for col in df.select_dtypes(include=['int']).columns:
        df[col] = pd.to_numeric(df[col], downcast='integer')
    
    for col in df.select_dtypes(include=['float']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    
    return df

9.2 并行处理

from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def parallel_cleaning(df, cleaning_func, n_workers=None):
    """
    使用多进程并行处理大数据清洗
    """
    if n_workers is None:
        n_workers = multiprocessing.cpu_count()
    
    # 将数据分割为多个部分
    df_split = np.array_split(df, n_workers)
    
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        results = list(executor.map(cleaning_func, df_split))
    
    # 合并结果
    return pd.concat(results, ignore_index=True)

# 示例清洗函数
def clean_chunk(chunk):
    """清洗单个数据块"""
    return (
        chunk
        .drop_duplicates()
        .fillna(method='ffill')
        .reset_index(drop=True)
    )

# 使用并行处理
# cleaned_df = parallel_cleaning(large_df, clean_chunk)

10. 总结和最佳实践

10.1 数据清洗检查清单

def data_cleaning_checklist():
    """
    数据清洗检查清单
    """
    checklist = {
        "数据探索": [
            "✓ 检查数据形状和类型",
            "✓ 识别缺失值分布",
            "✓ 检查重复数据",
            "✓ 分析数值分布",
            "✓ 可视化数据问题"
        ],
        "数据清洗": [
            "✓ 删除完全重复的行",
            "✓ 处理缺失值(删除/填充)",
            "✓ 识别和处理异常值",
            "✓ 标准化文本格式",
            "✓ 清理特殊字符",
            "✓ 验证数据范围"
        ],
        "数据转换": [
            "✓ 转换日期格式",
            "✓ 标准化数值格式",
            "✓ 优化数据类型",
            "✓ 创建衍生特征"
        ],
        "验证和文档": [
            "✓ 验证数据质量",
            "✓ 记录清洗步骤",
            "✓ 保存清洗后的数据",
            "✓ 创建清洗报告"
        ]
    }
    
    print("\n数据清洗检查清单:")
    print("=" * 50)
    for category, items in checklist.items():
        print(f"\n{category}:")
        for item in items:
            print(f"  {item}")

data_cleaning_checklist()

10.2 常见陷阱和解决方案

"""
数据清洗常见陷阱及解决方案:

1. 过度清洗
   - 陷阱:删除过多数据,导致样本偏差
   - 解决方案:记录删除原因,确保删除比例合理

2. 忽略数据上下文
   - 陷阱:不了解业务含义,错误处理数据
   - 解决方案:与领域专家沟通,理解数据含义

3. 硬编码阈值
   - 陷阱:使用固定的异常值阈值
   - 解决方案:根据数据分布动态调整

4. 不保存中间步骤
   - 陷阱:无法回溯和调试
   - 解决方案:记录所有清洗步骤和参数

5. 忽略数据类型
   - 陷阱:内存浪费和性能问题
   - 解决方案:优化数据类型,使用category等

6. 不验证结果
   - 陷阱:清洗引入新错误
   - 解决方案:清洗后进行验证和测试
"""

结论

数据清洗是数据分析和机器学习项目成功的关键步骤。通过本文介绍的技术和方法,您可以:

  1. 系统化地处理数据质量问题:从重复数据、缺失值到异常值
  2. 提高数据质量:通过标准化和验证确保数据可靠性
  3. 优化性能:使用高效的方法处理大数据集
  4. 建立可重用的流程:创建标准化的清洗管道

记住,好的数据清洗不仅仅是技术问题,更需要理解数据的业务含义。始终记录您的清洗步骤,保持数据的可追溯性,并在清洗后进行充分的验证。

通过实践这些方法,您将能够处理各种复杂的数据清洗任务,为后续的数据分析和建模打下坚实的基础。