引言:异步编程的重要性

在现代软件开发中,异步编程已经成为处理I/O密集型任务和提高应用程序性能的关键技术。Python作为一种广泛使用的编程语言,通过asyncio库和async/await语法为开发者提供了强大的异步编程支持。异步编程的核心优势在于它允许程序在等待某些操作(如网络请求、文件读写或数据库查询)完成时继续执行其他任务,而不是阻塞整个程序。这种非阻塞的执行模式特别适合构建高并发的Web服务器、网络爬虫和数据处理管道。

异步编程与传统的多线程或多进程模型相比,具有更低的资源消耗和更简单的并发控制。在Python 3.5及更高版本中,async/await语法的引入使得异步代码的编写更加直观和易于理解。本文将从基础概念开始,逐步深入到高级应用,帮助读者全面掌握Python异步编程的精髓。

基础概念:同步与异步的区别

要理解异步编程,首先需要明确同步与异步的根本区别。在同步编程中,代码按顺序执行,每个操作必须等待前一个操作完成后才能开始。例如,考虑以下同步代码:

import time

def fetch_data():
    print("开始获取数据...")
    time.sleep(2)  # 模拟I/O等待
    print("数据获取完成")
    return "模拟数据"

def process_data(data):
    print("开始处理数据...")
    time.sleep(1)  # 模拟处理时间
    print("数据处理完成")
    return data.upper()

# 同步执行
data = fetch_data()
result = process_data(data)
print(f"最终结果: {result}")

这段代码的执行流程是完全线性的:fetch_data()必须等待2秒,然后process_data()再等待1秒,总耗时3秒。在等待期间,程序无法做任何其他事情。

而异步编程通过事件循环和协程来实现非阻塞执行。以下是等效的异步版本:

import asyncio
import time

async def fetch_data():
    print("开始获取数据...")
    await asyncio.sleep(2)  # 非阻塞等待
    print("数据获取完成")
    return "模拟数据"

async def process_data(data):
    print("开始处理数据...")
    await asyncio.sleep(1)  # 非阻塞等待
    print("数据处理完成")
    return data.upper()

async def main():
    start_time = time.time()
    data = await fetch_data()
    result = await process_data(data)
    print(f"最终结果: {result}")
    print(f"总耗时: {time.time() - start_time:.2f}秒")

asyncio.run(main())

虽然这段代码看起来仍然按顺序执行,但关键在于await asyncio.sleep(2)不会阻塞整个程序。在实际应用中,当有多个异步任务时,它们可以在等待期间并发执行。

协程与事件循环

协程是异步编程的核心构建块。在Python中,协程是通过async def定义的特殊函数。与普通函数不同,调用协程函数不会立即执行函数体,而是返回一个协程对象。要执行协程,需要将其加入到事件循环中。

事件循环是异步编程的调度器,它负责管理所有协程的执行。以下是一个更复杂的例子,展示多个协程的并发执行:

import asyncio
import time

async def task(name, duration):
    print(f"任务 {name} 开始,预计耗时 {duration}秒")
    await asyncio.sleep(duration)
    print(f"任务 {name} 完成")
    return f"结果_{name}"

async def main():
    start_time = time.time()
    
    # 创建多个任务
    tasks = [
        task("A", 2),
        task("B", 3),
        task("C", 1)
    ]
    
    # 并发执行所有任务
    results = await asyncio.gather(*tasks)
    
    print(f"所有任务完成: {results}")
    print(f"总耗时: {time.time() - start_time:.2f}秒")

asyncio.run(main())

在这个例子中,三个任务被同时启动。尽管每个任务有不同的执行时间,但总耗时约等于最长任务的时间(3秒),而不是所有任务时间之和(6秒)。这是因为事件循环在某个任务等待时(通过await)会切换到其他就绪的任务。

异步上下文管理器与资源管理

异步编程中经常需要管理资源,如网络连接或文件句柄。Python提供了异步上下文管理器来优雅地处理这些资源。异步上下文管理器通过实现__aenter____aexit__方法来定义。

以下是一个异步网络连接池的实现示例:

import asyncio
from typing import List

class AsyncConnectionPool:
    def __init__(self, max_connections: int = 5):
        self.max_connections = max_connections
        self.semaphore = asyncio.Semaphore(max_connections)
        self.connections: List[str] = []
    
    async def __aenter__(self):
        print("初始化连接池...")
        await self._create_connections()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("关闭连接池...")
        self.connections.clear()
    
    async def _create_connections(self):
        # 模拟创建连接的异步操作
        for i in range(self.max_connections):
            await asyncio.sleep(0.1)  # 模拟连接创建延迟
            self.connections.append(f"Connection-{i}")
    
    async def get_connection(self):
        async with self.semaphore:
            if not self.connections:
                raise Exception("没有可用连接")
            conn = self.connections.pop()
            print(f"获取连接: {conn}")
            return conn
    
    async def release_connection(self, conn: str):
        print(f"释放连接: {conn}")
        self.connections.append(conn)

async def use_connection(pool: AsyncConnectionPool, task_id: int):
    conn = await pool.get_connection()
    try:
        print(f"任务{task_id}使用连接{conn}")
        await asyncio.sleep(1)  # 模拟使用连接
    finally:
        await pool.release_connection(conn)

async def main():
    async with AsyncConnectionPool(max_connections=3) as pool:
        tasks = [use_connection(pool, i) for i in range(8)]
        await asyncio.gather(*tasks)

asyncio.run(main())

这个例子展示了如何使用异步上下文管理器来管理连接池。信号量(Semaphore)确保同时只有指定数量的任务可以获取连接,防止资源耗尽。

异步HTTP请求实战:使用aiohttp

在实际应用中,异步编程最常见的场景之一是网络请求。Python的aiohttp库是一个强大的异步HTTP客户端/服务器框架。以下是一个完整的异步Web爬虫示例:

