在数字音乐时代,我们拥有了前所未有的音乐访问便利性,但同时也面临着新的挑战:Spotify、Apple Music、Amazon Music、网易云音乐、QQ音乐等多个平台的订阅,加上早年购买的iTunes专辑、Bandcamp下载、以及各种数字版赠送的音乐,导致我们的音乐收藏变得支离破碎。更糟糕的是,由于缺乏统一的管理,我们常常在不同平台重复购买同一张专辑,或者收藏夹里充斥着大量从未听过的曲目,真正想要的音乐反而难以找到。

本文将为您提供一套完整的解决方案,从诊断当前收藏状况开始,到建立统一的音乐数据库,再到自动化整理工具的使用,最终帮助您创建一个有序、可检索、充满个人情感记忆的专属音乐记忆库。

第一步:诊断你的音乐收藏现状

在开始整理之前,我们需要先了解自己到底拥有哪些音乐,以及它们分布在何处。这就像医生诊断病情一样,只有明确了问题所在,才能开出正确的药方。

1.1 列出所有音乐来源平台

首先,创建一个清单,列出你所有使用过的音乐平台和来源。这包括:

  • 流媒体订阅服务:Spotify、Apple Music、Tidal、Amazon Music、YouTube Music、网易云音乐、QQ音乐、Spotify中国(喜马拉雅)等
  • 数字音乐商店:iTunes Store、Bandcamp、Amazon MP3、7digital、HDtracks、Qobuz、Beatport等
  • 实体专辑数字版:购买CD/Vinyl时附赠的数字下载码
  • 独立音乐人平台:Patreon、Ko-fi、独立音乐人网站直接购买
  • 赠送/二手获得:朋友赠送的数字音乐、二手市场购买的兑换码

示例清单

1. Spotify(美国区)- 2015年至今
2. Apple Music(中国区)- 2018年至今
3. iTunes Store - 2008-2015年购买记录
4. Bandcamp - 2019年至今,购买过23张专辑
5. 网易云音乐 - 2016年至今,购买过15张数字专辑
6. QQ音乐 - 2017年至今,购买过8张数字专辑
7. 朋友赠送的iTunes兑换码(Taylor Swift专辑)
8. 购买CD时附带的数字下载码(已过期?)

1.2 导出各平台收藏数据

大多数平台都提供导出功能,或者可以通过第三方工具获取你的收藏数据:

Spotify

Apple Music/iTunes

  • 在Mac的音乐App中,选择”文件” > “库” > “导出库”
  • 在Windows的iTunes中,选择”文件” > “库” > “导出库”
  • 这将导出一个XML文件,包含所有元数据

网易云音乐

QQ音乐

  • 目前官方没有直接导出功能,需要手动记录或使用第三方工具如 QQMusicDownloader

Bandcamp

示例:导出Spotify数据

# 使用SpotMyBackup的步骤:
1. 访问 https://spotmybackup.com/
2. 使用Spotify账号登录
3. 选择要导出的数据类型(播放列表、收藏歌曲、专辑等)
4. 点击"Export"按钮
5. 下载CSV或JSON格式的文件

# 导出的CSV文件内容示例:
Track Name,Artist,Album,Album Artist,BPM,Time,Added,Popularity,Track ID,Album ID
"Blinding Lights","The Weeknd","After Hours","The Weeknd",171,200,2020-03-20,95,4Olu4e4u6qQeR7kZ4V6z0L,3dpt...
"Save Your Tears","The Weeknd","After Hours","The Weeknd",118,215,2020-03-23,93,5QO70khFlwaC0e0Z0gU4lR,3dpt...

1.3 识别重复购买

将所有导出的数据汇总后,我们需要识别重复购买。重复购买通常表现为:

  • 同一专辑在不同平台购买:例如在iTunes和Bandcamp都买了同一张专辑
  • 同一专辑的不同版本:标准版、豪华版、周年纪念版等
  • 单曲与整张专辑:购买了单曲后又购买了包含该单曲的专辑

示例:识别重复的Python脚本

import pandas as pd

# 假设我们已经将所有平台的购买记录整理成CSV文件
# 格式:Artist, Album, Platform, PurchaseDate, Price

# 读取数据
df = pd.read_csv('all_music_purchases.csv')

# 按艺术家和专辑分组,统计购买次数
duplicates = df.groupby(['Artist', 'Album']).size().reset_index(name='PurchaseCount')

# 筛选出重复购买的记录
duplicates = duplicates[duplicates['PurchaseCount'] > 1]

print("发现重复购买:")
for _, row in duplicates.iterrows():
    print(f"艺术家: {row['Artist']}, 专辑: {row['Album']}, 购买次数: {row['PurchaseCount']}")
    # 显示详细购买记录
    detail = df[(df['Artist'] == row['Artist']) & (df['Album'] == row['Album'])]
    print(detail[['Platform', 'PurchaseDate', 'Price']])
    print("-" * 50)

# 输出示例:
# 发现重复购买:
# 艺术家: Taylor Swift, 专辑: 1989, 购买次数: 2
#      Platform PurchaseDate  Price
# 0   iTunes Store   2014-10-27  13.99
# 1       Bandcamp   2020-05-15  10.00
# --------------------------------------------------

1.4 评估收藏混乱程度

除了重复购买,我们还需要评估收藏的混乱程度:

  • 未分类音乐比例:有多少音乐没有归类到特定专辑或播放列表
  • 低质量音频:是否有128kbps等低码率文件
  • 元数据缺失:缺少艺术家、专辑、年份等信息的歌曲比例
  • 从未播放的音乐:根据播放历史,识别从未听过的歌曲

示例:评估脚本

import os
from mutagen import File

def assess_music_library(folder_path):
    stats = {
        'total_files': 0,
        'missing_metadata': 0,
        'low_quality': 0,
        'various_artists': 0,
        'no_album': 0
    }
    
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith(('.mp3', '.flac', '.m4a', '.wav')):
                stats['total_files'] += 1
                try:
                    audio = File(os.path.join(root, file))
                    if audio is None:
                        continue
                    
                    # 检查元数据
                    if 'TPE1' not in audio or 'TALB' not in audio:
                        stats['missing_metadata'] += 1
                    
                    # 检查质量(示例:MP3码率)
                    if hasattr(audio.info, 'bitrate'):
                        if audio.info.bitrate < 192000:  # 低于192kbps
                            stats['low_quality'] += 1
                    
                    # 检查艺术家
                    if 'TPE1' in audio:
                        artist = str(audio['TPE1'])
                        if 'Various' in artist or 'Various Artists' in artist:
                            stats['various_artists'] += 1
                    
                    # 检查专辑
                    if 'TALB' not in audio:
                        stats['no_album'] += 1
                        
                except Exception as e:
                    print(f"Error processing {file}: {e}")
    
    return stats

# 使用示例
stats = assess_music_library('/path/to/your/music')
print(f"音乐库评估结果:")
print(f"总文件数: {stats['total_files']}")
print(f"缺少元数据: {stats['missing_metadata']}")
print(f"低质量音频: {stats['low_quality']}")
print(f"Various Artists: {stats['various_artists']}")
print(f"无专辑信息: {stats['no_album']}")

