引言:数据清洗的重要性与挑战

在数据科学和机器学习项目中,数据清洗是整个流程中最关键且最耗时的环节之一。根据业界统计,数据科学家通常花费60-80%的时间在数据准备和清洗上。原始数据往往包含各种问题:缺失值、异常值、重复记录、格式不一致、数据类型错误等。这些问题如果不得到妥善处理,会直接影响后续分析的准确性和模型的性能。

Python作为数据科学领域的主流语言,提供了强大的工具生态系统来应对这些挑战。本文将详细介绍如何使用Python进行高效的数据清洗,涵盖从基础操作到高级技巧的完整流程,并通过实际代码示例展示每个步骤的具体实现。

1. 数据加载与初步探索

1.1 使用Pandas加载数据

数据清洗的第一步是将数据加载到Python环境中。Pandas是Python数据分析的核心库,提供了灵活的DataFrame结构来处理表格数据。

import pandas as pd
import numpy as np

# 从CSV文件加载数据
df = pd.read_csv('data.csv')

# 从Excel文件加载数据
df = pd.read_excel('data.xlsx')

# 从JSON文件加载数据
df = pd.read_json('data.json')

# 查看数据的基本信息
print(df.info())
print(df.head())
print(df.describe())

1.2 初步数据探索

在开始清洗之前,我们需要了解数据的基本情况:

# 查看数据维度
print(f"数据集包含 {df.shape[0]} 行和 {df.shape[1]} 列")

# 查看列名和数据类型
print(df.dtypes)

# 查看数值型列的统计摘要
print(df.describe())

# 查看分类变量的分布
print(df['category_column'].value_counts())

# 检查缺失值情况
missing_values = df.isnull().sum()
print("缺失值统计:")
print(missing_values[missing_values > 0])

2. 处理缺失值

缺失值是数据中最常见的问题之一。处理缺失值的方法需要根据数据的特性和业务场景来决定。

2.1 识别缺失值

# 检查每列的缺失值数量
missing_count = df.isnull().sum()

# 计算缺失值比例
missing_percentage = (df.isnull().sum() / len(df)) * 100

# 可视化缺失值分布
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
sns.heatmap(df.isnull(), cbar=False, cmap='viridis')
plt.title('缺失值分布热力图')
plt.show()

2.2 删除缺失值

当缺失值比例很小或者数据量很大时,可以直接删除包含缺失值的行或列:

# 删除包含任何缺失值的行
df_cleaned = df.dropna()

# 删除包含任何缺失值的列
df_cleaned = df.dropna(axis=1)

# 删除缺失值超过阈值的列(例如50%)
threshold = 0.5
df_cleaned = df.loc[:, df.isnull().mean() < threshold]

# 删除缺失值超过阈值的行
threshold = 0.3
df_cleaned = df.loc[df.isnull().mean(axis=1) < threshold]

2.3 填充缺失值

填充缺失值是更常用的方法,需要根据数据类型选择合适的策略:

# 用0填充数值型列
df['numeric_column'] = df['numeric_column'].fillna(0)

# 用均值填充数值型列
df['numeric_column'] = df['numeric_column'].fillna(df['numeric_column'].mean())

# 用中位数填充数值型列(对异常值更鲁棒)
df['numeric_column'] = df['numeric_column'].fillna(df['numeric_column'].median())

# 用众数填充分类变量
df['category_column'] = df['category_column'].fillna(df['category_column'].mode()[0])

# 前向填充(适用于时间序列数据)
df['time_series_column'] = df['time_series_column'].fillna(method='ffill')

# 后向填充
df['time_series_column'] = df['time_series_column'].fillna(method='bfill')

# 使用插值法填充
df['numeric_column'] = df['numeric_column'].interpolate(method='linear')

# 使用分组均值填充(基于其他列的分组)
df['value'] = df.groupby('category')['value'].transform(lambda x: x.fillna(x.mean()))

2.4 高级缺失值处理

对于更复杂的场景,可以使用机器学习方法预测缺失值:

from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

