引言:Web爬虫的重要性与应用场景

Web爬虫(Web Crawler)是现代数据获取和信息收集的核心技术之一。在当今信息爆炸的时代,能够自动化地从互联网上提取有价值的数据变得至关重要。Web爬虫广泛应用于市场研究、竞争对手分析、价格监控、新闻聚合、学术研究等多个领域。

通过Python实现Web爬虫具有显著优势:Python拥有丰富的第三方库生态系统,简洁的语法结构,以及强大的数据处理能力。相比其他编程语言,Python编写爬虫代码更加高效,维护成本更低。

第一部分:Web爬虫的基础概念与准备工作

1.1 Web爬虫的工作原理

Web爬虫通过模拟人类浏览器的行为,自动访问网页并提取所需信息。其基本工作流程如下:

  1. 发送请求:向目标网站发送HTTP请求
  2. 获取响应:接收服务器返回的HTML内容
  3. 解析内容:从HTML中提取结构化数据
  4. 存储数据:将提取的数据保存到文件或数据库
  5. 链接发现:从当前页面发现新的链接继续爬取

1.2 法律与道德考量

在编写爬虫之前,必须了解相关法律和道德规范:

  • 遵守robots.txt:检查目标网站的robots.txt文件,了解爬取规则
  • 控制爬取频率:避免对目标网站造成过大负担
  • 尊重版权:注意数据的使用权限
  • 隐私保护:避免爬取个人隐私信息

1.3 开发环境准备

首先安装必要的Python库:

pip install requests beautifulsoup4 selenium scrapy pandas lxml

第二部分:基础爬虫实现

2.1 使用requests库发送HTTP请求

requests是Python最流行的HTTP库,简单易用。以下是一个完整的示例:

import requests
from bs4 import BeautifulSoup
import time
import random

def basic_crawler(url, headers=None):
    """
    基础爬虫函数,发送请求并返回解析后的BeautifulSoup对象
    
    Args:
        url (str): 目标URL
        headers (dict): 请求头,模拟浏览器访问
    
    Returns:
        BeautifulSoup: 解析后的HTML对象
    """
    # 设置默认请求头,模拟真实浏览器访问
    default_headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
    }
    
    if headers:
        default_headers.update(headers)
    
    try:
        # 发送GET请求,设置超时时间
        response = requests.get(url, headers=default_headers, timeout=10)
        
        # 检查状态码
        if response.status_code == 200:
            print(f"成功访问: {url}")
            # 使用lxml解析器,速度更快
            soup = BeautifulSoup(response.content, 'lxml')
            return soup
        else:
            print(f"访问失败: {url}, 状态码: {response.status_code}")
            return None
            
    except requests.exceptions.Timeout:
        print(f"请求超时: {url}")
        return None
    except requests.exceptions.RequestException as e:
        print(f"请求异常: {url}, 错误: {e}")
        return None

# 使用示例
if __name__ == "__main__":
    # 示例:爬取一个简单的新闻网站标题
    url = "https://example.com/news"  # 替换为实际URL
    soup = basic_crawler(url)
    
    if soup:
        # 提取所有新闻标题
        titles = soup.find_all('h2', class_='news-title')
        for i, title in enumerate(titles, 1):
            print(f"{i}. {title.get_text(strip=True)}")

2.2 使用BeautifulSoup解析HTML

BeautifulSoup提供了多种方法来定位和提取元素:

def parse_html_example(soup):
    """
    演示BeautifulSoup的各种解析方法
    """
    # 1. 通过标签名查找
    all_links = soup.find_all('a')
    print(f"找到 {len(all_links)} 个链接")
    
    # 2. 通过CSS类名查找
    articles = soup.find_all('div', class_='article')
    print(f"找到 {len(articles)} 篇文章")
    
    # 3. 通过ID查找
    main_content = soup.find(id='main-content')
    if main_content:
        print("找到主内容区域")
    
    # 4. 组合查找
    # 查找class="post"的div下的所有h2标签
    post_titles = soup.select('div.post > h2')
    
    # 5. 提取属性
    for link in all_links[:5]:  # 只显示前5个
        href = link.get('href')
        text = link.get_text(strip=True)
        print(f"链接文本: {text}, URL: {href}")
    
    # 6. 提取所有文本
    all_text = soup.get_text()
    print(f"页面总文本长度: {len(all_text)}")

# 实际应用示例:爬取博客文章
def crawl_blog(url):
    """
    爬取博客文章的标题、作者和发布时间
    """
    soup = basic_crawler(url)
    if not soup:
        return []
    
    articles = []
    # 假设每篇文章都在<article>标签内
    for article in soup.find_all('article'):
        # 提取标题
        title_elem = article.find('h2')
        title = title_elem.get_text(strip=True) if title_elem else "无标题"
        
        # 提取作者
        author_elem = article.find('span', class_='author')
        author = author_elem.get_text(strip=True) if author_elem else "未知作者"
        
        # 提取发布时间
        date_elem = article.find('time')
        date = date_elem.get_text(strip=True) if date_elem else "未知时间"
        
        # 提取文章链接
        link_elem = article.find('a')
        link = link_elem.get('href') if link_elem else "#"
        
        articles.append({
            'title': title,
            'author': author,
            'date': date,
            'link': link
        })
    
    return articles

