引言:为什么选择豆瓣电影作为爬虫练习对象

豆瓣电影作为中国最大的电影社区和评分网站,拥有海量的电影数据资源,包括电影基本信息、评分、评论、演员表等。对于爬虫学习者来说,豆瓣电影是一个理想的练习对象,原因如下:

  1. 数据结构清晰:豆瓣电影页面布局规范,信息组织有序,便于解析和提取
  2. 反爬策略典型:豆瓣具有完善的反爬机制,包括User-Agent检测、频率限制、IP封禁、验证码等,能全面锻炼爬虫应对能力
  3. 数据价值高:电影数据可用于数据分析、推荐系统、可视化等多种应用场景
  4. 法律风险相对较低:相比其他商业网站,豆瓣对爬虫的容忍度相对较高(但仍需遵守规则)

本教程将从零开始,带你一步步构建一个功能完善的豆瓣电影爬虫,重点解决反爬问题,让你在实践中掌握网络爬虫的核心技术。

环境准备与基础配置

必要工具与库安装

在开始编写爬虫之前,我们需要准备以下Python环境和库:

# 核心爬虫库
pip install requests==2.31.0
# HTML解析库
pip install beautifulsoup4==4.12.2
# 高级解析库(可选,但推荐)
pip install lxml==4.9.3
# 数据处理库
pip install pandas==2.0.3
# 用于IP代理和用户代理
pip install fake-useragent==1.4.0
# 用于处理验证码(OCR)
pip install pytesseract==0.3.10
# 用于IP代理池
pip install requests-html==0.10.0
# 用于异步请求(可选)
pip install aiohttp==3.8.5

项目结构设计

一个良好的项目结构有助于代码维护和扩展:

douban_movie_scraper/
├── config/                  # 配置文件目录
│   ├── __init__.py
│   ├── settings.py          # 全局设置
│   └── user_agents.py       # User-Agent池
├── utils/                   # 工具函数目录
│   ├── __init__.py
│   ├── proxy.py             # 代理管理
│   ├── captcha.py           # 验证码处理
│   ├── logger.py            # 日志记录
│   └── storage.py           # 数据存储
├── spiders/                 # 爬虫核心目录
│   ├── __init__.py
│   ├── movie_spider.py      # 电影爬虫主类
│   └── anti_spider.py       # 反爬策略处理
├── data/                    # 数据存储目录
│   └── movies.csv           # 爬取结果
└── main.py                  # 主程序入口

基础配置文件

创建 config/settings.py

# config/settings.py
import os

# 基础配置
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_DIR = os.path.join(BASE_DIR, 'data')

# 请求配置
REQUEST_TIMEOUT = 10  # 请求超时时间(秒)
REQUEST_RETRY = 3     # 失败重试次数
REQUEST_INTERVAL = 2  # 请求间隔(秒)

# 代理配置
PROXY_ENABLED = True  # 是否启用代理
PROXY_POOL_SIZE = 5   # 代理池大小

# 验证码配置
CAPTCHA_ENABLED = True  # 是否处理验证码
CAPTCHA_API_KEY = None  # 打码平台API密钥(如有)

# 日志配置
LOG_LEVEL = 'INFO'
LOG_FILE = os.path.join(BASE_DIR, 'scraper.log')

# 数据存储配置
STORAGE_FORMAT = 'csv'  # 支持 csv, json, excel

豆瓣电影页面结构分析

电影列表页结构分析

豆瓣电影列表页(如Top250)的HTML结构具有以下特点:

  1. 电影列表容器<div class="grid_view"><ol class="grid_view">
  2. 单个电影项<li> 标签,包含电影序号、标题、详情链接等信息
  3. 电影标题<span class="title"> 标签,包含电影名称
  4. 详情链接<a> 标签的 href 属性,指向电影详情页

电影详情页结构分析

电影详情页包含更丰富的信息:

  1. 基本信息:电影标题、导演、编剧、演员等
  2. 评分信息:豆瓣评分、评分人数
  3. 电影简介:剧情简介
  4. 标签:电影类型标签
  5. 剧照:电影图片链接

分析页面结构的工具推荐

  1. 浏览器开发者工具(F12):查看HTML结构和网络请求
  2. XPath Helper插件:测试XPath表达式
  3. BeautifulSoup的prettify()方法:格式化HTML查看结构

基础爬虫实现:抓取电影列表

创建电影爬虫类

首先创建一个基础的爬虫类,处理请求和解析:

# spiders/movie_spider.py
import requests
from bs4 import BeautifulSoup
import time
import random
from config.settings import REQUEST_TIMEOUT, REQUEST_RETRY, REQUEST_INTERVAL
from config.user_agents import get_random_user_agent
from utils.logger import get_logger