def impute_with_ml(df, target_column):
    """
    使用随机森林预测缺失值
    """
    # 分离有缺失值和无缺失值的数据
    known = df[df[target_column].notnull()]
    unknown = df[df[target_column].isnull()]
    
    if len(unknown) == 0:
        return df
    
    # 准备特征和目标变量
    features = [col for col in df.columns if col != target_column]
    X = known[features]
    y = known[target_column]
    
    # 训练模型
    model = RandomForestRegressor(n_estimators=100, random_state=42)
    model.fit(X, y)
    
    # 预测缺失值
    predicted = model.predict(unknown[features])
    df.loc[df[target_column].isnull(), target_column] = predicted
    
    return df

# 使用示例
# df = impute_with_ml(df, 'target_column')

3. 处理异常值

异常值可能是数据录入错误、测量误差或真实的极端值。识别和处理异常值需要谨慎。

3.1 异常值检测方法

# 3σ原则(适用于正态分布)
def detect_outliers_zscore(df, column, threshold=3):
    z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
    return df[z_scores > threshold]

# IQR方法(适用于偏态分布)
def detect_outliers_iqr(df, column):
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return df[(df[column] < lower_bound) | (df[column] > upper_bound)]

# 可视化检测
def plot_outliers(df, column):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
    
    # 箱线图
    ax1.boxplot(df[column])
    ax1.set_title(f'Boxplot of {column}')
    
    # 直方图
    ax2.hist(df[column], bins=30)
    ax2.axvline(df[column].mean(), color='red', linestyle='--', label='Mean')
    ax2.axvline(df[column].median(), color='green', linestyle='--', label='Median')
    ax2.set_title(f'Histogram of {column}')
    ax2.legend()
    
    plt.show()

# 使用示例
# outliers = detect_outliers_iqr(df, 'price')
# plot_outliers(df, 'price')

3.2 异常值处理策略

# 1. 删除异常值
def remove_outliers(df, column, method='iqr'):
    if method == 'zscore':
        z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
        return df[z_scores <= 3]
    elif method == 'iqr':
        Q1 = df[column].quantile(0.25)
        Q3 = df[column].column.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]

# 2. 截断异常值(Winsorization)
def cap_outliers(df, column, method='iqr'):
    if method == 'iqr':
        Q1 = df[column].quantile(0.05)  # 使用5%和95%分位数
        Q3 = df[column].quantile(0.95)
        df[column] = np.where(df[column] < Q1, Q1, df[column])
        df[column] = np.where(df[column] > Q3, Q3, df[column])
    return df

# 3. 转换处理(对数转换)
def transform_outliers(df, column):
    # 对数转换可以减少极端值的影响
    df[column] = np.log1p(df[column])  # log(1+x) 避免log(0)
    return df

# 4. 分箱处理
def bin_outliers(df, column, bins=10):
    df[column] = pd.cut(df[column], bins=bins, labels=False)
    return df

# 5. 标记异常值
def flag_outliers(df, column, method='iqr'):
    if method == 'iqr':
        Q1 = df[column].quantile(0.25)
        Q3 = df[column].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        df[f'{column}_outlier'] = ((df[column] < lower_bound) | (df[column] > upper_bound)).astype(int)
    return df

4. 数据格式标准化

4.1 文本数据清洗

# 基础文本清洗
def clean_text(text):
    """
    清洗文本数据的通用函数
    """
    if pd.isna(text):
        return ""
    
    # 转换为小写
    text = text.lower()
    
    # 移除特殊字符(保留字母、数字和空格)
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    
    # 移除多余空格
    text = ' '.join(text.split())
    
    # 去除首尾空格
    text = text.strip()
    
    return text

# 应用到DataFrame
df['cleaned_text'] = df['text_column'].apply(clean_text)

# 处理电话号码
def clean_phone_number(phone):
    """
    标准化电话号码格式
    """
    if pd.isna(phone):
        return ""
    
    # 移除所有非数字字符
    digits = re.sub(r'\D', '', str(phone))
    
    # 根据长度判断格式
    if len(digits) == 10:
        return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
    elif len(digits) == 11:
        return f"+{digits[0]} ({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
    else:
        return digits

df['cleaned_phone'] = df['phone_column'].apply(clean_phone_number)

# 处理地址
def clean_address(address):
    """
    标准化地址格式
    """
    if pd.isna(address):
        return ""
    
    # 定义缩写映射
    abbreviations = {
        'street': 'st',
        'avenue': 'ave',
        'boulevard': 'blvd',
        'road': 'rd',
        'drive': 'dr',
        'lane': 'ln',
        'court': 'ct'
    }
    
    address = address.lower()
    
    # 替换缩写
    for full, abbr in abbreviations.items():
        address = re.sub(r'\b' + full + r'\b', abbr, address)
    
    # 标准化方向词
    direction_map = {'north': 'N', 'south': 'S', 'east': 'E', 'west': 'W'}
    for word, direction in direction_map.items():
        address = re.sub(r'\b' + word + r'\b', direction, address)
    
    return address.title()