import asyncio
import aiohttp
from typing import List, Dict
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
    """异步获取单个URL的内容"""
    try:
        async with session.get(url, timeout=10) as response:
            print(f"正在获取: {url}")
            text = await response.text()
            return {
                "url": url,
                "status": response.status,
                "content_length": len(text),
                "success": True
            }
    except Exception as e:
        print(f"获取 {url} 失败: {e}")
        return {
            "url": url,
            "error": str(e),
            "success": False
        }

async def batch_fetch(urls: List[str], max_concurrent: int = 5) -> List[Dict]:
    """并发获取多个URL"""
    # 创建连接器,限制并发连接数
    connector = aiohttp.TCPConnector(limit=max_concurrent)
    
    async with aiohttp.ClientSession(connector=connector) as session:
        # 创建任务列表
        tasks = [fetch_url(session, url) for url in urls]
        
        # 使用gather并发执行,保持顺序
        results = await asyncio.gather(*tasks, return_exceptions=False)
        
        return results

async def main():
    # 测试URL列表
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404",
        "https://httpbin.org/bytes/1024",
        "https://httpbin.org/json",
        "https://httpbin.org/xml",
        "https://httpbin.org/html"
    ]
    
    start_time = time.time()
    print(f"开始批量获取 {len(urls)} 个URL...")
    
    results = await batch_fetch(urls, max_concurrent=3)
    
    print(f"\n完成! 总耗时: {time.time() - start_time:.2f}秒")
    print("\n结果摘要:")
    for result in results:
        if result["success"]:
            print(f"✓ {result['url']} - 状态: {result['status']}, 大小: {result['content_length']}字节")
        else:
            print(f"✗ {result['url']} - 错误: {result['error']}")

if __name__ == "__main__":
    asyncio.run(main())

这个爬虫示例展示了几个关键的异步编程模式:

  1. 使用aiohttp.ClientSession管理HTTP连接
  2. 通过TCPConnector限制并发连接数
  3. 使用asyncio.gather并发执行多个任务
  4. 错误处理和超时管理
  5. 资源的自动清理(async with)

异步数据库操作

异步编程在数据库操作中也非常有用,特别是对于高并发的Web应用。以下是一个使用异步SQLAlchemy和asyncpg的示例:

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100))

# 异步数据库引擎
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/testdb"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def init_db():
    """初始化数据库表"""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

async def create_user(name: str, email: str) -> int:
    """创建用户"""
    async with async_session() as session:
        user = User(name=name, email=email)
        session.add(user)
        await session.commit()
        return user.id

async def get_user_by_id(user_id: int) -> User:
    """根据ID获取用户"""
    async with async_session() as session:
        result = await session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one()

async def get_users_batch(user_ids: List[int]) -> List[User]:
    """批量获取用户"""
    async with async_session() as session:
        result = await session.execute(
            select(User).where(User.id.in_(user_ids))
        )
        return result.scalars().all()

async def main():
    # 初始化数据库
    await init_db()
    
    # 并发创建用户
    create_tasks = [
        create_user(f"User{i}", f"user{i}@example.com")
        for i in range(5)
    ]
    user_ids = await asyncio.gather(*create_tasks)
    print(f"创建的用户ID: {user_ids}")
    
    # 批量查询用户
    users = await get_users_batch(user_ids)
    for user in users:
        print(f"用户: {user.name}, 邮箱: {user.email}")

if __name__ == "__main__":
    asyncio.run(main())

这个例子展示了如何在异步环境中使用SQLAlchemy进行数据库操作。关键点包括:

  1. 使用create_async_engine创建异步引擎
  2. 使用async_sessionmaker创建异步会话工厂
  3. 使用await session.execute()执行查询
  4. 在异步上下文中管理数据库连接

异步任务调度与定时任务

在实际应用中,经常需要调度异步任务或执行定时任务。Python的asyncio提供了多种方式来实现这一点。以下是一个完整的任务调度器示例:

import asyncio
import time
from datetime import datetime, timedelta
from typing import Callable, List, Dict
import heapq

class AsyncTaskScheduler:
    """异步任务调度器"""
    
    def __init__(self):
        self._tasks: List[tuple] = []  # (执行时间, 任务ID, 任务函数)
        self._running = False
        self._task_counter = 0
    
    def schedule(self, delay: float, coro_func: Callable, *args, **kwargs) -> int:
        """调度一个任务在指定延迟后执行"""
        self._task_counter += 1
        execute_time = time.time() + delay
        task_id = self._task_counter
        
        # 使用堆确保按时间顺序执行
        heapq.heappush(self._tasks, (execute_time, task_id, coro_func, args, kwargs))
        print(f"[{datetime.now().strftime('%H:%M:%S')}] 任务 {task_id} 已调度,将在 {delay:.1f}秒后执行")
        
        return task_id
    
    def schedule_interval(self, interval: float, coro_func: Callable, *args, **kwargs) -> int:
        """调度一个周期性任务"""
        async def repeated_task():
            while self._running:
                try:
                    await coro_func(*args, **kwargs)
                except Exception as e:
                    print(f"周期任务执行错误: {e}")
                await asyncio.sleep(interval)
        
        return self.schedule(0, repeated_task)
    
    async def run(self):
        """启动调度器"""
        self._running = True
        print("任务调度器已启动")
        
        try:
            while self._running or self._tasks:
                if not self._tasks:
                    await asyncio.sleep(0.1)
                    continue
                
                execute_time, task_id, coro_func, args, kwargs = self._tasks[0]
                now = time.time()
                
                if now >= execute_time:
                    # 执行任务
                    heapq.heappop(self._tasks)
                    print(f"[{datetime.now().strftime('%H:%M:%S')}] 执行任务 {task_id}")
                    asyncio.create_task(self._execute_task(task_id, coro_func, args, kwargs))
                else:
                    # 等待下一个任务
                    await asyncio.sleep(min(0.1, execute_time - now))
        finally:
            print("任务调度器已停止")
    
    async def _execute_task(self, task_id: int, coro_func: Callable, args, kwargs):
        """执行单个任务"""
        try:
            result = await coro_func(*args, **kwargs)
            print(f"[{datetime.now().strftime('%H:%M:%S')}] 任务 {task_id} 完成,结果: {result}")
        except Exception as e:
            print(f"[{datetime.now().strftime('%H:%M:%S')}] 任务 {task_id} 失败: {e}")
    
    def stop(self):
        """停止调度器"""
        self._running = False

