引言:为什么选择豆瓣电影作为爬虫练习对象
豆瓣电影作为中国最大的电影社区和评分网站,拥有海量的电影数据资源,包括电影基本信息、评分、评论、演员表等。对于爬虫学习者来说,豆瓣电影是一个理想的练习对象,原因如下:
- 数据结构清晰:豆瓣电影页面布局规范,信息组织有序,便于解析和提取
- 反爬策略典型:豆瓣具有完善的反爬机制,包括User-Agent检测、频率限制、IP封禁、验证码等,能全面锻炼爬虫应对能力
- 数据价值高:电影数据可用于数据分析、推荐系统、可视化等多种应用场景
- 法律风险相对较低:相比其他商业网站,豆瓣对爬虫的容忍度相对较高(但仍需遵守规则)
本教程将从零开始,带你一步步构建一个功能完善的豆瓣电影爬虫,重点解决反爬问题,让你在实践中掌握网络爬虫的核心技术。
环境准备与基础配置
必要工具与库安装
在开始编写爬虫之前,我们需要准备以下Python环境和库:
# 核心爬虫库
pip install requests==2.31.0
# HTML解析库
pip install beautifulsoup4==4.12.2
# 高级解析库(可选,但推荐)
pip install lxml==4.9.3
# 数据处理库
pip install pandas==2.0.3
# 用于IP代理和用户代理
pip install fake-useragent==1.4.0
# 用于处理验证码(OCR)
pip install pytesseract==0.3.10
# 用于IP代理池
pip install requests-html==0.10.0
# 用于异步请求(可选)
pip install aiohttp==3.8.5
项目结构设计
一个良好的项目结构有助于代码维护和扩展:
douban_movie_scraper/
├── config/ # 配置文件目录
│ ├── __init__.py
│ ├── settings.py # 全局设置
│ └── user_agents.py # User-Agent池
├── utils/ # 工具函数目录
│ ├── __init__.py
│ ├── proxy.py # 代理管理
│ ├── captcha.py # 验证码处理
│ ├── logger.py # 日志记录
│ └── storage.py # 数据存储
├── spiders/ # 爬虫核心目录
│ ├── __init__.py
│ ├── movie_spider.py # 电影爬虫主类
│ └── anti_spider.py # 反爬策略处理
├── data/ # 数据存储目录
│ └── movies.csv # 爬取结果
└── main.py # 主程序入口
基础配置文件
创建 config/settings.py:
# config/settings.py
import os
# 基础配置
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_DIR = os.path.join(BASE_DIR, 'data')
# 请求配置
REQUEST_TIMEOUT = 10 # 请求超时时间(秒)
REQUEST_RETRY = 3 # 失败重试次数
REQUEST_INTERVAL = 2 # 请求间隔(秒)
# 代理配置
PROXY_ENABLED = True # 是否启用代理
PROXY_POOL_SIZE = 5 # 代理池大小
# 验证码配置
CAPTCHA_ENABLED = True # 是否处理验证码
CAPTCHA_API_KEY = None # 打码平台API密钥(如有)
# 日志配置
LOG_LEVEL = 'INFO'
LOG_FILE = os.path.join(BASE_DIR, 'scraper.log')
# 数据存储配置
STORAGE_FORMAT = 'csv' # 支持 csv, json, excel
豆瓣电影页面结构分析
电影列表页结构分析
豆瓣电影列表页(如Top250)的HTML结构具有以下特点:
- 电影列表容器:
<div class="grid_view">或<ol class="grid_view"> - 单个电影项:
<li>标签,包含电影序号、标题、详情链接等信息 - 电影标题:
<span class="title">标签,包含电影名称 - 详情链接:
<a>标签的href属性,指向电影详情页
电影详情页结构分析
电影详情页包含更丰富的信息:
- 基本信息:电影标题、导演、编剧、演员等
- 评分信息:豆瓣评分、评分人数
- 电影简介:剧情简介
- 标签:电影类型标签
- 剧照:电影图片链接
分析页面结构的工具推荐
- 浏览器开发者工具(F12):查看HTML结构和网络请求
- XPath Helper插件:测试XPath表达式
- BeautifulSoup的prettify()方法:格式化HTML查看结构
基础爬虫实现:抓取电影列表
创建电影爬虫类
首先创建一个基础的爬虫类,处理请求和解析:
# spiders/movie_spider.py
import requests
from bs4 import BeautifulSoup
import time
import random
from config.settings import REQUEST_TIMEOUT, REQUEST_RETRY, REQUEST_INTERVAL
from config.user_agents import get_random_user_agent
from utils.logger import get_logger
logger = get_logger(__name__)
class MovieSpider:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': get_random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
def get_page(self, url, retry_count=0):
"""获取页面内容,带重试机制"""
if retry_count >= REQUEST_RETRY:
logger.error(f"请求失败,已重试{REQUEST_RETRY}次: {url}")
return None
try:
# 添加随机延迟,避免请求过快
time.sleep(random.uniform(REQUEST_INTERVAL, REQUEST_INTERVAL + 1))
response = self.session.get(url, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
# 检查是否被反爬
if self._is_blocked(response):
logger.warning("检测到反爬措施,可能需要验证码或IP被限制")
return self._handle_blocking(response, url, retry_count)
logger.info(f"成功获取页面: {url}")
return response.text
except requests.exceptions.RequestException as e:
logger.error(f"请求异常: {e}")
return self.get_page(url, retry_count + 1)
def _is_blocked(self, response):
"""检测是否被反爬"""
# 检查状态码
if response.status_code == 403:
return True
# 检查响应内容是否包含反爬提示
blocked_indicators = [
'访问过于频繁',
'验证码',
'您的IP',
'403 Forbidden',
'blocked'
]
content = response.text
for indicator in blocked_indicators:
if indicator in content:
return True
return False
def _handle_blocking(self, response, url, retry_count):
"""处理反爬情况"""
# 这里可以添加验证码处理、IP切换等逻辑
logger.warning("触发反爬,等待更长时间后重试...")
time.sleep(10) # 长时间等待
return self.get_page(url, retry_count + 1)
抓取Top250电影列表
# spiders/movie_spider.py (续)
class MovieSpider:
# ... 前面的代码 ...
def get_top250_movies(self, start=0, limit=250):
"""抓取Top250电影列表"""
base_url = "https://movie.douban.com/top250"
movies = []
# 分批抓取,每次25条
for offset in range(0, limit, 25):
url = f"{base_url}?start={offset}"
logger.info(f"正在抓取第 {offset//25 + 1} 页...")
html = self.get_page(url)
if not html:
continue
movies.extend(self._parse_movie_list(html))
return movies
def _parse_movie_list(self, html):
"""解析电影列表页"""
soup = BeautifulSoup(html, 'lxml')
movie_list = []
# 查找所有电影项
items = soup.find_all('div', class_='item')
for item in items:
try:
# 电影序号
serial_number = item.find('em').get_text()
# 电影标题
title = item.find('span', class_='title').get_text()
# 详情链接
detail_url = item.find('a')['href']
# 电影评分
rating = item.find('span', class_='rating_num').get_text()
# 评分人数
rating_count = item.find('div', class_='star').find_all('span')[-1].get_text().replace('人评价', '')
# 简介(如果有)
quote_tag = item.find('span', class_='inq')
quote = quote_tag.get_text() if quote_tag else ''
movie_info = {
'serial_number': serial_number,
'title': title,
'detail_url': detail_url,
'rating': rating,
'rating_count': rating_count,
'quote': quote
}
movie_list.append(movie_info)
except Exception as e:
logger.error(f"解析电影项失败: {e}")
continue
return movie_list
电影详情页数据抓取
解析详情页信息
# spiders/movie_spider.py (续)
class MovieSpider:
# ... 前面的代码 ...
def get_movie_detail(self, detail_url):
"""获取电影详情页信息"""
html = self.get_page(detail_url)
if not html:
return None
return self._parse_movie_detail(html, detail_url)
def _parse_movie_detail(self, html, detail_url):
"""解析电影详情页"""
soup = BeautifulSoup(html, 'lxml')
detail_info = {}
try:
# 电影标题(可能有多个,如中英文)
title_tags = soup.find_all('span', property='v:itemreviewed')
if title_tags:
detail_info['title'] = title_tags[0].get_text()
# 导演、编剧、演员信息
info = soup.find('div', id='info')
if info:
# 提取所有信息行
info_text = info.get_text()
# 简单的键值对解析
if '导演:' in info_text:
detail_info['director'] = info_text.split('导演:')[1].split('\n')[0].strip()
if '编剧:' in info_text:
detail_info['writer'] = info_text.split('编剧:')[1].split('\n')[0].strip()
if '主演:' in info_text:
detail_info['actors'] = info_text.split('主演:')[1].split('\n')[0].strip()
if '类型:' in info_text:
detail_info['genre'] = info_text.split('类型:')[1].split('\n')[0].strip()
if '制片国家/地区:' in info_text:
detail_info['country'] = info_text.split('制片国家/地区:')[1].split('\n')[0].strip()
if '语言:' in info_text:
detail_info['language'] = info_text.split('语言:')[1].split('\n')[0].strip()
if '上映日期:' in info_text:
detail_info['release_date'] = info_text.split('上映日期:')[1].split('\n')[0].strip()
if '片长:' in info_text:
detail_info['duration'] = info_text.split('片长:')[1].split('\n')[0].strip()
# 评分和评价人数
rating_tag = soup.find('strong', class_='ll rating_num')
if rating_tag:
detail_info['rating'] = rating_tag.get_text()
rating_count_tag = soup.find('a', href=lambda x: x and 'reviews' in x)
if rating_count_tag:
detail_info['rating_count'] = rating_count_tag.get_text().replace('人评价', '')
# 剧情简介
intro_tag = soup.find('span', property='v:summary')
if intro_tag:
detail_info['intro'] = intro_tag.get_text().strip()
# 标签
tags = soup.find_all('span', property='v:genre')
if tags:
detail_info['tags'] = [tag.get_text() for tag in tags]
# 剧照链接(取第一张)
pic_tag = soup.find('a', class_='nbgnbg')
if pic_tag:
img_tag = pic_tag.find('img')
if img_tag:
detail_info['cover_url'] = img_tag.get('src')
# 详情页URL
detail_info['detail_url'] = detail_url
except Exception as e:
logger.error(f"解析详情页失败: {e}")
return None
return detail_info
批量抓取详情数据
# spiders/movie_spider.py (续)
class MovieSpider:
# ... 前面的代码 ...
def get_movies_details(self, movie_list):
"""批量获取电影详情"""
detailed_movies = []
for i, movie in enumerate(movie_list):
logger.info(f"正在抓取详情: {i+1}/{len(movie_list)} - {movie['title']}")
detail = self.get_movie_detail(movie['detail_url'])
if detail:
# 合并基本信息和详情信息
merged_movie = {**movie, **detail}
detailed_movies.append(merged_movie)
# 每抓取10个详情,稍作休息
if (i + 1) % 10 == 0:
logger.info("休息一下,避免请求过快...")
time.sleep(random.uniform(3, 5))
return detailed_movies
反爬策略与应对方法
1. User-Agent轮换
# config/user_agents.py
from fake_useragent import UserAgent
def get_random_user_agent():
"""获取随机User-Agent"""
try:
ua = UserAgent()
return ua.random
except:
# 备用User-Agent列表
fallback_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15"
]
return random.choice(fallback_agents)
# 在爬虫中动态更新User-Agent
class MovieSpider:
def __init__(self):
# ... 其他初始化 ...
self.update_user_agent()
def update_user_agent(self):
"""更新请求头中的User-Agent"""
self.session.headers['User-Agent'] = get_random_user_agent()
logger.info(f"User-Agent已更新: {self.session.headers['User-Agent'][:50]}...")
2. 请求频率控制
# utils/rate_limiter.py
import time
import random
from collections import deque
from threading import Lock
class RateLimiter:
"""令牌桶算法实现频率限制"""
def __init__(self, max_requests=10, time_window=60):
self.max_requests = max_requests # 时间窗口内最大请求数
self.time_window = time_window # 时间窗口(秒)
self.requests = deque() # 存储请求时间戳
self.lock = Lock()
def acquire(self):
"""获取请求许可"""
with self.lock:
now = time.time()
# 清理过期的请求记录
while self.requests and self.requests[0] < now - self.time_window:
self.requests.popleft()
# 检查是否超过限制
if len(self.requests) >= self.max_requests:
# 计算需要等待的时间
oldest_request = self.requests[0]
wait_time = self.time_window - (now - oldest_request)
logger.warning(f"请求频率过高,需要等待 {wait_time:.2f} 秒")
time.sleep(wait_time + random.uniform(0.5, 1.5))
return self.acquire() # 递归调用,重新检查
# 记录当前请求
self.requests.append(now)
# 添加随机延迟
delay = random.uniform(1, 2)
time.sleep(delay)
# 在爬虫中使用
class MovieSpider:
def __init__(self):
# ... 其他初始化 ...
self.rate_limiter = RateLimiter(max_requests=8, time_window=60)
def get_page(self, url, retry_count=0):
"""改进的get_page,加入频率限制"""
self.rate_limiter.acquire() # 获取请求许可
# ... 原有请求逻辑 ...
3. IP代理池实现
# utils/proxy.py
import random
import requests
from threading import Lock
from config.settings import PROXY_ENABLED, PROXY_POOL_SIZE
class ProxyManager:
"""IP代理池管理器"""
def __init__(self):
self.proxies = []
self.lock = Lock()
self.current_proxy = None
self.proxy_fail_count = {} # 记录代理失败次数
def add_proxy(self, proxy):
"""添加代理到池中"""
with self.lock:
if proxy not in self.proxies:
self.proxies.append(proxy)
self.proxy_fail_count[proxy] = 0
def get_proxy(self):
"""获取一个可用代理"""
with self.lock:
if not self.proxies:
return None
# 优先选择失败次数少的代理
available_proxies = [p for p in self.proxies if self.proxy_fail_count[p] < 3]
if not available_proxies:
return None
proxy = random.choice(available_proxies)
self.current_proxy = proxy
return proxy
def mark_proxy_failed(self, proxy):
"""标记代理失败"""
with self.lock:
if proxy in self.proxy_fail_count:
self.proxy_fail_count[proxy] += 1
# 如果失败次数过多,从池中移除
if self.proxy_fail_count[proxy] >= 5:
self.proxies.remove(proxy)
del self.proxy_fail_count[proxy]
logger.warning(f"代理 {proxy} 失败次数过多,已从池中移除")
def mark_proxy_success(self, proxy):
"""标记代理成功"""
with self.lock:
if proxy in self.proxy_fail_count:
# 成功一次减少一次失败计数
if self.proxy_fail_count[proxy] > 0:
self.proxy_fail_count[proxy] -= 1
# 代理验证函数
def validate_proxy(proxy):
"""验证代理是否可用"""
test_url = "https://movie.douban.com/"
try:
response = requests.get(
test_url,
proxies={'http': proxy, 'https': proxy},
timeout=5
)
return response.status_code == 200
except:
return False
# 在爬虫中集成代理
class MovieSpider:
def __init__(self, use_proxy=False):
# ... 其他初始化 ...
self.use_proxy = use_proxy
self.proxy_manager = ProxyManager()
# 如果启用代理,可以预先添加一些代理
if use_proxy:
self._init_proxies()
def _init_proxies(self):
"""初始化代理池"""
# 这里可以添加从免费代理网站获取代理的逻辑
# 或者从付费代理API获取
# 示例:
# proxies = ["http://123.45.67.89:8080", "http://98.76.54.32:3128"]
# for proxy in proxies:
# if validate_proxy(proxy):
# self.proxy_manager.add_proxy(proxy)
pass
def get_page(self, url, retry_count=0):
"""改进的get_page,支持代理"""
if retry_count >= REQUEST_RETRY:
logger.error(f"请求失败,已重试{REQUEST_RETRY}次: {url}")
return None
try:
self.rate_limiter.acquire()
# 准备请求参数
kwargs = {
'timeout': REQUEST_TIMEOUT,
'headers': self.session.headers
}
# 如果启用代理,设置代理
if self.use_proxy:
proxy = self.proxy_manager.get_proxy()
if proxy:
kwargs['proxies'] = {'http': proxy, 'https': proxy}
logger.debug(f"使用代理: {proxy}")
response = self.session.get(url, **kwargs)
response.raise_for_status()
# 检查是否被反爬
if self._is_blocked(response):
# 标记当前代理失败(如果使用了代理)
if self.use_proxy and self.proxy_manager.current_proxy:
self.proxy_manager.mark_proxy_failed(self.proxy_manager.current_proxy)
logger.warning("检测到反爬措施,触发处理逻辑")
return self._handle_blocking(response, url, retry_count)
# 标记代理成功
if self.use_proxy and self.proxy_manager.current_proxy:
self.proxy_manager.mark_proxy_success(self.proxy_manager.current_proxy)
logger.info(f"成功获取页面: {url}")
return response.text
except requests.exceptions.ProxyError as e:
logger.error(f"代理错误: {e}")
if self.use_proxy and self.proxy_manager.current_proxy:
self.proxy_manager.mark_proxy_failed(self.proxy_manager.current_proxy)
return self.get_page(url, retry_count + 1)
except requests.exceptions.RequestException as e:
logger.error(f"请求异常: {e}")
return self.get_page(url, retry_count + 1)
4. 验证码识别与处理
# utils/captcha.py
import pytesseract
from PIL import Image
import io
import requests
import time
class CaptchaSolver:
"""验证码处理器"""
def __init__(self, api_key=None):
self.api_key = api_key # 打码平台API密钥
def solve_image_captcha(self, image_url):
"""识别图片验证码"""
try:
# 下载验证码图片
response = requests.get(image_url, timeout=10)
image = Image.open(io.BytesIO(response.content))
# 图片预处理(提高识别率)
image = self._preprocess_image(image)
# 使用Tesseract OCR识别
text = pytesseract.image_to_string(image, lang='chi_sim+eng')
# 清理识别结果
captcha_text = self._clean_text(text)
logger.info(f"验证码识别结果: {captcha_text}")
return captcha_text
except Exception as e:
logger.error(f"验证码识别失败: {e}")
return None
def _preprocess_image(self, image):
"""图片预处理"""
# 转换为灰度图
if image.mode != 'L':
image = image.convert('L')
# 二值化处理
threshold = 128
table = []
for i in range(256):
if i < threshold:
table.append(0)
else:
table.append(1)
image = image.point(table, '1')
return image
def _clean_text(self, text):
"""清理识别结果"""
# 移除换行符和空格
text = text.replace('\n', '').replace(' ', '')
# 只保留字母和数字
import re
text = re.sub(r'[^a-zA-Z0-9]', '', text)
return text
def solve_by_hand(self, image_url):
"""手动解决验证码(用于调试)"""
print(f"请访问以下URL查看验证码并输入: {image_url}")
captcha_text = input("请输入验证码: ")
return captcha_text.strip()
# 在爬虫中集成验证码处理
class MovieSpider:
def __init__(self, use_proxy=False, auto_captcha=False):
# ... 其他初始化 ...
self.auto_captcha = auto_captcha
self.captcha_solver = CaptchaSolver()
def _handle_blocking(self, response, url, retry_count):
"""处理反爬情况(增强版)"""
content = response.text
# 检查是否需要验证码
if '验证码' in content or 'captcha' in content:
logger.warning("检测到验证码要求")
# 尝试提取验证码图片URL
captcha_url = self._extract_captcha_url(response)
if captcha_url:
if self.auto_captcha:
# 自动识别
captcha_text = self.captcha_solver.solve_image_captcha(captcha_url)
if captcha_text:
# 提交验证码并重试请求
return self._submit_captcha_and_retry(captcha_text, url, retry_count)
else:
# 手动输入
captcha_text = self.captcha_solver.solve_by_hand(captcha_url)
if captcha_text:
return self._submit_captcha_and_retry(captcha_text, url, retry_count)
# IP限制,切换代理或长时间等待
if 'IP' in content or '403' in str(response.status_code):
logger.warning("IP被限制,切换代理或等待...")
# 如果有代理,切换代理
if self.use_proxy:
if self.proxy_manager.current_proxy:
self.proxy_manager.mark_proxy_failed(self.proxy_manager.current_proxy)
# 等待后重试
time.sleep(30)
return self.get_page(url, retry_count + 1)
else:
# 长时间等待
wait_time = random.uniform(60, 120)
logger.info(f"等待 {wait_time:.2f} 秒后重试...")
time.sleep(wait_time)
return self.get_page(url, retry_count + 1)
# 其他情况,长时间等待后重试
wait_time = random.uniform(20, 40)
logger.info(f"触发反爬,等待 {wait_time:.2f} 秒后重试...")
time.sleep(wait_time)
return self.get_page(url, retry_count + 1)
def _extract_captcha_url(self, response):
"""从响应中提取验证码图片URL"""
# 这里需要根据实际页面结构提取
# 示例:查找包含captcha的img标签
soup = BeautifulSoup(response.text, 'lxml')
captcha_img = soup.find('img', id=lambda x: x and 'captcha' in str(x))
if captcha_img and captcha_img.get('src'):
captcha_url = captcha_img.get('src')
# 如果是相对URL,转换为绝对URL
if captcha_url.startswith('/'):
return f"https://movie.douban.com{captcha_url}"
return captcha_url
return None
def _submit_captcha_and_retry(self, captcha_text, original_url, retry_count):
"""提交验证码并重试请求"""
# 这里需要根据实际页面的验证码提交逻辑实现
# 通常验证码会作为参数附加到请求中
logger.info(f"提交验证码: {captcha_text}")
# 实际实现需要根据具体页面结构调整
# 例如:添加验证码参数到session或URL
return self.get_page(original_url, retry_count + 1)
数据存储与管理
数据存储模块
# utils/storage.py
import pandas as pd
import json
import os
from config.settings import STORAGE_FORMAT, DATA_DIR
class DataStorage:
"""数据存储管理器"""
def __init__(self, format_type=STORAGE_FORMAT):
self.format_type = format_type
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
def save(self, data, filename):
"""保存数据"""
if not data:
logger.warning("没有数据需要保存")
return
filepath = os.path.join(DATA_DIR, filename)
if self.format_type == 'csv':
self._save_csv(data, filepath)
elif self.format_type == 'json':
self._save_json(data, filepath)
elif self.format_type == 'excel':
self._save_excel(data, filepath)
else:
raise ValueError(f"不支持的存储格式: {self.format_type}")
logger.info(f"数据已保存到: {filepath}")
def _save_csv(self, data, filepath):
"""保存为CSV格式"""
df = pd.DataFrame(data)
df.to_csv(filepath, index=False, encoding='utf-8-sig')
def _save_json(self, data, filepath):
"""保存为JSON格式"""
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _save_excel(self, data, filepath):
"""保存为Excel格式"""
df = pd.DataFrame(data)
df.to_excel(filepath, index=False)
def load(self, filename):
"""加载数据"""
filepath = os.path.join(DATA_DIR, filename)
if not os.path.exists(filepath):
return None
if self.format_type == 'csv':
return pd.read_csv(filepath).to_dict('records')
elif self.format_type == 'json':
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
elif self.format_type == 'excel':
return pd.read_excel(filepath).to_dict('records')
在爬虫中集成数据存储
# spiders/movie_spider.py (续)
class MovieSpider:
def __init__(self, use_proxy=False, auto_captcha=False):
# ... 其他初始化 ...
self.storage = DataStorage()
def run(self, mode='top250'):
"""运行爬虫主函数"""
logger.info(f"开始运行豆瓣电影爬虫,模式: {mode}")
if mode == 'top250':
# 1. 获取电影列表
movie_list = self.get_top250_movies()
logger.info(f"获取到 {len(movie_list)} 部电影基本信息")
# 2. 获取电影详情
detailed_movies = self.get_movies_details(movie_list)
logger.info(f"获取到 {len(detailed_movies)} 部电影详情")
# 3. 保存数据
if detailed_movies:
self.storage.save(detailed_movies, 'douban_top250_movies.csv')
logger.info("数据抓取完成!")
else:
logger.error("未获取到有效数据")
elif mode == 'search':
# 可以扩展搜索模式
pass
完整代码整合与主程序
主程序入口
# main.py
import sys
import argparse
from spiders.movie_spider import MovieSpider
from config.settings import PROXY_ENABLED, CAPTCHA_ENABLED
from utils.logger import get_logger
logger = get_logger(__name__)
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='豆瓣电影爬虫')
parser.add_argument('--mode', type=str, default='top250',
choices=['top250', 'search'],
help='爬取模式: top250 (Top250电影), search (搜索)')
parser.add_argument('--proxy', action='store_true',
default=PROXY_ENABLED,
help='启用代理')
parser.add_argument('--auto-captcha', action='store_true',
default=CAPTCHA_ENABLED,
help='自动处理验证码')
parser.add_argument('--limit', type=int, default=250,
help='抓取数量限制(仅top250模式)')
args = parser.parse_args()
try:
# 创建爬虫实例
spider = MovieSpider(
use_proxy=args.proxy,
auto_captcha=args.auto_captcha
)
# 运行爬虫
if args.mode == 'top250':
# 获取电影列表
movie_list = spider.get_top250_movies(limit=args.limit)
logger.info(f"获取到 {len(movie_list)} 部电影基本信息")
# 获取电影详情
detailed_movies = spider.get_movies_details(movie_list)
logger.info(f"获取到 {len(detailed_movies)} 部电影详情")
# 保存数据
if detailed_movies:
spider.storage.save(detailed_movies, f'douban_top{args.limit}_movies.csv')
logger.info("数据抓取完成!")
else:
logger.error("未获取到有效数据")
elif args.mode == 'search':
logger.error("搜索模式暂未实现")
sys.exit(1)
except KeyboardInterrupt:
logger.info("用户中断操作")
sys.exit(0)
except Exception as e:
logger.error(f"程序运行出错: {e}")
sys.exit(1)
if __name__ == '__main__':
main()
日志工具
# utils/logger.py
import logging
import os
from config.settings import LOG_LEVEL, LOG_FILE
def get_logger(name):
"""获取日志记录器"""
# 创建日志目录
log_dir = os.path.dirname(LOG_FILE)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
# 配置日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(getattr(logging, LOG_LEVEL))
console_handler.setFormatter(formatter)
# 文件处理器
file_handler = logging.FileHandler(LOG_FILE, encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
# 创建日志器
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
高级技巧:应对复杂反爬
1. 动态User-Agent和Headers
# config/headers.py
def get_dynamic_headers():
"""生成动态请求头"""
return {
'User-Agent': get_random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0',
'Referer': 'https://movie.douban.com/top250',
}
# 在爬虫中定期更新headers
class MovieSpider:
def _update_headers_periodically(self):
"""定期更新请求头"""
# 可以设置定时器或每N次请求更新一次
self.session.headers.update(get_dynamic_headers())
2. 使用会话保持和Cookies
class MovieSpider:
def __init__(self, use_proxy=False, auto_captcha=False):
# ... 其他初始化 ...
self.session = requests.Session()
# 可以预先加载一些cookies,模拟真实用户
self._init_cookies()
def _init_cookies(self):
"""初始化Cookies"""
# 可以从文件加载cookies
# 或者手动设置一些cookies
cookies = {
# 示例:'bid': 'some_value',
# 'dbcl2': 'some_value',
}
for k, v in cookies.items():
self.session.cookies.set(k, v)
3. 异步爬虫(提高效率)
# spiders/async_spider.py
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import time
import random
class AsyncMovieSpider:
"""异步爬虫版本"""
def __init__(self, use_proxy=False):
self.use_proxy = use_proxy
self.proxy_manager = ProxyManager() if use_proxy else None
self.rate_limiter = RateLimiter(max_requests=10, time_window=60)
async def fetch(self, session, url):
"""异步获取页面"""
await self.rate_limiter.acquire_async() # 异步频率限制
proxy = None
if self.use_proxy and self.proxy_manager:
proxy = self.proxy_manager.get_proxy()
try:
async with session.get(url, proxy=proxy, timeout=10) as response:
if response.status == 200:
return await response.text()
else:
logger.error(f"请求失败: {url}, 状态码: {response.status}")
return None
except Exception as e:
logger.error(f"请求异常: {url}, 错误: {e}")
return None
async def get_top250_movies_async(self):
"""异步抓取Top250"""
async with aiohttp.ClientSession() as session:
# 添加动态headers
session.headers.update(get_dynamic_headers())
# 创建任务列表
tasks = []
for offset in range(0, 250, 25):
url = f"https://movie.douban.com/top250?start={offset}"
tasks.append(self.fetch(session, url))
# 并发执行
results = await asyncio.gather(*tasks)
# 解析结果
movies = []
for html in results:
if html:
movies.extend(self._parse_movie_list(html))
return movies
def run_async(self):
"""运行异步爬虫"""
loop = asyncio.get_event_loop()
movies = loop.run_until_complete(self.get_top250_movies_async())
return movies
法律与道德注意事项
1. 遵守robots.txt
# 检查robots.txt
def check_robots_txt():
"""检查网站的robots.txt文件"""
try:
response = requests.get("https://movie.douban.com/robots.txt", timeout=5)
print("Robots.txt内容:")
print(response.text)
except:
print("无法获取robots.txt")
# 在爬虫开始前调用
check_robots.txt()
2. 合理的请求频率
- 建议频率:每2-5秒一个请求
- 避免高峰时段:凌晨0-6点爬取
- 控制总量:每天不超过1000个页面
3. 数据使用规范
- 仅用于学习研究:不要用于商业用途
- 不干扰网站正常运行:避免高频请求
- 尊重版权:不传播爬取的数据
4. 应对法律风险
# 添加法律声明和用户代理标识
def get_legal_headers():
"""获取合法的请求头"""
return {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'From': 'your_email@example.com', # 你的邮箱
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
常见问题与解决方案
1. 代理IP被封
解决方案:
- 使用高质量付费代理(如阿布云、蘑菇代理)
- 实现代理自动切换和验证
- 降低请求频率
# 代理自动切换示例
def switch_proxy_on_failure(self):
"""失败时自动切换代理"""
if self.proxy_manager.current_proxy:
self.proxy_manager.mark_proxy_failed(self.proxy_manager.current_proxy)
new_proxy = self.proxy_manager.get_proxy()
if new_proxy:
logger.info(f"切换到新代理: {new_proxy}")
self.session.proxies = {'http': new_proxy, 'https': new_proxy}
else:
logger.error("代理池已空,无法切换")
# 退回到无代理模式
self.session.proxies = {}
2. 验证码识别率低
解决方案:
- 使用付费打码平台(如超级鹰、云打码)
- 图片预处理(去噪、二值化、锐化)
- 使用深度学习模型训练验证码识别器
# 使用打码平台示例
def solve_by_platform(self, image_url):
"""使用打码平台"""
# 超级鹰示例
import requests
import base64
# 下载图片并base64编码
img_data = requests.get(image_url).content
img_base64 = base64.b64encode(img_data).decode()
# 调用打码平台API
api_url = "http://www.chaojiying.com/SubmitOrder"
data = {
'user': 'your_username',
'pass2': 'your_password',
'softkey': 'your_softkey',
'img_base64': img_base64,
'codetype': '1004', # 验证码类型
}
response = requests.post(api_url, data=data)
result = response.json()
if result['result'] == '1':
return result['pic_str']
else:
logger.error(f"打码平台错误: {result}")
return None
3. 请求被识别为爬虫
解决方案:
- 完整的浏览器指纹模拟
- 使用Selenium/Playwright模拟真实浏览器
- 添加随机延迟和鼠标移动轨迹
# 使用Selenium模拟浏览器(适用于复杂反爬)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
class SeleniumSpider:
def __init__(self):
options = Options()
options.add_argument('--headless') # 无头模式
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument('--user-agent=Mozilla/5.0...')
self.driver = webdriver.Chrome(options=options)
def get_page(self, url):
"""使用Selenium获取页面"""
self.driver.get(url)
time.sleep(random.uniform(2, 4)) # 模拟人类阅读时间
return self.driver.page_source
def close(self):
self.driver.quit()
总结
本教程从零开始,详细介绍了豆瓣电影爬虫的完整实现过程,包括:
- 环境准备:安装必要的库和项目结构设计
- 页面分析:理解豆瓣电影页面的HTML结构
- 基础爬虫:实现电影列表和详情页抓取
- 反爬策略:User-Agent轮换、频率控制、IP代理、验证码处理
- 数据存储:多种格式的数据保存方法
- 高级技巧:异步爬虫、动态Headers、会话保持
- 法律道德:合规爬取的注意事项
- 问题解决:常见问题的应对方案
核心要点回顾:
- 尊重网站规则:控制请求频率,避免对服务器造成压力
- 多层次反爬应对:从简单的User-Agent到复杂的验证码识别
- 代码可维护性:模块化设计,便于扩展和维护
- 数据质量:完整的解析逻辑和错误处理
进阶方向:
- 分布式爬虫:使用Scrapy-Redis实现分布式
- 数据清洗:使用Pandas进行数据预处理
- 可视化:使用Matplotlib/Seaborn展示电影数据
- 推荐系统:基于爬取的数据构建推荐算法
通过本教程的学习,你不仅掌握了豆瓣电影爬虫的实现,更重要的是理解了网络爬虫的核心原理和反爬应对策略,这些知识可以应用到其他网站的爬取任务中。记住,爬虫技术是一把双刃剑,务必用于合法合规的用途!
