引言

在现代软件开发中,异步编程已成为处理高并发、I/O密集型任务的关键技术。Python作为一门广泛使用的编程语言,通过asyncio库提供了强大的异步编程支持。本文将详细介绍Python异步编程的核心概念、语法和实践应用,帮助您掌握这一重要技能。

异步编程基础

什么是异步编程?

异步编程是一种编程范式,它允许程序在等待某些操作(如I/O操作)完成时继续执行其他任务,而不是阻塞等待。这在处理网络请求、文件读写等耗时操作时特别有用。

为什么需要异步编程?

  1. 提高程序性能:避免在I/O操作上浪费CPU时间
  2. 改善用户体验:在GUI应用中保持界面响应
  3. 处理高并发:用较少的资源处理大量并发连接

Python中的async/await语法

基本语法

Python 3.5+引入了asyncawait关键字,使异步编程更加直观:

import asyncio

async def fetch_data():
    print("开始获取数据...")
    await asyncio.sleep(1)  # 模拟I/O操作
    print("数据获取完成!")
    return {"data": [1, 2, 3]}

async def main():
    result = await fetch_data()
    print(f"结果: {result}")

# 运行异步程序
asyncio.run(main())

关键概念

  1. 协程(Coroutine):使用async def定义的函数
  2. Awaitable:可以被await的对象,包括协程、Task和Future
  3. Event Loop:事件循环,负责调度和执行异步任务

实际应用示例

网络请求示例

使用aiohttp进行异步HTTP请求:

import aiohttp
import asyncio

async def fetch_url(session, url):
    try:
        async with session.get(url) as response:
            return await response.text()
    except Exception as e:
        return f"Error: {str(e)}"

async def main():
    urls = [
        'https://example.com',
        'https://httpbin.org/json',
        'https://api.github.com'
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, content in zip(urls, results):
            print(f"URL: {url}\n内容长度: {len(content)}\n")

if __name__ == "__main__":
    asyncio.run(main())

数据库操作示例

使用asyncpg进行异步PostgreSQL操作:

import asyncio
import asyncpg

async def run_query():
    # 连接数据库
    conn = await asyncpg.connect(
        host='localhost',
        database='testdb',
        user='postgres',
        password='password'
    )
    
    try:
        # 执行查询
        row = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", 100
        )
        if row:
            print(f"用户: {row['name']}, 邮箱: {row['email']}")
        
        # 批量插入
        data = [
            ('Alice', 'alice@example.com'),
            ('Bob', 'bob@example.com')
        ]
        await conn.executemany(
            "INSERT INTO users(name, email) VALUES($1, $2)", data
        )
        
    finally:
        await conn.close()

if __name__ == "__main__":
    asyncio.run(run_query())

高级技巧与最佳实践

1. 任务管理