# 使用示例
async def sample_task(name: str, duration: float):
    print(f"  任务 '{name}' 开始执行")
    await asyncio.sleep(duration)
    return f"'{name}' 完成"

async def periodic_task():
    print(f"  周期任务执行于 {datetime.now().strftime('%H:%M:%S')}")

async def main():
    scheduler = AsyncTaskScheduler()
    
    # 调度一次性任务
    scheduler.schedule(1.0, sample_task, "快速任务", 0.5)
    scheduler.schedule(2.0, sample_task, "中等任务", 1.0)
    scheduler.schedule(3.5, sample_task, "长任务", 2.0)
    
    # 调度周期性任务(每3秒执行一次)
    scheduler.schedule_interval(3.0, periodic_task)
    
    # 调度多个并发任务
    for i in range(3):
        scheduler.schedule(0.5 + i * 0.3, sample_task, f"并发任务{i}", 0.2)
    
    # 运行调度器5秒后停止
    async def stop_after_delay():
        await asyncio.sleep(5)
        print("\n5秒已到,停止调度器...")
        scheduler.stop()
    
    await asyncio.gather(
        scheduler.run(),
        stop_after_delay()
    )

if __name__ == "__main__":
    asyncio.run(main())

这个任务调度器展示了如何:

  1. 使用堆(heapq)管理定时任务
  2. 实现周期性任务调度
  3. 处理任务执行和错误
  4. 优雅地启动和停止调度器

异步编程的最佳实践

1. 避免阻塞操作

在异步代码中,任何阻塞操作都会破坏整个事件循环的性能。确保所有I/O操作都是异步的:

# 错误示例:阻塞操作
async def bad_example():
    time.sleep(1)  # 阻塞整个事件循环!
    return "done"

# 正确示例:异步操作
async def good_example():
    await asyncio.sleep(1)  # 非阻塞
    return "done"

2. 合理使用任务组

使用asyncio.gather()或asyncio.TaskGroup()来管理多个任务:

# 使用TaskGroup(Python 3.11+)
async def task_group_example():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coroutine())
        task2 = tg.create_task(another_coroutine())
    
    # 所有任务完成后才会继续执行
    return task1.result(), task2.result()

3. 错误处理

异步代码中的错误处理需要特别注意:

async def robust_async_operation():
    try:
        async with asyncio.timeout(10):  # 设置超时
            result = await some_async_call()
            return result
    except asyncio.TimeoutError:
        print("操作超时")
        return None
    except Exception as e:
        print(f"操作失败: {e}")
        return None

4. 资源限制

使用信号量(Semaphore)限制并发数量:

async def limited_concurrency():
    semaphore = asyncio.Semaphore(5)  # 最多5个并发
    
    async def limited_task(n):
        async with semaphore:
            print(f"任务 {n} 开始")
            await asyncio.sleep(1)
            print(f"任务 {n} 结束")
    
    tasks = [limited_task(i) for i in range(20)]
    await asyncio.gather(*tasks)

高级主题:异步生成器与异步迭代

Python还支持异步生成器和异步迭代,这对于处理流式数据非常有用:

import asyncio
from typing import AsyncIterator

async def async_number_generator(n: int) -> AsyncIterator[int]:
    """异步生成器示例"""
    for i in range(n):
        await asyncio.sleep(0.1)  # 模拟异步操作
        yield i

async def process_async_stream():
    """使用异步生成器"""
    async for number in async_number_generator(5):
        print(f"处理数字: {number}")
        await asyncio.sleep(0.05)  # 模拟处理时间

# 异步迭代器类
class AsyncDataStream:
    def __init__(self, data: List[int]):
        self.data = data
        self.index = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.index >= len(self.data):
            raise StopAsyncIteration
        
        await asyncio.sleep(0.1)  # 模拟异步获取数据
        value = self.data[self.index]
        self.index += 1
        return value

async def use_async_iterator():
    stream = AsyncDataStream([10, 20, 30, 40, 50])
    async for item in stream:
        print(f"收到数据: {item}")

async def main():
    await process_async_stream()
    await use_async_iterator()

if __name__ == "__main__":
    asyncio.run(main())

性能优化与监控

1. 性能分析

使用asyncio的调试模式来识别性能问题:

import asyncio
import time

async def slow_operation():
    await asyncio.sleep(1)

async def main():
    # 启用调试模式
    loop = asyncio.get_running_loop()
    loop.set_debug(True)
    
    # 记录开始时间
    start = time.time()
    
    # 并发执行
    await asyncio.gather(
        slow_operation(),
        slow_operation(),
        slow_operation()
    )
    
    print(f"总耗时: {time.time() - start:.2f}秒")

# 运行时设置环境变量PYTHONASYNCIODEBUG=1
asyncio.run(main())

2. 监控事件循环

监控事件循环的性能指标:

import asyncio
import time