2.3 处理反爬虫机制

现代网站通常有反爬虫机制,需要采取相应策略:

import random
import time
from fake_useragent import UserAgent

class AntiAntiCrawler:
    """
    处理反爬虫机制的工具类
    """
    
    def __init__(self):
        self.ua = UserAgent()
        self.request_count = 0
    
    def get_random_headers(self):
        """生成随机请求头"""
        return {
            'User-Agent': self.ua.random,
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }
    
    def random_delay(self, min_delay=1, max_delay=3):
        """随机延迟,避免请求过于频繁"""
        delay = random.uniform(min_delay, max_delay)
        time.sleep(delay)
        self.request_count += 1
        print(f"已执行 {self.request_count} 次请求,延迟 {delay:.2f} 秒")
    
    def crawl_with_anti_detection(self, url):
        """使用反检测策略爬取"""
        headers = self.get_random_headers()
        
        try:
            response = requests.get(url, headers=headers, timeout=15)
            if response.status_code == 200:
                # 随机延迟后再解析
                self.random_delay(2, 5)
                return BeautifulSoup(response.content, 'lxml')
            else:
                print(f"状态码异常: {response.status_code}")
                return None
        except Exception as e:
            print(f"爬取失败: {e}")
            return None

# 使用示例
anti_crawler = AntiAntiCrawler()
soup = anti_crawler.crawl_with_anti_detection("https://example.com")

第三部分:高级爬虫技术

3.1 处理JavaScript渲染的页面

对于使用JavaScript动态加载内容的网站,需要使用Selenium:

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import time

class JavaScriptCrawler:
    """
    处理JavaScript渲染页面的爬虫
    """
    
    def __init__(self, headless=True):
        """
        初始化Chrome驱动
        
        Args:
            headless (bool): 是否无头模式运行
        """
        chrome_options = Options()
        if headless:
            chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
        
        # 初始化驱动(需要提前安装ChromeDriver)
        self.driver = webdriver.Chrome(options=chrome_options)
        self.wait = WebDriverWait(self.driver, 10)
    
    def crawl_dynamic_content(self, url, wait_for_selector=None):
        """
        爬取动态加载的内容
        
        Args:
            url (str): 目标URL
            wait_for_selector (str): 等待出现的CSS选择器
        
        Returns:
            str: 页面源代码
        """
        try:
            print(f"正在加载页面: {url}")
            self.driver.get(url)
            
            if wait_for_selector:
                # 等待特定元素加载完成
                self.wait.until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, wait_for_selector))
                )
                print(f"元素 {wait_for_selector} 已加载")
            
            # 额外等待确保所有动态内容加载
            time.sleep(2)
            
            return self.driver.page_source
            
        except Exception as e:
            print(f"动态爬取失败: {e}")
            return None
    
    def extract_dynamic_data(self, url, selector):
        """
        提取动态加载的数据
        
        Args:
            url (str): 目标URL
            selector (str): CSS选择器
        
        Returns:
            list: 提取的数据列表
        """
        html = self.crawl_dynamic_content(url, selector)
        if not html:
            return []
        
        soup = BeautifulSoup(html, 'lxml')
        elements = soup.select(selector)
        
        data = [elem.get_text(strip=True) for elem in elements]
        return data
    
    def close(self):
        """关闭浏览器"""
        if self.driver:
            self.driver.quit()

# 使用示例
def crawl_infinite_scroll():
    """
    爬取无限滚动页面的示例
    """
    crawler = JavaScriptCrawler(headless=True)
    
    try:
        url = "https://example.com/infinite-scroll"
        # 爬取动态加载的商品列表
        products = crawler.extract_dynamic_data(url, '.product-item')
        
        print(f"找到 {len(products)} 个商品:")
        for i, product in enumerate(products, 1):
            print(f"{i}. {product}")
        
        # 模拟滚动加载更多
        for scroll in range(3):
            crawler.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(2)
            # 重新获取并解析内容
            html = crawler.driver.page_source
            soup = BeautifulSoup(html, 'lxml')
            new_products = soup.select('.product-item')
            print(f"滚动 {scroll+1} 次后,共找到 {len(new_products)} 个商品")
            
    finally:
        crawler.close()