df['cleaned_address'] = df['address_column'].apply(clean_address)

4.2 日期时间标准化

# 统一日期格式
def standardize_date(date_str):
    """
    将各种日期格式统一为 YYYY-MM-DD
    """
    if pd.isna(date_str):
        return None
    
    # 尝试多种格式
    formats = [
        '%Y-%m-%d',
        '%m/%d/%Y',
        '%d/%m/%Y',
        '%Y/%m/%d',
        '%m-%d-%Y',
        '%d-%m-%Y',
        '%b %d, %Y',
        '%B %d, %Y'
    ]
    
    for fmt in formats:
        try:
            date_obj = datetime.strptime(str(date_str), fmt)
            return date_obj.strftime('%Y-%m-%d')
        except ValueError:
            continue
    
    return None

# 使用pandas to_datetime
df['date_column'] = pd.to_datetime(df['date_column'], errors='coerce')

# 提取日期特征
df['year'] = df['date_column'].dt.year
df['month'] = df['date_column'].dt.month
df['day'] = df['date_column'].dt.day
df['day_of_week'] = df['date_column'].dt.dayofweek
df['is_weekend'] = df['date_column'].dt.weekday >= 5

# 处理时间戳
df['timestamp'] = pd.to_datetime(df['timestamp_column'], unit='s')

4.3 数值格式标准化

# 处理货币格式
def clean_currency(value):
    """
    清理货币字符串并转换为浮点数
    """
    if pd.isna(value):
        return np.nan
    
    # 移除货币符号和逗号
    cleaned = re.sub(r'[^\d.-]', '', str(value))
    
    try:
        return float(cleaned)
    except ValueError:
        return np.nan

df['price'] = df['price_column'].apply(clean_currency)

# 处理百分比
def clean_percentage(value):
    """
    将百分比字符串转换为小数
    """
    if pd.isna(value):
        return np.nan
    
    # 移除百分号
    cleaned = str(value).replace('%', '')
    
    try:
        return float(cleaned) / 100
    except ValueError:
        return np.nan

df['rate'] = df['rate_column'].apply(clean_percentage)

# 标准化数值范围
def normalize_column(df, column, method='minmax'):
    """
    标准化数值列到指定范围
    """
    if method == 'minmax':
        min_val = df[column].min()
        max_val = df[column].max()
        df[f'{column}_normalized'] = (df[column] - min_val) / (max_val - min_val)
    elif method == 'zscore':
        df[f'{column}_normalized'] = (df[column] - df[column].mean()) / df[column].std()
    return df

df = normalize_column(df, 'price', method='minmax')

5. 处理重复数据

5.1 识别重复值

# 查找完全重复的行
duplicates = df.duplicated()
print(f"发现 {duplicates.sum()} 个重复行")

# 查找特定列的重复值
duplicates_subset = df.duplicated(subset=['id', 'date'])
print(f"基于id和date的重复行: {duplicates_subset.sum()}")

# 查看重复行的详细信息
duplicate_rows = df[df.duplicated(keep=False)]
print(duplicate_rows.sort_values(by=['id', 'date']))

# 统计每组重复值的数量
duplicate_counts = df.groupby(['id', 'date']).size().reset_index(name='count')
duplicate_counts = duplicate_counts[duplicate_counts['count'] > 1]
print(duplicate_counts)

5.2 处理重复值

# 删除完全重复的行
df_unique = df.drop_duplicates()

# 删除特定列的重复行(保留第一条)
df_unique = df.drop_duplicates(subset=['id', 'date'], keep='first')

# 删除特定列的重复行(保留最后一条)
df_unique = df.drop_duplicates(subset=['id', 'date'], keep='last')

# 删除重复行但保留最新记录(基于时间戳)
df_sorted = df.sort_values('timestamp', ascending=False)
df_unique = df_sorted.drop_duplicates(subset=['id'], keep='first')