logger = get_logger(__name__)

class MovieSpider:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': get_random_user_agent(),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        })
    
    def get_page(self, url, retry_count=0):
        """获取页面内容,带重试机制"""
        if retry_count >= REQUEST_RETRY:
            logger.error(f"请求失败,已重试{REQUEST_RETRY}次: {url}")
            return None
        
        try:
            # 添加随机延迟,避免请求过快
            time.sleep(random.uniform(REQUEST_INTERVAL, REQUEST_INTERVAL + 1))
            
            response = self.session.get(url, timeout=REQUEST_TIMEOUT)
            response.raise_for_status()
            
            # 检查是否被反爬
            if self._is_blocked(response):
                logger.warning("检测到反爬措施,可能需要验证码或IP被限制")
                return self._handle_blocking(response, url, retry_count)
            
            logger.info(f"成功获取页面: {url}")
            return response.text
            
        except requests.exceptions.RequestException as e:
            logger.error(f"请求异常: {e}")
            return self.get_page(url, retry_count + 1)
    
    def _is_blocked(self, response):
        """检测是否被反爬"""
        # 检查状态码
        if response.status_code == 403:
            return True
        
        # 检查响应内容是否包含反爬提示
        blocked_indicators = [
            '访问过于频繁',
            '验证码',
            '您的IP',
            '403 Forbidden',
            'blocked'
        ]
        
        content = response.text
        for indicator in blocked_indicators:
            if indicator in content:
                return True
        
        return False
    
    def _handle_blocking(self, response, url, retry_count):
        """处理反爬情况"""
        # 这里可以添加验证码处理、IP切换等逻辑
        logger.warning("触发反爬,等待更长时间后重试...")
        time.sleep(10)  # 长时间等待
        return self.get_page(url, retry_count + 1)

抓取Top250电影列表

# spiders/movie_spider.py (续)
class MovieSpider:
    # ... 前面的代码 ...
    
    def get_top250_movies(self, start=0, limit=250):
        """抓取Top250电影列表"""
        base_url = "https://movie.douban.com/top250"
        movies = []
        
        # 分批抓取,每次25条
        for offset in range(0, limit, 25):
            url = f"{base_url}?start={offset}"
            logger.info(f"正在抓取第 {offset//25 + 1} 页...")
            
            html = self.get_page(url)
            if not html:
                continue
            
            movies.extend(self._parse_movie_list(html))
        
        return movies
    
    def _parse_movie_list(self, html):
        """解析电影列表页"""
        soup = BeautifulSoup(html, 'lxml')
        movie_list = []
        
        # 查找所有电影项
        items = soup.find_all('div', class_='item')
        
        for item in items:
            try:
                # 电影序号
                serial_number = item.find('em').get_text()
                
                # 电影标题
                title = item.find('span', class_='title').get_text()
                
                # 详情链接
                detail_url = item.find('a')['href']
                
                # 电影评分
                rating = item.find('span', class_='rating_num').get_text()
                
                # 评分人数
                rating_count = item.find('div', class_='star').find_all('span')[-1].get_text().replace('人评价', '')
                
                # 简介(如果有)
                quote_tag = item.find('span', class_='inq')
                quote = quote_tag.get_text() if quote_tag else ''
                
                movie_info = {
                    'serial_number': serial_number,
                    'title': title,
                    'detail_url': detail_url,
                    'rating': rating,
                    'rating_count': rating_count,
                    'quote': quote
                }
                
                movie_list.append(movie_info)
                
            except Exception as e:
                logger.error(f"解析电影项失败: {e}")
                continue
        
        return movie_list

电影详情页数据抓取

解析详情页信息