# 处理登录后的页面
def crawl_after_login():
    """
    登录后爬取受保护内容
    """
    crawler = JavaScriptCrawler(headless=False)  # 显示浏览器以便观察
    
    try:
        # 1. 打开登录页面
        crawler.driver.get("https://example.com/login")
        
        # 2. 填写登录表单
        username = crawler.driver.find_element(By.NAME, "username")
        password = crawler.driver.find_element(By.NAME, "password")
        
        username.send_keys("your_username")
        password.send_keys("your_password")
        
        # 3. 点击登录按钮
        login_button = crawler.driver.find_element(By.CSS_SELECTOR, "button[type='submit']")
        login_button.click()
        
        # 4. 等待登录完成
        time.sleep(3)
        
        # 5. 爬取登录后的内容
        protected_data = crawler.extract_dynamic_data(
            "https://example.com/dashboard", 
            ".dashboard-item"
        )
        
        print("受保护内容:", protected_data)
        
    finally:
        crawler.close()

3.2 Scrapy框架:构建专业爬虫

Scrapy是Python最强大的爬虫框架,适合大规模爬取:

# 创建Scrapy项目结构(在命令行执行)
# scrapy startproject myproject
# cd myproject
# scrapy genspider example example.com

# spiders/blog_spider.py
import scrapy
from scrapy.http import Request
from myproject.items import BlogItem

class BlogSpider(scrapy.Spider):
    name = "blog_spider"
    allowed_domains = ["example.com"]
    start_urls = ["https://example.com/blog"]
    
    custom_settings = {
        'DOWNLOAD_DELAY': 2,  # 下载延迟
        'CONCURRENT_REQUESTS_PER_DOMAIN': 2,  # 并发请求数
        'ROBOTSTXT_OBEY': True,
        'FEEDS': {
            'output/blog.json': {'format': 'json', 'overwrite': True},
        },
    }
    
    def parse(self, response):
        """解析博客列表页"""
        # 提取所有文章链接
        article_links = response.css('article.post h2 a::attr(href)').getall()
        
        for link in article_links:
            # 跟随文章链接,调用parse_article方法
            yield Request(
                response.urljoin(link),
                callback=self.parse_article,
                meta={'link': link}
            )
        
        # 处理分页
        next_page = response.css('a.next-page::attr(href)').get()
        if next_page:
            yield Request(
                response.urljoin(next_page),
                callback=self.parse
            )
    
    def parse_article(self, response):
        """解析单篇文章详情"""
        item = BlogItem()
        
        item['url'] = response.url
        item['title'] = response.css('h1.article-title::text').get()
        item['author'] = response.css('span.author::text').get()
        item['publish_date'] = response.css('time::attr(datetime)').get()
        item['content'] = response.css('div.article-content').get()
        item['tags'] = response.css('span.tag::text').getall()
        
        yield item

# items.py
import scrapy

class BlogItem(scrapy.Item):
    url = scrapy.Field()
    title = scrapy.Field()
    author = scrapy.Field()
    publish_date = scrapy.Field()
    content = scrapy.Field()
    tags = scrapy.Field()

# middlewares.py - 添加代理和随机User-Agent
class RandomUserAgentMiddleware:
    def __init__(self):
        self.ua = UserAgent()
    
    def process_request(self, request, spider):
        request.headers['User-Agent'] = self.ua.random

class ProxyMiddleware:
    def __init__(self, proxy_list):
        self.proxy_list = proxy_list
    
    def process_request(self, request, spider):
        if self.proxy_list:
            proxy = random.choice(self.proxy_list)
            request.meta['proxy'] = proxy

3.3 数据存储与管理

3.3.1 保存到CSV和JSON

import csv
import json
import pandas as pd