# 处理重复值的高级策略
def handle_duplicates_advanced(df, id_cols, sort_cols=None, keep='first'):
    """
    高级重复值处理:可以指定排序和保留策略
    """
    if sort_cols:
        df = df.sort_values(sort_cols, ascending=False)
    
    return df.drop_duplicates(subset=id_cols, keep=keep)

# 使用示例
df = handle_duplicates_advanced(
    df, 
    id_cols=['customer_id', 'product_id'], 
    sort_cols=['transaction_date'],
    keep='first'
)

# 标记重复值而不删除
df['is_duplicate'] = df.duplicated(subset=['id'], keep=False)
df['duplicate_rank'] = df.groupby(['id']).cumcount() + 1

6. 数据类型转换与优化

6.1 数据类型检查与转换

# 检查当前数据类型
print(df.dtypes)

# 优化内存使用的数据类型转换
def optimize_memory(df):
    """
    优化DataFrame的内存使用
    """
    start_mem = df.memory_usage().sum() / 1024**2
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            # 对于对象类型,检查是否可以转换为category
            num_unique_values = len(df[col].unique())
            num_total_values = len(df[col])
            if num_unique_values / num_total_values < 0.5:
                df[col] = df[col].astype('category')
    
    end_mem = df.memory_usage().sum() / 1024**2
    print(f"内存使用从 {start_mem:.2f} MB 优化到 {end_mem:.2f} MB")
    return df

df = optimize_memory(df)

# 手动转换数据类型
df['id'] = df['id'].astype(int)
df['price'] = df['price'].astype(float)
df['category'] = df['category'].astype('category')
df['date'] = pd.to_datetime(df['date'])

6.2 处理编码问题

# 检测文件编码
import chardet

def detect_encoding(file_path):
    """
    检测文件编码
    """
    with open(file_path, 'rb') as f:
        result = chardet.detect(f.read(10000))
        return result['encoding']

# 处理编码问题
def read_with_encoding(file_path, encoding='utf-8'):
    """
    尝试多种编码读取文件
    """
    encodings = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252']
    
    for enc in encodings + [encoding]:
        try:
            return pd.read_csv(file_path, encoding=enc)
        except UnicodeDecodeError:
            continue
    
    raise ValueError(f"无法读取文件,尝试了多种编码: {encodings}")

# 处理DataFrame中的编码问题
def clean_encoding(series):
    """
    清理文本列的编码问题
    """
    return series.astype(str).str.encode('utf-8', errors='ignore').str.decode('utf-8', errors='ignore')

df['text_column'] = clean_encoding(df['text_column'])

7. 高级数据清洗技术

7.1 使用管道(Pipeline)进行自动化清洗

from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin

class DataCleaner(BaseEstimator, TransformerMixin):
    """
    自定义数据清洗转换器
    """
    def __init__(self, fill_method='median', outlier_method='iqr'):
        self.fill_method = fill_method
        self.outlier_method = outlier_method
        self.fill_values = {}
        self.outlier_bounds = {}
    
    def fit(self, X, y=None):
        # 计算填充值
        for col in X.select_dtypes(include=[np.number]).columns:
            if self.fill_method == 'median':
                self.fill_values[col] = X[col].median()
            elif self.fill_method == 'mean':
                self.fill_values[col] = X[col].mean()
            else:
                self.fill_values[col] = 0
        
        # 计算异常值边界
        for col in X.select_dtypes(include=[np.number]).columns:
            Q1 = X[col].quantile(0.25)
            Q3 = X[col].quantile(0.75)
            IQR = Q3 - Q1
            self.outlier_bounds[col] = (Q1 - 1.5 * IQR, Q3 + 1.5 * IQR)
        
        return self
    
    def transform(self, X):
        X = X.copy()
        
        # 填充缺失值
        for col, value in self.fill_values.items():
            X[col] = X[col].fillna(value)
        
        # 处理异常值(截断)
        for col, (lower, upper) in self.outlier_bounds.items():
            X[col] = np.clip(X[col], lower, upper)
        
        return X

# 使用管道
cleaning_pipeline = Pipeline([
    ('cleaner', DataCleaner(fill_method='median', outlier_method='iqr'))
])

# 应用清洗
df_cleaned = cleaning_pipeline.fit_transform(df)

7.2 使用Dask处理大数据

import dask.dataframe as dd