# spiders/movie_spider.py (续)
class MovieSpider:
    # ... 前面的代码 ...
    
    def get_movie_detail(self, detail_url):
        """获取电影详情页信息"""
        html = self.get_page(detail_url)
        if not html:
            return None
        
        return self._parse_movie_detail(html, detail_url)
    
    def _parse_movie_detail(self, html, detail_url):
        """解析电影详情页"""
        soup = BeautifulSoup(html, 'lxml')
        detail_info = {}
        
        try:
            # 电影标题(可能有多个,如中英文)
            title_tags = soup.find_all('span', property='v:itemreviewed')
            if title_tags:
                detail_info['title'] = title_tags[0].get_text()
            
            # 导演、编剧、演员信息
            info = soup.find('div', id='info')
            if info:
                # 提取所有信息行
                info_text = info.get_text()
                # 简单的键值对解析
                if '导演:' in info_text:
                    detail_info['director'] = info_text.split('导演:')[1].split('\n')[0].strip()
                if '编剧:' in info_text:
                    detail_info['writer'] = info_text.split('编剧:')[1].split('\n')[0].strip()
                if '主演:' in info_text:
                    detail_info['actors'] = info_text.split('主演:')[1].split('\n')[0].strip()
                if '类型:' in info_text:
                    detail_info['genre'] = info_text.split('类型:')[1].split('\n')[0].strip()
                if '制片国家/地区:' in info_text:
                    detail_info['country'] = info_text.split('制片国家/地区:')[1].split('\n')[0].strip()
                if '语言:' in info_text:
                    detail_info['language'] = info_text.split('语言:')[1].split('\n')[0].strip()
                if '上映日期:' in info_text:
                    detail_info['release_date'] = info_text.split('上映日期:')[1].split('\n')[0].strip()
                if '片长:' in info_text:
                    detail_info['duration'] = info_text.split('片长:')[1].split('\n')[0].strip()
            
            # 评分和评价人数
            rating_tag = soup.find('strong', class_='ll rating_num')
            if rating_tag:
                detail_info['rating'] = rating_tag.get_text()
            
            rating_count_tag = soup.find('a', href=lambda x: x and 'reviews' in x)
            if rating_count_tag:
                detail_info['rating_count'] = rating_count_tag.get_text().replace('人评价', '')
            
            # 剧情简介
            intro_tag = soup.find('span', property='v:summary')
            if intro_tag:
                detail_info['intro'] = intro_tag.get_text().strip()
            
            # 标签
            tags = soup.find_all('span', property='v:genre')
            if tags:
                detail_info['tags'] = [tag.get_text() for tag in tags]
            
            # 剧照链接(取第一张)
            pic_tag = soup.find('a', class_='nbgnbg')
            if pic_tag:
                img_tag = pic_tag.find('img')
                if img_tag:
                    detail_info['cover_url'] = img_tag.get('src')
            
            # 详情页URL
            detail_info['detail_url'] = detail_url
            
        except Exception as e:
            logger.error(f"解析详情页失败: {e}")
            return None
        
        return detail_info

批量抓取详情数据

# spiders/movie_spider.py (续)
class MovieSpider:
    # ... 前面的代码 ...
    
    def get_movies_details(self, movie_list):
        """批量获取电影详情"""
        detailed_movies = []
        
        for i, movie in enumerate(movie_list):
            logger.info(f"正在抓取详情: {i+1}/{len(movie_list)} - {movie['title']}")
            
            detail = self.get_movie_detail(movie['detail_url'])
            if detail:
                # 合并基本信息和详情信息
                merged_movie = {**movie, **detail}
                detailed_movies.append(merged_movie)
            
            # 每抓取10个详情,稍作休息
            if (i + 1) % 10 == 0:
                logger.info("休息一下,避免请求过快...")
                time.sleep(random.uniform(3, 5))
        
        return detailed_movies

反爬策略与应对方法

1. User-Agent轮换

# config/user_agents.py
from fake_useragent import UserAgent

def get_random_user_agent():
    """获取随机User-Agent"""
    try:
        ua = UserAgent()
        return ua.random
    except:
        # 备用User-Agent列表
        fallback_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15"
        ]
        return random.choice(fallback_agents)

# 在爬虫中动态更新User-Agent
class MovieSpider:
    def __init__(self):
        # ... 其他初始化 ...
        self.update_user_agent()
    
    def update_user_agent(self):
        """更新请求头中的User-Agent"""
        self.session.headers['User-Agent'] = get_random_user_agent()
        logger.info(f"User-Agent已更新: {self.session.headers['User-Agent'][:50]}...")

2. 请求频率控制

# utils/rate_limiter.py
import time
import random
from collections import deque
from threading import Lock

class RateLimiter:
    """令牌桶算法实现频率限制"""
    def __init__(self, max_requests=10, time_window=60):
        self.max_requests = max_requests  # 时间窗口内最大请求数
        self.time_window = time_window    # 时间窗口(秒)
        self.requests = deque()           # 存储请求时间戳
        self.lock = Lock()
    
    def acquire(self):
        """获取请求许可"""
        with self.lock:
            now = time.time()
            
            # 清理过期的请求记录
            while self.requests and self.requests[0] < now - self.time_window:
                self.requests.popleft()
            
            # 检查是否超过限制
            if len(self.requests) >= self.max_requests:
                # 计算需要等待的时间
                oldest_request = self.requests[0]
                wait_time = self.time_window - (now - oldest_request)
                logger.warning(f"请求频率过高,需要等待 {wait_time:.2f} 秒")
                time.sleep(wait_time + random.uniform(0.5, 1.5))
                return self.acquire()  # 递归调用,重新检查
            
            # 记录当前请求
            self.requests.append(now)
            
            # 添加随机延迟
            delay = random.uniform(1, 2)
            time.sleep(delay)