class DataStorage:
    """
    数据存储工具类
    """
    
    @staticmethod
    def save_to_csv(data, filename):
        """
        保存数据到CSV文件
        
        Args:
            data (list): 数据列表,每个元素是字典
            filename (str): 文件名
        """
        if not data:
            print("没有数据需要保存")
            return
        
        # 获取所有可能的字段
        fieldnames = set()
        for item in data:
            fieldnames.update(item.keys())
        fieldnames = list(fieldnames)
        
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data)
        
        print(f"数据已保存到 {filename}")
    
    @staticmethod
    def save_to_json(data, filename, indent=2):
        """保存数据到JSON文件"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=indent)
        
        print(f"数据已保存到 {filename}")
    
    @staticmethod
    def save_to_excel(data, filename, sheet_name='Sheet1'):
        """保存数据到Excel文件"""
        df = pd.DataFrame(data)
        df.to_excel(filename, sheet_name=sheet_name, index=False)
        print(f"数据已保存到 {filename}")

# 使用示例
def save_example():
    sample_data = [
        {'title': 'Python教程', 'author': '张三', 'views': 1000},
        {'title': '爬虫指南', 'author': '李四', 'views': 2000},
    ]
    
    storage = DataStorage()
    storage.save_to_csv(sample_data, 'blog_data.csv')
    storage.save_to_json(sample_data, 'blog_data.json')
    storage.save_to_excel(sample_data, 'blog_data.xlsx')

3.3.2 保存到数据库

import sqlite3
import pymysql
from pymongo import MongoClient

class DatabaseStorage:
    """
    数据库存储工具类
    """
    
    def __init__(self, db_type='sqlite', **kwargs):
        """
        初始化数据库连接
        
        Args:
            db_type (str): 数据库类型 ('sqlite', 'mysql', 'mongodb')
            **kwargs: 连接参数
        """
        self.db_type = db_type
        self.conn = None
        
        if db_type == 'sqlite':
            self.conn = sqlite3.connect(kwargs.get('db_path', 'crawler.db'))
            self._init_sqlite_tables()
        elif db_type == 'mysql':
            self.conn = pymysql.connect(
                host=kwargs.get('host', 'localhost'),
                user=kwargs.get('user'),
                password=kwargs.get('password'),
                database=kwargs.get('database'),
                charset='utf8mb4'
            )
            self._init_mysql_tables()
        elif db_type == 'mongodb':
            self.client = MongoClient(kwargs.get('uri', 'mongodb://localhost:27017/'))
            self.db = self.client[kwargs.get('db_name', 'crawler_db')]
    
    def _init_sqlite_tables(self):
        """初始化SQLite表结构"""
        cursor = self.conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS articles (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT NOT NULL,
                author TEXT,
                publish_date TEXT,
                url TEXT UNIQUE,
                content TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        self.conn.commit()
    
    def _init_mysql_tables(self):
        """初始化MySQL表结构"""
        cursor = self.conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS articles (
                id INT AUTO_INCREMENT PRIMARY KEY,
                title VARCHAR(500) NOT NULL,
                author VARCHAR(100),
                publish_date DATE,
                url VARCHAR(500) UNIQUE,
                content TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                INDEX idx_title (title),
                INDEX idx_author (author)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ''')
        self.conn.commit()
    
    def save_article(self, data):
        """保存文章数据"""
        if self.db_type == 'sqlite':
            cursor = self.conn.cursor()
            try:
                cursor.execute('''
                    INSERT OR REPLACE INTO articles (title, author, publish_date, url, content)
                    VALUES (?, ?, ?, ?, ?)
                ''', (data['title'], data.get('author'), data.get('publish_date'), 
                      data['url'], data.get('content')))
                self.conn.commit()
                print(f"文章 '{data['title']}' 已保存到SQLite")
            except Exception as e:
                print(f"保存失败: {e}")
        
        elif self.db_type == 'mysql':
            cursor = self.conn.cursor()
            try:
                cursor.execute('''
                    INSERT INTO articles (title, author, publish_date, url, content)
                    VALUES (%s, %s, %s, %s, %s)
                    ON DUPLICATE KEY UPDATE 
                    title=VALUES(title), author=VALUES(author), 
                    publish_date=VALUES(publish_date), content=VALUES(content)
                ''', (data['title'], data.get('author'), data.get('publish_date'), 
                      data['url'], data.get('content')))
                self.conn.commit()
                print(f"文章 '{data['title']}' 已保存到MySQL")
            except Exception as e:
                print(f"保存失败: {e}")
        
        elif self.db_type == 'mongodb':
            collection = self.db.articles
            result = collection.update_one(
                {'url': data['url']},
                {'$set': data},
                upsert=True
            )
            if result.upserted_id or result.modified_count:
                print(f"文章 '{data['title']}' 已保存到MongoDB")
    
    def close(self):
        """关闭数据库连接"""
        if self.db_type in ['sqlite', 'mysql'] and self.conn:
            self.conn.close()
        elif self.db_type == 'mongodb' and self.client:
            self.client.close()

# 使用示例
def database_example():
    # SQLite示例
    sqlite_db = DatabaseStorage('sqlite')
    article = {
        'title': 'Python爬虫教程',
        'author': '王五',
        'publish_date': '2024-01-15',
        'url': 'https://example.com/python-crawler',
        'content': '这是文章的详细内容...'
    }
    sqlite_db.save_article(article)
    sqlite_db.close()
    
    # MongoDB示例
    mongo_db = DatabaseStorage('mongodb', db_name='crawler_db')
    mongo_db.save_article(article)
    mongo_db.close()

第四部分:完整项目示例

4.1 项目结构设计

crawler_project/
├── main.py                 # 主程序入口
├── config.py              # 配置文件
├── crawler/
│   ├── __init__.py
│   ├── base_crawler.py    # 基础爬虫类
│   ├── dynamic_crawler.py # 动态爬虫类
│   └── scrapy_crawler.py  # Scrapy爬虫
├── utils/
│   ├── __init__.py
│   ├── storage.py         # 存储工具
│   ├── anti_detection.py  # 反检测工具
│   └── logger.py          # 日志工具
├── data/
│   ├── raw/               # 原始数据
│   └── processed/         # 处理后的数据
└── requirements.txt

4.2 完整的新闻爬虫项目

# config.py
class Config:
    # 目标网站配置
    TARGET_URL = "https://example.com/news"
    
    # 爬取配置
    MAX_PAGES = 10
    DOWNLOAD_DELAY = 2
    TIMEOUT = 15
    
    # 存储配置
    STORAGE_TYPE = "sqlite"  # "sqlite", "mysql", "mongodb", "csv", "json"
    DB_CONFIG = {
        "db_path": "news.db",
        "host": "localhost",
        "user": "root",
        "password": "password",
        "database": "crawler_db",
        "uri": "mongodb://localhost:27017/"
    }
    
    # 反爬配置
    USE_PROXY = False
    PROXY_LIST = [
        "http://proxy1.example.com:8080",
        "http://proxy2.example.com:8080",
    ]

# utils/logger.py
import logging
import sys
from datetime import datetime

def setup_logger(name="crawler"):
    """配置日志"""
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    
    # 清除已有处理器
    logger.handlers.clear()
    
    # 控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    
    # 文件处理器
    file_handler = logging.FileHandler(f'crawler_{datetime.now().strftime("%Y%m%d")}.log')
    file_handler.setLevel(logging.DEBUG)
    
    # 格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

# crawler/base_crawler.py
import requests
from bs4 import BeautifulSoup
import time
import random
from utils.anti_detection import AntiAntiCrawler
from utils.logger import setup_logger

class BaseCrawler:
    """基础爬虫类"""
    
    def __init__(self, config):
        self.config = config
        self.logger = setup_logger()
        self.anti_crawler = AntiAntiCrawler()
        self.session = requests.Session()
        self.session.headers.update(self.anti_crawler.get_random_headers())
    
    def safe_request(self, url, max_retries=3):
        """安全请求,带重试机制"""
        for attempt in range(max_retries):
            try:
                self.anti_crawler.random_delay(
                    self.config.DOWNLOAD_DELAY * 0.5,
                    self.config.DOWNLOAD_DELAY * 1.5
                )
                
                response = self.session.get(
                    url, 
                    timeout=self.config.TIMEOUT,
                    headers=self.anti_crawler.get_random_headers()
                )
                
                if response.status_code == 200:
                    self.logger.info(f"成功获取: {url}")
                    return BeautifulSoup(response.content, 'lxml')
                else:
                    self.logger.warning(f"状态码 {response.status_code}: {url}")
                    
            except Exception as e:
                self.logger.error(f"请求失败 (尝试 {attempt+1}/{max_retries}): {e}")
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # 指数退避
                else:
                    return None
        
        return None
    
    def extract_article(self, soup):
        """提取文章信息"""
        if not soup:
            return None
        
        article = {
            'title': soup.find('h1').get_text(strip=True) if soup.find('h1') else '',
            'author': soup.find('span', class_='author').get_text(strip=True) if soup.find('span', class_='author') else '',
            'publish_date': soup.find('time').get_text(strip=True) if soup.find('time') else '',
            'content': soup.find('div', class_='article-content').get_text(strip=True) if soup.find('div', class_='article-content') else '',
            'url': self.current_url,
        }
        
        return article

# main.py
from config import Config
from crawler.base_crawler import BaseCrawler
from utils.storage import DataStorage, DatabaseStorage

def main():
    config = Config()
    crawler = BaseCrawler(config)
    storage = DataStorage()
    
    # 根据配置选择存储方式
    if config.STORAGE_TYPE in ['sqlite', 'mysql', 'mongodb']:
        db_storage = DatabaseStorage(config.STORAGE_TYPE, **config.DB_CONFIG)
    
    articles = []
    current_page = 1
    
    while current_page <= config.MAX_PAGES:
        url = f"{config.TARGET_URL}?page={current_page}"
        crawler.current_url = url
        
        soup = crawler.safe_request(url)
        if not soup:
            break
        
        # 解析文章列表
        article_links = soup.select('article.post h2 a')
        for link in article_links:
            article_url = link.get('href')
            if not article_url:
                continue
            
            # 获取完整URL
            if not article_url.startswith('http'):
                article_url = f"https://example.com{article_url}"
            
            # 爬取文章详情
            article_soup = crawler.safe_request(article_url)
            if article_soup:
                article_data = crawler.extract_article(article_soup)
                if article_data:
                    articles.append(article_data)
                    
                    # 存储数据
                    if config.STORAGE_TYPE == 'csv':
                        storage.save_to_csv([article_data], 'articles.csv')
                    elif config.STORAGE_TYPE == 'json':
                        storage.save_to_json([article_data], 'articles.json')
                    elif config.STORAGE_TYPE in ['sqlite', 'mysql', 'mongodb']:
                        db_storage.save_article(article_data)
        
        current_page += 1
    
    # 批量保存
    if config.STORAGE_TYPE == 'csv' and articles:
        storage.save_to_csv(articles, 'articles_all.csv')
    elif config.STORAGE_TYPE == 'json' and articles:
        storage.save_to_json(articles, 'articles_all.json')
    
    if config.STORAGE_TYPE in ['sqlite', 'mysql', 'mongodb']:
        db_storage.close()
    
    crawler.logger.info(f"爬取完成,共 {len(articles)} 篇文章")

if __name__ == "__main__":
    main()

第五部分:性能优化与最佳实践

5.1 并发爬取

import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import threading

class ConcurrentCrawler:
    """并发爬虫"""
    
    def __init__(self, max_workers=5):
        self.max_workers = max_workers
        self.lock = threading.Lock()
    
    async def fetch_async(self, session, url):
        """异步获取单个URL"""
        try:
            async with session.get(url, timeout=10) as response:
                if response.status == 200:
                    html = await response.text()
                    return html
                else:
                    print(f"错误: {url} - {response.status}")
                    return None
        except Exception as e:
            print(f"异常: {url} - {e}")
            return None
    
    async def crawl_multiple_async(self, urls):
        """异步爬取多个URL"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_async(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
            return results
    
    def crawl_multiple_sync(self, urls):
        """多线程爬取"""
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            results = list(executor.map(self.safe_request_sync, urls))
        return results
    
    def safe_request_sync(self, url):
        """线程安全的请求"""
        try:
            headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
            response = requests.get(url, headers=headers, timeout=10)
            if response.status_code == 200:
                return BeautifulSoup(response.content, 'lxml')
        except Exception as e:
            print(f"线程请求失败: {url} - {e}")
        return None