def clean_large_dataset(file_path, chunksize=100000):
    """
    清洗大型数据集(内存不足时使用)
    """
    # 使用Dask读取大文件
    ddf = dd.read_csv(file_path, blocksize=chunksize)
    
    # 定义清洗函数
    def clean_chunk(df):
        # 填充缺失值
        df = df.fillna({
            'numeric_column': df['numeric_column'].mean(),
            'category_column': 'Unknown'
        })
        
        # 处理异常值
        Q1 = df['price'].quantile(0.25)
        Q3 = df['price'].quantile(0.75)
        IQR = Q3 - Q1
        df = df[(df['price'] >= Q1 - 1.5 * IQR) & (df['price'] <= Q3 + 1.5 * IQR)]
        
        return df
    
    # 应用清洗
    ddf_cleaned = ddf.map_partitions(clean_chunk)
    
    # 保存结果
    ddf_cleaned.to_csv('cleaned_data_*.csv', index=False)
    
    return ddf_cleaned

# 使用示例
# ddf = clean_large_dataset('large_file.csv')

7.3 数据质量验证

import great_expectations as ge

def validate_data_quality(df):
    """
    使用Great Expectations进行数据质量验证
    """
    # 将Pandas DataFrame转换为Great Expectations DataFrame
    ge_df = ge.from_pandas(df)
    
    # 定义期望
    expectations = [
        # 列存在性检查
        ge_df.expect_column_to_exist('id'),
        ge_df.expect_column_to_exist('price'),
        
        # 值域检查
        ge_df.expect_column_values_to_be_between('price', 0, 10000),
        
        # 唯一性检查
        ge_df.expect_column_values_to_be_unique('id'),
        
        # 缺失值检查
        ge_df.expect_column_values_to_not_be_null('customer_id'),
        ge_df.expect_column_values_to_not_be_null('transaction_date'),
        
        # 数据类型检查
        ge_df.expect_column_dtype_to_be('price', 'float64'),
        ge_df.expect_column_dtype_to_be('id', 'int64'),
        
        # 格式检查
        ge_df.expect_column_values_to_match_regex('email', r'^[\w\.-]+@[\w\.-]+\.\w+$'),
        
        # 范围检查
        ge_df.expect_column_values_to_be_between('age', 0, 120),
        
        # 类别检查
        ge_df.expect_column_values_to_be_in_set('status', ['active', 'inactive', 'pending']),
        
        # 自定义验证
        ge_df.expect_column_pair_values_A_to_be_greater_than_B(
            'start_date', 'end_date', or_equal=True
        )
    ]
    
    # 运行验证
    results = ge_df.validate()
    
    # 输出结果
    print(f"验证通过率: {results['statistics']['success_percent']:.2f}%")
    
    # 查看失败的期望
    failed_expectations = [r for r in results['results'] if not r['success']]
    for exp in failed_expectations:
        print(f"失败: {exp['expectation_config']['expectation_type']}")
        print(f"  错误: {exp.get('result', {}).get('unexpected_list', [])[:5]}...")
    
    return results

# 使用示例
# quality_report = validate_data_quality(df)

8. 实战案例:完整数据清洗流程

8.1 案例背景

假设我们有一个电商交易数据集,包含以下问题:

  • 缺失的客户信息
  • 异常的价格和数量
  • 重复的交易记录
  • 不一致的日期格式
  • 特殊字符和编码问题

8.2 完整清洗代码

import pandas as pd
import numpy as np
import re
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

