引言:为什么需要异步编程?
在现代软件开发中,I/O密集型任务(如网络请求、文件读写、数据库查询)往往成为性能瓶颈。传统的同步编程模式会导致程序在等待I/O操作完成时阻塞,浪费CPU资源。异步编程通过非阻塞的方式处理这些操作,让程序在等待时可以执行其他任务,从而显著提高吞吐量和响应速度。
想象一个餐厅的场景:同步模式就像一个服务员必须等一道菜完全做好后才能服务下一桌;而异步模式就像一个服务员同时照看多桌,哪桌需要就去处理,效率自然更高。
一、异步编程的核心概念
1.1 协程(Coroutine)
协程是异步编程的基本构建块。与线程不同,协程是用户态的轻量级执行单元,切换开销极小。
# 传统函数 vs 协程函数
def sync_function():
return "Hello"
async def async_function():
return "Hello"
# 调用方式不同
sync_result = sync_function() # 立即执行
async_result = async_function() # 返回协程对象,需要await执行
1.2 Event Loop(事件循环)
事件循环是异步编程的心脏,它负责:
- 监听和分发事件
- 执行协程任务
- 处理I/O多路复用
import asyncio
async def main():
print("Start")
await asyncio.sleep(1) # 非阻塞等待
print("End")
# Python 3.7+ 推荐写法
asyncio.run(main())
# 旧版写法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
1.3 Future 和 Task
Future代表一个未来可能完成的结果,Task是Future的子类,专门用于包装协程。
async def fetch_data():
await asyncio.sleep(1)
return {"data": 123}
async def main():
# 创建Task但不立即执行
task = asyncio.create_task(fetch_data())
print("Task created")
# 等待任务完成
result = await task
print(f"Result: {result}")
asyncio.run(main())
二、Python异步编程实战
2.1 基础异步HTTP请求
使用aiohttp库进行异步HTTP请求:
import aiohttp
import asyncio
import time
async def fetch_url(session, url):
"""异步获取单个URL内容"""
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
return f"Error: {e}"
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1"
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
# 并发执行所有请求
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"URL {i+1}: {len(result)} characters")
print(f"Total time: {time.time() - start_time:.2f}s")
# 运行
asyncio.run(main())
2.2 异步数据库操作
使用asyncpg进行PostgreSQL异步操作:
import asyncpg
import asyncio
async def create_pool():
"""创建数据库连接池"""
return await asyncpg.create_pool(
host="localhost",
port=5432,
user="postgres",
password="password",
database="testdb",
min_size=5,
max_size=20
)
async def fetch_users(pool):
"""异步查询用户数据"""
async with pool.acquire() as conn:
# 异步执行查询
rows = await conn.fetch("SELECT id, name, email FROM users")
return [dict(row) for row in rows]
async def insert_user(pool, name, email):
"""异步插入用户数据"""
async with pool.acquire() as conn:
# 异步执行插入
result = await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
name, email
)
return result
async def main():
pool = await create_pool()
try:
# 并发查询和插入
users_task = asyncio.create_task(fetch_users(pool))
insert_task = asyncio.create_task(insert_user(pool, "Alice", "alice@example.com"))
users = await users_task
new_id = await insert_task
print(f"Users: {users}")
print(f"New user ID: {new_id}")
finally:
await pool.close()
asyncio.run(main())
2.3 异步文件操作
使用aiofiles进行异步文件读写:
import aiofiles
import asyncio
async def write_log(message):
"""异步写入日志"""
async with aiofiles.open("app.log", mode="a") as f:
await f.write(f"{message}\n")
async def read_config():
"""异步读取配置"""
try:
async with aiofiles.open("config.json", mode="r") as f:
content = await f.read()
return json.loads(content)
except FileNotFoundError:
return {}
async def process_data(data):
"""模拟数据处理"""
await asyncio.sleep(0.5)
return {"processed": data, "timestamp": time.time()}
async def main():
# 并发执行多个文件操作和数据处理
tasks = [
write_log("Starting application"),
read_config(),
process_data({"key": "value"})
]
results = await asyncio.gather(*tasks)
print(f"Results: {results}")
asyncio.run(main())
三、高级异步模式
3.1 异步上下文管理器
class AsyncDatabaseSession:
"""自定义异步上下文管理器"""
def __init__(self, connection_string):
self.connection_string = connection_string
self.session = None
async def __aenter__(self):
"""进入上下文时执行"""
self.session = await self._connect()
return self.session
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出上下文时执行"""
if self.session:
await self.session.close()
async def _connect(self):
"""模拟连接数据库"""
await asyncio.sleep(0.1)
return {"connected": True}
# 使用示例
async def use_async_context():
async with AsyncDatabaseSession("postgresql://...") as session:
print(f"Session: {session}")
# 在这里执行数据库操作
asyncio.run(use_async_context())
3.2 异步迭代器
class AsyncDataStreamer:
"""异步数据流处理器"""
def __init__(self, data_source):
self.data_source = data_source
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
"""异步获取下一个数据项"""
if self.index >= len(self.data_source):
raise StopAsyncIteration
# 模拟异步处理
await asyncio.sleep(0.1)
item = self.data_source[self.index]
self.index += 1
return item
# 使用示例
async def stream_data():
streamer = AsyncDataStreamer([1, 2, 3, 4, 5])
async for item in streamer:
print(f"Processing item: {item}")
asyncio.run(stream_data())
3.3 异步锁与同步
import asyncio
class RateLimiter:
"""异步速率限制器"""
def __init__(self, max_requests, period):
self.max_requests = max_requests
self.period = period
self.requests = []
self.lock = asyncio.Lock()
async def acquire(self):
"""获取访问权限"""
async with self.lock:
now = asyncio.get_event_loop().time()
# 清理过期的请求记录
self.requests = [req_time for req_time in self.requests
if now - req_time < self.period]
if len(self.requests) >= self.max_requests:
# 计算需要等待的时间
oldest_request = self.requests[0]
wait_time = self.period - (now - oldest_request)
await asyncio.sleep(wait_time)
# 重新清理
self.requests = [req_time for req_time in self.requests
if asyncio.get_event_loop().time() - req_time < self.period]
self.requests.append(now)
# 使用示例
async def limited_api_call(limiter, call_id):
await limiter.acquire()
print(f"API call {call_id} at {asyncio.get_event_loop().time():.2f}")
await asyncio.sleep(0.1) # 模拟API调用
async def main():
limiter = RateLimiter(max_requests=3, period=1.0)
# 并发发起10个请求,但速率限制为每秒3个
tasks = [limited_api_call(limiter, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
四、异步编程最佳实践
4.1 错误处理
import asyncio
from typing import Optional
async def robust_async_call():
"""健壮的异步错误处理"""
try:
# 模拟可能失败的异步操作
await asyncio.sleep(0.1)
if True: # 模拟条件判断
raise ValueError("Something went wrong")
return "Success"
except ValueError as e:
print(f"Caught expected error: {e}")
return "Recovered"
except Exception as e:
print(f"Unexpected error: {e}")
raise # 重新抛出未知错误
finally:
print("Cleanup performed")
async def main():
result = await robust_async_call()
print(f"Final result: {result}")
asyncio.run(main())
4.2 超时控制
async def with_timeout(coro, timeout: float, default=None):
"""带超时的异步执行"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
print(f"Operation timed out after {timeout}s")
return default
async def slow_operation():
await asyncio.sleep(2)
return "Completed"
async def main():
# 尝试在1秒内完成,但操作需要2秒
result = await with_timeout(slow_operation(), timeout=1.0)
print(f"Result: {result}")
asyncio.run(main())
4.3 任务取消与清理
import asyncio
class GracefulTaskManager:
"""优雅的任务管理器"""
def __init__(self):
self.tasks = set()
self.shutdown_event = asyncio.Event()
def create_task(self, coro):
"""创建并跟踪任务"""
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self._on_task_done)
return task
def _on_task_done(self, task):
"""任务完成时的回调"""
self.tasks.discard(task)
if task.exception():
print(f"Task failed: {task.exception()}")
async def shutdown(self, timeout=5.0):
"""优雅关闭所有任务"""
print(f"Shutting down {len(self.tasks)} tasks...")
# 取消所有任务
for task in self.tasks:
task.cancel()
# 等待任务完成(带超时)
if self.tasks:
await asyncio.wait(
self.tasks,
timeout=timeout,
return_when=asyncio.ALL_COMPLETED
)
# 处理未完成的任务
for task in self.tasks:
if not task.done():
print(f"Task did not complete in time: {task}")
self.tasks.clear()
async def worker(manager, worker_id):
"""工作协程"""
try:
while not manager.shutdown_event.is_set():
print(f"Worker {worker_id} working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"Worker {worker_id} cancelled")
raise
async def main():
manager = GracefulTaskManager()
# 启动多个工作协程
for i in range(3):
manager.create_task(worker(manager, i))
# 运行一段时间
await asyncio.sleep(3)
# 优雅关闭
await manager.shutdown()
asyncio.run(main())
五、性能优化与调试
5.1 性能监控
import asyncio
import time
from functools import wraps
def async_timed(func):
"""异步函数执行时间装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
result = await func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__} took {end - start:.2f}s")
return result
return wrapper
@async_timed
async def monitored_operation():
"""被监控的异步操作"""
await asyncio.sleep(1)
return "Done"
async def main():
await monitored_operation()
asyncio.run(main())
5.2 使用asyncio的调试模式
# 启用调试模式的方法:
# 1. 设置环境变量:PYTHONASYNCIODEBUG=1
# 2. 或者在代码中:
import asyncio
async def debug_example():
loop = asyncio.get_event_loop()
loop.set_debug(True)
# 检测执行时间过长的协程
loop.slow_callback_duration = 0.1 # 100ms
await asyncio.sleep(0.05)
asyncio.run(debug_example())
六、总结
异步编程是现代Python开发的重要技能,特别适用于I/O密集型应用。通过掌握协程、事件循环和各种异步模式,你可以构建高性能、可扩展的应用程序。记住以下关键点:
- 理解核心概念:协程、事件循环、Future/Task
- 选择合适的库:aiohttp、asyncpg、aiofiles等
- 遵循最佳实践:正确处理错误、超时控制、优雅关闭
- 持续优化:监控性能、使用调试工具
异步编程的学习曲线可能较陡,但一旦掌握,将极大提升你的编程能力和应用性能。开始时从小项目入手,逐步构建更复杂的异步系统。