# 使用示例
async def async_example():
    crawler = ConcurrentCrawler()
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3",
    ]
    
    results = await crawler.crawl_multiple_async(urls)
    for url, html in zip(urls, results):
        if html:
            soup = BeautifulSoup(html, 'lxml')
            print(f"{url}: {soup.title.get_text(strip=True) if soup.title else 'No title'}")

# 运行异步爬虫
# asyncio.run(async_example())

5.2 缓存机制

import hashlib
import os
import pickle
from functools import wraps

def cache_to_disk(func):
    """磁盘缓存装饰器"""
    cache_dir = "cache"
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir)
    
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 生成缓存键
        key = hashlib.md5(str(args).encode() + str(kwargs).encode()).hexdigest()
        cache_file = os.path.join(cache_dir, f"{key}.pkl")
        
        # 检查缓存
        if os.path.exists(cache_file):
            try:
                with open(cache_file, 'rb') as f:
                    return pickle.load(f)
            except:
                pass
        
        # 执行函数并缓存结果
        result = func(*args, **kwargs)
        with open(cache_file, 'wb') as f:
            pickle.dump(result, f)
        
        return result
    
    return wrapper

@cache_to_disk
def get_cached_page(url):
    """带缓存的页面获取"""
    headers = {'User-Agent': 'Mozilla/5.0'}
    response = requests.get(url, headers=headers, timeout=10)
    return response.text if response.status_code == 200 else None