# 在爬虫中使用
class MovieSpider:
    def __init__(self):
        # ... 其他初始化 ...
        self.rate_limiter = RateLimiter(max_requests=8, time_window=60)
    
    def get_page(self, url, retry_count=0):
        """改进的get_page,加入频率限制"""
        self.rate_limiter.acquire()  # 获取请求许可
        
        # ... 原有请求逻辑 ...

3. IP代理池实现

# utils/proxy.py
import random
import requests
from threading import Lock
from config.settings import PROXY_ENABLED, PROXY_POOL_SIZE

class ProxyManager:
    """IP代理池管理器"""
    def __init__(self):
        self.proxies = []
        self.lock = Lock()
        self.current_proxy = None
        self.proxy_fail_count = {}  # 记录代理失败次数
    
    def add_proxy(self, proxy):
        """添加代理到池中"""
        with self.lock:
            if proxy not in self.proxies:
                self.proxies.append(proxy)
                self.proxy_fail_count[proxy] = 0
    
    def get_proxy(self):
        """获取一个可用代理"""
        with self.lock:
            if not self.proxies:
                return None
            
            # 优先选择失败次数少的代理
            available_proxies = [p for p in self.proxies if self.proxy_fail_count[p] < 3]
            if not available_proxies:
                return None
            
            proxy = random.choice(available_proxies)
            self.current_proxy = proxy
            return proxy
    
    def mark_proxy_failed(self, proxy):
        """标记代理失败"""
        with self.lock:
            if proxy in self.proxy_fail_count:
                self.proxy_fail_count[proxy] += 1
                # 如果失败次数过多,从池中移除
                if self.proxy_fail_count[proxy] >= 5:
                    self.proxies.remove(proxy)
                    del self.proxy_fail_count[proxy]
                    logger.warning(f"代理 {proxy} 失败次数过多,已从池中移除")
    
    def mark_proxy_success(self, proxy):
        """标记代理成功"""
        with self.lock:
            if proxy in self.proxy_fail_count:
                # 成功一次减少一次失败计数
                if self.proxy_fail_count[proxy] > 0:
                    self.proxy_fail_count[proxy] -= 1

# 代理验证函数
def validate_proxy(proxy):
    """验证代理是否可用"""
    test_url = "https://movie.douban.com/"
    try:
        response = requests.get(
            test_url,
            proxies={'http': proxy, 'https': proxy},
            timeout=5
        )
        return response.status_code == 200
    except:
        return False

# 在爬虫中集成代理
class MovieSpider:
    def __init__(self, use_proxy=False):
        # ... 其他初始化 ...
        self.use_proxy = use_proxy
        self.proxy_manager = ProxyManager()
        
        # 如果启用代理,可以预先添加一些代理
        if use_proxy:
            self._init_proxies()
    
    def _init_proxies(self):
        """初始化代理池"""
        # 这里可以添加从免费代理网站获取代理的逻辑
        # 或者从付费代理API获取
        # 示例:
        # proxies = ["http://123.45.67.89:8080", "http://98.76.54.32:3128"]
        # for proxy in proxies:
        #     if validate_proxy(proxy):
        #         self.proxy_manager.add_proxy(proxy)
        pass
    
    def get_page(self, url, retry_count=0):
        """改进的get_page,支持代理"""
        if retry_count >= REQUEST_RETRY:
            logger.error(f"请求失败,已重试{REQUEST_RETRY}次: {url}")
            return None
        
        try:
            self.rate_limiter.acquire()
            
            # 准备请求参数
            kwargs = {
                'timeout': REQUEST_TIMEOUT,
                'headers': self.session.headers
            }
            
            # 如果启用代理,设置代理
            if self.use_proxy:
                proxy = self.proxy_manager.get_proxy()
                if proxy:
                    kwargs['proxies'] = {'http': proxy, 'https': proxy}
                    logger.debug(f"使用代理: {proxy}")
            
            response = self.session.get(url, **kwargs)
            response.raise_for_status()
            
            # 检查是否被反爬
            if self._is_blocked(response):
                # 标记当前代理失败(如果使用了代理)
                if self.use_proxy and self.proxy_manager.current_proxy:
                    self.proxy_manager.mark_proxy_failed(self.proxy_manager.current_proxy)
                
                logger.warning("检测到反爬措施,触发处理逻辑")
                return self._handle_blocking(response, url, retry_count)
            
            # 标记代理成功
            if self.use_proxy and self.proxy_manager.current_proxy:
                self.proxy_manager.mark_proxy_success(self.proxy_manager.current_proxy)
            
            logger.info(f"成功获取页面: {url}")
            return response.text
            
        except requests.exceptions.ProxyError as e:
            logger.error(f"代理错误: {e}")
            if self.use_proxy and self.proxy_manager.current_proxy:
                self.proxy_manager.mark_proxy_failed(self.proxy_manager.current_proxy)
            return self.get_page(url, retry_count + 1)
            
        except requests.exceptions.RequestException as e:
            logger.error(f"请求异常: {e}")
            return self.get_page(url, retry_count + 1)

