引言:异步编程的重要性
在现代软件开发中,异步编程已经成为处理I/O密集型任务和提高应用程序性能的关键技术。Python作为一种广泛使用的编程语言,通过asyncio库和async/await语法为开发者提供了强大的异步编程支持。异步编程的核心优势在于它允许程序在等待某些操作(如网络请求、文件读写或数据库查询)完成时继续执行其他任务,而不是阻塞整个程序。这种非阻塞的执行模式特别适合构建高并发的Web服务器、网络爬虫和数据处理管道。
异步编程与传统的多线程或多进程模型相比,具有更低的资源消耗和更简单的并发控制。在Python 3.5及更高版本中,async/await语法的引入使得异步代码的编写更加直观和易于理解。本文将从基础概念开始,逐步深入到高级应用,帮助读者全面掌握Python异步编程的精髓。
基础概念:同步与异步的区别
要理解异步编程,首先需要明确同步与异步的根本区别。在同步编程中,代码按顺序执行,每个操作必须等待前一个操作完成后才能开始。例如,考虑以下同步代码:
import time
def fetch_data():
print("开始获取数据...")
time.sleep(2) # 模拟I/O等待
print("数据获取完成")
return "模拟数据"
def process_data(data):
print("开始处理数据...")
time.sleep(1) # 模拟处理时间
print("数据处理完成")
return data.upper()
# 同步执行
data = fetch_data()
result = process_data(data)
print(f"最终结果: {result}")
这段代码的执行流程是完全线性的:fetch_data()必须等待2秒,然后process_data()再等待1秒,总耗时3秒。在等待期间,程序无法做任何其他事情。
而异步编程通过事件循环和协程来实现非阻塞执行。以下是等效的异步版本:
import asyncio
import time
async def fetch_data():
print("开始获取数据...")
await asyncio.sleep(2) # 非阻塞等待
print("数据获取完成")
return "模拟数据"
async def process_data(data):
print("开始处理数据...")
await asyncio.sleep(1) # 非阻塞等待
print("数据处理完成")
return data.upper()
async def main():
start_time = time.time()
data = await fetch_data()
result = await process_data(data)
print(f"最终结果: {result}")
print(f"总耗时: {time.time() - start_time:.2f}秒")
asyncio.run(main())
虽然这段代码看起来仍然按顺序执行,但关键在于await asyncio.sleep(2)不会阻塞整个程序。在实际应用中,当有多个异步任务时,它们可以在等待期间并发执行。
协程与事件循环
协程是异步编程的核心构建块。在Python中,协程是通过async def定义的特殊函数。与普通函数不同,调用协程函数不会立即执行函数体,而是返回一个协程对象。要执行协程,需要将其加入到事件循环中。
事件循环是异步编程的调度器,它负责管理所有协程的执行。以下是一个更复杂的例子,展示多个协程的并发执行:
import asyncio
import time
async def task(name, duration):
print(f"任务 {name} 开始,预计耗时 {duration}秒")
await asyncio.sleep(duration)
print(f"任务 {name} 完成")
return f"结果_{name}"
async def main():
start_time = time.time()
# 创建多个任务
tasks = [
task("A", 2),
task("B", 3),
task("C", 1)
]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
print(f"所有任务完成: {results}")
print(f"总耗时: {time.time() - start_time:.2f}秒")
asyncio.run(main())
在这个例子中,三个任务被同时启动。尽管每个任务有不同的执行时间,但总耗时约等于最长任务的时间(3秒),而不是所有任务时间之和(6秒)。这是因为事件循环在某个任务等待时(通过await)会切换到其他就绪的任务。
异步上下文管理器与资源管理
异步编程中经常需要管理资源,如网络连接或文件句柄。Python提供了异步上下文管理器来优雅地处理这些资源。异步上下文管理器通过实现__aenter__和__aexit__方法来定义。
以下是一个异步网络连接池的实现示例:
import asyncio
from typing import List
class AsyncConnectionPool:
def __init__(self, max_connections: int = 5):
self.max_connections = max_connections
self.semaphore = asyncio.Semaphore(max_connections)
self.connections: List[str] = []
async def __aenter__(self):
print("初始化连接池...")
await self._create_connections()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭连接池...")
self.connections.clear()
async def _create_connections(self):
# 模拟创建连接的异步操作
for i in range(self.max_connections):
await asyncio.sleep(0.1) # 模拟连接创建延迟
self.connections.append(f"Connection-{i}")
async def get_connection(self):
async with self.semaphore:
if not self.connections:
raise Exception("没有可用连接")
conn = self.connections.pop()
print(f"获取连接: {conn}")
return conn
async def release_connection(self, conn: str):
print(f"释放连接: {conn}")
self.connections.append(conn)
async def use_connection(pool: AsyncConnectionPool, task_id: int):
conn = await pool.get_connection()
try:
print(f"任务{task_id}使用连接{conn}")
await asyncio.sleep(1) # 模拟使用连接
finally:
await pool.release_connection(conn)
async def main():
async with AsyncConnectionPool(max_connections=3) as pool:
tasks = [use_connection(pool, i) for i in range(8)]
await asyncio.gather(*tasks)
asyncio.run(main())
这个例子展示了如何使用异步上下文管理器来管理连接池。信号量(Semaphore)确保同时只有指定数量的任务可以获取连接,防止资源耗尽。
异步HTTP请求实战:使用aiohttp
在实际应用中,异步编程最常见的场景之一是网络请求。Python的aiohttp库是一个强大的异步HTTP客户端/服务器框架。以下是一个完整的异步Web爬虫示例:
import asyncio
import aiohttp
from typing import List, Dict
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""异步获取单个URL的内容"""
try:
async with session.get(url, timeout=10) as response:
print(f"正在获取: {url}")
text = await response.text()
return {
"url": url,
"status": response.status,
"content_length": len(text),
"success": True
}
except Exception as e:
print(f"获取 {url} 失败: {e}")
return {
"url": url,
"error": str(e),
"success": False
}
async def batch_fetch(urls: List[str], max_concurrent: int = 5) -> List[Dict]:
"""并发获取多个URL"""
# 创建连接器,限制并发连接数
connector = aiohttp.TCPConnector(limit=max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
# 创建任务列表
tasks = [fetch_url(session, url) for url in urls]
# 使用gather并发执行,保持顺序
results = await asyncio.gather(*tasks, return_exceptions=False)
return results
async def main():
# 测试URL列表
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/bytes/1024",
"https://httpbin.org/json",
"https://httpbin.org/xml",
"https://httpbin.org/html"
]
start_time = time.time()
print(f"开始批量获取 {len(urls)} 个URL...")
results = await batch_fetch(urls, max_concurrent=3)
print(f"\n完成! 总耗时: {time.time() - start_time:.2f}秒")
print("\n结果摘要:")
for result in results:
if result["success"]:
print(f"✓ {result['url']} - 状态: {result['status']}, 大小: {result['content_length']}字节")
else:
print(f"✗ {result['url']} - 错误: {result['error']}")
if __name__ == "__main__":
asyncio.run(main())
这个爬虫示例展示了几个关键的异步编程模式:
- 使用aiohttp.ClientSession管理HTTP连接
- 通过TCPConnector限制并发连接数
- 使用asyncio.gather并发执行多个任务
- 错误处理和超时管理
- 资源的自动清理(async with)
异步数据库操作
异步编程在数据库操作中也非常有用,特别是对于高并发的Web应用。以下是一个使用异步SQLAlchemy和asyncpg的示例:
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(100))
# 异步数据库引擎
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/testdb"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def init_db():
"""初始化数据库表"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def create_user(name: str, email: str) -> int:
"""创建用户"""
async with async_session() as session:
user = User(name=name, email=email)
session.add(user)
await session.commit()
return user.id
async def get_user_by_id(user_id: int) -> User:
"""根据ID获取用户"""
async with async_session() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one()
async def get_users_batch(user_ids: List[int]) -> List[User]:
"""批量获取用户"""
async with async_session() as session:
result = await session.execute(
select(User).where(User.id.in_(user_ids))
)
return result.scalars().all()
async def main():
# 初始化数据库
await init_db()
# 并发创建用户
create_tasks = [
create_user(f"User{i}", f"user{i}@example.com")
for i in range(5)
]
user_ids = await asyncio.gather(*create_tasks)
print(f"创建的用户ID: {user_ids}")
# 批量查询用户
users = await get_users_batch(user_ids)
for user in users:
print(f"用户: {user.name}, 邮箱: {user.email}")
if __name__ == "__main__":
asyncio.run(main())
这个例子展示了如何在异步环境中使用SQLAlchemy进行数据库操作。关键点包括:
- 使用create_async_engine创建异步引擎
- 使用async_sessionmaker创建异步会话工厂
- 使用await session.execute()执行查询
- 在异步上下文中管理数据库连接
异步任务调度与定时任务
在实际应用中,经常需要调度异步任务或执行定时任务。Python的asyncio提供了多种方式来实现这一点。以下是一个完整的任务调度器示例:
import asyncio
import time
from datetime import datetime, timedelta
from typing import Callable, List, Dict
import heapq
class AsyncTaskScheduler:
"""异步任务调度器"""
def __init__(self):
self._tasks: List[tuple] = [] # (执行时间, 任务ID, 任务函数)
self._running = False
self._task_counter = 0
def schedule(self, delay: float, coro_func: Callable, *args, **kwargs) -> int:
"""调度一个任务在指定延迟后执行"""
self._task_counter += 1
execute_time = time.time() + delay
task_id = self._task_counter
# 使用堆确保按时间顺序执行
heapq.heappush(self._tasks, (execute_time, task_id, coro_func, args, kwargs))
print(f"[{datetime.now().strftime('%H:%M:%S')}] 任务 {task_id} 已调度,将在 {delay:.1f}秒后执行")
return task_id
def schedule_interval(self, interval: float, coro_func: Callable, *args, **kwargs) -> int:
"""调度一个周期性任务"""
async def repeated_task():
while self._running:
try:
await coro_func(*args, **kwargs)
except Exception as e:
print(f"周期任务执行错误: {e}")
await asyncio.sleep(interval)
return self.schedule(0, repeated_task)
async def run(self):
"""启动调度器"""
self._running = True
print("任务调度器已启动")
try:
while self._running or self._tasks:
if not self._tasks:
await asyncio.sleep(0.1)
continue
execute_time, task_id, coro_func, args, kwargs = self._tasks[0]
now = time.time()
if now >= execute_time:
# 执行任务
heapq.heappop(self._tasks)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 执行任务 {task_id}")
asyncio.create_task(self._execute_task(task_id, coro_func, args, kwargs))
else:
# 等待下一个任务
await asyncio.sleep(min(0.1, execute_time - now))
finally:
print("任务调度器已停止")
async def _execute_task(self, task_id: int, coro_func: Callable, args, kwargs):
"""执行单个任务"""
try:
result = await coro_func(*args, **kwargs)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 任务 {task_id} 完成,结果: {result}")
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 任务 {task_id} 失败: {e}")
def stop(self):
"""停止调度器"""
self._running = False
# 使用示例
async def sample_task(name: str, duration: float):
print(f" 任务 '{name}' 开始执行")
await asyncio.sleep(duration)
return f"'{name}' 完成"
async def periodic_task():
print(f" 周期任务执行于 {datetime.now().strftime('%H:%M:%S')}")
async def main():
scheduler = AsyncTaskScheduler()
# 调度一次性任务
scheduler.schedule(1.0, sample_task, "快速任务", 0.5)
scheduler.schedule(2.0, sample_task, "中等任务", 1.0)
scheduler.schedule(3.5, sample_task, "长任务", 2.0)
# 调度周期性任务(每3秒执行一次)
scheduler.schedule_interval(3.0, periodic_task)
# 调度多个并发任务
for i in range(3):
scheduler.schedule(0.5 + i * 0.3, sample_task, f"并发任务{i}", 0.2)
# 运行调度器5秒后停止
async def stop_after_delay():
await asyncio.sleep(5)
print("\n5秒已到,停止调度器...")
scheduler.stop()
await asyncio.gather(
scheduler.run(),
stop_after_delay()
)
if __name__ == "__main__":
asyncio.run(main())
这个任务调度器展示了如何:
- 使用堆(heapq)管理定时任务
- 实现周期性任务调度
- 处理任务执行和错误
- 优雅地启动和停止调度器
异步编程的最佳实践
1. 避免阻塞操作
在异步代码中,任何阻塞操作都会破坏整个事件循环的性能。确保所有I/O操作都是异步的:
# 错误示例:阻塞操作
async def bad_example():
time.sleep(1) # 阻塞整个事件循环!
return "done"
# 正确示例:异步操作
async def good_example():
await asyncio.sleep(1) # 非阻塞
return "done"
2. 合理使用任务组
使用asyncio.gather()或asyncio.TaskGroup()来管理多个任务:
# 使用TaskGroup(Python 3.11+)
async def task_group_example():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coroutine())
task2 = tg.create_task(another_coroutine())
# 所有任务完成后才会继续执行
return task1.result(), task2.result()
3. 错误处理
异步代码中的错误处理需要特别注意:
async def robust_async_operation():
try:
async with asyncio.timeout(10): # 设置超时
result = await some_async_call()
return result
except asyncio.TimeoutError:
print("操作超时")
return None
except Exception as e:
print(f"操作失败: {e}")
return None
4. 资源限制
使用信号量(Semaphore)限制并发数量:
async def limited_concurrency():
semaphore = asyncio.Semaphore(5) # 最多5个并发
async def limited_task(n):
async with semaphore:
print(f"任务 {n} 开始")
await asyncio.sleep(1)
print(f"任务 {n} 结束")
tasks = [limited_task(i) for i in range(20)]
await asyncio.gather(*tasks)
高级主题:异步生成器与异步迭代
Python还支持异步生成器和异步迭代,这对于处理流式数据非常有用:
import asyncio
from typing import AsyncIterator
async def async_number_generator(n: int) -> AsyncIterator[int]:
"""异步生成器示例"""
for i in range(n):
await asyncio.sleep(0.1) # 模拟异步操作
yield i
async def process_async_stream():
"""使用异步生成器"""
async for number in async_number_generator(5):
print(f"处理数字: {number}")
await asyncio.sleep(0.05) # 模拟处理时间
# 异步迭代器类
class AsyncDataStream:
def __init__(self, data: List[int]):
self.data = data
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.data):
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步获取数据
value = self.data[self.index]
self.index += 1
return value
async def use_async_iterator():
stream = AsyncDataStream([10, 20, 30, 40, 50])
async for item in stream:
print(f"收到数据: {item}")
async def main():
await process_async_stream()
await use_async_iterator()
if __name__ == "__main__":
asyncio.run(main())
性能优化与监控
1. 性能分析
使用asyncio的调试模式来识别性能问题:
import asyncio
import time
async def slow_operation():
await asyncio.sleep(1)
async def main():
# 启用调试模式
loop = asyncio.get_running_loop()
loop.set_debug(True)
# 记录开始时间
start = time.time()
# 并发执行
await asyncio.gather(
slow_operation(),
slow_operation(),
slow_operation()
)
print(f"总耗时: {time.time() - start:.2f}秒")
# 运行时设置环境变量PYTHONASYNCIODEBUG=1
asyncio.run(main())
2. 监控事件循环
监控事件循环的性能指标:
import asyncio
import time
class LoopMonitor:
def __init__(self):
self.callbacks = []
self.start_time = time.time()
def register_callback(self, callback):
self.callbacks.append(callback)
async def monitor(self):
while True:
await asyncio.sleep(1)
current_time = time.time()
elapsed = current_time - self.start_time
# 检查回调执行时间
for callback in self.callbacks:
try:
callback(elapsed)
except Exception as e:
print(f"监控回调错误: {e}")
async def monitored_operation():
await asyncio.sleep(2)
return "完成"
async def main():
monitor = LoopMonitor()
def log_performance(elapsed):
print(f"[{elapsed:.1f}s] 事件循环运行正常")
monitor.register_callback(log_performance)
# 启动监控
monitor_task = asyncio.create_task(monitor())
# 执行操作
result = await monitored_operation()
print(result)
# 停止监控
monitor_task.cancel()
try:
await monitor_task
except asyncio.CancelledError:
pass
asyncio.run(main())
结论
Python的异步编程为构建高性能、高并发的应用程序提供了强大的工具。通过掌握协程、事件循环、异步上下文管理器和各种异步库,开发者可以显著提升应用程序的性能和响应能力。
关键要点总结:
- 理解核心概念:协程、事件循环和await是异步编程的基础
- 选择合适的库:aiohttp用于HTTP请求,asyncpg用于PostgreSQL,aiomysql用于MySQL等
- 遵循最佳实践:避免阻塞操作、合理管理资源、正确处理错误
- 性能监控:使用调试模式和监控工具来优化代码
- 逐步迁移:可以从部分功能开始异步化,逐步扩展到整个应用
异步编程虽然有一定的学习曲线,但一旦掌握,就能构建出响应迅速、资源高效的Python应用程序。随着Python生态系统的不断完善,异步编程的支持也在持续增强,这使得它成为现代Python开发不可或缺的技能之一。# Python异步编程完全指南:从入门到精通
引言:为什么需要异步编程
在当今的软件开发中,应用程序经常需要处理大量并发连接,例如Web服务器、API服务、网络爬虫等。传统的同步编程模型在处理这些场景时会遇到性能瓶颈,因为每个操作都会阻塞程序的执行,直到操作完成。
异步编程通过允许程序在等待某些操作(如I/O操作)完成时继续执行其他任务,从而显著提高了程序的并发性能和资源利用率。Python通过asyncio库和async/await语法为异步编程提供了优雅的解决方案。
基础概念:同步vs异步
同步编程示例
import time
def sync_task(name, duration):
print(f"任务 {name} 开始")
time.sleep(duration) # 阻塞操作
print(f"任务 {name} 结束")
return f"结果_{name}"
def main_sync():
start = time.time()
# 顺序执行
result1 = sync_task("A", 2)
result2 = sync_task("B", 1)
result3 = sync_task("C", 3)
end = time.time()
print(f"总耗时: {end - start:.2f}秒")
print(f"结果: {result1}, {result2}, {result3}")
if __name__ == "__main__":
main_sync()
输出:
任务 A 开始
任务 A 结束
任务 B 开始
任务 B 结束
任务 C 开始
任务 C 结束
总耗时: 6.00秒
结果: 结果_A, 结果_B, 结果_C
异步编程示例
import asyncio
import time
async def async_task(name, duration):
print(f"任务 {name} 开始")
await asyncio.sleep(duration) # 非阻塞等待
print(f"任务 {name} 结束")
return f"结果_{name}"
async def main_async():
start = time.time()
# 并发执行
task1 = async_task("A", 2)
task2 = async_task("B", 1)
task3 = async_task("C", 3)
results = await asyncio.gather(task1, task2, task3)
end = time.time()
print(f"总耗时: {end - start:.2f}秒")
print(f"结果: {', '.join(results)}")
if __name__ == "__main__":
asyncio.run(main_async())
输出:
任务 A 开始
任务 B 开始
任务 C 开始
任务 B 结束
任务 A 结束
任务 C 结束
总耗时: 3.00秒
结果: 结果_A, 结果_B, 结果_C
协程与事件循环
协程基础
协程是Python异步编程的核心概念。它们是可以暂停和恢复执行的特殊函数。
import asyncio
async def my_coroutine():
print("协程开始")
await asyncio.sleep(1)
print("协程继续")
return "完成"
# 创建并运行协程
async def main():
# 方法1: 使用asyncio.run()
result = await my_coroutine()
print(f"结果: {result}")
# 方法2: 创建任务
task = asyncio.create_task(my_coroutine())
result = await task
print(f"任务结果: {result}")
asyncio.run(main())
事件循环详解
事件循环是异步编程的调度器,负责管理所有协程的执行。
import asyncio
import time
async def worker(name, sleep_time):
print(f"[{time.strftime('%H:%M:%S')}] 工人 {name} 开始工作")
await asyncio.sleep(sleep_time)
print(f"[{time.strftime('%H:%M:%S')}] 工人 {name} 完成工作")
return f"{name}_result"
async def event_loop_demo():
# 获取当前事件循环
loop = asyncio.get_running_loop()
print("=== 事件循环信息 ===")
print(f"事件循环类型: {type(loop)}")
print(f"当前时间: {time.strftime('%H:%M:%S')}")
# 创建多个任务
tasks = [
worker("A", 2),
worker("B", 1),
worker("C", 3),
worker("D", 1.5)
]
# 使用gather并发执行
results = await asyncio.gather(*tasks)
print("\n=== 所有任务完成 ===")
for result in results:
print(f" {result}")
if __name__ == "__main__":
asyncio.run(event_loop_demo())
异步上下文管理器
异步上下文管理器用于管理需要异步初始化和清理的资源。
import asyncio
from typing import Optional
class AsyncDatabaseConnection:
"""异步数据库连接管理器"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.connection: Optional[str] = None
async def __aenter__(self):
"""异步进入上下文"""
print(f"正在连接到: {self.connection_string}")
await asyncio.sleep(0.5) # 模拟连接延迟
self.connection = f"Connection_to_{self.connection_string}"
print(f"连接成功: {self.connection}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步退出上下文"""
print(f"正在关闭连接: {self.connection}")
await asyncio.sleep(0.2) # 模拟关闭延迟
self.connection = None
print("连接已关闭")
# 如果有异常,可以在这里处理
if exc_type:
print(f"异常被捕获: {exc_type}")
async def execute_query(self, query: str):
"""执行查询"""
if not self.connection:
raise RuntimeError("连接未建立")
print(f"执行查询: {query}")
await asyncio.sleep(0.3) # 模拟查询延迟
return f"结果_{query}"
async def use_database():
"""使用异步上下文管理器"""
async with AsyncDatabaseConnection("postgres://localhost/mydb") as db:
result1 = await db.execute_query("SELECT * FROM users")
result2 = await db.execute_query("SELECT * FROM products")
print(f"查询结果: {result1}, {result2}")
# 这里连接会自动关闭
async def main():
await use_database()
if __name__ == "__main__":
asyncio.run(main())
实战:异步HTTP请求
使用aiohttp进行HTTP请求
import asyncio
import aiohttp
from typing import List, Dict
import time
class AsyncHttpClient:
"""异步HTTP客户端"""
def __init__(self, max_concurrent: int = 5):
self.max_concurrent = max_concurrent
self.connector = aiohttp.TCPConnector(limit=max_concurrent)
async def fetch_single(self, session: aiohttp.ClientSession, url: str) -> Dict:
"""获取单个URL"""
try:
async with session.get(url, timeout=10) as response:
text = await response.text()
return {
"url": url,
"status": response.status,
"content_length": len(text),
"success": True,
"content": text[:100] + "..." if len(text) > 100 else text
}
except Exception as e:
return {
"url": url,
"error": str(e),
"success": False
}
async def fetch_batch(self, urls: List[str]) -> List[Dict]:
"""批量获取URL"""
async with aiohttp.ClientSession(connector=self.connector) as session:
tasks = [self.fetch_single(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=False)
return results
async def fetch_with_retry(self, url: str, max_retries: int = 3) -> Dict:
"""带重试机制的获取"""
async with aiohttp.ClientSession(connector=self.connector) as session:
for attempt in range(max_retries):
try:
result = await self.fetch_single(session, url)
if result["success"]:
return result
except Exception as e:
if attempt == max_retries - 1:
return {"url": url, "error": str(e), "success": False}
await asyncio.sleep(2 ** attempt) # 指数退避
return {"url": url, "error": "Max retries exceeded", "success": False}
async def demo_http_client():
"""演示HTTP客户端"""
client = AsyncHttpClient(max_concurrent=3)
# 测试URL列表
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/json",
"https://httpbin.org/html",
"https://httpbin.org/xml",
"https://httpbin.org/bytes/1024"
]
print(f"开始批量获取 {len(urls)} 个URL...")
start_time = time.time()
results = await client.fetch_batch(urls)
end_time = time.time()
print(f"\n完成! 总耗时: {end_time - start_time:.2f}秒\n")
# 输出结果摘要
success_count = sum(1 for r in results if r["success"])
print(f"成功: {success_count}/{len(urls)}")
for result in results:
if result["success"]:
status = result["status"]
size = result["content_length"]
print(f"✓ {result['url']} - 状态: {status}, 大小: {size}字节")
else:
print(f"✗ {result['url']} - 错误: {result['error']}")
if __name__ == "__main__":
asyncio.run(demo_http_client())
异步数据库操作
使用异步SQLAlchemy
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, select, delete
from sqlalchemy.exc import SQLAlchemyError
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True)
age = Column(Integer)
class AsyncDatabaseManager:
"""异步数据库管理器"""
def __init__(self, database_url: str):
self.engine = create_async_engine(
database_url,
echo=False,
pool_size=10,
max_overflow=20
)
self.async_session = sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False
)
async def initialize(self):
"""初始化数据库表"""
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def create_user(self, name: str, email: str, age: int) -> int:
"""创建用户"""
async with self.async_session() as session:
user = User(name=name, email=email, age=age)
session.add(user)
await session.commit()
await session.refresh(user)
return user.id
async def get_user_by_id(self, user_id: int) -> User:
"""根据ID获取用户"""
async with self.async_session() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_users_by_age_range(self, min_age: int, max_age: int) -> list[User]:
"""获取指定年龄范围的用户"""
async with self.async_session() as session:
result = await session.execute(
select(User).where(
User.age >= min_age,
User.age <= max_age
).order_by(User.age)
)
return list(result.scalars().all())
async def bulk_create_users(self, users_data: list[dict]) -> list[int]:
"""批量创建用户"""
async with self.async_session() as session:
users = [
User(name=data["name"], email=data["email"], age=data["age"])
for data in users_data
]
session.add_all(users)
await session.commit()
# 返回创建的用户ID
return [user.id for user in users]
async def update_user_email(self, user_id: int, new_email: str) -> bool:
"""更新用户邮箱"""
async with self.async_session() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if user:
user.email = new_email
await session.commit()
return True
return False
async def delete_user(self, user_id: int) -> bool:
"""删除用户"""
async with self.async_session() as session:
result = await session.execute(
delete(User).where(User.id == user_id)
)
await session.commit()
return result.rowcount > 0
async def close(self):
"""关闭数据库连接"""
await self.engine.dispose()
async def demo_database_operations():
"""演示数据库操作"""
# 使用内存数据库进行演示
db = AsyncDatabaseManager("sqlite+aiosqlite:///:memory:")
try:
# 初始化
await db.initialize()
print("数据库初始化完成")
# 创建单个用户
user_id = await db.create_user("张三", "zhangsan@example.com", 25)
print(f"创建用户,ID: {user_id}")
# 批量创建用户
users_data = [
{"name": "李四", "email": "lisi@example.com", "age": 30},
{"name": "王五", "email": "wangwu@example.com", "age": 22},
{"name": "赵六", "email": "zhaoliu@example.com", "age": 35},
{"name": "钱七", "email": "qianqi@example.com", "age": 28},
]
ids = await db.bulk_create_users(users_data)
print(f"批量创建用户,IDs: {ids}")
# 查询用户
user = await db.get_user_by_id(user_id)
if user:
print(f"查询用户: {user.name}, 邮箱: {user.email}, 年龄: {user.age}")
# 年龄范围查询
users_25_30 = await db.get_users_by_age_range(25, 30)
print(f"年龄25-30的用户: {[u.name for u in users_25_30]}")
# 更新邮箱
updated = await db.update_user_email(user_id, "zhangsan_new@example.com")
print(f"邮箱更新: {updated}")
# 删除用户
deleted = await db.delete_user(user_id)
print(f"删除用户: {deleted}")
finally:
await db.close()
if __name__ == "__main__":
asyncio.run(demo_database_operations())
异步任务调度器
import asyncio
import time
from datetime import datetime
from typing import Callable, List, Dict
import heapq
class AsyncTaskScheduler:
"""高级异步任务调度器"""
def __init__(self):
self._tasks: List[tuple] = [] # (执行时间, 任务ID, 任务函数, 参数)
self._running = False
self._task_counter = 0
self._completed_tasks: Dict[int, dict] = {}
def schedule_once(self, delay: float, coro_func: Callable, *args, **kwargs) -> int:
"""调度一次性任务"""
self._task_counter += 1
execute_time = time.time() + delay
task_id = self._task_counter
heapq.heappush(self._tasks, (execute_time, task_id, coro_func, args, kwargs))
print(f"[{self._timestamp()}] 调度任务 {task_id}: {coro_func.__name__} 在 {delay:.1f}s后执行")
return task_id
def schedule_interval(self, interval: float, coro_func: Callable, *args, **kwargs) -> int:
"""调度周期性任务"""
async def repeated_task():
while self._running:
try:
await coro_func(*args, **kwargs)
except Exception as e:
print(f"[{self._timestamp()}] 周期任务错误: {e}")
await asyncio.sleep(interval)
return self.schedule_once(0, repeated_task)
def schedule_cron(self, cron_expr: str, coro_func: Callable, *args, **kwargs):
"""调度cron任务(简化版)"""
# 这里可以实现更复杂的cron解析
# 简单实现:每分钟检查一次
async def cron_checker():
while self._running:
now = datetime.now()
# 简化的cron检查逻辑
if now.second == 0: # 每分钟的0秒执行
try:
await coro_func(*args, **kwargs)
except Exception as e:
print(f"[{self._timestamp()}] Cron任务错误: {e}")
await asyncio.sleep(1)
return self.schedule_once(0, cron_checker)
async def run(self):
"""启动调度器"""
self._running = True
print(f"[{self._timestamp()}] 调度器启动")
try:
while self._running or self._tasks:
if not self._tasks:
await asyncio.sleep(0.1)
continue
execute_time, task_id, coro_func, args, kwargs = self._tasks[0]
now = time.time()
if now >= execute_time:
# 执行任务
heapq.heappop(self._tasks)
asyncio.create_task(self._execute_task(task_id, coro_func, args, kwargs))
else:
# 等待下一个任务
sleep_time = min(0.1, execute_time - now)
await asyncio.sleep(sleep_time)
finally:
print(f"[{self._timestamp()}] 调度器停止")
async def _execute_task(self, task_id: int, coro_func: Callable, args, kwargs):
"""执行单个任务"""
start_time = time.time()
try:
result = await coro_func(*args, **kwargs)
duration = time.time() - start_time
self._completed_tasks[task_id] = {
"status": "success",
"result": result,
"duration": duration
}
print(f"[{self._timestamp()}] 任务 {task_id} 完成 ({duration:.2f}s): {result}")
except Exception as e:
duration = time.time() - start_time
self._completed_tasks[task_id] = {
"status": "failed",
"error": str(e),
"duration": duration
}
print(f"[{self._timestamp()}] 任务 {task_id} 失败 ({duration:.2f}s): {e}")
def stop(self):
"""停止调度器"""
self._running = False
def get_stats(self) -> Dict:
"""获取统计信息"""
total = len(self._completed_tasks)
success = sum(1 for t in self._completed_tasks.values() if t["status"] == "success")
failed = total - success
avg_duration = 0
if total > 0:
durations = [t["duration"] for t in self._completed_tasks.values()]
avg_duration = sum(durations) / len(durations)
return {
"total_tasks": total,
"successful": success,
"failed": failed,
"avg_duration": avg_duration,
"pending_tasks": len(self._tasks)
}
def _timestamp(self) -> str:
"""获取时间戳字符串"""
return datetime.now().strftime("%H:%M:%S")
# 使用示例
async def sample_task(name: str, duration: float):
"""示例任务"""
print(f" 任务 '{name}' 开始执行")
await asyncio.sleep(duration)
return f"'{name}' 完成"
async def periodic_report(scheduler: AsyncTaskScheduler):
"""周期性报告任务"""
stats = scheduler.get_stats()
print(f"\n[{scheduler._timestamp()}] 状态报告: {stats}")
async def demo_scheduler():
"""演示调度器"""
scheduler = AsyncTaskScheduler()
# 调度一次性任务
scheduler.schedule_once(1.0, sample_task, "快速任务", 0.5)
scheduler.schedule_once(2.0, sample_task, "中等任务", 1.0)
scheduler.schedule_once(3.5, sample_task, "长任务", 2.0)
# 调度周期性任务(每3秒)
scheduler.schedule_interval(3.0, periodic_report, scheduler)
# 调度多个并发任务
for i in range(3):
scheduler.schedule_once(0.5 + i * 0.3, sample_task, f"并发任务{i}", 0.2)
# 调度未来任务
scheduler.schedule_once(5.0, sample_task, "未来任务", 0.3)
# 运行5秒后停止
async def stop_after_delay():
await asyncio.sleep(5)
print("\n" + "="*50)
print("5秒已到,停止调度器...")
scheduler.stop()
# 打印最终统计
final_stats = scheduler.get_stats()
print("\n最终统计:")
for key, value in final_stats.items():
print(f" {key}: {value}")
await asyncio.gather(
scheduler.run(),
stop_after_delay()
)
if __name__ == "__main__":
asyncio.run(demo_scheduler())
异步生成器与流处理
import asyncio
from typing import AsyncIterator, List
class AsyncDataStream:
"""异步数据流处理器"""
def __init__(self, data: List[int]):
self.data = data
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.data):
raise StopAsyncIteration
# 模拟异步处理
await asyncio.sleep(0.1)
value = self.data[self.index]
self.index += 1
return value
async def async_number_generator(n: int) -> AsyncIterator[int]:
"""异步生成器示例"""
for i in range(n):
await asyncio.sleep(0.05) # 模拟异步操作
yield i * 2 # 生成双倍数值
async def process_stream():
"""处理异步数据流"""
print("=== 异步生成器演示 ===")
async for number in async_number_generator(5):
print(f"处理数字: {number}")
await asyncio.sleep(0.02) # 模拟处理时间
print("\n=== 异步迭代器演示 ===")
stream = AsyncDataStream([10, 20, 30, 40, 50])
async for item in stream:
print(f"收到数据: {item}")
# 高级流处理:管道模式
async def producer(queue: asyncio.Queue, count: int):
"""生产者"""
for i in range(count):
await asyncio.sleep(0.1)
item = f"数据_{i}"
await queue.put(item)
print(f"[生产者] 生成: {item}")
await queue.put(None) # 结束信号
async def processor(queue: asyncio.Queue, result_queue: asyncio.Queue):
"""处理器"""
while True:
item = await queue.get()
if item is None:
break
# 模拟处理
await asyncio.sleep(0.15)
processed = f"已处理_{item}"
await result_queue.put(processed)
print(f"[处理器] 处理: {item} -> {processed}")
queue.task_done()
async def consumer(result_queue: asyncio.Queue):
"""消费者"""
while True:
result = await result_queue.get()
if result is None:
break
# 模拟保存或输出
await asyncio.sleep(0.05)
print(f"[消费者] 接收: {result}")
result_queue.task_done()
async def pipeline_demo():
"""管道模式演示"""
print("\n=== 管道模式演示 ===")
queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
# 启动管道
await asyncio.gather(
producer(queue1, 5),
processor(queue1, queue2),
consumer(queue2)
)
print("管道处理完成")
async def main():
await process_stream()
await pipeline_demo()
if __name__ == "__main__":
asyncio.run(main())
性能优化与最佳实践
1. 避免阻塞操作
import asyncio
import time
# 错误示例:阻塞操作
async def bad_practice():
# time.sleep(1) # 这会阻塞整个事件循环!
# 应该使用:
await asyncio.sleep(1)
# 正确示例:使用异步文件操作
import aiofiles
async def async_file_operations():
# 异步文件写入
async with aiofiles.open('async_file.txt', 'w') as f:
await f.write('异步写入的内容\n')
# 异步文件读取
async with aiofiles.open('async_file.txt', 'r') as f:
content = await f.read()
print(f"读取内容: {content}")
2. 并发控制
import asyncio
async def limited_concurrency():
"""使用信号量控制并发"""
semaphore = asyncio.Semaphore(3) # 最多3个并发
async def limited_task(n):
async with semaphore:
print(f"任务 {n} 开始")
await asyncio.sleep(1)
print(f"任务 {n} 结束")
return f"结果_{n}"
# 创建20个任务,但只有3个能同时执行
tasks = [limited_task(i) for i in range(20)]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个任务")
# 使用连接池
async def connection_pool_demo():
"""模拟连接池"""
class ConnectionPool:
def __init__(self, size: int):
self.semaphore = asyncio.Semaphore(size)
self.connections = []
async def get_connection(self):
async with self.semaphore:
# 模拟获取连接
await asyncio.sleep(0.1)
return f"Connection_{id(self)}"
pool = ConnectionPool(5)
async def use_connection(task_id):
conn = await pool.get_connection()
print(f"任务 {task_id} 使用 {conn}")
await asyncio.sleep(0.2)
print(f"任务 {task_id} 释放 {conn}")
tasks = [use_connection(i) for i in range(10)]
await asyncio.gather(*tasks)
3. 错误处理与超时控制
import asyncio
async def robust_async_operation():
"""健壮的异步操作"""
# 超时控制
try:
async with asyncio.timeout(5): # 5秒超时
await asyncio.sleep(10) # 这会超时
except TimeoutError:
print("操作超时")
# 重试机制
async def fetch_with_retry(max_retries=3):
for attempt in range(max_retries):
try:
# 模拟可能失败的操作
if attempt < 2:
raise ConnectionError("连接失败")
return "成功"
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
print(f"尝试 {attempt + 1} 失败,{wait_time}秒后重试...")
await asyncio.sleep(wait_time)
try:
result = await fetch_with_retry()
print(f"最终结果: {result}")
except Exception as e:
print(f"最终失败: {e}")
# 任务分组与错误处理
async def task_with_timeout(name, duration, timeout):
try:
async with asyncio.timeout(timeout):
await asyncio.sleep(duration)
return f"{name}_成功"
except TimeoutError:
return f"{name}_超时"
except Exception as e:
return f"{name}_错误_{e}"
# 并发执行,部分可能超时
tasks = [
task_with_timeout("任务1", 1, 2),
task_with_timeout("任务2", 3, 2), # 会超时
task_with_timeout("任务3", 1, 2),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
print("\n带超时的任务结果:")
for result in results:
print(f" {result}")
4. 性能监控与调试
import asyncio
import time
from typing import List
class AsyncPerformanceMonitor:
"""异步性能监控器"""
def __init__(self):
self.start_time = None
self.task_times: List[float] = []
self.loop = asyncio.get_running_loop()
async def monitor_coroutine(self, coro, name: str):
"""监控协程执行时间"""
if self.start_time is None:
self.start_time = time.time()
task_start = time.time()
try:
result = await coro
task_duration = time.time() - task_start
self.task_times.append(task_duration)
print(f"[{task_duration:.3f}s] {name} 完成")
return result
except Exception as e:
task_duration = time.time() - task_start
print(f"[{task_duration:.3f}s] {name} 失败: {e}")
raise
def get_stats(self):
"""获取性能统计"""
if not self.task_times:
return {}
total_time = time.time() - self.start_time if self.start_time else 0
return {
"total_tasks": len(self.task_times),
"total_time": total_time,
"avg_task_time": sum(self.task_times) / len(self.task_times),
"min_task_time": min(self.task_times),
"max_task_time": max(self.task_times),
"throughput": len(self.task_times) / total_time if total_time > 0 else 0
}
def print_stats(self):
"""打印性能统计"""
stats = self.get_stats()
print("\n=== 性能统计 ===")
for key, value in stats.items():
print(f" {key}: {value:.2f}" if isinstance(value, float) else f" {key}: {value}")
async def performance_demo():
"""性能监控演示"""
monitor = AsyncPerformanceMonitor()
# 模拟不同耗时的任务
tasks = [
(asyncio.sleep(0.1), "快速任务"),
(asyncio.sleep(0.3), "中等任务"),
(asyncio.sleep(0.2), "短任务"),
(asyncio.sleep(0.5), "长任务"),
]
# 并发执行并监控
coroutines = [monitor.monitor_coroutine(coro, name) for coro, name in tasks]
await asyncio.gather(*coroutines)
# 打印统计
monitor.print_stats()
if __name__ == "__main__":
asyncio.run(performance_demo())
结论
Python异步编程为构建高性能、高并发的应用程序提供了强大的工具。通过掌握协程、事件循环、异步上下文管理器和各种异步库,开发者可以显著提升应用程序的性能和响应能力。
关键要点总结:
- 理解核心概念:协程、事件循环和await是异步编程的基础
- 选择合适的库:aiohttp用于HTTP请求,asyncpg用于PostgreSQL,aiomysql用于MySQL等
- 遵循最佳实践:避免阻塞操作、合理管理资源、正确处理错误
- 性能监控:使用调试模式和监控工具来优化代码
- 逐步迁移:可以从部分功能开始异步化,逐步扩展到整个应用
异步编程虽然有一定的学习曲线,但一旦掌握,就能构建出响应迅速、资源高效的Python应用程序。随着Python生态系统的不断完善,异步编程的支持也在持续增强,这使得它成为现代Python开发不可或缺的技能之一。