class EcommerceDataCleaner:
    """
    电商数据清洗器
    """
    def __init__(self):
        self.fill_values = {}
        self.outlier_bounds = {}
        self.duplicate_strategy = 'latest'
    
    def load_data(self, file_path):
        """加载数据"""
        try:
            self.df = pd.read_csv(file_path, encoding='utf-8')
            print(f"成功加载数据: {self.df.shape}")
            return self.df
        except:
            # 尝试其他编码
            self.df = pd.read_csv(file_path, encoding='latin-1')
            print(f"成功加载数据(使用latin-1编码): {self.df.shape}")
            return self.df
    
    def explore_data(self):
        """数据探索"""
        print("\n=== 数据概览 ===")
        print(f"数据维度: {self.df.shape}")
        print(f"\n列名: {list(self.df.columns)}")
        
        print(f"\n=== 数据类型 ===")
        print(self.df.dtypes)
        
        print(f"\n=== 缺失值统计 ===")
        missing = self.df.isnull().sum()
        print(missing[missing > 0])
        
        print(f"\n=== 数值列描述性统计 ===")
        print(self.df.describe())
        
        print(f"\n=== 分类列分布 ===")
        for col in self.df.select_dtypes(include=['object']).columns:
            if self.df[col].nunique() < 20:
                print(f"\n{col}:")
                print(self.df[col].value_counts().head())
    
    def clean_text_columns(self):
        """清洗文本列"""
        print("\n=== 清洗文本列 ===")
        
        # 清洗客户姓名
        if 'customer_name' in self.df.columns:
            self.df['customer_name'] = self.df['customer_name'].str.strip()
            self.df['customer_name'] = self.df['customer_name'].str.title()
            # 移除特殊字符
            self.df['customer_name'] = self.df['customer_name'].str.replace(r'[^a-zA-Z\s]', '', regex=True)
        
        # 清洗产品名称
        if 'product_name' in self.df.columns:
            self.df['product_name'] = self.df['product_name'].str.strip()
            self.df['product_name'] = self.df['product_name'].str.upper()
        
        # 清洗邮箱
        if 'email' in self.df.columns:
            self.df['email'] = self.df['email'].str.strip().str.lower()
        
        print("文本列清洗完成")
    
    def standardize_dates(self):
        """标准化日期列"""
        print("\n=== 标准化日期列 ===")
        
        date_columns = ['order_date', 'payment_date', 'delivery_date']
        for col in date_columns:
            if col in self.df.columns:
                # 转换为datetime
                self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
                
                # 检查转换失败的记录
                failed = self.df[col].isnull().sum()
                if failed > 0:
                    print(f"  {col}: {failed} 条记录转换失败")
        
        print("日期列标准化完成")
    
    def handle_missing_values(self):
        """处理缺失值"""
        print("\n=== 处理缺失值 ===")
        
        # 数值列:用中位数填充
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            if self.df[col].isnull().sum() > 0:
                median_val = self.df[col].median()
                self.df[col] = self.df[col].fillna(median_val)
                self.fill_values[col] = median_val
                print(f"  {col}: 用中位数 {median_val:.2f} 填充")
        
        # 分类列:用'Unknown'填充
        categorical_cols = self.df.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            if self.df[col].isnull().sum() > 0:
                self.df[col] = self.df[col].fillna('Unknown')
                print(f"  {col}: 用 'Unknown' 填充")
        
        print("缺失值处理完成")
    
    def detect_and_handle_outliers(self):
        """检测和处理异常值"""
        print("\n=== 处理异常值 ===")
        
        # 价格列
        if 'price' in self.df.columns:
            Q1 = self.df['price'].quantile(0.25)
            Q3 = self.df['price'].quantile(0.75)
            IQR = Q3 - Q1
            lower = Q1 - 1.5 * IQR
            upper = Q3 + 1.5 * IQR
            
            outliers = self.df[(self.df['price'] < lower) | (self.df['price'] > upper)]
            print(f"  价格异常值: {len(outliers)} 条")
            
            # 截断异常值
            self.df['price'] = np.clip(self.df['price'], lower, upper)
            self.outlier_bounds['price'] = (lower, upper)
        
        # 数量列
        if 'quantity' in self.df.columns:
            # 数量应该为正数
            invalid_qty = self.df[self.df['quantity'] <= 0]
            print(f"  无效数量: {len(invalid_qty)} 条")
            
            # 用中位数替换无效值
            median_qty = self.df[self.df['quantity'] > 0]['quantity'].median()
            self.df.loc[self.df['quantity'] <= 0, 'quantity'] = median_qty
        
        print("异常值处理完成")
    
    def handle_duplicates(self):
        """处理重复值"""
        print("\n=== 处理重复值 ===")
        
        # 查找基于订单ID的重复
        if 'order_id' in self.df.columns:
            duplicates = self.df.duplicated(subset=['order_id'], keep=False)
            dup_count = duplicates.sum()
            print(f"  基于order_id的重复记录: {dup_count} 条")
            
            if dup_count > 0:
                # 按时间排序,保留最新的
                if 'order_date' in self.df.columns:
                    self.df = self.df.sort_values('order_date', ascending=False)
                    self.df = self.df.drop_duplicates(subset=['order_id'], keep='first')
                    print(f"  保留最新记录,剩余: {len(self.df)} 条")
        
        print("重复值处理完成")
    
    def optimize_data_types(self):
        """优化数据类型"""
        print("\n=== 优化数据类型 ===")
        
        # 转换分类列为category类型
        for col in self.df.select_dtypes(include=['object']).columns:
            if self.df[col].nunique() / len(self.df) < 0.5:
                self.df[col] = self.df[col].astype('category')
                print(f"  {col}: 转换为category类型")
        
        # 优化数值类型
        for col in self.df.select_dtypes(include=[np.number]).columns:
            if self.df[col].dtype == 'float64':
                self.df[col] = self.df[col].astype('float32')
            elif self.df[col].dtype == 'int64':
                self.df[col] = self.df[col].astype('int32')
        
        print("数据类型优化完成")
    
    def validate_data(self):
        """数据质量验证"""
        print("\n=== 数据质量验证 ===")
        
        # 检查缺失值
        total_missing = self.df.isnull().sum().sum()
        print(f"剩余缺失值: {total_missing}")
        
        # 检查重复值
        total_duplicates = self.df.duplicated().sum()
        print(f"剩余重复值: {total_duplicates}")
        
        # 检查数值范围
        if 'price' in self.df.columns:
            invalid_price = ((self.df['price'] < 0) | (self.df['price'] > 100000)).sum()
            print(f"异常价格: {invalid_price}")
        
        if 'quantity' in self.df.columns:
            invalid_qty = (self.df['quantity'] <= 0).sum()
            print(f"异常数量: {invalid_qty}")
        
        # 检查日期范围
        if 'order_date' in self.df.columns:
            min_date = self.df['order_date'].min()
            max_date = self.df['order_date'].max()
            print(f"日期范围: {min_date} 到 {max_date}")
        
        print(f"\n最终数据维度: {self.df.shape}")
        
        return total_missing == 0 and total_duplicates == 0
    
    def save_data(self, output_path):
        """保存清洗后的数据"""
        self.df.to_csv(output_path, index=False)
        print(f"\n数据已保存到: {output_path}")
    
    def run_full_cleaning(self, input_path, output_path):
        """运行完整的清洗流程"""
        print("=" * 60)
        print("开始数据清洗流程")
        print("=" * 60)
        
        self.load_data(input_path)
        self.explore_data()
        self.clean_text_columns()
        self.standardize_dates()
        self.handle_missing_values()
        self.detect_and_handle_outliers()
        self.handle_duplicates()
        self.optimize_data_types()
        
        is_valid = self.validate_data()
        self.save_data(output_path)
        
        print("\n" + "=" * 60)
        if is_valid:
            print("数据清洗完成!数据质量良好。")
        else:
            print("数据清洗完成,但仍有部分问题需要手动检查。")
        print("=" * 60)
        
        return self.df