4. 验证码识别与处理

# utils/captcha.py
import pytesseract
from PIL import Image
import io
import requests
import time

class CaptchaSolver:
    """验证码处理器"""
    def __init__(self, api_key=None):
        self.api_key = api_key  # 打码平台API密钥
    
    def solve_image_captcha(self, image_url):
        """识别图片验证码"""
        try:
            # 下载验证码图片
            response = requests.get(image_url, timeout=10)
            image = Image.open(io.BytesIO(response.content))
            
            # 图片预处理(提高识别率)
            image = self._preprocess_image(image)
            
            # 使用Tesseract OCR识别
            text = pytesseract.image_to_string(image, lang='chi_sim+eng')
            
            # 清理识别结果
            captcha_text = self._clean_text(text)
            
            logger.info(f"验证码识别结果: {captcha_text}")
            return captcha_text
            
        except Exception as e:
            logger.error(f"验证码识别失败: {e}")
            return None
    
    def _preprocess_image(self, image):
        """图片预处理"""
        # 转换为灰度图
        if image.mode != 'L':
            image = image.convert('L')
        
        # 二值化处理
        threshold = 128
        table = []
        for i in range(256):
            if i < threshold:
                table.append(0)
            else:
                table.append(1)
        image = image.point(table, '1')
        
        return image
    
    def _clean_text(self, text):
        """清理识别结果"""
        # 移除换行符和空格
        text = text.replace('\n', '').replace(' ', '')
        # 只保留字母和数字
        import re
        text = re.sub(r'[^a-zA-Z0-9]', '', text)
        return text
    
    def solve_by_hand(self, image_url):
        """手动解决验证码(用于调试)"""
        print(f"请访问以下URL查看验证码并输入: {image_url}")
        captcha_text = input("请输入验证码: ")
        return captcha_text.strip()

# 在爬虫中集成验证码处理
class MovieSpider:
    def __init__(self, use_proxy=False, auto_captcha=False):
        # ... 其他初始化 ...
        self.auto_captcha = auto_captcha
        self.captcha_solver = CaptchaSolver()
    
    def _handle_blocking(self, response, url, retry_count):
        """处理反爬情况(增强版)"""
        content = response.text
        
        # 检查是否需要验证码
        if '验证码' in content or 'captcha' in content:
            logger.warning("检测到验证码要求")
            
            # 尝试提取验证码图片URL
            captcha_url = self._extract_captcha_url(response)
            if captcha_url:
                if self.auto_captcha:
                    # 自动识别
                    captcha_text = self.captcha_solver.solve_image_captcha(captcha_url)
                    if captcha_text:
                        # 提交验证码并重试请求
                        return self._submit_captcha_and_retry(captcha_text, url, retry_count)
                else:
                    # 手动输入
                    captcha_text = self.captcha_solver.solve_by_hand(captcha_url)
                    if captcha_text:
                        return self._submit_captcha_and_retry(captcha_text, url, retry_count)
        
        # IP限制,切换代理或长时间等待
        if 'IP' in content or '403' in str(response.status_code):
            logger.warning("IP被限制,切换代理或等待...")
            
            # 如果有代理,切换代理
            if self.use_proxy:
                if self.proxy_manager.current_proxy:
                    self.proxy_manager.mark_proxy_failed(self.proxy_manager.current_proxy)
                # 等待后重试
                time.sleep(30)
                return self.get_page(url, retry_count + 1)
            else:
                # 长时间等待
                wait_time = random.uniform(60, 120)
                logger.info(f"等待 {wait_time:.2f} 秒后重试...")
                time.sleep(wait_time)
                return self.get_page(url, retry_count + 1)
        
        # 其他情况,长时间等待后重试
        wait_time = random.uniform(20, 40)
        logger.info(f"触发反爬,等待 {wait_time:.2f} 秒后重试...")
        time.sleep(wait_time)
        return self.get_page(url, retry_count + 1)
    
    def _extract_captcha_url(self, response):
        """从响应中提取验证码图片URL"""
        # 这里需要根据实际页面结构提取
        # 示例:查找包含captcha的img标签
        soup = BeautifulSoup(response.text, 'lxml')
        captcha_img = soup.find('img', id=lambda x: x and 'captcha' in str(x))
        if captcha_img and captcha_img.get('src'):
            captcha_url = captcha_img.get('src')
            # 如果是相对URL,转换为绝对URL
            if captcha_url.startswith('/'):
                return f"https://movie.douban.com{captcha_url}"
            return captcha_url
        return None
    
    def _submit_captcha_and_retry(self, captcha_text, original_url, retry_count):
        """提交验证码并重试请求"""
        # 这里需要根据实际页面的验证码提交逻辑实现
        # 通常验证码会作为参数附加到请求中
        logger.info(f"提交验证码: {captcha_text}")
        # 实际实现需要根据具体页面结构调整
        # 例如:添加验证码参数到session或URL
        return self.get_page(original_url, retry_count + 1)