第二步:建立统一的音乐数据库

有了诊断数据后,我们需要建立一个中央数据库来管理所有音乐信息。这个数据库将成为我们音乐记忆库的核心。

2.1 选择数据库方案

对于个人音乐管理,我们不需要复杂的商业数据库,以下方案都是不错的选择:

方案A:SQLite(推荐)

  • 轻量级,单文件数据库
  • 无需安装服务器
  • 支持SQL查询,灵活性高
  • 可以用Python、R等语言操作

方案B:Airtable/Notion

  • 可视化界面,适合非技术用户
  • 支持附件、链接、标签等丰富字段
  • 可以跨平台访问

方案C:Excel/Google Sheets

  • 简单易用
  • 适合小型收藏
  • 公式和筛选功能强大

我们重点介绍SQLite方案,因为它最灵活且可扩展。

2.2 设计数据库结构

一个完整的音乐数据库应该包含以下表:

  • Artists:艺术家信息
  • Albums:专辑信息
  • Tracks:歌曲信息
  • Purchases:购买记录
  • Platforms:音乐平台信息
  • Playlists:播放列表
  • Tags:标签系统(情绪、风格、场景等)
  • Reviews:个人评价和回忆

SQL表结构定义

-- 艺术家表
CREATE TABLE Artists (
    artist_id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    country TEXT,
    genre TEXT,
    formed_year INTEGER,
    biography TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 专辑表
CREATE TABLE Albums (
    album_id INTEGER PRIMARY KEY AUTOINCREMENT,
    title TEXT NOT NULL,
    artist_id INTEGER,
    release_year INTEGER,
    genre TEXT,
    label TEXT,
    total_tracks INTEGER,
    cover_art_url TEXT,
    discogs_url TEXT,
    allmusic_url TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (artist_id) REFERENCES Artists(artist_id)
);

-- 歌曲表
CREATE TABLE Tracks (
    track_id INTEGER PRIMARY KEY AUTOINCREMENT,
    title TEXT NOT NULL,
    album_id INTEGER,
    artist_id INTEGER,
    track_number INTEGER,
    duration INTEGER,  -- 秒
    bitrate INTEGER,
    file_format TEXT,  -- MP3, FLAC, ALAC, etc.
    file_path TEXT,
    isrc TEXT,  -- 国际标准录音代码
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (album_id) REFERENCES Albums(album_id),
    FOREIGN KEY (artist_id) REFERENCES Artists(artist_id)
);

-- 购买记录表
CREATE TABLE Purchases (
    purchase_id INTEGER PRIMARY KEY AUTOINCREMENT,
    album_id INTEGER,
    platform_id INTEGER,
    purchase_date DATE,
    price REAL,
    currency TEXT,
    order_number TEXT,
    download_url TEXT,
    is_gift BOOLEAN DEFAULT FALSE,
    notes TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (album_id) REFERENCES Albums(album_id),
    FOREIGN KEY (platform_id) REFERENCES Platforms(platform_id)
);

-- 平台表
CREATE TABLE Platforms (
    platform_id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    type TEXT,  -- streaming, store, bandcamp, etc.
    url TEXT,
    account_email TEXT
);

-- 播放列表表
CREATE TABLE Playlists (
    playlist_id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    platform_id INTEGER,
    platform_playlist_id TEXT,
    description TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (platform_id) REFERENCES Platforms(platform_id)
);

-- 播放列表歌曲关联表
CREATE TABLE PlaylistTracks (
    playlist_id INTEGER,
    track_id INTEGER,
    track_order INTEGER,
    added_date DATE,
    PRIMARY KEY (playlist_id, track_id),
    FOREIGN KEY (playlist_id) REFERENCES Playlists(playlist_id),
    FOREIGN KEY (track_id) REFERENCES Tracks(track_id)
);

-- 标签表
CREATE TABLE Tags (
    tag_id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    type TEXT,  -- mood, genre, scene, etc.
    color TEXT  -- 用于UI显示
);

-- 歌曲标签关联表
CREATE TABLE TrackTags (
    track_id INTEGER,
    tag_id INTEGER,
    PRIMARY KEY (track_id, tag_id),
    FOREIGN KEY (track_id) REFERENCES Tracks(track_id),
    FOREIGN KEY (tag_id) REFERENCES Tags(tag_id)
);

-- 个人评价表
CREATE TABLE Reviews (
    review_id INTEGER PRIMARY KEY AUTOINCREMENT,
    track_id INTEGER,
    album_id INTEGER,
    rating INTEGER,  -- 1-5星
    review_text TEXT,
    memories TEXT,  -- 与这首歌相关的回忆
    listened_count INTEGER DEFAULT 0,
    last_listened DATE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (track_id) REFERENCES Tracks(track_id),
    FOREIGN KEY (album_id) REFERENCES Albums(album_id)
);

2.3 填充数据库

现在我们需要编写脚本,将之前导出的数据填充到数据库中。这里以Python为例:

import sqlite3
import pandas as pd
from datetime import datetime

def create_database(db_path='music_library.db'):
    """创建数据库和表结构"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # 执行上面的SQL创建表语句
    # 这里简化,实际使用时需要执行完整的SQL
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS Artists (
            artist_id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL UNIQUE,
            country TEXT,
            genre TEXT
        )
    ''')
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS Albums (
            album_id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            artist_id INTEGER,
            release_year INTEGER,
            FOREIGN KEY (artist_id) REFERENCES Artists(artist_id)
        )
    ''')
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS Purchases (
            purchase_id INTEGER PRIMARY KEY AUTOINCREMENT,
            album_id INTEGER,
            platform TEXT,
            purchase_date DATE,
            price REAL,
            FOREIGN KEY (album_id) REFERENCES Albums(album_id)
        )
    ''')
    
    conn.commit()
    return conn

def import_spotify_data(conn, spotify_csv_path):
    """导入Spotify导出的数据"""
    df = pd.read_csv(spotify_csv_path)
    
    cursor = conn.cursor()
    
    for _, row in df.iterrows():
        # 插入艺术家
        cursor.execute(
            "INSERT OR IGNORE INTO Artists (name) VALUES (?)",
            (row['Artist'],)
        )
        
        # 获取艺术家ID
        cursor.execute("SELECT artist_id FROM Artists WHERE name = ?", (row['Artist'],))
        artist_id = cursor.fetchone()[0]
        
        # 插入专辑
        cursor.execute(
            "INSERT OR IGNORE INTO Albums (title, artist_id) VALUES (?, ?)",
            (row['Album'], artist_id)
        )
        
        # 获取专辑ID
        cursor.execute(
            "SELECT album_id FROM Albums WHERE title = ? AND artist_id = ?",
            (row['Album'], artist_id)
        )
        album_id = cursor.fetchone()[0]
        
        # 记录这是来自Spotify的收藏(非购买)
        # 可以在Purchases表中用特殊标记,或创建单独的Streaming表
    
    conn.commit()
    print(f"导入了 {len(df)} 条Spotify记录")

def import_bandcamp_data(conn, bandcamp_csv_path):
    """导入Bandcamp购买记录"""
    df = pd.read_csv(bandcamp_csv_path)
    
    cursor = conn.cursor()
    
    for _, row in df.iterrows():
        # Bandcamp数据通常包含:Artist, Album, PurchaseDate, Price, Currency
        
        # 插入艺术家
        cursor.execute(
            "INSERT OR IGNORE INTO Artists (name) VALUES (?)",
            (row['Artist'],)
        )
        
        cursor.execute("SELECT artist_id FROM Artists WHERE name = ?", (row['Artist'],))
        artist_id = cursor.fetchone()[0]
        
        # 插入专辑
        cursor.execute(
            "INSERT INTO Albums (title, artist_id) VALUES (?, ?)",
            (row['Album'], artist_id)
        )
        album_id = cursor.lastrowid
        
        # 插入购买记录
        cursor.execute(
            "INSERT INTO Purchases (album_id, platform, purchase_date, price, currency) VALUES (?, ?, ?, ?, ?)",
            (album_id, 'Bandcamp', row['PurchaseDate'], row['Price'], row.get('Currency', 'USD'))
        )
    
    conn.commit()
    print(f"导入了 {len(df)} 条Bandcamp购买记录")

def detect_duplicates(conn):
    """检测重复购买"""
    cursor = conn.cursor()
    
    query = """
    SELECT a.title, ar.name, COUNT(p.purchase_id) as purchase_count,
           GROUP_CONCAT(p.platform) as platforms
    FROM Purchases p
    JOIN Albums a ON p.album_id = a.album_id
    JOIN Artists ar ON a.artist_id = ar.artist_id
    GROUP BY a.album_id
    HAVING purchase_count > 1
    """
    
    cursor.execute(query)
    duplicates = cursor.fetchall()
    
    print("发现重复购买:")
    for album, artist, count, platforms in duplicates:
        print(f"  {artist} - {album}: {count}次 ({platforms})")
    
    return duplicates

# 使用示例
conn = create_database()

# 导入各平台数据
import_spotify_data(conn, 'spotify_export.csv')
import_bandcamp_data(conn, 'bandcamp_purchases.csv')
# 类似地导入其他平台数据...

# 检测重复
duplicates = detect_duplicates(conn)

# 关闭连接
conn.close()

2.4 使用MusicBrainz API丰富数据

手动输入元数据非常耗时,我们可以利用MusicBrainz这个开放的音乐数据库来自动填充详细信息。

import requests
import time

def search_musicbrainz(artist_name, album_title=None):
    """搜索MusicBrainz获取详细信息"""
    headers = {
        'User-Agent': 'MyMusicLibrary/1.0 (your-email@example.com)'
    }
    
    # 搜索艺术家
    artist_url = "https://musicbrainz.org/ws/2/artist/"
    params = {
        'query': f'artist:"{artist_name}"',
        'fmt': 'json'
    }
    
    response = requests.get(artist_url, params=params, headers=headers)
    time.sleep(1)  # 遵守API速率限制
    
    if response.status_code == 200:
        data = response.json()
        if data['artists']:
            artist = data['artists'][0]
            return {
                'mbid': artist['id'],
                'country': artist.get('country', ''),
                'type': artist['type'],
                'life_span': artist.get('life-span', {}),
                'genres': [g['name'] for g in artist.get('genres', [])]
            }
    
    return None

def get_album_details(artist_mbid, album_title):
    """获取专辑详细信息"""
    headers = {
        'User-Agent': 'MyMusicLibrary/1.0 (your-email@example.com)'
    }
    
    release_url = "https://musicbrainz.org/ws/2/release/"
    params = {
        'query': f'artist:"{artist_mbid}" AND release:"{album_title}"',
        'fmt': 'json'
    }
    
    response = requests.get(release_url, params=params, headers=headers)
    time.sleep(1)
    
    if response.status_code == 200:
        data = response.json()
        if data['releases']:
            release = data['releases'][0]
            return {
                'mbid': release['id'],
                'date': release.get('date', ''),
                'country': release.get('country', ''),
                'tracks': release.get('track-count', 0),
                'label': release.get('label-info', [{}])[0].get('label', {}).get('name', ''),
                'format': release.get('media', [{}])[0].get('format', '')
            }
    
    return None

# 使用示例
artist_info = search_musicbrainz("Taylor Swift")
print(f"Taylor Swift的MusicBrainz信息: {artist_info}")

album_info = get_album_details(artist_info['mbid'], "1989")
print(f"1989专辑信息: {album_info}")

第三步:解决重复购买问题

识别出重复购买后,我们需要制定策略来处理它们。

3.1 评估不同版本的价值

对于重复购买,我们需要评估每个版本的价值:

  • 音质差异:FLAC vs MP3,高解析度 vs 标准CD质量
  • 附加内容:是否有独家曲目、幕后花絮、数字 booklet
  • DRM限制:是否受DRM保护,能否自由转换
  • 收藏价值:是否是限量版、签名版、特殊包装

决策矩阵示例

def evaluate_purchase_version(purchase_record):
    """评估购买版本的价值"""
    score = 0
    
    # 音质权重
    if purchase_record['format'] == 'FLAC':
        score += 10
    elif purchase_record['format'] == 'ALAC':
        score += 9
    elif purchase_record['format'] == 'MP3 320kbps':
        score += 7
    elif purchase_record['format'] == 'MP3 128kbps':
        score += 3
    
    # 附加内容
    if purchase_record.get('has_bonus_tracks', False):
        score += 5
    if purchase_record.get('has_digital_booklet', False):
        score += 3
    
    # DRM自由度
    if not purchase_record.get('has_drm', True):
        score += 8
    
    # 价格合理性
    if purchase_record['price'] < 10:
        score += 2
    
    # 个人情感价值(手动标记)
    if purchase_record.get('sentimental_value', False):
        score += 10
    
    return score

# 示例:比较同一专辑的不同版本
versions = [
    {'platform': 'iTunes', 'format': 'MP3 256kbps', 'price': 12.99, 'has_drm': False, 'has_bonus_tracks': False},
    {'platform': 'Bandcamp', 'format': 'FLAC', 'price': 10.00, 'has_drm': False, 'has_bonus_tracks': True},
    {'platform': 'Qobuz', 'format': 'FLAC 24bit', 'price': 24.99, 'has_drm': False, 'has_bonus_tracks': False}
]

for v in versions:
    print(f"{v['platform']} - {v['format']}: 价值分数 {evaluate_purchase_version(v)}")

3.2 处理重复购买的策略

根据评估结果,可以采用以下策略:

策略1:保留最佳版本

  • 保留音质最好、附加内容最多的版本
  • 其他版本可以:
    • 从账户中移除(如果平台允许)
    • 标记为”已拥有但不使用”
    • 转换为统一格式后存档

策略2:按用途分类

  • 日常聆听:使用流媒体版本(方便)
  • 收藏/存档:保留购买的高音质版本
  • DJ/制作:保留无损版本

策略3:出售或转让

  • 对于有实体价值的数字版(如NFT专辑),可以考虑出售
  • 注意平台的转让政策

自动化处理脚本

def process_duplicates(conn):
    """自动处理重复购买"""
    cursor = conn.cursor()
    
    # 获取重复购买
    duplicates = detect_duplicates(conn)
    
    for album_title, artist_name, count, platforms in duplicates:
        print(f"\n处理: {artist_name} - {album_title}")
        
        # 获取所有版本详情
        cursor.execute("""
            SELECT p.purchase_id, p.platform, p.price, p.purchase_date,
                   a.title, ar.name
            FROM Purchases p
            JOIN Albums a ON p.album_id = a.album_id
            JOIN Artists ar ON a.artist_id = ar.artist_id
            WHERE a.title = ? AND ar.name = ?
            ORDER BY p.purchase_date
        """, (album_title, artist_name))
        
        versions = cursor.fetchall()
        
        # 简单策略:保留最新购买的版本,标记其他为重复
        # 实际应用中应根据音质、价格等综合判断
        
        keep_id = versions[-1][0]  # 保留最新
        duplicate_ids = [v[0] for v in versions[:-1]]
        
        # 标记为重复
        for dup_id in duplicate_ids:
            cursor.execute("""
                UPDATE Purchases SET notes = 'Duplicate purchase, kept version on platform: ?'
                WHERE purchase_id = ?
            """, (versions[-1][1], dup_id))
        
        print(f"  保留: {versions[-1][1]} (ID: {keep_id})")
        print(f"  标记重复: {duplicate_ids}")
    
    conn.commit()

# 使用
conn = sqlite3.connect('music_library.db')
process_duplicates(conn)
conn.close()

第四步:建立标签和分类系统

一个强大的标签系统是音乐记忆库的灵魂,它能让你按情绪、风格、场景等多维度快速找到想要的音乐。

4.1 设计标签体系

建议采用分层标签系统:

一级标签(类别)

  • Genre:流派(摇滚、爵士、电子、古典等)
  • Mood:情绪(快乐、悲伤、放松、兴奋等)
  • Scene:场景(工作、学习、运动、驾驶、睡前等)
  • Era:年代(80s, 90s, 2000s, 2010s, 2020s)
  • Language:语言(中文、英文、日文、韩文等)
  • Energy:能量等级(1-5星)
  • Vocal:人声类型(男声、女声、纯音乐、合唱等)

二级标签(子标签)

  • 在一级标签下细分,如:
    • Genre: Rock → Indie Rock, Alternative Rock, Classic Rock
    • Mood: Happy → Euphoric, Cheerful, Upbeat

4.2 自动打标签策略

我们可以基于现有数据自动生成部分标签:

def auto_tag_tracks(conn):
    """基于元数据自动打标签"""
    cursor = conn.cursor()
    
    # 基于流派标签
    genre_mapping = {
        'Rock': ['Rock', 'Rock & Roll', 'Alternative Rock', 'Indie Rock'],
        'Electronic': ['Electronic', 'Dance', 'House', 'Techno', 'Ambient'],
        'Jazz': ['Jazz', 'Smooth Jazz', 'Bebop', 'Cool Jazz'],
        'Classical': ['Classical', 'Orchestral', 'Chamber Music'],
        'Hip Hop': ['Hip Hop', 'Rap', 'Trap'],
        'Pop': ['Pop', 'Pop Rock', 'Synthpop']
    }
    
    # 获取所有专辑
    cursor.execute("SELECT album_id, genre FROM Albums WHERE genre IS NOT NULL")
    albums = cursor.fetchall()
    
    for album_id, genre in albums:
        # 查找匹配的流派标签
        for main_genre, sub_genres in genre_mapping.items():
            if genre in sub_genres or any(g in genre for g in sub_genres):
                # 获取该专辑的所有歌曲
                cursor.execute("SELECT track_id FROM Tracks WHERE album_id = ?", (album_id,))
                tracks = cursor.fetchall()
                
                # 为每首歌添加流派标签
                for (track_id,) in tracks:
                    # 获取或创建标签ID
                    cursor.execute("SELECT tag_id FROM Tags WHERE name = ?", (main_genre,))
                    tag = cursor.fetchone()
                    if not tag:
                        cursor.execute("INSERT INTO Tags (name, type) VALUES (?, 'genre')", (main_genre,))
                        tag_id = cursor.lastrowid
                    else:
                        tag_id = tag[0]
                    
                    # 关联标签
                    cursor.execute(
                        "INSERT OR IGNORE INTO TrackTags (track_id, tag_id) VALUES (?, ?)",
                        (track_id, tag_id)
                    )
    
    conn.commit()
    print("自动流派标签完成")

# 基于年份的时代标签
def tag_by_era(conn):
    cursor = conn.cursor()
    
    cursor.execute("SELECT album_id, release_year FROM Albums WHERE release_year IS NOT NULL")
    albums = cursor.fetchall()
    
    era_tags = {
        '80s': (1980, 1989),
        '90s': (1990, 1999),
        '2000s': (2000, 2009),
        '2010s': (2010, 2019),
        '2020s': (2020, 2029)
    }
    
    for album_id, year in albums:
        for era, (start, end) in era_tags.items():
            if start <= year <= end:
                # 添加时代标签
                cursor.execute("SELECT tag_id FROM Tags WHERE name = ?", (era,))
                tag = cursor.fetchone()
                if not tag:
                    cursor.execute("INSERT INTO Tags (name, type) VALUES (?, 'era')", (era,))
                    tag_id = cursor.lastrowid
                else:
                    tag_id = tag[0]
                
                # 为专辑下所有歌曲添加标签
                cursor.execute("SELECT track_id FROM Tracks WHERE album_id = ?", (album_id,))
                tracks = cursor.fetchall()
                for (track_id,) in tracks:
                    cursor.execute(
                        "INSERT OR IGNORE INTO TrackTags (track_id, tag_id) VALUES (?, ?)",
                        (track_id, tag_id)
                    )
    
    conn.commit()
    print("时代标签完成")

4.3 手动打标签工具

对于需要人工判断的标签(如情绪、场景),可以创建一个简单的命令行工具:

def manual_tagging(conn):
    """交互式手动打标签"""
    cursor = conn.cursor()
    
    # 获取需要打标签的歌曲(例如没有情绪标签的)
    cursor.execute("""
        SELECT t.track_id, t.title, ar.name, a.title
        FROM Tracks t
        JOIN Albums a ON t.album_id = a.album_id
        JOIN Artists ar ON t.artist_id = ar.artist_id
        WHERE t.track_id NOT IN (
            SELECT track_id FROM TrackTags tt
            JOIN Tags ON tt.tag_id = Tags.tag_id
            WHERE Tags.type = 'mood'
        )
        LIMIT 10
    """)
    
    tracks = cursor.fetchall()
    
    mood_options = ['Happy', 'Sad', 'Relaxed', 'Energetic', 'Angry', 'Romantic', 'Nostalgic']
    
    for track_id, title, artist, album in tracks:
        print(f"\n歌曲: {title}")
        print(f"艺术家: {artist}")
        print(f"专辑: {album}")
        print("\n选择情绪标签(输入数字,多个用逗号分隔):")
        for i, mood in enumerate(mood_options, 1):
            print(f"  {i}. {mood}")
        print("  s. 跳过")
        print("  q. 退出")
        
        choice = input("你的选择: ")
        
        if choice.lower() == 'q':
            break
        elif choice.lower() == 's':
            continue
        
        try:
            selected_indices = [int(i.strip()) - 1 for i in choice.split(',')]
            for idx in selected_indices:
                if 0 <= idx < len(mood_options):
                    mood = mood_options[idx]
                    
                    # 获取或创建标签
                    cursor.execute("SELECT tag_id FROM Tags WHERE name = ?", (mood,))
                    tag = cursor.fetchone()
                    if not tag:
                        cursor.execute("INSERT INTO Tags (name, type) VALUES (?, 'mood')", (mood,))
                        tag_id = cursor.lastrowid
                    else:
                        tag_id = tag[0]
                    
                    # 关联
                    cursor.execute(
                        "INSERT OR IGNORE INTO TrackTags (track_id, tag_id) VALUES (?, ?)",
                        (track_id, tag_id)
                    )
                    print(f"  已添加标签: {mood}")
            
            conn.commit()
            
        except ValueError:
            print("无效输入")
            continue

# 使用示例
# conn = sqlite3.connect('music_library.db')
# manual_tagging(conn)

第五步:自动化整理工具

手动整理非常耗时,我们需要自动化工具来持续维护音乐库。

5.1 文件重命名和组织

根据数据库信息,自动重命名和组织音乐文件:

import os
import shutil
from pathlib import Path

def organize_music_files(db_path, music_root, backup_root):
    """
    根据数据库信息自动组织音乐文件
    格式: Artist/Album/TrackNumber - Title.ext
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # 获取所有带文件路径的歌曲
    cursor.execute("""
        SELECT t.track_id, t.title, t.file_path, t.file_format,
               ar.name as artist_name, a.title as album_title,
               t.track_number
        FROM Tracks t
        JOIN Albums a ON t.album_id = a.album_id
        JOIN Artists ar ON t.artist_id = ar.artist_id
        WHERE t.file_path IS NOT NULL
    """)
    
    tracks = cursor.fetchall()
    
    for track_id, title, old_path, file_format, artist, album, track_num in tracks:
        if not old_path or not os.path.exists(old_path):
            continue
        
        # 清理文件名中的非法字符
        def clean_filename(name):
            for char in ['/', '\\', ':', '*', '?', '"', '<', '>', '|']:
                name = name.replace(char, '_')
            return name.strip()
        
        artist_dir = clean_filename(artist)
        album_dir = clean_filename(album)
        
        # 创建目录结构
        target_dir = os.path.join(music_root, artist_dir, album_dir)
        os.makedirs(target_dir, exist_ok=True)
        
        # 生成新文件名
        if track_num:
            new_filename = f"{track_num:02d} - {clean_filename(title)}.{file_format.lower()}"
        else:
            new_filename = f"{clean_filename(title)}.{file_format.lower()}"
        
        new_path = os.path.join(target_dir, new_filename)
        
        # 如果文件已存在,添加后缀
        if os.path.exists(new_path):
            base, ext = os.path.splitext(new_path)
            new_path = f"{base}_{track_id}{ext}"
        
        # 备份原文件(可选)
        if backup_root:
            backup_path = os.path.join(backup_root, os.path.basename(old_path))
            shutil.copy2(old_path, backup_path)
        
        # 移动文件
        try:
            shutil.move(old_path, new_path)
            
            # 更新数据库中的文件路径
            cursor.execute(
                "UPDATE Tracks SET file_path = ? WHERE track_id = ?",
                (new_path, track_id)
            )
            
            print(f"已整理: {artist} - {album} - {title}")
            
        except Exception as e:
            print(f"错误处理 {old_path}: {e}")
    
    conn.commit()
    conn.close()
    print("文件整理完成")

# 使用示例
# organize_music_files(
#     db_path='music_library.db',
#     music_root='/path/to/organized/music',
#     backup_root='/path/to/backup'
# )

5.2 元数据修复和标准化

使用第三方工具如beets或MusicBrainz Picard,或者编写自定义脚本:

from mutagen import File
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK, TDRC

def fix_metadata(db_path, music_root):
    """修复和标准化元数据"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute("""
        SELECT t.track_id, t.title, t.file_path,
               ar.name as artist_name, a.title as album_title,
               t.track_number, t.release_year
        FROM Tracks t
        JOIN Albums a ON t.album_id = a.album_id
        JOIN Artists ar ON t.artist_id = ar.artist_id
        WHERE t.file_path IS NOT NULL
    """)
    
    tracks = cursor.fetchall()
    
    for track_id, title, file_path, artist, album, track_num, year in tracks:
        if not file_path or not os.path.exists(file_path):
            continue
        
        try:
            # 对于MP3文件
            if file_path.lower().endswith('.mp3'):
                audio = ID3(file_path)
                
                # 更新ID3标签
                audio['TIT2'] = TIT2(encoding=3, text=title)
                audio['TPE1'] = TPE1(encoding=3, text=artist)
                audio['TALB'] = TALB(encoding=3, text=album)
                
                if track_num:
                    audio['TRCK'] = TRCK(encoding=3, text=str(track_num))
                
                if year:
                    audio['TDRC'] = TDRC(encoding=3, text=str(year))
                
                audio.save()
            
            # 对于FLAC文件
            elif file_path.lower().endswith('.flac'):
                audio = File(file_path)
                audio['title'] = title
                audio['artist'] = artist
                audio['album'] = album
                if track_num:
                    audio['tracknumber'] = str(track_num)
                if year:
                    audio['date'] = str(year)
                audio.save()
            
            print(f"已修复元数据: {artist} - {title}")
            
        except Exception as e:
            print(f"修复失败 {file_path}: {e}")
    
    conn.close()
    print("元数据修复完成")

5.3 持续同步和监控

设置定期任务来监控新添加的音乐:

import schedule
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MusicFileHandler(FileSystemEventHandler):
    def __init__(self, db_path, watch_path):
        self.db_path = db_path
        self.watch_path = watch_path
    
    def on_created(self, event):
        if event.is_directory:
            return
        
        if event.src_path.lower().endswith(('.mp3', '.flac', '.m4a', '.wav')):
            print(f"检测到新文件: {event.src_path}")
            # 等待文件复制完成
            time.sleep(2)
            self.process_new_file(event.src_path)
    
    def process_new_file(self, file_path):
        """处理新添加的音乐文件"""
        # 提取元数据
        try:
            audio = File(file_path)
            if audio is None:
                return
            
            # 获取信息
            title = audio.get('TIT2', [os.path.splitext(os.path.basename(file_path))[0]])[0]
            artist = audio.get('TPE1', ['Unknown Artist'])[0]
            album = audio.get('TALB', ['Unknown Album'])[0]
            
            # 插入数据库
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            # 插入艺术家
            cursor.execute("INSERT OR IGNORE INTO Artists (name) VALUES (?)", (artist,))
            cursor.execute("SELECT artist_id FROM Artists WHERE name = ?", (artist,))
            artist_id = cursor.fetchone()[0]
            
            # 插入专辑
            cursor.execute(
                "INSERT OR IGNORE INTO Albums (title, artist_id) VALUES (?, ?)",
                (album, artist_id)
            )
            cursor.execute(
                "SELECT album_id FROM Albums WHERE title = ? AND artist_id = ?",
                (album, artist_id)
            )
            album_id = cursor.fetchone()[0]
            
            # 插入歌曲
            cursor.execute("""
                INSERT INTO Tracks (title, album_id, artist_id, file_path, file_format)
                VALUES (?, ?, ?, ?, ?)
            """, (title, album_id, artist_id, file_path, file_path.split('.')[-1].upper()))
            
            conn.commit()
            conn.close()
            
            print(f"已添加到数据库: {artist} - {album} - {title}")
            
        except Exception as e:
            print(f"处理新文件失败: {e}")

def start_monitoring(db_path, watch_path):
    """启动文件监控"""
    event_handler = MusicFileHandler(db_path, watch_path)
    observer = Observer()
    observer.schedule(event_handler, watch_path, recursive=True)
    observer.start()
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()

# 使用示例
# start_monitoring('music_library.db', '/path/to/new/music')

第六步:创建个人音乐记忆库

现在我们有了整理好的音乐库,是时候添加个人回忆和情感价值了。

6.1 记录听歌回忆

为每首歌或专辑添加个人回忆:

def add_memory(conn, track_id=None, album_id=None, memory_text=""):
    """添加听歌回忆"""
    cursor = conn.cursor()
    
    if track_id:
        cursor.execute("""
            INSERT INTO Reviews (track_id, review_text, memories)
            VALUES (?, ?, ?)
        """, (track_id, memory_text, memory_text))
    elif album_id:
        cursor.execute("""
            INSERT INTO Reviews (album_id, review_text, memories)
            VALUES (?, ?, ?)
        """, (album_id, memory_text, memory_text))
    
    conn.commit()
    print("回忆已保存")

# 示例:添加回忆
# conn = sqlite3.connect('music_library.db')
# add_memory(conn, track_id=123, memory_text="这首歌陪伴我度过了2020年的隔离期,每天早晨都听")
# add_memory(conn, album_id=45, memory_text="2015年去日本旅行时买的,当时在涩谷的Tower Records")

6.2 创建智能播放列表

基于标签、回忆和评分创建动态播放列表:

def create_smart_playlist(conn, playlist_name, criteria):
    """
    创建智能播放列表
    criteria: {
        'moods': ['Happy', 'Relaxed'],
        'min_rating': 4,
        'years': [2010, 2020],
        'exclude_listened': True  # 排除最近听过的
    }
    """
    cursor = conn.cursor()
    
    query = """
        SELECT DISTINCT t.track_id, t.title, ar.name, a.title
        FROM Tracks t
        JOIN Albums a ON t.album_id = a.album_id
        JOIN Artists ar ON t.artist_id = ar.artist_id
        JOIN TrackTags tt ON t.track_id = tt.track_id
        JOIN Tags ON tt.tag_id = Tags.tag_id
        LEFT JOIN Reviews r ON t.track_id = r.track_id
        WHERE 1=1
    """
    
    params = []
    
    # 情绪筛选
    if 'moods' in criteria:
        query += " AND Tags.name IN ({})".format(','.join('?' * len(criteria['moods'])))
        params.extend(criteria['moods'])
    
    # 评分筛选
    if 'min_rating' in criteria:
        query += " AND r.rating >= ?"
        params.append(criteria['min_rating'])
    
    # 年份筛选
    if 'years' in criteria:
        query += " AND a.release_year BETWEEN ? AND ?"
        params.extend(criteria['years'])
    
    # 排除最近听过的
    if criteria.get('exclude_listened'):
        query += " AND (r.last_listened IS NULL OR r.last_listened < date('now', '-30 days'))"
    
    cursor.execute(query, params)
    tracks = cursor.fetchall()
    
    # 创建播放列表
    cursor.execute(
        "INSERT INTO Playlists (name, description) VALUES (?, ?)",
        (playlist_name, f"智能播放列表: {criteria}")
    )
    playlist_id = cursor.lastrowid
    
    # 添加歌曲
    for i, (track_id, title, artist, album) in enumerate(tracks, 1):
        cursor.execute(
            "INSERT INTO PlaylistTracks (playlist_id, track_id, track_order) VALUES (?, ?, ?)",
            (playlist_id, track_id, i)
        )
    
    conn.commit()
    print(f"创建智能播放列表 '{playlist_name}',包含 {len(tracks)} 首歌曲")
    return playlist_id

# 示例:创建"2010年代快乐工作"播放列表
# conn = sqlite3.connect('music_library.db')
# create_smart_playlist(conn, "2010年代快乐工作", {
#     'moods': ['Happy', 'Energetic'],
#     'years': [2010, 2019],
#     'min_rating': 3
# })

6.3 导出和分享

将你的音乐记忆库导出为各种格式:

def export_playlist(conn, playlist_id, format='m3u'):
    """导出播放列表"""
    cursor = conn.cursor()
    
    cursor.execute("""
        SELECT t.file_path, t.title, ar.name, a.title
        FROM PlaylistTracks pt
        JOIN Tracks t ON pt.track_id = t.track_id
        JOIN Albums a ON t.album_id = a.album_id
        JOIN Artists ar ON t.artist_id = ar.artist_id
        WHERE pt.playlist_id = ?
        ORDER BY pt.track_order
    """, (playlist_id,))
    
    tracks = cursor.fetchall()
    
    if format == 'm3u':
        # M3U播放列表格式
        cursor.execute("SELECT name FROM Playlists WHERE playlist_id = ?", (playlist_id,))
        playlist_name = cursor.fetchone()[0]
        
        m3u_content = f"#EXTM3U\n#PLAYLIST:{playlist_name}\n"
        
        for file_path, title, artist, album in tracks:
            if file_path and os.path.exists(file_path):
                m3u_content += f"#EXTINF:-1,{artist} - {title}\n"
                m3u_content += f"{file_path}\n"
        
        filename = f"{playlist_name}.m3u"
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(m3u_content)
        
        print(f"已导出M3U播放列表: {filename}")
    
    elif format == 'json':
        # JSON格式(用于备份或分享)
        playlist_data = {
            'tracks': [
                {
                    'title': title,
                    'artist': artist,
                    'album': album,
                    'file_path': file_path
                }
                for file_path, title, artist, album in tracks
            ]
        }
        
        import json
        filename = f"playlist_{playlist_id}.json"
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(playlist_data, f, ensure_ascii=False, indent=2)
        
        print(f"已导出JSON: {filename}")

# 示例
# conn = sqlite3.connect('music_library.db')
# export_playlist(conn, 1, 'm3u')
# export_playlist(conn, 1, 'json')

第七步:维护和更新

音乐库的维护是一个持续的过程,需要建立定期维护机制。

7.1 定期备份策略

import sqlite3
import shutil
from datetime import datetime

def backup_music_library(db_path, backup_dir):
    """备份数据库和重要数据"""
    if not os.path.exists(backup_dir):
        os.makedirs(backup_dir)
    
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    # 备份数据库
    db_backup = os.path.join(backup_dir, f"music_library_{timestamp}.db")
    shutil.copy2(db_path, db_backup)
    
    # 备份播放列表(JSON)
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute("SELECT playlist_id, name FROM Playlists")
    playlists = cursor.fetchall()
    
    for playlist_id, name in playlists:
        export_playlist(conn, playlist_id, 'json')
        json_file = f"playlist_{playlist_id}.json"
        if os.path.exists(json_file):
            shutil.move(json_file, backup_dir)
    
    # 备份标签和统计数据
    cursor.execute("SELECT * FROM Tags")
    tags = cursor.fetchall()
    
    import json
    with open(os.path.join(backup_dir, f"tags_{timestamp}.json"), 'w') as f:
        json.dump([dict(row) for row in tags], f, default=str)
    
    conn.close()
    
    print(f"备份完成: {backup_dir}")

# 设置定期备份
def schedule_backup():
    """每周自动备份"""
    schedule.every().sunday.at("02:00").do(backup_music_library, 
                                           db_path='music_library.db',
                                           backup_dir='/path/to/backups')
    
    while True:
        schedule.run_pending()
        time.sleep(3600)  # 每小时检查一次

# 使用
# schedule_backup()

7.2 处理新音乐添加流程

建立标准流程来处理新获得的音乐:

def process_new_music_pipeline(file_path, db_path):
    """
    新音乐处理流程:
    1. 提取元数据
    2. 检查是否已存在
    3. 获取详细信息(MusicBrainz)
    4. 自动打标签
    5. 组织文件
    6. 添加到数据库
    """
    print(f"开始处理新音乐: {file_path}")
    
    # 步骤1: 提取基本信息
    audio = File(file_path)
    if not audio:
        print("无法读取文件")
        return
    
    title = audio.get('TIT2', [os.path.splitext(os.path.basename(file_path))[0]])[0]
    artist = audio.get('TPE1', ['Unknown Artist'])[0]
    album = audio.get('TALB', ['Unknown Album'])[0]
    
    # 步骤2: 检查是否已存在
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute("""
        SELECT t.track_id FROM Tracks t
        JOIN Albums a ON t.album_id = a.album_id
        JOIN Artists ar ON t.artist_id = ar.artist_id
        WHERE t.title = ? AND ar.name = ? AND a.title = ?
    """, (title, artist, album))
    
    if cursor.fetchone():
        print("该歌曲已存在于数据库中,跳过")
        conn.close()
        return
    
    # 步骤3: 获取MusicBrainz详细信息
    try:
        artist_info = search_musicbrainz(artist)
        if artist_info:
            # 更新艺术家信息
            cursor.execute("""
                UPDATE Artists SET country = ?, genre = ?
                WHERE name = ?
            """, (artist_info.get('country'), ','.join(artist_info.get('genres', [])), artist))
        
        album_info = get_album_details(artist_info['mbid'], album) if artist_info else None
    except Exception as e:
        print(f"MusicBrainz查询失败: {e}")
        album_info = None
    
    # 步骤4: 插入数据库
    # 插入艺术家
    cursor.execute("INSERT OR IGNORE INTO Artists (name) VALUES (?)", (artist,))
    cursor.execute("SELECT artist_id FROM Artists WHERE name = ?", (artist,))
    artist_id = cursor.fetchone()[0]
    
    # 插入专辑
    cursor.execute(
        "INSERT INTO Albums (title, artist_id, release_year, label) VALUES (?, ?, ?, ?)",
        (album, artist_id, 
         album_info.get('date', '')[:4] if album_info else None,
         album_info.get('label', '') if album_info else '')
    )
    album_id = cursor.lastrowid
    
    # 插入歌曲
    file_format = os.path.splitext(file_path)[1][1:].upper()
    cursor.execute("""
        INSERT INTO Tracks (title, album_id, artist_id, file_path, file_format)
        VALUES (?, ?, ?, ?, ?)
    """, (title, album_id, artist_id, file_path, file_format))
    track_id = cursor.lastrowid
    
    # 步骤5: 自动打标签
    # 流派标签
    if album_info and album_info.get('label'):
        # 简单的流派推断
        if 'Jazz' in album_info.get('label', ''):
            cursor.execute("INSERT OR IGNORE INTO Tags (name, type) VALUES (?, 'genre')", ('Jazz',))
            cursor.execute("SELECT tag_id FROM Tags WHERE name = 'Jazz'")
            tag_id = cursor.fetchone()[0]
            cursor.execute("INSERT INTO TrackTags (track_id, tag_id) VALUES (?, ?)", (track_id, tag_id))
    
    # 时代标签
    if album_info and album_info.get('date'):
        year = int(album_info['date'][:4])
        if 2010 <= year <= 2019:
            cursor.execute("INSERT OR IGNORE INTO Tags (name, type) VALUES (?, 'era')", ('2010s',))
            cursor.execute("SELECT tag_id FROM Tags WHERE name = '2010s'")
            tag_id = cursor.fetchone()[0]
            cursor.execute("INSERT INTO TrackTags (track_id, tag_id) VALUES (?, ?)", (track_id, tag_id))
    
    conn.commit()
    conn.close()
    
    print(f"处理完成: {artist} - {album} - {title}")
    
    # 步骤6: 组织文件(可选)
    # organize_music_files(db_path, '/path/to/music', '/path/to/backup')

# 使用示例
# process_new_music_pipeline('/path/to/new/song.mp3', 'music_library.db')

7.3 定期清理和优化

def cleanup_and_optimize(db_path):
    """清理无效记录和优化数据库"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # 查找没有关联歌曲的标签
    cursor.execute("""
        DELETE FROM Tags
        WHERE tag_id NOT IN (SELECT DISTINCT tag_id FROM TrackTags)
    """)
    
    # 查找没有关联歌曲的专辑
    cursor.execute("""
        DELETE FROM Albums
        WHERE album_id NOT IN (SELECT DISTINCT album_id FROM Tracks)
    """)
    
    # 查找没有关联专辑的艺术家
    cursor.execute("""
        DELETE FROM Artists
        WHERE artist_id NOT IN (SELECT DISTINCT artist_id FROM Albums)
    """)
    
    # 优化数据库
    cursor.execute("VACUUM")
    
    conn.commit()
    conn.close()
    
    print("数据库清理和优化完成")

# 每月运行一次
# schedule.every().month.do(cleanup_and_optimize, 'music_library.db')

第八步:可视化与展示

让你的音乐记忆库以更直观的方式呈现。

8.1 生成音乐统计报告

import matplotlib.pyplot as plt
import pandas as pd

def generate_stats_report(db_path):
    """生成音乐库统计报告"""
    conn = sqlite3.connect(db_path)
    
    # 艺术家分布
    artist_stats = pd.read_sql("""
        SELECT ar.name, COUNT(DISTINCT t.track_id) as track_count
        FROM Artists ar
        JOIN Tracks t ON ar.artist_id = t.artist_id
        GROUP BY ar.name
        ORDER BY track_count DESC
        LIMIT 10
    """, conn)
    
    # 年代分布
    era_stats = pd.read_sql("""
        SELECT Tags.name as era, COUNT(DISTINCT t.track_id) as track_count
        FROM TrackTags tt
        JOIN Tags ON tt.tag_id = Tags.tag_id
        JOIN Tracks t ON tt.track_id = t.track_id
        WHERE Tags.type = 'era'
        GROUP BY Tags.name
    """, conn)
    
    # 情绪分布
    mood_stats = pd.read_sql("""
        SELECT Tags.name as mood, COUNT(DISTINCT t.track_id) as track_count
        FROM TrackTags tt
        JOIN Tags ON tt.tag_id = Tags.tag_id
        JOIN Tracks t ON tt.track_id = t.track_id
        WHERE Tags.type = 'mood'
        GROUP BY Tags.name
    """, conn)
    
    # 创建图表
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('音乐库统计报告', fontsize=16)
    
    # 艺术家Top10
    if not artist_stats.empty:
        axes[0, 0].barh(artist_stats['name'], artist_stats['track_count'])
        axes[0, 0].set_title('歌曲数量最多的艺术家')
        axes[0, 0].invert_yaxis()
    
    # 年代分布
    if not era_stats.empty:
        axes[0, 1].pie(era_stats['track_count'], labels=era_stats['era'], autopct='%1.1f%%')
        axes[0, 1].set_title('年代分布')
    
    # 情绪分布
    if not mood_stats.empty:
        axes[1, 0].bar(mood_stats['mood'], mood_stats['track_count'])
        axes[1, 0].set_title('情绪分布')
        axes[1, 0].tick_params(axis='x', rotation=45)
    
    # 购买统计
    purchase_stats = pd.read_sql("""
        SELECT platform, COUNT(*) as purchase_count, SUM(price) as total_spent
        FROM Purchases
        GROUP BY platform
    """, conn)
    
    if not purchase_stats.empty:
        axes[1, 1].bar(purchase_stats['platform'], purchase_stats['total_spent'])
        axes[1, 1].set_title('各平台花费(美元)')
        axes[1, 1].tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.savefig('music_library_report.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    conn.close()
    print("统计报告已生成: music_library_report.png")

# 使用
# generate_stats_report('music_library.db')

8.2 创建Web界面(可选)

使用Flask创建简单的Web界面来浏览音乐库:

from flask import Flask, render_template_string, request
import sqlite3

app = Flask(__name__)
DB_PATH = 'music_library.db'

HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
    <title>我的音乐记忆库</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .search-box { margin-bottom: 20px; }
        .track { border: 1px solid #ddd; padding: 10px; margin: 5px 0; }
        .artist { color: #666; font-size: 0.9em; }
        .memory { background: #f0f8ff; padding: 5px; margin-top: 5px; }
        .tag { display: inline-block; background: #e0e0e0; padding: 2px 6px; border-radius: 3px; font-size: 0.8em; margin-right: 5px; }
    </style>
</head>
<body>
    <h1>🎵 我的音乐记忆库</h1>
    
    <div class="search-box">
        <form method="GET">
            <input type="text" name="q" placeholder="搜索歌曲、艺术家、专辑..." value="{{ query }}">
            <input type="submit" value="搜索">
        </form>
    </div>
    
    <div>共找到 {{ tracks|length }} 首歌曲</div>
    
    {% for track in tracks %}
    <div class="track">
        <strong>{{ track.title }}</strong>
        <div class="artist">{{ track.artist }} - {{ track.album }}</div>
        
        {% if track.tags %}
        <div>
            {% for tag in track.tags.split(',') %}
            <span class="tag">{{ tag }}</span>
            {% endfor %}
        </div>
        {% endif %}
        
        {% if track.memories %}
        <div class="memory">💡 {{ track.memories }}</div>
        {% endif %}
        
        {% if track.rating %}
        <div>⭐ {{ track.rating }}/5</div>
        {% endif %}
    </div>
    {% endfor %}
</body>
</html>
"""

@app.route('/')
def index():
    query = request.args.get('q', '')
    
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    if query:
        # 搜索功能
        cursor.execute("""
            SELECT DISTINCT t.title, ar.name as artist, a.title as album,
                   GROUP_CONCAT(DISTINCT Tags.name) as tags,
                   r.memories, r.rating
            FROM Tracks t
            JOIN Albums a ON t.album_id = a.album_id
            JOIN Artists ar ON t.artist_id = ar.artist_id
            LEFT JOIN TrackTags tt ON t.track_id = tt.track_id
            LEFT JOIN Tags ON tt.tag_id = Tags.tag_id
            LEFT JOIN Reviews r ON t.track_id = r.track_id
            WHERE t.title LIKE ? OR ar.name LIKE ? OR a.title LIKE ?
            GROUP BY t.track_id
            LIMIT 50
        """, (f'%{query}%', f'%{query}%', f'%{query}%'))
    else:
        # 显示最近添加的
        cursor.execute("""
            SELECT DISTINCT t.title, ar.name as artist, a.title as album,
                   GROUP_CONCAT(DISTINCT Tags.name) as tags,
                   r.memories, r.rating
            FROM Tracks t
            JOIN Albums a ON t.album_id = a.album_id
            JOIN Artists ar ON t.artist_id = ar.artist_id
            LEFT JOIN TrackTags tt ON t.track_id = tt.track_id
            LEFT JOIN Tags ON tt.tag_id = Tags.tag_id
            LEFT JOIN Reviews r ON t.track_id = r.track_id
            GROUP BY t.track_id
            ORDER BY t.track_id DESC
            LIMIT 50
        """)
    
    tracks = cursor.fetchall()
    conn.close()
    
    # 转换为字典列表
    track_list = []
    for row in tracks:
        track_list.append({
            'title': row[0],
            'artist': row[1],
            'album': row[2],
            'tags': row[3],
            'memories': row[4],
            'rating': row[5]
        })
    
    return render_template_string(HTML_TEMPLATE, tracks=track_list, query=query)

if __name__ == '__main__':
    # 运行前确保有数据
    # app.run(debug=True, host='127.0.0.1', port=5000)
    print("Web界面已准备就绪,取消注释即可运行")

总结

通过以上八个步骤,你已经建立了一个完整的个人音乐记忆库系统。这个系统不仅能解决重复购买和收藏混乱的问题,还能让你的音乐收藏变得有序、可检索、充满个人情感价值。

核心要点回顾

  1. 诊断现状:全面了解你的音乐分布在哪些平台,识别重复购买
  2. 建立数据库:使用SQLite创建统一的音乐信息中心
  3. 处理重复:评估不同版本价值,制定保留策略
  4. 标签系统:建立多维度标签体系,便于检索
  5. 自动化工具:使用脚本自动整理文件、修复元数据
  6. 添加回忆:为音乐注入个人情感和记忆
  7. 持续维护:建立定期备份和更新机制
  8. 可视化展示:通过图表和Web界面展示你的音乐库

下一步行动建议

  • 从第一步开始,逐步实施每个步骤
  • 根据你的技术能力选择合适的工具(代码 vs 图形界面)
  • 不要追求完美,先建立基础框架,再逐步完善
  • 定期回顾和调整你的整理策略

记住,音乐整理是一个持续的过程,而不是一次性任务。最重要的是享受这个过程,让每一首歌都成为你人生故事的一部分。