5.3 错误处理与日志

import logging
from datetime import datetime

class CrawlerErrorHandler:
    """错误处理与日志记录"""
    
    def __init__(self, log_file="crawler_errors.log"):
        self.logger = logging.getLogger("CrawlerErrors")
        self.logger.setLevel(logging.ERROR)
        
        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def handle_error(self, error, context=None):
        """统一错误处理"""
        error_msg = f"Error: {str(error)}"
        if context:
            error_msg += f" | Context: {context}"
        
        self.logger.error(error_msg)
        
        # 可以添加通知逻辑(邮件、Slack等)
        self.send_alert(error_msg)
    
    def send_alert(self, message):
        """发送警报(示例)"""
        # 这里可以集成邮件、Slack、钉钉等通知
        print(f"ALERT: {message}")

# 使用示例
error_handler = CrawlerErrorHandler()

try:
    # 爬取逻辑
    response = requests.get("https://example.com", timeout=5)
except requests.exceptions.Timeout as e:
    error_handler.handle_error(e, {"url": "https://example.com", "type": "timeout"})
except Exception as e:
    error_handler.handle_error(e, {"url": "https://example.com", "type": "unknown"})

第六部分:实战案例:爬取电商网站商品信息

6.1 项目需求分析

假设我们需要爬取一个电商网站的商品信息,包括:

  • 商品名称
  • 价格
  • 评分
  • 评论数
  • 商品链接

6.2 完整实现代码

# ecommerce_crawler.py
import requests
from bs4 import BeautifulSoup
import re
import json
import time
from datetime import datetime