数据存储与管理

数据存储模块

# utils/storage.py
import pandas as pd
import json
import os
from config.settings import STORAGE_FORMAT, DATA_DIR

class DataStorage:
    """数据存储管理器"""
    def __init__(self, format_type=STORAGE_FORMAT):
        self.format_type = format_type
        if not os.path.exists(DATA_DIR):
            os.makedirs(DATA_DIR)
    
    def save(self, data, filename):
        """保存数据"""
        if not data:
            logger.warning("没有数据需要保存")
            return
        
        filepath = os.path.join(DATA_DIR, filename)
        
        if self.format_type == 'csv':
            self._save_csv(data, filepath)
        elif self.format_type == 'json':
            self._save_json(data, filepath)
        elif self.format_type == 'excel':
            self._save_excel(data, filepath)
        else:
            raise ValueError(f"不支持的存储格式: {self.format_type}")
        
        logger.info(f"数据已保存到: {filepath}")
    
    def _save_csv(self, data, filepath):
        """保存为CSV格式"""
        df = pd.DataFrame(data)
        df.to_csv(filepath, index=False, encoding='utf-8-sig')
    
    def _save_json(self, data, filepath):
        """保存为JSON格式"""
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    
    def _save_excel(self, data, filepath):
        """保存为Excel格式"""
        df = pd.DataFrame(data)
        df.to_excel(filepath, index=False)

    def load(self, filename):
        """加载数据"""
        filepath = os.path.join(DATA_DIR, filename)
        if not os.path.exists(filepath):
            return None
        
        if self.format_type == 'csv':
            return pd.read_csv(filepath).to_dict('records')
        elif self.format_type == 'json':
            with open(filepath, 'r', encoding='utf-8') as f:
                return json.load(f)
        elif self.format_type == 'excel':
            return pd.read_excel(filepath).to_dict('records')

在爬虫中集成数据存储

# spiders/movie_spider.py (续)
class MovieSpider:
    def __init__(self, use_proxy=False, auto_captcha=False):
        # ... 其他初始化 ...
        self.storage = DataStorage()
    
    def run(self, mode='top250'):
        """运行爬虫主函数"""
        logger.info(f"开始运行豆瓣电影爬虫,模式: {mode}")
        
        if mode == 'top250':
            # 1. 获取电影列表
            movie_list = self.get_top250_movies()
            logger.info(f"获取到 {len(movie_list)} 部电影基本信息")
            
            # 2. 获取电影详情
            detailed_movies = self.get_movies_details(movie_list)
            logger.info(f"获取到 {len(detailed_movies)} 部电影详情")
            
            # 3. 保存数据
            if detailed_movies:
                self.storage.save(detailed_movies, 'douban_top250_movies.csv')
                logger.info("数据抓取完成!")
            else:
                logger.error("未获取到有效数据")
        
        elif mode == 'search':
            # 可以扩展搜索模式
            pass

完整代码整合与主程序

主程序入口

# main.py
import sys
import argparse
from spiders.movie_spider import MovieSpider
from config.settings import PROXY_ENABLED, CAPTCHA_ENABLED
from utils.logger import get_logger

logger = get_logger(__name__)

def main():
    """主函数"""
    parser = argparse.ArgumentParser(description='豆瓣电影爬虫')
    parser.add_argument('--mode', type=str, default='top250', 
                       choices=['top250', 'search'],
                       help='爬取模式: top250 (Top250电影), search (搜索)')
    parser.add_argument('--proxy', action='store_true', 
                       default=PROXY_ENABLED,
                       help='启用代理')
    parser.add_argument('--auto-captcha', action='store_true',
                       default=CAPTCHA_ENABLED,
                       help='自动处理验证码')
    parser.add_argument('--limit', type=int, default=250,
                       help='抓取数量限制(仅top250模式)')
    
    args = parser.parse_args()
    
    try:
        # 创建爬虫实例
        spider = MovieSpider(
            use_proxy=args.proxy,
            auto_captcha=args.auto_captcha
        )
        
        # 运行爬虫
        if args.mode == 'top250':
            # 获取电影列表
            movie_list = spider.get_top250_movies(limit=args.limit)
            logger.info(f"获取到 {len(movie_list)} 部电影基本信息")
            
            # 获取电影详情
            detailed_movies = spider.get_movies_details(movie_list)
            logger.info(f"获取到 {len(detailed_movies)} 部电影详情")
            
            # 保存数据
            if detailed_movies:
                spider.storage.save(detailed_movies, f'douban_top{args.limit}_movies.csv')
                logger.info("数据抓取完成!")
            else:
                logger.error("未获取到有效数据")
        
        elif args.mode == 'search':
            logger.error("搜索模式暂未实现")
            sys.exit(1)
            
    except KeyboardInterrupt:
        logger.info("用户中断操作")
        sys.exit(0)
    except Exception as e:
        logger.error(f"程序运行出错: {e}")
        sys.exit(1)