# 使用示例
if __name__ == "__main__":
    cleaner = EcommerceDataCleaner()
    
    # 运行完整清洗
    cleaned_df = cleaner.run_full_cleaning(
        input_path='raw_ecommerce_data.csv',
        output_path='cleaned_ecommerce_data.csv'
    )
    
    # 查看清洗结果
    print("\n清洗后的数据前5行:")
    print(cleaned_df.head())

9. 数据清洗最佳实践

9.1 清洗流程标准化

# 创建可复用的清洗配置
CLEANING_CONFIG = {
    'numeric_columns': {
        'price': {'fill_method': 'median', 'outlier_method': 'iqr', 'bounds': (0, 10000)},
        'quantity': {'fill_method': 'median', 'outlier_method': 'range', 'bounds': (1, 1000)},
        'rating': {'fill_method': 'mean', 'outlier_method': 'range', 'bounds': (1, 5)}
    },
    'categorical_columns': {
        'category': {'fill_method': 'mode', 'max_categories': 20},
        'status': {'fill_method': 'Unknown', 'allowed_values': ['active', 'inactive', 'pending']}
    },
    'date_columns': {
        'order_date': {'format': 'auto', 'min_date': '2020-01-01'},
        'delivery_date': {'format': 'auto', 'min_date': 'order_date'}
    },
    'text_columns': {
        'customer_name': {'clean': True, 'strip': True, 'title': True},
        'email': {'clean': True, 'validate': True}
    }
}