class EcommerceCrawler:
    """电商网站爬虫"""
    
    def __init__(self, base_url):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
        })
        self.products = []
    
    def extract_price(self, price_text):
        """从文本中提取价格"""
        if not price_text:
            return None
        # 匹配数字和小数点
        match = re.search(r'[\d,]+\.?\d*', price_text)
        if match:
            return float(match.group().replace(',', ''))
        return None
    
    def extract_rating(self, rating_text):
        """提取评分"""
        if not rating_text:
            return None
        match = re.search(r'(\d+\.?\d*)', rating_text)
        if match:
            return float(match.group())
        return None
    
    def extract_review_count(self, review_text):
        """提取评论数"""
        if not review_text:
            return None
        # 移除单位(K, M等)并转换为数字
        review_text = review_text.replace(',', '').replace(' ', '')
        if 'K' in review_text:
            return int(float(review_text.replace('K', '')) * 1000)
        elif 'M' in review_text:
            return int(float(review_text.replace('M', '')) * 1000000)
        else:
            try:
                return int(review_text)
            except:
                return None
    
    def parse_product_page(self, soup):
        """解析商品列表页"""
        products = []
        
        # 假设商品都在div.product-item中
        product_elements = soup.find_all('div', class_='product-item')
        
        for elem in product_elements:
            try:
                # 提取商品名称
                name_elem = elem.find('h3', class_='product-title')
                name = name_elem.get_text(strip=True) if name_elem else None
                
                # 提取价格
                price_elem = elem.find('span', class_='price')
                price = self.extract_price(price_elem.get_text(strip=True)) if price_elem else None
                
                # 提取评分
                rating_elem = elem.find('span', class_='rating')
                rating = self.extract_rating(rating_elem.get_text(strip=True)) if rating_elem else None
                
                # 提取评论数
                review_elem = elem.find('span', class_='review-count')
                review_count = self.extract_review_count(review_elem.get_text(strip=True)) if review_elem else None
                
                # 提取链接
                link_elem = elem.find('a', class_='product-link')
                link = link_elem.get('href') if link_elem else None
                if link and not link.startswith('http'):
                    link = self.base_url + link
                
                # 提取图片
                img_elem = elem.find('img')
                image_url = img_elem.get('src') if img_elem else None
                
                if name and price:  # 至少需要名称和价格
                    products.append({
                        'name': name,
                        'price': price,
                        'rating': rating,
                        'review_count': review_count,
                        'link': link,
                        'image_url': image_url,
                        'crawled_at': datetime.now().isoformat()
                    })
                    
            except Exception as e:
                print(f"解析商品时出错: {e}")
                continue
        
        return products
    
    def crawl_category(self, category_url, max_pages=5):
        """爬取整个分类"""
        all_products = []
        
        for page in range(1, max_pages + 1):
            url = f"{category_url}?page={page}"
            print(f"正在爬取第 {page} 页: {url}")
            
            try:
                response = self.session.get(url, timeout=15)
                if response.status_code != 200:
                    print(f"无法访问: {url}, 状态码: {response.status_code}")
                    break
                
                soup = BeautifulSoup(response.content, 'lxml')
                products = self.parse_product_page(soup)
                
                if not products:
                    print("未找到商品,停止爬取")
                    break
                
                all_products.extend(products)
                print(f"第 {page} 页找到 {len(products)} 个商品")
                
                # 随机延迟
                time.sleep(random.uniform(2, 4))
                
            except Exception as e:
                print(f"爬取第 {page} 页时出错: {e}")
                break
        
        return all_products
    
    def save_results(self, products, filename=None):
        """保存结果"""
        if not filename:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"products_{timestamp}.json"
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(products, f, ensure_ascii=False, indent=2)
        
        print(f"已保存 {len(products)} 个商品到 {filename}")
        
        # 同时保存CSV
        csv_filename = filename.replace('.json', '.csv')
        if products:
            import csv
            fieldnames = products[0].keys()
            with open(csv_filename, 'w', newline='', encoding='utf-8') as f:
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(products)
            print(f"同时保存CSV到 {csv_filename}")
    
    def analyze_products(self, products):
        """分析商品数据"""
        if not products:
            print("没有商品数据可供分析")
            return
        
        prices = [p['price'] for p in products if p['price']]
        ratings = [p['rating'] for p in products if p['rating']]
        review_counts = [p['review_count'] for p in products if p['review_count']]
        
        print("\n=== 商品分析报告 ===")
        print(f"商品总数: {len(products)}")
        
        if prices:
            print(f"价格范围: ${min(prices):.2f} - ${max(prices):.2f}")
            print(f"平均价格: ${sum(prices)/len(prices):.2f}")
        
        if ratings:
            print(f"评分范围: {min(ratings):.1f} - {max(ratings):.1f}")
            print(f"平均评分: {sum(ratings)/len(ratings):.2f}")
        
        if review_counts:
            print(f"总评论数: {sum(review_counts)}")
            print(f"平均评论数: {sum(review_counts)/len(review_counts):.0f}")
        
        # 找出最贵的商品
        if prices:
            most_expensive = max(products, key=lambda x: x['price'] if x['price'] else 0)
            print(f"\n最贵商品: {most_expensive['name']} - ${most_expensive['price']:.2f}")
        
        # 找出评分最高的商品
        if ratings:
            best_rated = max(products, key=lambda x: x['rating'] if x['rating'] else 0)
            print(f"评分最高: {best_rated['name']} - {best_rated['rating']:.1f}星")