if __name__ == '__main__':
    main()

日志工具

# utils/logger.py
import logging
import os
from config.settings import LOG_LEVEL, LOG_FILE

def get_logger(name):
    """获取日志记录器"""
    # 创建日志目录
    log_dir = os.path.dirname(LOG_FILE)
    if log_dir and not os.path.exists(log_dir):
        os.makedirs(log_dir)
    
    # 配置日志格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(getattr(logging, LOG_LEVEL))
    console_handler.setFormatter(formatter)
    
    # 文件处理器
    file_handler = logging.FileHandler(LOG_FILE, encoding='utf-8')
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    
    # 创建日志器
    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

高级技巧:应对复杂反爬

1. 动态User-Agent和Headers

# config/headers.py
def get_dynamic_headers():
    """生成动态请求头"""
    return {
        'User-Agent': get_random_user_agent(),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Cache-Control': 'max-age=0',
        'Referer': 'https://movie.douban.com/top250',
    }

# 在爬虫中定期更新headers
class MovieSpider:
    def _update_headers_periodically(self):
        """定期更新请求头"""
        # 可以设置定时器或每N次请求更新一次
        self.session.headers.update(get_dynamic_headers())

2. 使用会话保持和Cookies

class MovieSpider:
    def __init__(self, use_proxy=False, auto_captcha=False):
        # ... 其他初始化 ...
        self.session = requests.Session()
        # 可以预先加载一些cookies,模拟真实用户
        self._init_cookies()
    
    def _init_cookies(self):
        """初始化Cookies"""
        # 可以从文件加载cookies
        # 或者手动设置一些cookies
        cookies = {
            # 示例:'bid': 'some_value',
            # 'dbcl2': 'some_value',
        }
        for k, v in cookies.items():
            self.session.cookies.set(k, v)

3. 异步爬虫(提高效率)

# spiders/async_spider.py
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import time
import random

class AsyncMovieSpider:
    """异步爬虫版本"""
    def __init__(self, use_proxy=False):
        self.use_proxy = use_proxy
        self.proxy_manager = ProxyManager() if use_proxy else None
        self.rate_limiter = RateLimiter(max_requests=10, time_window=60)
    
    async def fetch(self, session, url):
        """异步获取页面"""
        await self.rate_limiter.acquire_async()  # 异步频率限制
        
        proxy = None
        if self.use_proxy and self.proxy_manager:
            proxy = self.proxy_manager.get_proxy()
        
        try:
            async with session.get(url, proxy=proxy, timeout=10) as response:
                if response.status == 200:
                    return await response.text()
                else:
                    logger.error(f"请求失败: {url}, 状态码: {response.status}")
                    return None
        except Exception as e:
            logger.error(f"请求异常: {url}, 错误: {e}")
            return None
    
    async def get_top250_movies_async(self):
        """异步抓取Top250"""
        async with aiohttp.ClientSession() as session:
            # 添加动态headers
            session.headers.update(get_dynamic_headers())
            
            # 创建任务列表
            tasks = []
            for offset in range(0, 250, 25):
                url = f"https://movie.douban.com/top250?start={offset}"
                tasks.append(self.fetch(session, url))
            
            # 并发执行
            results = await asyncio.gather(*tasks)
            
            # 解析结果
            movies = []
            for html in results:
                if html:
                    movies.extend(self._parse_movie_list(html))
            
            return movies
    
    def run_async(self):
        """运行异步爬虫"""
        loop = asyncio.get_event_loop()
        movies = loop.run_until_complete(self.get_top250_movies_async())
        return movies

法律与道德注意事项

1. 遵守robots.txt

# 检查robots.txt
def check_robots_txt():
    """检查网站的robots.txt文件"""
    try:
        response = requests.get("https://movie.douban.com/robots.txt", timeout=5)
        print("Robots.txt内容:")
        print(response.text)
    except:
        print("无法获取robots.txt")