def apply_cleaning_config(df, config):
    """
    应用标准化的清洗配置
    """
    # 处理数值列
    for col, settings in config.get('numeric_columns', {}).items():
        if col in df.columns:
            # 填充缺失值
            if settings['fill_method'] == 'median':
                fill_val = df[col].median()
            elif settings['fill_method'] == 'mean':
                fill_val = df[col].mean()
            else:
                fill_val = 0
            df[col] = df[col].fillna(fill_val)
            
            # 处理异常值
            if settings['outlier_method'] == 'iqr':
                Q1 = df[col].quantile(0.25)
                Q3 = df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower = Q1 - 1.5 * IQR
                upper = Q3 + 1.5 * IQR
            elif settings['outlier_method'] == 'range':
                lower, upper = settings['bounds']
            
            df[col] = np.clip(df[col], lower, upper)
    
    # 处理分类列
    for col, settings in config.get('categorical_columns', {}).items():
        if col in df.columns:
            if settings['fill_method'] == 'mode':
                fill_val = df[col].mode()[0] if not df[col].mode().empty else 'Unknown'
            else:
                fill_val = settings['fill_method']
            df[col] = df[col].fillna(fill_val)
            
            # 限制类别数量
            if 'max_categories' in settings:
                value_counts = df[col].value_counts()
                if len(value_counts) > settings['max_categories']:
                    other_categories = value_counts.iloc[settings['max_categories']:].index
                    df[col] = df[col].replace(other_categories, 'Other')
    
    # 处理日期列
    for col, settings in config.get('date_columns', {}).items():
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors='coerce')
            if 'min_date' in settings:
                min_date = pd.to_datetime(settings['min_date'])
                df.loc[df[col] < min_date, col] = np.nan
    
    # 处理文本列
    for col, settings in config.get('text_columns', {}).items():
        if col in df.columns:
            if settings.get('clean', False):
                df[col] = df[col].astype(str).str.strip()
                if settings.get('title', False):
                    df[col] = df[col].str.title()
    
    return df

# 使用示例
# df_cleaned = apply_cleaning_config(df, CLEANING_CONFIG)

9.2 清洗过程文档化

def create_cleaning_report(df, original_df, cleaning_steps):
    """
    生成数据清洗报告
    """
    report = {
        '原始数据维度': original_df.shape,
        '清洗后维度': df.shape,
        '删除行数': original_df.shape[0] - df.shape[0],
        '删除列数': original_df.shape[1] - df.shape[1],
        '清洗步骤': cleaning_steps,
        '数据质量指标': {
            '缺失值减少': original_df.isnull().sum().sum() - df.isnull().sum().sum(),
            '重复值减少': original_df.duplicated().sum() - df.duplicated().sum(),
            '内存优化': f"{original_df.memory_usage().sum() / 1024**2:.2f}MB -> {df.memory_usage().sum() / 1024**2:.2f}MB"
        }
    }
    
    return report

# 记录清洗步骤
cleaning_steps = []
cleaning_steps.append("1. 加载数据并探索")
cleaning_steps.append("2. 清洗文本列")
cleaning_steps.append("3. 标准化日期")
cleaning_steps.append("4. 处理缺失值")
cleaning_steps.append("5. 处理异常值")
cleaning_steps.append("6. 处理重复值")
cleaning_steps.append("7. 优化数据类型")

# 生成报告
# report = create_cleaning_report(df_cleaned, df_original, cleaning_steps)
# print(json.dumps(report, indent=2, default=str))

10. 总结

数据清洗是数据科学项目成功的关键。通过本文介绍的方法和代码示例,您可以:

  1. 系统化处理缺失值:根据数据特性选择合适的填充或删除策略
  2. 有效识别和处理异常值:使用统计方法和可视化工具
  3. 标准化数据格式:统一文本、日期、数值的格式
  4. 处理重复数据:智能识别和去重
  5. 优化数据存储:减少内存使用,提高处理效率
  6. 验证数据质量:确保清洗后的数据符合业务要求

记住,数据清洗不是一次性的任务,而是一个迭代的过程。随着数据的变化和业务需求的发展,您可能需要调整清洗策略。建议将清洗流程自动化,建立可复用的清洗管道,并始终保持对数据质量的关注。

最后,数据清洗的最终目标是为后续的分析和建模提供高质量、可靠的数据基础。只有打好这个基础,才能确保数据科学项目的价值最大化。