引言:Web爬虫的重要性与应用场景
Web爬虫(Web Crawler)是现代数据获取和信息收集的核心技术之一。在当今信息爆炸的时代,能够自动化地从互联网上提取有价值的数据变得至关重要。Web爬虫广泛应用于市场研究、竞争对手分析、价格监控、新闻聚合、学术研究等多个领域。
通过Python实现Web爬虫具有显著优势:Python拥有丰富的第三方库生态系统,简洁的语法结构,以及强大的数据处理能力。相比其他编程语言,Python编写爬虫代码更加高效,维护成本更低。
第一部分:Web爬虫的基础概念与准备工作
1.1 Web爬虫的工作原理
Web爬虫通过模拟人类浏览器的行为,自动访问网页并提取所需信息。其基本工作流程如下:
- 发送请求:向目标网站发送HTTP请求
- 获取响应:接收服务器返回的HTML内容
- 解析内容:从HTML中提取结构化数据
- 存储数据:将提取的数据保存到文件或数据库
- 链接发现:从当前页面发现新的链接继续爬取
1.2 法律与道德考量
在编写爬虫之前,必须了解相关法律和道德规范:
- 遵守robots.txt:检查目标网站的robots.txt文件,了解爬取规则
- 控制爬取频率:避免对目标网站造成过大负担
- 尊重版权:注意数据的使用权限
- 隐私保护:避免爬取个人隐私信息
1.3 开发环境准备
首先安装必要的Python库:
pip install requests beautifulsoup4 selenium scrapy pandas lxml
第二部分:基础爬虫实现
2.1 使用requests库发送HTTP请求
requests是Python最流行的HTTP库,简单易用。以下是一个完整的示例:
import requests
from bs4 import BeautifulSoup
import time
import random
def basic_crawler(url, headers=None):
"""
基础爬虫函数,发送请求并返回解析后的BeautifulSoup对象
Args:
url (str): 目标URL
headers (dict): 请求头,模拟浏览器访问
Returns:
BeautifulSoup: 解析后的HTML对象
"""
# 设置默认请求头,模拟真实浏览器访问
default_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
if headers:
default_headers.update(headers)
try:
# 发送GET请求,设置超时时间
response = requests.get(url, headers=default_headers, timeout=10)
# 检查状态码
if response.status_code == 200:
print(f"成功访问: {url}")
# 使用lxml解析器,速度更快
soup = BeautifulSoup(response.content, 'lxml')
return soup
else:
print(f"访问失败: {url}, 状态码: {response.status_code}")
return None
except requests.exceptions.Timeout:
print(f"请求超时: {url}")
return None
except requests.exceptions.RequestException as e:
print(f"请求异常: {url}, 错误: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 示例:爬取一个简单的新闻网站标题
url = "https://example.com/news" # 替换为实际URL
soup = basic_crawler(url)
if soup:
# 提取所有新闻标题
titles = soup.find_all('h2', class_='news-title')
for i, title in enumerate(titles, 1):
print(f"{i}. {title.get_text(strip=True)}")
2.2 使用BeautifulSoup解析HTML
BeautifulSoup提供了多种方法来定位和提取元素:
def parse_html_example(soup):
"""
演示BeautifulSoup的各种解析方法
"""
# 1. 通过标签名查找
all_links = soup.find_all('a')
print(f"找到 {len(all_links)} 个链接")
# 2. 通过CSS类名查找
articles = soup.find_all('div', class_='article')
print(f"找到 {len(articles)} 篇文章")
# 3. 通过ID查找
main_content = soup.find(id='main-content')
if main_content:
print("找到主内容区域")
# 4. 组合查找
# 查找class="post"的div下的所有h2标签
post_titles = soup.select('div.post > h2')
# 5. 提取属性
for link in all_links[:5]: # 只显示前5个
href = link.get('href')
text = link.get_text(strip=True)
print(f"链接文本: {text}, URL: {href}")
# 6. 提取所有文本
all_text = soup.get_text()
print(f"页面总文本长度: {len(all_text)}")
# 实际应用示例:爬取博客文章
def crawl_blog(url):
"""
爬取博客文章的标题、作者和发布时间
"""
soup = basic_crawler(url)
if not soup:
return []
articles = []
# 假设每篇文章都在<article>标签内
for article in soup.find_all('article'):
# 提取标题
title_elem = article.find('h2')
title = title_elem.get_text(strip=True) if title_elem else "无标题"
# 提取作者
author_elem = article.find('span', class_='author')
author = author_elem.get_text(strip=True) if author_elem else "未知作者"
# 提取发布时间
date_elem = article.find('time')
date = date_elem.get_text(strip=True) if date_elem else "未知时间"
# 提取文章链接
link_elem = article.find('a')
link = link_elem.get('href') if link_elem else "#"
articles.append({
'title': title,
'author': author,
'date': date,
'link': link
})
return articles
2.3 处理反爬虫机制
现代网站通常有反爬虫机制,需要采取相应策略:
import random
import time
from fake_useragent import UserAgent
class AntiAntiCrawler:
"""
处理反爬虫机制的工具类
"""
def __init__(self):
self.ua = UserAgent()
self.request_count = 0
def get_random_headers(self):
"""生成随机请求头"""
return {
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
def random_delay(self, min_delay=1, max_delay=3):
"""随机延迟,避免请求过于频繁"""
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
self.request_count += 1
print(f"已执行 {self.request_count} 次请求,延迟 {delay:.2f} 秒")
def crawl_with_anti_detection(self, url):
"""使用反检测策略爬取"""
headers = self.get_random_headers()
try:
response = requests.get(url, headers=headers, timeout=15)
if response.status_code == 200:
# 随机延迟后再解析
self.random_delay(2, 5)
return BeautifulSoup(response.content, 'lxml')
else:
print(f"状态码异常: {response.status_code}")
return None
except Exception as e:
print(f"爬取失败: {e}")
return None
# 使用示例
anti_crawler = AntiAntiCrawler()
soup = anti_crawler.crawl_with_anti_detection("https://example.com")
第三部分:高级爬虫技术
3.1 处理JavaScript渲染的页面
对于使用JavaScript动态加载内容的网站,需要使用Selenium:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import time
class JavaScriptCrawler:
"""
处理JavaScript渲染页面的爬虫
"""
def __init__(self, headless=True):
"""
初始化Chrome驱动
Args:
headless (bool): 是否无头模式运行
"""
chrome_options = Options()
if headless:
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
# 初始化驱动(需要提前安装ChromeDriver)
self.driver = webdriver.Chrome(options=chrome_options)
self.wait = WebDriverWait(self.driver, 10)
def crawl_dynamic_content(self, url, wait_for_selector=None):
"""
爬取动态加载的内容
Args:
url (str): 目标URL
wait_for_selector (str): 等待出现的CSS选择器
Returns:
str: 页面源代码
"""
try:
print(f"正在加载页面: {url}")
self.driver.get(url)
if wait_for_selector:
# 等待特定元素加载完成
self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, wait_for_selector))
)
print(f"元素 {wait_for_selector} 已加载")
# 额外等待确保所有动态内容加载
time.sleep(2)
return self.driver.page_source
except Exception as e:
print(f"动态爬取失败: {e}")
return None
def extract_dynamic_data(self, url, selector):
"""
提取动态加载的数据
Args:
url (str): 目标URL
selector (str): CSS选择器
Returns:
list: 提取的数据列表
"""
html = self.crawl_dynamic_content(url, selector)
if not html:
return []
soup = BeautifulSoup(html, 'lxml')
elements = soup.select(selector)
data = [elem.get_text(strip=True) for elem in elements]
return data
def close(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
# 使用示例
def crawl_infinite_scroll():
"""
爬取无限滚动页面的示例
"""
crawler = JavaScriptCrawler(headless=True)
try:
url = "https://example.com/infinite-scroll"
# 爬取动态加载的商品列表
products = crawler.extract_dynamic_data(url, '.product-item')
print(f"找到 {len(products)} 个商品:")
for i, product in enumerate(products, 1):
print(f"{i}. {product}")
# 模拟滚动加载更多
for scroll in range(3):
crawler.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
# 重新获取并解析内容
html = crawler.driver.page_source
soup = BeautifulSoup(html, 'lxml')
new_products = soup.select('.product-item')
print(f"滚动 {scroll+1} 次后,共找到 {len(new_products)} 个商品")
finally:
crawler.close()
# 处理登录后的页面
def crawl_after_login():
"""
登录后爬取受保护内容
"""
crawler = JavaScriptCrawler(headless=False) # 显示浏览器以便观察
try:
# 1. 打开登录页面
crawler.driver.get("https://example.com/login")
# 2. 填写登录表单
username = crawler.driver.find_element(By.NAME, "username")
password = crawler.driver.find_element(By.NAME, "password")
username.send_keys("your_username")
password.send_keys("your_password")
# 3. 点击登录按钮
login_button = crawler.driver.find_element(By.CSS_SELECTOR, "button[type='submit']")
login_button.click()
# 4. 等待登录完成
time.sleep(3)
# 5. 爬取登录后的内容
protected_data = crawler.extract_dynamic_data(
"https://example.com/dashboard",
".dashboard-item"
)
print("受保护内容:", protected_data)
finally:
crawler.close()
3.2 Scrapy框架:构建专业爬虫
Scrapy是Python最强大的爬虫框架,适合大规模爬取:
# 创建Scrapy项目结构(在命令行执行)
# scrapy startproject myproject
# cd myproject
# scrapy genspider example example.com
# spiders/blog_spider.py
import scrapy
from scrapy.http import Request
from myproject.items import BlogItem
class BlogSpider(scrapy.Spider):
name = "blog_spider"
allowed_domains = ["example.com"]
start_urls = ["https://example.com/blog"]
custom_settings = {
'DOWNLOAD_DELAY': 2, # 下载延迟
'CONCURRENT_REQUESTS_PER_DOMAIN': 2, # 并发请求数
'ROBOTSTXT_OBEY': True,
'FEEDS': {
'output/blog.json': {'format': 'json', 'overwrite': True},
},
}
def parse(self, response):
"""解析博客列表页"""
# 提取所有文章链接
article_links = response.css('article.post h2 a::attr(href)').getall()
for link in article_links:
# 跟随文章链接,调用parse_article方法
yield Request(
response.urljoin(link),
callback=self.parse_article,
meta={'link': link}
)
# 处理分页
next_page = response.css('a.next-page::attr(href)').get()
if next_page:
yield Request(
response.urljoin(next_page),
callback=self.parse
)
def parse_article(self, response):
"""解析单篇文章详情"""
item = BlogItem()
item['url'] = response.url
item['title'] = response.css('h1.article-title::text').get()
item['author'] = response.css('span.author::text').get()
item['publish_date'] = response.css('time::attr(datetime)').get()
item['content'] = response.css('div.article-content').get()
item['tags'] = response.css('span.tag::text').getall()
yield item
# items.py
import scrapy
class BlogItem(scrapy.Item):
url = scrapy.Field()
title = scrapy.Field()
author = scrapy.Field()
publish_date = scrapy.Field()
content = scrapy.Field()
tags = scrapy.Field()
# middlewares.py - 添加代理和随机User-Agent
class RandomUserAgentMiddleware:
def __init__(self):
self.ua = UserAgent()
def process_request(self, request, spider):
request.headers['User-Agent'] = self.ua.random
class ProxyMiddleware:
def __init__(self, proxy_list):
self.proxy_list = proxy_list
def process_request(self, request, spider):
if self.proxy_list:
proxy = random.choice(self.proxy_list)
request.meta['proxy'] = proxy
3.3 数据存储与管理
3.3.1 保存到CSV和JSON
import csv
import json
import pandas as pd
class DataStorage:
"""
数据存储工具类
"""
@staticmethod
def save_to_csv(data, filename):
"""
保存数据到CSV文件
Args:
data (list): 数据列表,每个元素是字典
filename (str): 文件名
"""
if not data:
print("没有数据需要保存")
return
# 获取所有可能的字段
fieldnames = set()
for item in data:
fieldnames.update(item.keys())
fieldnames = list(fieldnames)
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"数据已保存到 {filename}")
@staticmethod
def save_to_json(data, filename, indent=2):
"""保存数据到JSON文件"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
print(f"数据已保存到 {filename}")
@staticmethod
def save_to_excel(data, filename, sheet_name='Sheet1'):
"""保存数据到Excel文件"""
df = pd.DataFrame(data)
df.to_excel(filename, sheet_name=sheet_name, index=False)
print(f"数据已保存到 {filename}")
# 使用示例
def save_example():
sample_data = [
{'title': 'Python教程', 'author': '张三', 'views': 1000},
{'title': '爬虫指南', 'author': '李四', 'views': 2000},
]
storage = DataStorage()
storage.save_to_csv(sample_data, 'blog_data.csv')
storage.save_to_json(sample_data, 'blog_data.json')
storage.save_to_excel(sample_data, 'blog_data.xlsx')
3.3.2 保存到数据库
import sqlite3
import pymysql
from pymongo import MongoClient
class DatabaseStorage:
"""
数据库存储工具类
"""
def __init__(self, db_type='sqlite', **kwargs):
"""
初始化数据库连接
Args:
db_type (str): 数据库类型 ('sqlite', 'mysql', 'mongodb')
**kwargs: 连接参数
"""
self.db_type = db_type
self.conn = None
if db_type == 'sqlite':
self.conn = sqlite3.connect(kwargs.get('db_path', 'crawler.db'))
self._init_sqlite_tables()
elif db_type == 'mysql':
self.conn = pymysql.connect(
host=kwargs.get('host', 'localhost'),
user=kwargs.get('user'),
password=kwargs.get('password'),
database=kwargs.get('database'),
charset='utf8mb4'
)
self._init_mysql_tables()
elif db_type == 'mongodb':
self.client = MongoClient(kwargs.get('uri', 'mongodb://localhost:27017/'))
self.db = self.client[kwargs.get('db_name', 'crawler_db')]
def _init_sqlite_tables(self):
"""初始化SQLite表结构"""
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
author TEXT,
publish_date TEXT,
url TEXT UNIQUE,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def _init_mysql_tables(self):
"""初始化MySQL表结构"""
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(500) NOT NULL,
author VARCHAR(100),
publish_date DATE,
url VARCHAR(500) UNIQUE,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_title (title),
INDEX idx_author (author)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
self.conn.commit()
def save_article(self, data):
"""保存文章数据"""
if self.db_type == 'sqlite':
cursor = self.conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO articles (title, author, publish_date, url, content)
VALUES (?, ?, ?, ?, ?)
''', (data['title'], data.get('author'), data.get('publish_date'),
data['url'], data.get('content')))
self.conn.commit()
print(f"文章 '{data['title']}' 已保存到SQLite")
except Exception as e:
print(f"保存失败: {e}")
elif self.db_type == 'mysql':
cursor = self.conn.cursor()
try:
cursor.execute('''
INSERT INTO articles (title, author, publish_date, url, content)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title=VALUES(title), author=VALUES(author),
publish_date=VALUES(publish_date), content=VALUES(content)
''', (data['title'], data.get('author'), data.get('publish_date'),
data['url'], data.get('content')))
self.conn.commit()
print(f"文章 '{data['title']}' 已保存到MySQL")
except Exception as e:
print(f"保存失败: {e}")
elif self.db_type == 'mongodb':
collection = self.db.articles
result = collection.update_one(
{'url': data['url']},
{'$set': data},
upsert=True
)
if result.upserted_id or result.modified_count:
print(f"文章 '{data['title']}' 已保存到MongoDB")
def close(self):
"""关闭数据库连接"""
if self.db_type in ['sqlite', 'mysql'] and self.conn:
self.conn.close()
elif self.db_type == 'mongodb' and self.client:
self.client.close()
# 使用示例
def database_example():
# SQLite示例
sqlite_db = DatabaseStorage('sqlite')
article = {
'title': 'Python爬虫教程',
'author': '王五',
'publish_date': '2024-01-15',
'url': 'https://example.com/python-crawler',
'content': '这是文章的详细内容...'
}
sqlite_db.save_article(article)
sqlite_db.close()
# MongoDB示例
mongo_db = DatabaseStorage('mongodb', db_name='crawler_db')
mongo_db.save_article(article)
mongo_db.close()
第四部分:完整项目示例
4.1 项目结构设计
crawler_project/
├── main.py # 主程序入口
├── config.py # 配置文件
├── crawler/
│ ├── __init__.py
│ ├── base_crawler.py # 基础爬虫类
│ ├── dynamic_crawler.py # 动态爬虫类
│ └── scrapy_crawler.py # Scrapy爬虫
├── utils/
│ ├── __init__.py
│ ├── storage.py # 存储工具
│ ├── anti_detection.py # 反检测工具
│ └── logger.py # 日志工具
├── data/
│ ├── raw/ # 原始数据
│ └── processed/ # 处理后的数据
└── requirements.txt
4.2 完整的新闻爬虫项目
# config.py
class Config:
# 目标网站配置
TARGET_URL = "https://example.com/news"
# 爬取配置
MAX_PAGES = 10
DOWNLOAD_DELAY = 2
TIMEOUT = 15
# 存储配置
STORAGE_TYPE = "sqlite" # "sqlite", "mysql", "mongodb", "csv", "json"
DB_CONFIG = {
"db_path": "news.db",
"host": "localhost",
"user": "root",
"password": "password",
"database": "crawler_db",
"uri": "mongodb://localhost:27017/"
}
# 反爬配置
USE_PROXY = False
PROXY_LIST = [
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
]
# utils/logger.py
import logging
import sys
from datetime import datetime
def setup_logger(name="crawler"):
"""配置日志"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
# 清除已有处理器
logger.handlers.clear()
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler(f'crawler_{datetime.now().strftime("%Y%m%d")}.log')
file_handler.setLevel(logging.DEBUG)
# 格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
# crawler/base_crawler.py
import requests
from bs4 import BeautifulSoup
import time
import random
from utils.anti_detection import AntiAntiCrawler
from utils.logger import setup_logger
class BaseCrawler:
"""基础爬虫类"""
def __init__(self, config):
self.config = config
self.logger = setup_logger()
self.anti_crawler = AntiAntiCrawler()
self.session = requests.Session()
self.session.headers.update(self.anti_crawler.get_random_headers())
def safe_request(self, url, max_retries=3):
"""安全请求,带重试机制"""
for attempt in range(max_retries):
try:
self.anti_crawler.random_delay(
self.config.DOWNLOAD_DELAY * 0.5,
self.config.DOWNLOAD_DELAY * 1.5
)
response = self.session.get(
url,
timeout=self.config.TIMEOUT,
headers=self.anti_crawler.get_random_headers()
)
if response.status_code == 200:
self.logger.info(f"成功获取: {url}")
return BeautifulSoup(response.content, 'lxml')
else:
self.logger.warning(f"状态码 {response.status_code}: {url}")
except Exception as e:
self.logger.error(f"请求失败 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
else:
return None
return None
def extract_article(self, soup):
"""提取文章信息"""
if not soup:
return None
article = {
'title': soup.find('h1').get_text(strip=True) if soup.find('h1') else '',
'author': soup.find('span', class_='author').get_text(strip=True) if soup.find('span', class_='author') else '',
'publish_date': soup.find('time').get_text(strip=True) if soup.find('time') else '',
'content': soup.find('div', class_='article-content').get_text(strip=True) if soup.find('div', class_='article-content') else '',
'url': self.current_url,
}
return article
# main.py
from config import Config
from crawler.base_crawler import BaseCrawler
from utils.storage import DataStorage, DatabaseStorage
def main():
config = Config()
crawler = BaseCrawler(config)
storage = DataStorage()
# 根据配置选择存储方式
if config.STORAGE_TYPE in ['sqlite', 'mysql', 'mongodb']:
db_storage = DatabaseStorage(config.STORAGE_TYPE, **config.DB_CONFIG)
articles = []
current_page = 1
while current_page <= config.MAX_PAGES:
url = f"{config.TARGET_URL}?page={current_page}"
crawler.current_url = url
soup = crawler.safe_request(url)
if not soup:
break
# 解析文章列表
article_links = soup.select('article.post h2 a')
for link in article_links:
article_url = link.get('href')
if not article_url:
continue
# 获取完整URL
if not article_url.startswith('http'):
article_url = f"https://example.com{article_url}"
# 爬取文章详情
article_soup = crawler.safe_request(article_url)
if article_soup:
article_data = crawler.extract_article(article_soup)
if article_data:
articles.append(article_data)
# 存储数据
if config.STORAGE_TYPE == 'csv':
storage.save_to_csv([article_data], 'articles.csv')
elif config.STORAGE_TYPE == 'json':
storage.save_to_json([article_data], 'articles.json')
elif config.STORAGE_TYPE in ['sqlite', 'mysql', 'mongodb']:
db_storage.save_article(article_data)
current_page += 1
# 批量保存
if config.STORAGE_TYPE == 'csv' and articles:
storage.save_to_csv(articles, 'articles_all.csv')
elif config.STORAGE_TYPE == 'json' and articles:
storage.save_to_json(articles, 'articles_all.json')
if config.STORAGE_TYPE in ['sqlite', 'mysql', 'mongodb']:
db_storage.close()
crawler.logger.info(f"爬取完成,共 {len(articles)} 篇文章")
if __name__ == "__main__":
main()
第五部分:性能优化与最佳实践
5.1 并发爬取
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import threading
class ConcurrentCrawler:
"""并发爬虫"""
def __init__(self, max_workers=5):
self.max_workers = max_workers
self.lock = threading.Lock()
async def fetch_async(self, session, url):
"""异步获取单个URL"""
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
html = await response.text()
return html
else:
print(f"错误: {url} - {response.status}")
return None
except Exception as e:
print(f"异常: {url} - {e}")
return None
async def crawl_multiple_async(self, urls):
"""异步爬取多个URL"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
def crawl_multiple_sync(self, urls):
"""多线程爬取"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = list(executor.map(self.safe_request_sync, urls))
return results
def safe_request_sync(self, url):
"""线程安全的请求"""
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return BeautifulSoup(response.content, 'lxml')
except Exception as e:
print(f"线程请求失败: {url} - {e}")
return None
# 使用示例
async def async_example():
crawler = ConcurrentCrawler()
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
]
results = await crawler.crawl_multiple_async(urls)
for url, html in zip(urls, results):
if html:
soup = BeautifulSoup(html, 'lxml')
print(f"{url}: {soup.title.get_text(strip=True) if soup.title else 'No title'}")
# 运行异步爬虫
# asyncio.run(async_example())
5.2 缓存机制
import hashlib
import os
import pickle
from functools import wraps
def cache_to_disk(func):
"""磁盘缓存装饰器"""
cache_dir = "cache"
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
key = hashlib.md5(str(args).encode() + str(kwargs).encode()).hexdigest()
cache_file = os.path.join(cache_dir, f"{key}.pkl")
# 检查缓存
if os.path.exists(cache_file):
try:
with open(cache_file, 'rb') as f:
return pickle.load(f)
except:
pass
# 执行函数并缓存结果
result = func(*args, **kwargs)
with open(cache_file, 'wb') as f:
pickle.dump(result, f)
return result
return wrapper
@cache_to_disk
def get_cached_page(url):
"""带缓存的页面获取"""
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers, timeout=10)
return response.text if response.status_code == 200 else None
5.3 错误处理与日志
import logging
from datetime import datetime
class CrawlerErrorHandler:
"""错误处理与日志记录"""
def __init__(self, log_file="crawler_errors.log"):
self.logger = logging.getLogger("CrawlerErrors")
self.logger.setLevel(logging.ERROR)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def handle_error(self, error, context=None):
"""统一错误处理"""
error_msg = f"Error: {str(error)}"
if context:
error_msg += f" | Context: {context}"
self.logger.error(error_msg)
# 可以添加通知逻辑(邮件、Slack等)
self.send_alert(error_msg)
def send_alert(self, message):
"""发送警报(示例)"""
# 这里可以集成邮件、Slack、钉钉等通知
print(f"ALERT: {message}")
# 使用示例
error_handler = CrawlerErrorHandler()
try:
# 爬取逻辑
response = requests.get("https://example.com", timeout=5)
except requests.exceptions.Timeout as e:
error_handler.handle_error(e, {"url": "https://example.com", "type": "timeout"})
except Exception as e:
error_handler.handle_error(e, {"url": "https://example.com", "type": "unknown"})
第六部分:实战案例:爬取电商网站商品信息
6.1 项目需求分析
假设我们需要爬取一个电商网站的商品信息,包括:
- 商品名称
- 价格
- 评分
- 评论数
- 商品链接
6.2 完整实现代码
# ecommerce_crawler.py
import requests
from bs4 import BeautifulSoup
import re
import json
import time
from datetime import datetime
class EcommerceCrawler:
"""电商网站爬虫"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
})
self.products = []
def extract_price(self, price_text):
"""从文本中提取价格"""
if not price_text:
return None
# 匹配数字和小数点
match = re.search(r'[\d,]+\.?\d*', price_text)
if match:
return float(match.group().replace(',', ''))
return None
def extract_rating(self, rating_text):
"""提取评分"""
if not rating_text:
return None
match = re.search(r'(\d+\.?\d*)', rating_text)
if match:
return float(match.group())
return None
def extract_review_count(self, review_text):
"""提取评论数"""
if not review_text:
return None
# 移除单位(K, M等)并转换为数字
review_text = review_text.replace(',', '').replace(' ', '')
if 'K' in review_text:
return int(float(review_text.replace('K', '')) * 1000)
elif 'M' in review_text:
return int(float(review_text.replace('M', '')) * 1000000)
else:
try:
return int(review_text)
except:
return None
def parse_product_page(self, soup):
"""解析商品列表页"""
products = []
# 假设商品都在div.product-item中
product_elements = soup.find_all('div', class_='product-item')
for elem in product_elements:
try:
# 提取商品名称
name_elem = elem.find('h3', class_='product-title')
name = name_elem.get_text(strip=True) if name_elem else None
# 提取价格
price_elem = elem.find('span', class_='price')
price = self.extract_price(price_elem.get_text(strip=True)) if price_elem else None
# 提取评分
rating_elem = elem.find('span', class_='rating')
rating = self.extract_rating(rating_elem.get_text(strip=True)) if rating_elem else None
# 提取评论数
review_elem = elem.find('span', class_='review-count')
review_count = self.extract_review_count(review_elem.get_text(strip=True)) if review_elem else None
# 提取链接
link_elem = elem.find('a', class_='product-link')
link = link_elem.get('href') if link_elem else None
if link and not link.startswith('http'):
link = self.base_url + link
# 提取图片
img_elem = elem.find('img')
image_url = img_elem.get('src') if img_elem else None
if name and price: # 至少需要名称和价格
products.append({
'name': name,
'price': price,
'rating': rating,
'review_count': review_count,
'link': link,
'image_url': image_url,
'crawled_at': datetime.now().isoformat()
})
except Exception as e:
print(f"解析商品时出错: {e}")
continue
return products
def crawl_category(self, category_url, max_pages=5):
"""爬取整个分类"""
all_products = []
for page in range(1, max_pages + 1):
url = f"{category_url}?page={page}"
print(f"正在爬取第 {page} 页: {url}")
try:
response = self.session.get(url, timeout=15)
if response.status_code != 200:
print(f"无法访问: {url}, 状态码: {response.status_code}")
break
soup = BeautifulSoup(response.content, 'lxml')
products = self.parse_product_page(soup)
if not products:
print("未找到商品,停止爬取")
break
all_products.extend(products)
print(f"第 {page} 页找到 {len(products)} 个商品")
# 随机延迟
time.sleep(random.uniform(2, 4))
except Exception as e:
print(f"爬取第 {page} 页时出错: {e}")
break
return all_products
def save_results(self, products, filename=None):
"""保存结果"""
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"products_{timestamp}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(products, f, ensure_ascii=False, indent=2)
print(f"已保存 {len(products)} 个商品到 {filename}")
# 同时保存CSV
csv_filename = filename.replace('.json', '.csv')
if products:
import csv
fieldnames = products[0].keys()
with open(csv_filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(products)
print(f"同时保存CSV到 {csv_filename}")
def analyze_products(self, products):
"""分析商品数据"""
if not products:
print("没有商品数据可供分析")
return
prices = [p['price'] for p in products if p['price']]
ratings = [p['rating'] for p in products if p['rating']]
review_counts = [p['review_count'] for p in products if p['review_count']]
print("\n=== 商品分析报告 ===")
print(f"商品总数: {len(products)}")
if prices:
print(f"价格范围: ${min(prices):.2f} - ${max(prices):.2f}")
print(f"平均价格: ${sum(prices)/len(prices):.2f}")
if ratings:
print(f"评分范围: {min(ratings):.1f} - {max(ratings):.1f}")
print(f"平均评分: {sum(ratings)/len(ratings):.2f}")
if review_counts:
print(f"总评论数: {sum(review_counts)}")
print(f"平均评论数: {sum(review_counts)/len(review_counts):.0f}")
# 找出最贵的商品
if prices:
most_expensive = max(products, key=lambda x: x['price'] if x['price'] else 0)
print(f"\n最贵商品: {most_expensive['name']} - ${most_expensive['price']:.2f}")
# 找出评分最高的商品
if ratings:
best_rated = max(products, key=lambda x: x['rating'] if x['rating'] else 0)
print(f"评分最高: {best_rated['name']} - {best_rated['rating']:.1f}星")
# 使用示例
def crawl_electronics():
"""爬取电子产品分类"""
crawler = EcommerceCrawler("https://example.com")
# 爬取手机分类
phones = crawler.crawl_category("https://example.com/category/phones", max_pages=3)
# 爬取笔记本分类
laptops = crawler.crawl_category("https://example.com/category/laptops", max_pages=2)
# 合并结果
all_products = phones + laptops
# 保存和分析
crawler.save_results(all_products)
crawler.analyze_products(all_products)
return all_products
if __name__ == "__main__":
# 注意:这是一个示例,实际使用时需要替换为真实的URL和选择器
print("注意:请替换为实际的电商网站URL和CSS选择器")
# products = crawl_electronics()
第七部分:法律与道德规范
7.1 遵守robots.txt
import urllib.robotparser
from urllib.parse import urljoin
def check_robots_permission(url, user_agent='*'):
"""
检查是否允许爬取
Args:
url (str): 目标URL
user_agent (str): 用户代理
Returns:
bool: 是否允许
"""
rp = urllib.robotparser.RobotFileParser()
base_url = f"{url.split('/')[0]}//{url.split('/')[2]}"
robots_url = urljoin(base_url, "/robots.txt")
try:
rp.set_url(robots_url)
rp.read()
return rp.can_fetch(user_agent, url)
except Exception as e:
print(f"无法读取robots.txt: {e}")
return True # 如果无法读取,假设允许
# 使用示例
def ethical_crawl(url):
if not check_robots_permission(url):
print(f"根据robots.txt,不允许爬取 {url}")
return
print(f"允许爬取 {url}")
# 继续爬取逻辑
7.2 控制爬取频率
import time
from collections import defaultdict
class RateLimiter:
"""速率限制器"""
def __init__(self, max_requests_per_minute=10):
self.max_requests = max_requests_per_minute
self.request_times = defaultdict(list)
def wait_if_needed(self, domain):
"""检查是否需要等待"""
now = time.time()
# 清理60秒前的记录
self.request_times[domain] = [
t for t in self.request_times[domain] if now - t < 60
]
if len(self.request_times[domain]) >= self.max_requests:
sleep_time = 60 - (now - self.request_times[domain][0])
if sleep_time > 0:
print(f"速率限制,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
# 清空后重新记录
self.request_times[domain] = []
self.request_times[domain].append(now)
# 使用示例
rate_limiter = RateLimiter(max_requests_per_minute=5)
def limited_request(url):
domain = url.split('/')[2]
rate_limiter.wait_if_needed(domain)
# 执行请求
response = requests.get(url)
return response
第八部分:总结与进阶学习
8.1 关键要点回顾
- 基础技术:requests + BeautifulSoup 适合静态页面
- 动态内容:Selenium 处理JavaScript渲染
- 大规模爬取:Scrapy框架提供完整解决方案
- 数据存储:CSV/JSON适合小规模,数据库适合大规模
- 反爬策略:随机User-Agent、延迟、代理
- 性能优化:并发、缓存、错误处理
8.2 进阶学习方向
- 分布式爬虫:使用Scrapy-Redis实现分布式
- 机器学习:使用ML识别验证码、分类数据
- API爬取:直接调用REST API获取数据
- 浏览器自动化:Playwright替代Selenium
- 云部署:在AWS/GCP上部署爬虫
8.3 推荐资源
- 官方文档:Scrapy、Selenium、requests
- GitHub项目:awesome-web-crawlers
- 书籍:《Python网络数据采集》
- 社区:Stack Overflow、Reddit r/webscraping
8.4 最终建议
- 始终尊重目标网站:控制频率,避免造成服务器压力
- 合法合规:了解相关法律法规
- 持续学习:网站技术不断更新,爬虫技术也需要与时俱进
- 备份数据:定期备份爬取的数据
- 监控维护:建立监控机制,及时发现和解决问题
通过本指南,您应该能够构建从简单到复杂的各种Web爬虫。记住,爬虫技术是一把双刃剑,正确使用可以创造巨大价值,滥用则可能带来法律风险。祝您爬虫之旅顺利!