# 在爬虫开始前调用
check_robots.txt()

2. 合理的请求频率

  • 建议频率:每2-5秒一个请求
  • 避免高峰时段:凌晨0-6点爬取
  • 控制总量:每天不超过1000个页面

3. 数据使用规范

  • 仅用于学习研究:不要用于商业用途
  • 不干扰网站正常运行:避免高频请求
  • 尊重版权:不传播爬取的数据

4. 应对法律风险

# 添加法律声明和用户代理标识
def get_legal_headers():
    """获取合法的请求头"""
    return {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
        'From': 'your_email@example.com',  # 你的邮箱
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    }

常见问题与解决方案

1. 代理IP被封

解决方案

  • 使用高质量付费代理(如阿布云、蘑菇代理)
  • 实现代理自动切换和验证
  • 降低请求频率
# 代理自动切换示例
def switch_proxy_on_failure(self):
    """失败时自动切换代理"""
    if self.proxy_manager.current_proxy:
        self.proxy_manager.mark_proxy_failed(self.proxy_manager.current_proxy)
    
    new_proxy = self.proxy_manager.get_proxy()
    if new_proxy:
        logger.info(f"切换到新代理: {new_proxy}")
        self.session.proxies = {'http': new_proxy, 'https': new_proxy}
    else:
        logger.error("代理池已空,无法切换")
        # 退回到无代理模式
        self.session.proxies = {}

2. 验证码识别率低

解决方案

  • 使用付费打码平台(如超级鹰、云打码)
  • 图片预处理(去噪、二值化、锐化)
  • 使用深度学习模型训练验证码识别器
# 使用打码平台示例
def solve_by_platform(self, image_url):
    """使用打码平台"""
    # 超级鹰示例
    import requests
    import base64
    
    # 下载图片并base64编码
    img_data = requests.get(image_url).content
    img_base64 = base64.b64encode(img_data).decode()
    
    # 调用打码平台API
    api_url = "http://www.chaojiying.com/SubmitOrder"
    data = {
        'user': 'your_username',
        'pass2': 'your_password',
        'softkey': 'your_softkey',
        'img_base64': img_base64,
        'codetype': '1004',  # 验证码类型
    }
    
    response = requests.post(api_url, data=data)
    result = response.json()
    
    if result['result'] == '1':
        return result['pic_str']
    else:
        logger.error(f"打码平台错误: {result}")
        return None

3. 请求被识别为爬虫

解决方案

  • 完整的浏览器指纹模拟
  • 使用Selenium/Playwright模拟真实浏览器
  • 添加随机延迟和鼠标移动轨迹
# 使用Selenium模拟浏览器(适用于复杂反爬)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options

class SeleniumSpider:
    def __init__(self):
        options = Options()
        options.add_argument('--headless')  # 无头模式
        options.add_argument('--disable-blink-features=AutomationControlled')
        options.add_argument('--user-agent=Mozilla/5.0...')
        self.driver = webdriver.Chrome(options=options)
    
    def get_page(self, url):
        """使用Selenium获取页面"""
        self.driver.get(url)
        time.sleep(random.uniform(2, 4))  # 模拟人类阅读时间
        return self.driver.page_source
    
    def close(self):
        self.driver.quit()

总结

本教程从零开始,详细介绍了豆瓣电影爬虫的完整实现过程,包括:

  1. 环境准备:安装必要的库和项目结构设计
  2. 页面分析:理解豆瓣电影页面的HTML结构
  3. 基础爬虫:实现电影列表和详情页抓取
  4. 反爬策略:User-Agent轮换、频率控制、IP代理、验证码处理
  5. 数据存储:多种格式的数据保存方法
  6. 高级技巧:异步爬虫、动态Headers、会话保持
  7. 法律道德:合规爬取的注意事项
  8. 问题解决:常见问题的应对方案

核心要点回顾

  • 尊重网站规则:控制请求频率,避免对服务器造成压力
  • 多层次反爬应对:从简单的User-Agent到复杂的验证码识别
  • 代码可维护性:模块化设计,便于扩展和维护
  • 数据质量:完整的解析逻辑和错误处理

进阶方向

  1. 分布式爬虫:使用Scrapy-Redis实现分布式
  2. 数据清洗:使用Pandas进行数据预处理
  3. 可视化:使用Matplotlib/Seaborn展示电影数据
  4. 推荐系统:基于爬取的数据构建推荐算法

通过本教程的学习,你不仅掌握了豆瓣电影爬虫的实现,更重要的是理解了网络爬虫的核心原理和反爬应对策略,这些知识可以应用到其他网站的爬取任务中。记住,爬虫技术是一把双刃剑,务必用于合法合规的用途!