class LoopMonitor:
    def __init__(self):
        self.callbacks = []
        self.start_time = time.time()
    
    def register_callback(self, callback):
        self.callbacks.append(callback)
    
    async def monitor(self):
        while True:
            await asyncio.sleep(1)
            current_time = time.time()
            elapsed = current_time - self.start_time
            
            # 检查回调执行时间
            for callback in self.callbacks:
                try:
                    callback(elapsed)
                except Exception as e:
                    print(f"监控回调错误: {e}")

async def monitored_operation():
    await asyncio.sleep(2)
    return "完成"

async def main():
    monitor = LoopMonitor()
    
    def log_performance(elapsed):
        print(f"[{elapsed:.1f}s] 事件循环运行正常")
    
    monitor.register_callback(log_performance)
    
    # 启动监控
    monitor_task = asyncio.create_task(monitor())
    
    # 执行操作
    result = await monitored_operation()
    print(result)
    
    # 停止监控
    monitor_task.cancel()
    try:
        await monitor_task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

结论

Python的异步编程为构建高性能、高并发的应用程序提供了强大的工具。通过掌握协程、事件循环、异步上下文管理器和各种异步库,开发者可以显著提升应用程序的性能和响应能力。

关键要点总结:

  1. 理解核心概念:协程、事件循环和await是异步编程的基础
  2. 选择合适的库:aiohttp用于HTTP请求,asyncpg用于PostgreSQL,aiomysql用于MySQL等
  3. 遵循最佳实践:避免阻塞操作、合理管理资源、正确处理错误
  4. 性能监控:使用调试模式和监控工具来优化代码
  5. 逐步迁移:可以从部分功能开始异步化,逐步扩展到整个应用

异步编程虽然有一定的学习曲线,但一旦掌握,就能构建出响应迅速、资源高效的Python应用程序。随着Python生态系统的不断完善,异步编程的支持也在持续增强,这使得它成为现代Python开发不可或缺的技能之一。# Python异步编程完全指南:从入门到精通

引言:为什么需要异步编程

在当今的软件开发中,应用程序经常需要处理大量并发连接,例如Web服务器、API服务、网络爬虫等。传统的同步编程模型在处理这些场景时会遇到性能瓶颈,因为每个操作都会阻塞程序的执行,直到操作完成。

异步编程通过允许程序在等待某些操作(如I/O操作)完成时继续执行其他任务,从而显著提高了程序的并发性能和资源利用率。Python通过asyncio库和async/await语法为异步编程提供了优雅的解决方案。

基础概念:同步vs异步

同步编程示例

import time

def sync_task(name, duration):
    print(f"任务 {name} 开始")
    time.sleep(duration)  # 阻塞操作
    print(f"任务 {name} 结束")
    return f"结果_{name}"

def main_sync():
    start = time.time()
    
    # 顺序执行
    result1 = sync_task("A", 2)
    result2 = sync_task("B", 1)
    result3 = sync_task("C", 3)
    
    end = time.time()
    print(f"总耗时: {end - start:.2f}秒")
    print(f"结果: {result1}, {result2}, {result3}")

if __name__ == "__main__":
    main_sync()

输出:

任务 A 开始
任务 A 结束
任务 B 开始
任务 B 结束
任务 C 开始
任务 C 结束
总耗时: 6.00秒
结果: 结果_A, 结果_B, 结果_C

异步编程示例

import asyncio
import time

async def async_task(name, duration):
    print(f"任务 {name} 开始")
    await asyncio.sleep(duration)  # 非阻塞等待
    print(f"任务 {name} 结束")
    return f"结果_{name}"

async def main_async():
    start = time.time()
    
    # 并发执行
    task1 = async_task("A", 2)
    task2 = async_task("B", 1)
    task3 = async_task("C", 3)
    
    results = await asyncio.gather(task1, task2, task3)
    
    end = time.time()
    print(f"总耗时: {end - start:.2f}秒")
    print(f"结果: {', '.join(results)}")

if __name__ == "__main__":
    asyncio.run(main_async())

输出:

任务 A 开始
任务 B 开始
任务 C 开始
任务 B 结束
任务 A 结束
任务 C 结束
总耗时: 3.00秒
结果: 结果_A, 结果_B, 结果_C

协程与事件循环

协程基础

协程是Python异步编程的核心概念。它们是可以暂停和恢复执行的特殊函数。

import asyncio

async def my_coroutine():
    print("协程开始")
    await asyncio.sleep(1)
    print("协程继续")
    return "完成"

# 创建并运行协程
async def main():
    # 方法1: 使用asyncio.run()
    result = await my_coroutine()
    print(f"结果: {result}")
    
    # 方法2: 创建任务
    task = asyncio.create_task(my_coroutine())
    result = await task
    print(f"任务结果: {result}")

asyncio.run(main())

事件循环详解

事件循环是异步编程的调度器,负责管理所有协程的执行。

import asyncio
import time

async def worker(name, sleep_time):
    print(f"[{time.strftime('%H:%M:%S')}] 工人 {name} 开始工作")
    await asyncio.sleep(sleep_time)
    print(f"[{time.strftime('%H:%M:%S')}] 工人 {name} 完成工作")
    return f"{name}_result"

async def event_loop_demo():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    
    print("=== 事件循环信息 ===")
    print(f"事件循环类型: {type(loop)}")
    print(f"当前时间: {time.strftime('%H:%M:%S')}")
    
    # 创建多个任务
    tasks = [
        worker("A", 2),
        worker("B", 1),
        worker("C", 3),
        worker("D", 1.5)
    ]
    
    # 使用gather并发执行
    results = await asyncio.gather(*tasks)
    
    print("\n=== 所有任务完成 ===")
    for result in results:
        print(f"  {result}")

if __name__ == "__main__":
    asyncio.run(event_loop_demo())

异步上下文管理器

异步上下文管理器用于管理需要异步初始化和清理的资源。

import asyncio
from typing import Optional