# 使用示例
def crawl_electronics():
    """爬取电子产品分类"""
    crawler = EcommerceCrawler("https://example.com")
    
    # 爬取手机分类
    phones = crawler.crawl_category("https://example.com/category/phones", max_pages=3)
    
    # 爬取笔记本分类
    laptops = crawler.crawl_category("https://example.com/category/laptops", max_pages=2)
    
    # 合并结果
    all_products = phones + laptops
    
    # 保存和分析
    crawler.save_results(all_products)
    crawler.analyze_products(all_products)
    
    return all_products

if __name__ == "__main__":
    # 注意:这是一个示例,实际使用时需要替换为真实的URL和选择器
    print("注意:请替换为实际的电商网站URL和CSS选择器")
    # products = crawl_electronics()

第七部分:法律与道德规范

7.1 遵守robots.txt

import urllib.robotparser
from urllib.parse import urljoin

def check_robots_permission(url, user_agent='*'):
    """
    检查是否允许爬取
    
    Args:
        url (str): 目标URL
        user_agent (str): 用户代理
    
    Returns:
        bool: 是否允许
    """
    rp = urllib.robotparser.RobotFileParser()
    base_url = f"{url.split('/')[0]}//{url.split('/')[2]}"
    robots_url = urljoin(base_url, "/robots.txt")
    
    try:
        rp.set_url(robots_url)
        rp.read()
        return rp.can_fetch(user_agent, url)
    except Exception as e:
        print(f"无法读取robots.txt: {e}")
        return True  # 如果无法读取,假设允许

# 使用示例
def ethical_crawl(url):
    if not check_robots_permission(url):
        print(f"根据robots.txt,不允许爬取 {url}")
        return
    
    print(f"允许爬取 {url}")
    # 继续爬取逻辑

7.2 控制爬取频率

import time
from collections import defaultdict

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, max_requests_per_minute=10):
        self.max_requests = max_requests_per_minute
        self.request_times = defaultdict(list)
    
    def wait_if_needed(self, domain):
        """检查是否需要等待"""
        now = time.time()
        # 清理60秒前的记录
        self.request_times[domain] = [
            t for t in self.request_times[domain] if now - t < 60
        ]
        
        if len(self.request_times[domain]) >= self.max_requests:
            sleep_time = 60 - (now - self.request_times[domain][0])
            if sleep_time > 0:
                print(f"速率限制,等待 {sleep_time:.1f} 秒")
                time.sleep(sleep_time)
                # 清空后重新记录
                self.request_times[domain] = []
        
        self.request_times[domain].append(now)

# 使用示例
rate_limiter = RateLimiter(max_requests_per_minute=5)

def limited_request(url):
    domain = url.split('/')[2]
    rate_limiter.wait_if_needed(domain)
    
    # 执行请求
    response = requests.get(url)
    return response

第八部分:总结与进阶学习

8.1 关键要点回顾

  1. 基础技术:requests + BeautifulSoup 适合静态页面
  2. 动态内容:Selenium 处理JavaScript渲染
  3. 大规模爬取:Scrapy框架提供完整解决方案
  4. 数据存储:CSV/JSON适合小规模,数据库适合大规模
  5. 反爬策略:随机User-Agent、延迟、代理
  6. 性能优化:并发、缓存、错误处理

8.2 进阶学习方向

  1. 分布式爬虫:使用Scrapy-Redis实现分布式
  2. 机器学习:使用ML识别验证码、分类数据
  3. API爬取:直接调用REST API获取数据
  4. 浏览器自动化:Playwright替代Selenium
  5. 云部署:在AWS/GCP上部署爬虫

8.3 推荐资源

  • 官方文档:Scrapy、Selenium、requests
  • GitHub项目:awesome-web-crawlers
  • 书籍:《Python网络数据采集》
  • 社区:Stack Overflow、Reddit r/webscraping

8.4 最终建议

  • 始终尊重目标网站:控制频率,避免造成服务器压力
  • 合法合规:了解相关法律法规
  • 持续学习:网站技术不断更新,爬虫技术也需要与时俱进
  • 备份数据:定期备份爬取的数据
  • 监控维护:建立监控机制,及时发现和解决问题

通过本指南,您应该能够构建从简单到复杂的各种Web爬虫。记住,爬虫技术是一把双刃剑,正确使用可以创造巨大价值,滥用则可能带来法律风险。祝您爬虫之旅顺利!