async def main():
    # 创建多个任务
    task1 = asyncio.create_task(fetch_data())
    task2 = asyncio.create_task(fetch_data())
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    
    # 带超时的等待
    try:
        result = await asyncio.wait_for(fetch_data(), timeout=0.5)
    except asyncio.TimeoutError:
        print("操作超时!")
    
    # 取消任务
    task = asyncio.create_task(fetch_data())
    await asyncio.sleep(0.1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("任务被取消")

2. 同步代码集成

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    # 模拟阻塞I/O操作
    import time
    time.sleep(1)
    return "完成阻塞操作"

async def async_wrapper():
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
        return result

async def main():
    result = await async_wrapper()
    print(result)

3. 锁与同步

import asyncio

class SharedResource:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()
    
    async def increment(self):
        async with self.lock:
            # 模拟处理时间
            await asyncio.sleep(0.01)
            self.value += 1
            return self.value

async def worker(resource, name):
    for i in range(5):
        val = await resource.increment()
        print(f"Worker {name}: {val}")

async def main():
    resource = SharedResource()
    await asyncio.gather(
        worker(resource, "A"),
        worker(resource, "B")
    )

if __name__ == "__main__":
    asyncio.run(main())

性能优化建议

  1. 合理设置并发数:使用asyncio.Semaphore限制并发连接数
  2. 避免阻塞调用:确保所有I/O操作都是异步的
  3. 监控事件循环:使用asyncio的调试模式检测慢操作
  4. 资源清理:确保正确关闭连接和释放资源

结论

Python的异步编程通过asyncio库提供了强大而灵活的工具来处理并发任务。掌握这些概念和技巧可以显著提高应用程序的性能和响应能力。从简单的协程到复杂的任务管理,异步编程为现代Python开发提供了必要的工具集。

通过本文的示例和解释,您应该能够开始在自己的项目中应用异步编程技术。记住,异步编程最适合I/O密集型任务,对于CPU密集型任务,可能需要结合多进程使用。# Python异步编程完全指南:从入门到精通

1. 异步编程基础概念

1.1 什么是异步编程

异步编程是一种编程范式,它允许程序在等待某些操作(如I/O操作)完成时继续执行其他任务,而不是阻塞等待。这在处理网络请求、文件读写等耗时操作时特别有用。

# 同步编程示例 - 会阻塞程序执行
import time

def sync_task():
    print("开始任务")
    time.sleep(2)  # 模拟耗时操作,程序会在这里阻塞
    print("任务完成")

# 异步编程示例 - 不会阻塞
import asyncio

async def async_task():
    print("开始任务")
    await asyncio.sleep(2)  # 模拟耗时操作,程序可以执行其他任务
    print("任务完成")

1.2 为什么需要异步编程

  1. 提高程序性能:避免在I/O操作上浪费CPU时间
  2. 改善用户体验:在GUI应用中保持界面响应
  3. 处理高并发:用较少的资源处理大量并发连接
  4. 资源利用率:减少线程数量,降低上下文切换开销

2. Python异步编程核心语法

2.1 async/await关键字

Python 3.5+引入了asyncawait关键字,使异步编程更加直观:

import asyncio

# 定义协程函数
async def fetch_data():
    print("开始获取数据...")
    await asyncio.sleep(1)  # 模拟I/O操作
    print("数据获取完成!")
    return {"data": [1, 2, 3]}

# 定义主协程
async def main():
    # await等待协程执行完成
    result = await fetch_data()
    print(f"结果: {result}")

# 运行异步程序
if __name__ == "__main__":
    asyncio.run(main())

2.2 核心概念详解

协程 (Coroutine)

使用async def定义的函数就是协程:

async def my_coroutine():
    print("这是一个协程")
    return 42

# 调用协程不会立即执行,而是返回一个协程对象
coro = my_coroutine()
print(type(coro))  # <class 'coroutine'>

# 需要在事件循环中运行
result = asyncio.run(coro)

事件循环 (Event Loop)

事件循环是异步编程的核心,负责调度和执行协程:

import asyncio

async def task1():
    print("任务1开始")
    await asyncio.sleep(1)
    print("任务1结束")

async def task2():
    print("任务2开始")
    await asyncio.sleep(1)
    print("任务2结束")

async def main():
    # 并发执行多个任务
    await asyncio.gather(task1(), task2())

# 获取当前事件循环并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Task对象

Task用于并发执行协程:

import asyncio

async def slow_operation(n):
    print(f"操作 {n} 开始")
    await asyncio.sleep(2)
    print(f"操作 {n} 结束")
    return n * 2

async def main():
    # 创建任务
    task1 = asyncio.create_task(slow_operation(1))
    task2 = asyncio.create_task(slow_operation(2))
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    
    print(f"结果: {result1}, {result2}")

asyncio.run(main())

3. 实际应用示例

3.1 网络请求示例

使用aiohttp进行异步HTTP请求:

import aiohttp
import asyncio
import time

async def fetch_url(session, url):
    """获取单个URL的内容"""
    try:
        print(f"开始请求: {url}")
        async with session.get(url, timeout=10) as response:
            content = await response.text()
            print(f"完成请求: {url} (长度: {len(content)})")
            return {"url": url, "content": content, "status": response.status}
    except Exception as e:
        print(f"请求 {url} 失败: {e}")
        return {"url": url, "error": str(e)}

async def fetch_multiple_urls(urls):
    """并发获取多个URL"""
    async with aiohttp.ClientSession() as session:
        # 创建任务列表
        tasks = [fetch_url(session, url) for url in urls]
        
        # 并发执行所有任务
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return results

async def main():
    urls = [
        'https://httpbin.org/json',
        'https://httpbin.org/html',
        'https://httpbin.org/xml',
        'https://httpbin.org/robots.txt',
        'https://httpbin.org/uuid'
    ]
    
    print("开始并发请求...")
    start_time = time.time()
    
    results = await fetch_multiple_urls(urls)
    
    end_time = time.time()
    print(f"\n总共耗时: {end_time - start_time:.2f}秒")
    
    # 处理结果
    for result in results:
        if isinstance(result, dict):
            if 'error' in result:
                print(f"❌ {result['url']}: {result['error']}")
            else:
                print(f"✅ {result['url']}: 状态 {result['status']}")

if __name__ == "__main__":
    asyncio.run(main())

3.2 数据库操作示例

使用asyncpg进行异步PostgreSQL操作:

import asyncio
import asyncpg
from datetime import datetime

# 数据库连接配置
DB_CONFIG = {
    'host': 'localhost',
    'database': 'testdb',
    'user': 'postgres',
    'password': 'password'
}

async def create_connection_pool():
    """创建连接池"""
    return await asyncpg.create_pool(
        min_size=5,
        max_size=20,
        **DB_CONFIG
    )

async def init_database(pool):
    """初始化数据库表"""
    async with pool.acquire() as conn:
        await conn.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name VARCHAR(100),
                email VARCHAR(100),
                created_at TIMESTAMP
            )
        ''')
        print("数据库表创建成功")

async def insert_user(pool, name, email):
    """插入用户"""
    async with pool.acquire() as conn:
        result = await conn.execute(
            'INSERT INTO users(name, email, created_at) VALUES($1, $2, $3)',
            name, email, datetime.now()
        )
        return result

async def get_users(pool):
    """获取所有用户"""
    async with pool.acquire() as conn:
        rows = await conn.fetch('SELECT * FROM users ORDER BY id')
        return [dict(row) for row in rows]

async def batch_insert_users(pool, users_data):
    """批量插入用户"""
    async with pool.acquire() as conn:
        await conn.executemany(
            'INSERT INTO users(name, email, created_at) VALUES($1, $2, $3)',
            [(name, email, datetime.now()) for name, email in users_data]
        )

async def main():
    # 创建连接池
    pool = await create_connection_pool()
    
    try:
        # 初始化数据库
        await init_database(pool)
        
        # 插入单个用户
        await insert_user(pool, "Alice", "alice@example.com")
        
        # 批量插入用户
        users = [
            ("Bob", "bob@example.com"),
            ("Charlie", "charlie@example.com"),
            ("David", "david@example.com"),
            ("Eve", "eve@example.com")
        ]
        await batch_insert_users(pool, users)
        
        # 查询用户
        all_users = await get_users(pool)
        print(f"\n数据库中的用户 ({len(all_users)}):")
        for user in all_users:
            print(f"  {user['id']}: {user['name']} ({user['email']})")
            
    finally:
        # 关闭连接池
        await pool.close()

if __name__ == "__main__":
    asyncio.run(main())

3.3 文件系统操作

使用aiofiles进行异步文件操作:

import asyncio
import aiofiles
import json
import os

async def read_file_async(filepath):
    """异步读取文件"""
    try:
        async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
            content = await f.read()
            return {"file": filepath, "content": content, "success": True}
    except Exception as e:
        return {"file": filepath, "error": str(e), "success": False}

async def write_file_async(filepath, content):
    """异步写入文件"""
    try:
        # 确保目录存在
        os.makedirs(os.path.dirname(filepath), exist_ok=True)
        
        async with aiofiles.open(filepath, 'w', encoding='utf-8') as f:
            await f.write(content)
        return {"file": filepath, "success": True}
    except Exception as e:
        return {"file": filepath, "error": str(e), "success": False}

async def process_files_concurrently():
    """并发处理多个文件"""
    # 准备数据
    files_data = {
        "data/file1.txt": "这是文件1的内容\n包含多行文本",
        "data/file2.txt": "这是文件2的内容\n另一段文本",
        "data/file3.txt": "这是文件3的内容\n更多内容",
        "data/config.json": json.dumps({"version": "1.0", "debug": True}, indent=2)
    }
    
    # 并发写入文件
    write_tasks = [
        write_file_async(filepath, content)
        for filepath, content in files_data.items()
    ]
    write_results = await asyncio.gather(*write_tasks)
    
    print("写入结果:")
    for result in write_results:
        status = "✅" if result["success"] else "❌"
        print(f"  {status} {result['file']}")
    
    # 并发读取文件
    read_tasks = [
        read_file_async(filepath)
        for filepath in files_data.keys()
    ]
    read_results = await asyncio.gather(*read_tasks)
    
    print("\n读取结果:")
    for result in read_results:
        if result["success"]:
            print(f"  ✅ {result['file']}: {len(result['content'])} 字符")
        else:
            print(f"  ❌ {result['file']}: {result['error']}")

async def main():
    await process_files_concurrently()

if __name__ == "__main__":
    asyncio.run(main())

4. 高级技巧与最佳实践

4.1 任务管理与控制

import asyncio

class TaskManager:
    """任务管理器示例"""
    
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.tasks = []
    
    async def limited_task(self, task_id):
        """受限制的并发任务"""
        async with self.semaphore:
            print(f"任务 {task_id} 开始执行")
            await asyncio.sleep(1)  # 模拟工作
            print(f"任务 {task_id} 完成")
            return task_id * 2
    
    async def run_all(self, task_count):
        """运行所有任务"""
        # 创建任务列表
        self.tasks = [
            asyncio.create_task(self.limited_task(i))
            for i in range(task_count)
        ]
        
        # 等待所有任务完成
        results = await asyncio.gather(*self.tasks)
        return results
    
    def cancel_all(self):
        """取消所有任务"""
        for task in self.tasks:
            if not task.done():
                task.cancel()

async def task_with_timeout():
    """带超时的任务"""
    try:
        # 设置超时时间
        result = await asyncio.wait_for(
            asyncio.sleep(5),  # 模拟耗时5秒的操作
            timeout=2          # 但最多等待2秒
        )
        return result
    except asyncio.TimeoutError:
        print("任务超时!")
        return None

async def task_with_retry(max_retries=3):
    """带重试机制的任务"""
    for attempt in range(max_retries):
        try:
            # 模拟可能失败的操作
            if attempt < 2:
                raise ConnectionError("模拟连接失败")
            
            print(f"第 {attempt + 1} 次尝试成功")
            return "成功"
            
        except Exception as e:
            print(f"第 {attempt + 1} 次尝试失败: {e}")
            if attempt == max_retries - 1:
                raise
            # 等待后重试
            await asyncio.sleep(1)

async def main():
    # 1. 限制并发数
    print("=== 限制并发数 ===")
    manager = TaskManager(max_concurrent=3)
    results = await manager.run_all(10)
    print(f"结果: {results}\n")
    
    # 2. 超时控制
    print("=== 超时控制 ===")
    await task_with_timeout()
    print()
    
    # 3. 重试机制
    print("=== 重试机制 ===")
    try:
        result = await task_with_retry()
        print(f"最终结果: {result}")
    except Exception as e:
        print(f"最终失败: {e}")

if __name__ == "__main__":
    asyncio.run(main())

4.2 同步代码集成

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_io_operation(duration):
    """模拟阻塞的I/O操作"""
    print(f"开始阻塞操作,持续 {duration} 秒")
    time.sleep(duration)
    return f"完成阻塞操作,持续 {duration} 秒"

def cpu_intensive_calculation(n):
    """模拟CPU密集型计算"""
    print(f"开始CPU计算,计算 {n} 的平方")
    result = sum(i * i for i in range(n))
    return f"计算结果: {result}"

async def async_wrapper():
    """将同步代码包装为异步"""
    loop = asyncio.get_event_loop()
    
    # 使用线程池执行阻塞操作
    with ThreadPoolExecutor() as pool:
        # 在单独的线程中运行阻塞I/O
        io_result = await loop.run_in_executor(
            pool, blocking_io_operation, 2
        )
        print(io_result)
        
        # 在单独的线程中运行CPU密集型任务
        cpu_result = await loop.run_in_executor(
            pool, cpu_intensive_calculation, 1000000
        )
        print(cpu_result)

async def mixed_operations():
    """混合异步和同步操作"""
    async def async_task():
        print("异步任务开始")
        await asyncio.sleep(1)
        print("异步任务结束")
        return "async_result"
    
    # 同时运行异步任务和同步包装任务
    results = await asyncio.gather(
        async_task(),
        async_wrapper()
    )
    
    print(f"最终结果: {results}")

if __name__ == "__main__":
    asyncio.run(mixed_operations())

4.3 锁与同步机制

import asyncio

class AsyncCounter:
    """线程安全的异步计数器"""
    
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()
    
    async def increment(self):
        """原子性递增"""
        async with self.lock:
            # 模拟处理时间
            await asyncio.sleep(0.01)
            self.value += 1
            return self.value

class SharedResource:
    """共享资源管理"""
    
    def __init__(self):
        self.data = {}
        self.read_lock = asyncio.Lock()
        self.write_lock = asyncio.Lock()
        self.readers = 0
    
    async def read(self, key):
        """读取数据(允许多个读者)"""
        async with self.read_lock:
            self.readers += 1
            if self.readers == 1:
                # 第一个读者需要获取写锁
                await self.write_lock.acquire()
        
        try:
            # 读取数据
            await asyncio.sleep(0.01)  # 模拟读取时间
            return self.data.get(key)
        finally:
            async with self.read_lock:
                self.readers -= 1
                if self.readers == 0:
                    # 最后一个读者释放写锁
                    self.write_lock.release()
    
    async def write(self, key, value):
        """写入数据(独占访问)"""
        async with self.write_lock:
            await asyncio.sleep(0.02)  # 模拟写入时间
            self.data[key] = value

async def worker(counter, resource, worker_id):
    """工作协程"""
    # 使用计数器
    count = await counter.increment()
    print(f"Worker {worker_id}: 计数 {count}")
    
    # 读写共享资源
    await resource.write(f"key_{worker_id}", f"value_{worker_id}")
    value = await resource.read(f"key_{worker_id}")
    print(f"Worker {worker_id}: 读取 {value}")

async def main():
    counter = AsyncCounter()
    resource = SharedResource()
    
    # 并发执行多个工作协程
    workers = [worker(counter, resource, i) for i in range(10)]
    await asyncio.gather(*workers)
    
    print(f"\n最终计数: {counter.value}")
    print(f"最终数据: {resource.data}")

if __name__ == "__main__":
    asyncio.run(main())

5. 性能优化与调试

5.1 性能监控

import asyncio
import time
from functools import wraps

def async_timed(func):
    """异步函数执行时间装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        result = await func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} 执行时间: {end - start:.3f}秒")
        return result
    return wrapper

@async_timed
async def monitored_task(duration):
    """被监控的任务"""
    await asyncio.sleep(duration)
    return f"完成 {duration}秒任务"

async def performance_test():
    """性能测试"""
    # 测试不同并发策略
    print("=== 顺序执行 ===")
    start = time.time()
    await monitored_task(1)
    await monitored_task(1)
    print(f"总时间: {time.time() - start:.3f}秒\n")
    
    print("=== 并发执行 ===")
    start = time.time()
    await asyncio.gather(
        monitored_task(1),
        monitored_task(1)
    )
    print(f"总时间: {time.time() - start:.3f}秒")

if __name__ == "__main__":
    asyncio.run(performance_test())

5.2 调试技巧

import asyncio
import logging

# 启用调试模式
logging.basicConfig(level=logging.DEBUG)

async def debug_example():
    """调试示例"""
    # 检测慢操作
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    
    # 模拟慢操作
    async def slow_task():
        logging.info("开始慢任务")
        await asyncio.sleep(2)
        logging.info("慢任务完成")
    
    # 设置超时检测
    try:
        await asyncio.wait_for(slow_task(), timeout=1)
    except asyncio.TimeoutError:
        logging.error("任务超时!")
    
    # 检查未完成的任务
    pending = asyncio.all_tasks()
    logging.info(f"未完成的任务数量: {len(pending)}")

if __name__ == "__main__":
    asyncio.run(debug_example())

6. 实际项目中的应用模式

6.1 生产者-消费者模式

import asyncio
import random

class AsyncQueue:
    """异步队列实现"""
    
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self.queue = []
        self.getters = []
        self.putters = []
    
    async def put(self, item):
        """放入项目"""
        if self.maxsize > 0 and len(self.queue) >= self.maxsize:
            # 队列满,等待
            waiter = asyncio.Future()
            self.putters.append(waiter)
            await waiter
        
        self.queue.append(item)
        
        # 唤醒等待的获取者
        if self.getters:
            self.getters.pop(0).set_result(True)
    
    async def get(self):
        """获取项目"""
        if not self.queue:
            # 队列空,等待
            waiter = asyncio.Future()
            self.getters.append(waiter)
            await waiter
        
        item = self.queue.pop(0)
        
        # 唤醒等待的放入者
        if self.putters:
            self.putters.pop(0).set_result(True)
        
        return item

async def producer(queue, producer_id, count):
    """生产者"""
    for i in range(count):
        item = f"生产者{producer_id}-项目{i}"
        await queue.put(item)
        print(f"生产者{producer_id}: 放入 {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def consumer(queue, consumer_id):
    """消费者"""
    while True:
        item = await queue.get()
        if item is None:  # 结束信号
            break
        
        print(f"消费者{consumer_id}: 处理 {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))

async def producer_consumer_demo():
    """生产者-消费者演示"""
    queue = AsyncQueue(maxsize=5)
    
    # 创建生产者和消费者
    producers = [
        producer(queue, i, 5)
        for i in range(2)
    ]
    
    consumers = [
        consumer(queue, i)
        for i in range(3)
    ]
    
    # 运行所有任务
    await asyncio.gather(*producers, *consumers)
    
    # 发送结束信号
    for _ in range(len(consumers)):
        await queue.put(None)

if __name__ == "__main__":
    asyncio.run(producer_consumer_demo())

6.2 批处理与流处理

import asyncio
from collections import defaultdict

class BatchProcessor:
    """批处理器"""
    
    def __init__(self, batch_size=10, timeout=1.0):
        self.batch_size = batch_size
        self.timeout = timeout
        self.buffer = []
        self.timer = None
    
    async def add(self, item):
        """添加项目到批处理"""
        self.buffer.append(item)
        
        # 启动计时器(如果未启动)
        if self.timer is None:
            self.timer = asyncio.create_task(self._flush_after_timeout())
        
        # 如果达到批处理大小,立即处理
        if len(self.buffer) >= self.batch_size:
            await self._flush()
    
    async def _flush_after_timeout(self):
        """超时后刷新批处理"""
        try:
            await asyncio.sleep(self.timeout)
            await self._flush()
        except asyncio.CancelledError:
            pass
        finally:
            self.timer = None
    
    async def _flush(self):
        """刷新批处理"""
        if not self.buffer:
            return
        
        # 取消计时器
        if self.timer:
            self.timer.cancel()
            self.timer = None
        
        # 处理当前批处理
        batch = self.buffer[:]
        self.buffer = []
        
        # 模拟批处理操作
        print(f"处理批处理: {len(batch)} 个项目 - {batch[:3]}...")
        await asyncio.sleep(0.1)  # 模拟处理时间
        return batch

async def batch_demo():
    """批处理演示"""
    processor = BatchProcessor(batch_size=5, timeout=0.5)
    
    # 模拟数据流
    for i in range(20):
        await processor.add(f"item_{i}")
        await asyncio.sleep(0.1)  # 模拟数据到达间隔
    
    # 确保处理剩余数据
    await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(batch_demo())

7. 常见问题与解决方案

7.1 死锁预防

import asyncio

async def deadlock_example():
    """死锁示例"""
    lock1 = asyncio.Lock()
    lock2 = asyncio.Lock()
    
    async def task1():
        async with lock1:
            await asyncio.sleep(0.1)
            async with lock2:  # 可能死锁
                print("任务1完成")
    
    async def task2():
        async with lock2:
            await asyncio.sleep(0.1)
            async with lock1:  # 可能死锁
                print("任务2完成")
    
    # 使用asyncio.gather可能导致死锁
    # await asyncio.gather(task1(), task2())
    
    # 正确做法:使用锁的超时或固定顺序获取
    async def safe_task1():
        # 固定顺序获取锁
        async with lock1:
            await asyncio.sleep(0.1)
            async with lock2:
                print("安全任务1完成")
    
    async def safe_task2():
        # 固定顺序获取锁
        async with lock1:
            await asyncio.sleep(0.1)
            async with lock2:
                print("安全任务2完成")
    
    await asyncio.gather(safe_task1(), safe_task2())

if __name__ == "__main__":
    asyncio.run(deadlock_example())

7.2 资源清理

import asyncio

class AsyncResource:
    """需要清理的资源"""
    
    def __init__(self, name):
        self.name = name
        self.closed = False
    
    async def close(self):
        """清理资源"""
        if not self.closed:
            print(f"清理资源: {self.name}")
            await asyncio.sleep(0.1)  # 模拟清理时间
            self.closed = True
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        await self.close()
        return False

async def proper_cleanup():
    """正确的资源清理演示"""
    
    # 方法1:使用async with
    async with AsyncResource("资源A") as resource:
        print(f"使用资源: {resource.name}")
        # 在这里使用资源
        await asyncio.sleep(0.5)
    # 自动调用close()
    
    # 方法2:try-finally
    resource = None
    try:
        resource = AsyncResource("资源B")
        print(f"使用资源: {resource.name}")
        await asyncio.sleep(0.5)
    finally:
        if resource:
            await resource.close()
    
    # 方法3:使用asyncio.run确保清理
    async def main():
        async with AsyncResource("资源C") as resource:
            print(f"使用资源: {resource.name}")
            await asyncio.sleep(0.5)
    
    await main()

if __name__ == "__main__":
    asyncio.run(proper_cleanup())

8. 总结与最佳实践

8.1 关键要点

  1. 理解事件循环:它是异步编程的核心
  2. 正确使用await:只在协程和可等待对象上使用
  3. 避免阻塞操作:确保所有I/O都是异步的
  4. 合理控制并发:使用Semaphore限制并发数
  5. 正确处理异常:使用try-except和try-finally
  6. 资源清理:确保正确关闭连接和释放资源

8.2 性能优化建议

  1. 批量处理:减少上下文切换
  2. 连接池:复用数据库/HTTP连接
  3. 适当超时:防止无限等待
  4. 监控与调试:使用调试模式检测问题
  5. 选择合适的工具:CPU密集型考虑多进程

8.3 适用场景

  • Web爬虫:并发请求多个网页
  • API服务器:处理高并发请求
  • 数据处理:并发读写文件/数据库
  • 实时通信:WebSocket应用
  • 微服务:并发调用多个服务

通过掌握这些概念和技巧,您可以有效地使用Python的异步编程来构建高性能的应用程序。记住,异步编程最适合I/O密集型任务,对于CPU密集型任务,可能需要结合多进程使用。