class AsyncDatabaseConnection:
    """异步数据库连接管理器"""
    
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection: Optional[str] = None
    
    async def __aenter__(self):
        """异步进入上下文"""
        print(f"正在连接到: {self.connection_string}")
        await asyncio.sleep(0.5)  # 模拟连接延迟
        self.connection = f"Connection_to_{self.connection_string}"
        print(f"连接成功: {self.connection}")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步退出上下文"""
        print(f"正在关闭连接: {self.connection}")
        await asyncio.sleep(0.2)  # 模拟关闭延迟
        self.connection = None
        print("连接已关闭")
        
        # 如果有异常,可以在这里处理
        if exc_type:
            print(f"异常被捕获: {exc_type}")
    
    async def execute_query(self, query: str):
        """执行查询"""
        if not self.connection:
            raise RuntimeError("连接未建立")
        
        print(f"执行查询: {query}")
        await asyncio.sleep(0.3)  # 模拟查询延迟
        return f"结果_{query}"

async def use_database():
    """使用异步上下文管理器"""
    async with AsyncDatabaseConnection("postgres://localhost/mydb") as db:
        result1 = await db.execute_query("SELECT * FROM users")
        result2 = await db.execute_query("SELECT * FROM products")
        print(f"查询结果: {result1}, {result2}")
    
    # 这里连接会自动关闭

async def main():
    await use_database()

if __name__ == "__main__":
    asyncio.run(main())

实战:异步HTTP请求

使用aiohttp进行HTTP请求

import asyncio
import aiohttp
from typing import List, Dict
import time

class AsyncHttpClient:
    """异步HTTP客户端"""
    
    def __init__(self, max_concurrent: int = 5):
        self.max_concurrent = max_concurrent
        self.connector = aiohttp.TCPConnector(limit=max_concurrent)
    
    async def fetch_single(self, session: aiohttp.ClientSession, url: str) -> Dict:
        """获取单个URL"""
        try:
            async with session.get(url, timeout=10) as response:
                text = await response.text()
                return {
                    "url": url,
                    "status": response.status,
                    "content_length": len(text),
                    "success": True,
                    "content": text[:100] + "..." if len(text) > 100 else text
                }
        except Exception as e:
            return {
                "url": url,
                "error": str(e),
                "success": False
            }
    
    async def fetch_batch(self, urls: List[str]) -> List[Dict]:
        """批量获取URL"""
        async with aiohttp.ClientSession(connector=self.connector) as session:
            tasks = [self.fetch_single(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=False)
            return results
    
    async def fetch_with_retry(self, url: str, max_retries: int = 3) -> Dict:
        """带重试机制的获取"""
        async with aiohttp.ClientSession(connector=self.connector) as session:
            for attempt in range(max_retries):
                try:
                    result = await self.fetch_single(session, url)
                    if result["success"]:
                        return result
                except Exception as e:
                    if attempt == max_retries - 1:
                        return {"url": url, "error": str(e), "success": False}
                    await asyncio.sleep(2 ** attempt)  # 指数退避
            return {"url": url, "error": "Max retries exceeded", "success": False}

async def demo_http_client():
    """演示HTTP客户端"""
    client = AsyncHttpClient(max_concurrent=3)
    
    # 测试URL列表
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2", 
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404",
        "https://httpbin.org/json",
        "https://httpbin.org/html",
        "https://httpbin.org/xml",
        "https://httpbin.org/bytes/1024"
    ]
    
    print(f"开始批量获取 {len(urls)} 个URL...")
    start_time = time.time()
    
    results = await client.fetch_batch(urls)
    
    end_time = time.time()
    print(f"\n完成! 总耗时: {end_time - start_time:.2f}秒\n")
    
    # 输出结果摘要
    success_count = sum(1 for r in results if r["success"])
    print(f"成功: {success_count}/{len(urls)}")
    
    for result in results:
        if result["success"]:
            status = result["status"]
            size = result["content_length"]
            print(f"✓ {result['url']} - 状态: {status}, 大小: {size}字节")
        else:
            print(f"✗ {result['url']} - 错误: {result['error']}")

if __name__ == "__main__":
    asyncio.run(demo_http_client())

异步数据库操作

使用异步SQLAlchemy

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, select, delete
from sqlalchemy.exc import SQLAlchemyError

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(50), nullable=False)
    email = Column(String(100), unique=True)
    age = Column(Integer)

class AsyncDatabaseManager:
    """异步数据库管理器"""
    
    def __init__(self, database_url: str):
        self.engine = create_async_engine(
            database_url,
            echo=False,
            pool_size=10,
            max_overflow=20
        )
        self.async_session = sessionmaker(
            self.engine,
            class_=AsyncSession,
            expire_on_commit=False
        )
    
    async def initialize(self):
        """初始化数据库表"""
        async with self.engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
    
    async def create_user(self, name: str, email: str, age: int) -> int:
        """创建用户"""
        async with self.async_session() as session:
            user = User(name=name, email=email, age=age)
            session.add(user)
            await session.commit()
            await session.refresh(user)
            return user.id
    
    async def get_user_by_id(self, user_id: int) -> User:
        """根据ID获取用户"""
        async with self.async_session() as session:
            result = await session.execute(
                select(User).where(User.id == user_id)
            )
            return result.scalar_one_or_none()
    
    async def get_users_by_age_range(self, min_age: int, max_age: int) -> list[User]:
        """获取指定年龄范围的用户"""
        async with self.async_session() as session:
            result = await session.execute(
                select(User).where(
                    User.age >= min_age,
                    User.age <= max_age
                ).order_by(User.age)
            )
            return list(result.scalars().all())
    
    async def bulk_create_users(self, users_data: list[dict]) -> list[int]:
        """批量创建用户"""
        async with self.async_session() as session:
            users = [
                User(name=data["name"], email=data["email"], age=data["age"])
                for data in users_data
            ]
            session.add_all(users)
            await session.commit()
            
            # 返回创建的用户ID
            return [user.id for user in users]
    
    async def update_user_email(self, user_id: int, new_email: str) -> bool:
        """更新用户邮箱"""
        async with self.async_session() as session:
            result = await session.execute(
                select(User).where(User.id == user_id)
            )
            user = result.scalar_one_or_none()
            
            if user:
                user.email = new_email
                await session.commit()
                return True
            return False
    
    async def delete_user(self, user_id: int) -> bool:
        """删除用户"""
        async with self.async_session() as session:
            result = await session.execute(
                delete(User).where(User.id == user_id)
            )
            await session.commit()
            return result.rowcount > 0
    
    async def close(self):
        """关闭数据库连接"""
        await self.engine.dispose()

async def demo_database_operations():
    """演示数据库操作"""
    # 使用内存数据库进行演示
    db = AsyncDatabaseManager("sqlite+aiosqlite:///:memory:")
    
    try:
        # 初始化
        await db.initialize()
        print("数据库初始化完成")
        
        # 创建单个用户
        user_id = await db.create_user("张三", "zhangsan@example.com", 25)
        print(f"创建用户,ID: {user_id}")
        
        # 批量创建用户
        users_data = [
            {"name": "李四", "email": "lisi@example.com", "age": 30},
            {"name": "王五", "email": "wangwu@example.com", "age": 22},
            {"name": "赵六", "email": "zhaoliu@example.com", "age": 35},
            {"name": "钱七", "email": "qianqi@example.com", "age": 28},
        ]
        ids = await db.bulk_create_users(users_data)
        print(f"批量创建用户,IDs: {ids}")
        
        # 查询用户
        user = await db.get_user_by_id(user_id)
        if user:
            print(f"查询用户: {user.name}, 邮箱: {user.email}, 年龄: {user.age}")
        
        # 年龄范围查询
        users_25_30 = await db.get_users_by_age_range(25, 30)
        print(f"年龄25-30的用户: {[u.name for u in users_25_30]}")
        
        # 更新邮箱
        updated = await db.update_user_email(user_id, "zhangsan_new@example.com")
        print(f"邮箱更新: {updated}")
        
        # 删除用户
        deleted = await db.delete_user(user_id)
        print(f"删除用户: {deleted}")
        
    finally:
        await db.close()

if __name__ == "__main__":
    asyncio.run(demo_database_operations())

异步任务调度器

import asyncio
import time
from datetime import datetime
from typing import Callable, List, Dict
import heapq

class AsyncTaskScheduler:
    """高级异步任务调度器"""
    
    def __init__(self):
        self._tasks: List[tuple] = []  # (执行时间, 任务ID, 任务函数, 参数)
        self._running = False
        self._task_counter = 0
        self._completed_tasks: Dict[int, dict] = {}
    
    def schedule_once(self, delay: float, coro_func: Callable, *args, **kwargs) -> int:
        """调度一次性任务"""
        self._task_counter += 1
        execute_time = time.time() + delay
        task_id = self._task_counter
        
        heapq.heappush(self._tasks, (execute_time, task_id, coro_func, args, kwargs))
        print(f"[{self._timestamp()}] 调度任务 {task_id}: {coro_func.__name__} 在 {delay:.1f}s后执行")
        
        return task_id
    
    def schedule_interval(self, interval: float, coro_func: Callable, *args, **kwargs) -> int:
        """调度周期性任务"""
        async def repeated_task():
            while self._running:
                try:
                    await coro_func(*args, **kwargs)
                except Exception as e:
                    print(f"[{self._timestamp()}] 周期任务错误: {e}")
                await asyncio.sleep(interval)
        
        return self.schedule_once(0, repeated_task)
    
    def schedule_cron(self, cron_expr: str, coro_func: Callable, *args, **kwargs):
        """调度cron任务(简化版)"""
        # 这里可以实现更复杂的cron解析
        # 简单实现:每分钟检查一次
        async def cron_checker():
            while self._running:
                now = datetime.now()
                # 简化的cron检查逻辑
                if now.second == 0:  # 每分钟的0秒执行
                    try:
                        await coro_func(*args, **kwargs)
                    except Exception as e:
                        print(f"[{self._timestamp()}] Cron任务错误: {e}")
                await asyncio.sleep(1)
        
        return self.schedule_once(0, cron_checker)
    
    async def run(self):
        """启动调度器"""
        self._running = True
        print(f"[{self._timestamp()}] 调度器启动")
        
        try:
            while self._running or self._tasks:
                if not self._tasks:
                    await asyncio.sleep(0.1)
                    continue
                
                execute_time, task_id, coro_func, args, kwargs = self._tasks[0]
                now = time.time()
                
                if now >= execute_time:
                    # 执行任务
                    heapq.heappop(self._tasks)
                    asyncio.create_task(self._execute_task(task_id, coro_func, args, kwargs))
                else:
                    # 等待下一个任务
                    sleep_time = min(0.1, execute_time - now)
                    await asyncio.sleep(sleep_time)
        finally:
            print(f"[{self._timestamp()}] 调度器停止")
    
    async def _execute_task(self, task_id: int, coro_func: Callable, args, kwargs):
        """执行单个任务"""
        start_time = time.time()
        try:
            result = await coro_func(*args, **kwargs)
            duration = time.time() - start_time
            self._completed_tasks[task_id] = {
                "status": "success",
                "result": result,
                "duration": duration
            }
            print(f"[{self._timestamp()}] 任务 {task_id} 完成 ({duration:.2f}s): {result}")
        except Exception as e:
            duration = time.time() - start_time
            self._completed_tasks[task_id] = {
                "status": "failed",
                "error": str(e),
                "duration": duration
            }
            print(f"[{self._timestamp()}] 任务 {task_id} 失败 ({duration:.2f}s): {e}")
    
    def stop(self):
        """停止调度器"""
        self._running = False
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        total = len(self._completed_tasks)
        success = sum(1 for t in self._completed_tasks.values() if t["status"] == "success")
        failed = total - success
        
        avg_duration = 0
        if total > 0:
            durations = [t["duration"] for t in self._completed_tasks.values()]
            avg_duration = sum(durations) / len(durations)
        
        return {
            "total_tasks": total,
            "successful": success,
            "failed": failed,
            "avg_duration": avg_duration,
            "pending_tasks": len(self._tasks)
        }
    
    def _timestamp(self) -> str:
        """获取时间戳字符串"""
        return datetime.now().strftime("%H:%M:%S")

# 使用示例
async def sample_task(name: str, duration: float):
    """示例任务"""
    print(f"  任务 '{name}' 开始执行")
    await asyncio.sleep(duration)
    return f"'{name}' 完成"

async def periodic_report(scheduler: AsyncTaskScheduler):
    """周期性报告任务"""
    stats = scheduler.get_stats()
    print(f"\n[{scheduler._timestamp()}] 状态报告: {stats}")

async def demo_scheduler():
    """演示调度器"""
    scheduler = AsyncTaskScheduler()
    
    # 调度一次性任务
    scheduler.schedule_once(1.0, sample_task, "快速任务", 0.5)
    scheduler.schedule_once(2.0, sample_task, "中等任务", 1.0)
    scheduler.schedule_once(3.5, sample_task, "长任务", 2.0)
    
    # 调度周期性任务(每3秒)
    scheduler.schedule_interval(3.0, periodic_report, scheduler)
    
    # 调度多个并发任务
    for i in range(3):
        scheduler.schedule_once(0.5 + i * 0.3, sample_task, f"并发任务{i}", 0.2)
    
    # 调度未来任务
    scheduler.schedule_once(5.0, sample_task, "未来任务", 0.3)
    
    # 运行5秒后停止
    async def stop_after_delay():
        await asyncio.sleep(5)
        print("\n" + "="*50)
        print("5秒已到,停止调度器...")
        scheduler.stop()
        
        # 打印最终统计
        final_stats = scheduler.get_stats()
        print("\n最终统计:")
        for key, value in final_stats.items():
            print(f"  {key}: {value}")
    
    await asyncio.gather(
        scheduler.run(),
        stop_after_delay()
    )

if __name__ == "__main__":
    asyncio.run(demo_scheduler())

异步生成器与流处理

import asyncio
from typing import AsyncIterator, List

class AsyncDataStream:
    """异步数据流处理器"""
    
    def __init__(self, data: List[int]):
        self.data = data
        self.index = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.index >= len(self.data):
            raise StopAsyncIteration
        
        # 模拟异步处理
        await asyncio.sleep(0.1)
        value = self.data[self.index]
        self.index += 1
        return value

async def async_number_generator(n: int) -> AsyncIterator[int]:
    """异步生成器示例"""
    for i in range(n):
        await asyncio.sleep(0.05)  # 模拟异步操作
        yield i * 2  # 生成双倍数值

async def process_stream():
    """处理异步数据流"""
    print("=== 异步生成器演示 ===")
    async for number in async_number_generator(5):
        print(f"处理数字: {number}")
        await asyncio.sleep(0.02)  # 模拟处理时间
    
    print("\n=== 异步迭代器演示 ===")
    stream = AsyncDataStream([10, 20, 30, 40, 50])
    async for item in stream:
        print(f"收到数据: {item}")

# 高级流处理:管道模式
async def producer(queue: asyncio.Queue, count: int):
    """生产者"""
    for i in range(count):
        await asyncio.sleep(0.1)
        item = f"数据_{i}"
        await queue.put(item)
        print(f"[生产者] 生成: {item}")
    await queue.put(None)  # 结束信号

async def processor(queue: asyncio.Queue, result_queue: asyncio.Queue):
    """处理器"""
    while True:
        item = await queue.get()
        if item is None:
            break
        
        # 模拟处理
        await asyncio.sleep(0.15)
        processed = f"已处理_{item}"
        await result_queue.put(processed)
        print(f"[处理器] 处理: {item} -> {processed}")
        queue.task_done()

async def consumer(result_queue: asyncio.Queue):
    """消费者"""
    while True:
        result = await result_queue.get()
        if result is None:
            break
        
        # 模拟保存或输出
        await asyncio.sleep(0.05)
        print(f"[消费者] 接收: {result}")
        result_queue.task_done()

async def pipeline_demo():
    """管道模式演示"""
    print("\n=== 管道模式演示 ===")
    
    queue1 = asyncio.Queue()
    queue2 = asyncio.Queue()
    
    # 启动管道
    await asyncio.gather(
        producer(queue1, 5),
        processor(queue1, queue2),
        consumer(queue2)
    )
    
    print("管道处理完成")

async def main():
    await process_stream()
    await pipeline_demo()

if __name__ == "__main__":
    asyncio.run(main())

性能优化与最佳实践

1. 避免阻塞操作

import asyncio
import time

# 错误示例:阻塞操作
async def bad_practice():
    # time.sleep(1)  # 这会阻塞整个事件循环!
    # 应该使用:
    await asyncio.sleep(1)

# 正确示例:使用异步文件操作
import aiofiles

async def async_file_operations():
    # 异步文件写入
    async with aiofiles.open('async_file.txt', 'w') as f:
        await f.write('异步写入的内容\n')
    
    # 异步文件读取
    async with aiofiles.open('async_file.txt', 'r') as f:
        content = await f.read()
        print(f"读取内容: {content}")

2. 并发控制

import asyncio

async def limited_concurrency():
    """使用信号量控制并发"""
    semaphore = asyncio.Semaphore(3)  # 最多3个并发
    
    async def limited_task(n):
        async with semaphore:
            print(f"任务 {n} 开始")
            await asyncio.sleep(1)
            print(f"任务 {n} 结束")
            return f"结果_{n}"
    
    # 创建20个任务,但只有3个能同时执行
    tasks = [limited_task(i) for i in range(20)]
    results = await asyncio.gather(*tasks)
    print(f"完成 {len(results)} 个任务")

# 使用连接池
async def connection_pool_demo():
    """模拟连接池"""
    class ConnectionPool:
        def __init__(self, size: int):
            self.semaphore = asyncio.Semaphore(size)
            self.connections = []
        
        async def get_connection(self):
            async with self.semaphore:
                # 模拟获取连接
                await asyncio.sleep(0.1)
                return f"Connection_{id(self)}"
    
    pool = ConnectionPool(5)
    
    async def use_connection(task_id):
        conn = await pool.get_connection()
        print(f"任务 {task_id} 使用 {conn}")
        await asyncio.sleep(0.2)
        print(f"任务 {task_id} 释放 {conn}")
    
    tasks = [use_connection(i) for i in range(10)]
    await asyncio.gather(*tasks)

3. 错误处理与超时控制

import asyncio

async def robust_async_operation():
    """健壮的异步操作"""
    
    # 超时控制
    try:
        async with asyncio.timeout(5):  # 5秒超时
            await asyncio.sleep(10)  # 这会超时
    except TimeoutError:
        print("操作超时")
    
    # 重试机制
    async def fetch_with_retry(max_retries=3):
        for attempt in range(max_retries):
            try:
                # 模拟可能失败的操作
                if attempt < 2:
                    raise ConnectionError("连接失败")
                return "成功"
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                wait_time = 2 ** attempt
                print(f"尝试 {attempt + 1} 失败,{wait_time}秒后重试...")
                await asyncio.sleep(wait_time)
    
    try:
        result = await fetch_with_retry()
        print(f"最终结果: {result}")
    except Exception as e:
        print(f"最终失败: {e}")
    
    # 任务分组与错误处理
    async def task_with_timeout(name, duration, timeout):
        try:
            async with asyncio.timeout(timeout):
                await asyncio.sleep(duration)
                return f"{name}_成功"
        except TimeoutError:
            return f"{name}_超时"
        except Exception as e:
            return f"{name}_错误_{e}"
    
    # 并发执行,部分可能超时
    tasks = [
        task_with_timeout("任务1", 1, 2),
        task_with_timeout("任务2", 3, 2),  # 会超时
        task_with_timeout("任务3", 1, 2),
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    print("\n带超时的任务结果:")
    for result in results:
        print(f"  {result}")

4. 性能监控与调试

import asyncio
import time
from typing import List

class AsyncPerformanceMonitor:
    """异步性能监控器"""
    
    def __init__(self):
        self.start_time = None
        self.task_times: List[float] = []
        self.loop = asyncio.get_running_loop()
    
    async def monitor_coroutine(self, coro, name: str):
        """监控协程执行时间"""
        if self.start_time is None:
            self.start_time = time.time()
        
        task_start = time.time()
        try:
            result = await coro
            task_duration = time.time() - task_start
            self.task_times.append(task_duration)
            print(f"[{task_duration:.3f}s] {name} 完成")
            return result
        except Exception as e:
            task_duration = time.time() - task_start
            print(f"[{task_duration:.3f}s] {name} 失败: {e}")
            raise
    
    def get_stats(self):
        """获取性能统计"""
        if not self.task_times:
            return {}
        
        total_time = time.time() - self.start_time if self.start_time else 0
        return {
            "total_tasks": len(self.task_times),
            "total_time": total_time,
            "avg_task_time": sum(self.task_times) / len(self.task_times),
            "min_task_time": min(self.task_times),
            "max_task_time": max(self.task_times),
            "throughput": len(self.task_times) / total_time if total_time > 0 else 0
        }
    
    def print_stats(self):
        """打印性能统计"""
        stats = self.get_stats()
        print("\n=== 性能统计 ===")
        for key, value in stats.items():
            print(f"  {key}: {value:.2f}" if isinstance(value, float) else f"  {key}: {value}")

async def performance_demo():
    """性能监控演示"""
    monitor = AsyncPerformanceMonitor()
    
    # 模拟不同耗时的任务
    tasks = [
        (asyncio.sleep(0.1), "快速任务"),
        (asyncio.sleep(0.3), "中等任务"),
        (asyncio.sleep(0.2), "短任务"),
        (asyncio.sleep(0.5), "长任务"),
    ]
    
    # 并发执行并监控
    coroutines = [monitor.monitor_coroutine(coro, name) for coro, name in tasks]
    await asyncio.gather(*coroutines)
    
    # 打印统计
    monitor.print_stats()

if __name__ == "__main__":
    asyncio.run(performance_demo())

结论

Python异步编程为构建高性能、高并发的应用程序提供了强大的工具。通过掌握协程、事件循环、异步上下文管理器和各种异步库,开发者可以显著提升应用程序的性能和响应能力。

关键要点总结:

  1. 理解核心概念:协程、事件循环和await是异步编程的基础
  2. 选择合适的库:aiohttp用于HTTP请求,asyncpg用于PostgreSQL,aiomysql用于MySQL等
  3. 遵循最佳实践:避免阻塞操作、合理管理资源、正确处理错误
  4. 性能监控:使用调试模式和监控工具来优化代码
  5. 逐步迁移:可以从部分功能开始异步化,逐步扩展到整个应用

异步编程虽然有一定的学习曲线,但一旦掌握,就能构建出响应迅速、资源高效的Python应用程序。随着Python生态系统的不断完善,异步编程的支持也在持续增强,这使得它成为现代Python开发不可或缺的技能之一。