引言:为什么需要异步编程
在当今的软件开发世界中,I/O密集型应用变得越来越普遍。想象一下你正在开发一个网络爬虫,需要同时请求1000个网页;或者构建一个Web服务器,需要处理成千上万的并发连接。传统的同步编程方式就像一个服务员一次只能服务一桌客人,效率低下。而异步编程则像一个经验丰富的服务员,能够同时照看多张桌子,在客人点菜的间隙去服务其他客人。
Python的asyncio库正是为解决这类问题而生。它通过事件循环(Event Loop)机制,允许程序在等待I/O操作(如网络请求、文件读写)时不会阻塞,而是切换到其他任务继续执行。这种机制使得单线程也能实现高并发,大大提高了程序的执行效率。
基础概念解析
协程(Coroutine)
协程是asyncio的核心概念。与普通函数不同,协程可以在执行过程中暂停和恢复。在Python中,使用async def定义的函数就是协程函数:
async def hello_world():
print("Hello")
await asyncio.sleep(1) # 模拟I/O操作
print("World")
事件循环(Event Loop)
事件循环是asyncio的发动机。它负责:
- 跟踪所有注册的协程
- 当协程需要等待I/O时,将其挂起
- 当I/O就绪时,恢复协程执行
import asyncio
async def main():
print('开始执行')
await asyncio.sleep(1)
print('执行完成')
# 获取事件循环并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Future和Task
Future代表一个未来完成的操作。Task是Future的子类,专门用于包装协程:
async def compute(x, y):
await asyncio.sleep(1)
return x + y
# 创建Task
task = asyncio.create_task(compute(1, 2))
# 或者使用ensure_future
# task = asyncio.ensure_future(compute(1, 2))
# 等待任务完成
result = await task
print(f"结果是: {result}")
基本使用方法
1. 运行协程
有三种主要方式运行协程:
方式一:直接运行
async def my_coroutine():
print("Hello, asyncio!")
# Python 3.7+
asyncio.run(my_coroutine())
方式二:在事件循环中运行
async def my_coroutine():
print("Hello, asyncio!")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(my_coroutine())
finally:
loop.close()
方式三:并发运行多个协程
async def task1():
await asyncio.sleep(1)
return "任务1完成"
async def task2():
await asyncio.sleep(2)
return "任务2完成"
async def main():
# 并发执行
results = await asyncio.gather(
task1(),
task2()
)
print(results) # ['任务1完成', '任务2完成']
asyncio.run(main())
2. 基本同步原语
asyncio提供了多种同步原语,类似于threading模块:
# 锁
lock = asyncio.Lock()
async def critical_section():
async with lock:
# 临界区代码
await asyncio.sleep(0.1)
print("临界区执行完成")
# 信号量
semaphore = asyncio.Semaphore(3) # 最多3个并发
async def limited_task():
async with semaphore:
print("开始执行受限任务")
await asyncio.sleep(1)
print("受限任务完成")
# 事件
event = asyncio.Event()
async def waiter():
print("等待事件...")
await event.wait()
print("事件已触发!")
async def setter():
await asyncio.sleep(2)
event.set()
print("事件已设置")
高级特性
1. 超时控制
async def slow_task():
await asyncio.sleep(5)
return "完成"
async def main():
try:
# 设置5秒超时
result = await asyncio.wait_for(slow_task(), timeout=3)
except asyncio.TimeoutError:
print("任务超时!")
# 或者使用wait_for的替代方案
task = asyncio.create_task(slow_task())
try:
await asyncio.wait_for(task, timeout=3)
except asyncio.TimeoutError:
print("任务超时!")
task.cancel() # 取消任务
2. 任务取消和清理
async def cancellable_task():
try:
while True:
print("任务运行中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("任务被取消,执行清理...")
# 执行必要的清理工作
await asyncio.sleep(0.5)
print("清理完成")
raise # 重新抛出异常
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("主函数确认任务已取消")
3. 任务组(TaskGroup)
Python 3.11引入了TaskGroup,提供了更安全的任务管理:
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coroutine())
task2 = tg.create_task(another_coroutine())
# 所有任务完成后才会继续执行
print("所有任务完成")
4. 异步迭代器和生成器
class AsyncRange:
def __init__(self, start, end):
self.start = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.start >= self.end:
raise StopAsyncIteration
current = self.start
self.start += 1
await asyncio.sleep(0.1) # 模拟异步操作
return current
async def main():
async for number in AsyncRange(1, 5):
print(number)
# 异步生成器
async def async_generator():
for i in range(5):
await asyncio.sleep(0.1)
yield i
async def main():
async for value in async_generator():
print(value)
实际应用示例
示例1:并发网络请求
import aiohttp
import asyncio
import time
async def fetch_url(session, url):
"""获取单个URL的内容"""
try:
async with session.get(url, timeout=10) as response:
content = await response.text()
return len(content)
except Exception as e:
print(f"获取 {url} 失败: {e}")
return 0
async def main():
urls = [
"https://www.example.com",
"https://www.google.com",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.python.org"
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
# 并发执行所有请求
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, length in zip(urls, results):
print(f"{url}: {length} 字节")
print(f"总耗时: {time.time() - start_time:.2f} 秒")
# 运行
asyncio.run(main())
示例2:异步数据库操作
import asyncpg
import asyncio
async def create_pool():
"""创建数据库连接池"""
return await asyncpg.create_pool(
host="localhost",
port=5432,
user="postgres",
password="password",
database="mydb",
min_size=5,
max_size=20
)
async def get_user(pool, user_id):
"""获取用户信息"""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
return dict(row) if row else None
async def insert_user(pool, name, email):
"""插入用户信息"""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
name, email
)
return row['id']
async def main():
pool = await create_pool()
try:
# 并发查询多个用户
user_ids = [1, 2, 3, 4, 5]
users = await asyncio.gather(*[
get_user(pool, uid) for uid in user_ids
])
for user in users:
if user:
print(f"用户: {user['name']}, 邮箱: {user['email']}")
# 插入新用户
new_id = await insert_user(pool, "张三", "zhangsan@example.com")
print(f"新用户ID: {new_id}")
finally:
await pool.close()
# 运行
asyncio.run(main())
示例3:Web服务器
from aiohttp import web
import asyncio
import json
async def handle_request(request):
"""处理HTTP请求"""
# 模拟数据库查询
await asyncio.sleep(0.1)
data = {
"message": "Hello from async server!",
"timestamp": asyncio.get_event_loop().time(),
"method": request.method,
"path": request.path
}
return web.json_response(data)
async def background_task():
"""后台任务"""
counter = 0
while True:
counter += 1
print(f"后台任务运行中... 计数: {counter}")
await asyncio.sleep(5)
async def start_background_tasks(app):
"""启动后台任务"""
app['background_task'] = asyncio.create_task(background_task())
async def cleanup_background_tasks(app):
"""清理后台任务"""
app['background_task'].cancel()
await app['background_task']
def create_app():
"""创建Web应用"""
app = web.Application()
app.router.add_get('/', handle_request)
app.router.add_get('/api/data', handle_request)
# 添加启动和清理钩子
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
return app
if __name__ == '__main__':
app = create_app()
web.run_app(app, host='127.0.0.1', port=8080)
性能优化技巧
1. 限制并发数
async def limited_concurrent():
"""使用Semaphore限制并发"""
semaphore = asyncio.Semaphore(10) # 最多10个并发
async def limited_task(n):
async with semaphore:
print(f"任务 {n} 开始")
await asyncio.sleep(1)
print(f"任务 {n} 完成")
tasks = [limited_task(i) for i in range(20)]
await asyncio.gather(*tasks)
2. 使用连接池
# 对于HTTP请求
import aiohttp
async def create_http_session():
connector = aiohttp.TCPConnector(
limit=100, # 总连接数限制
limit_per_host=30, # 每个主机连接数限制
ttl_dns_cache=300 # DNS缓存时间
)
timeout = aiohttp.ClientTimeout(total=30)
return aiohttp.ClientSession(connector=connector, timeout=timeout)
# 对于数据库
import asyncpg
async def create_db_pool():
return await asyncpg.create_pool(
min_size=5,
max_size=20,
command_timeout=60,
server_settings={
'jit': 'off' # 关闭JIT以提高性能
}
)
3. 任务调度优化
async def priority_task_execution():
"""优先级任务执行"""
high_priority = asyncio.PriorityQueue()
low_priority = asyncio.PriorityQueue()
# 添加任务
await high_priority.put((1, "高优先级任务1"))
await high_priority.put((1, "高优先级任务2"))
await low_priority.put((5, "低优先级任务"))
# 执行任务
async def worker(queue):
while not queue.empty():
priority, task = await queue.get()
print(f"执行 {task} (优先级: {priority})")
await asyncio.sleep(0.5)
# 优先处理高优先级
await worker(high_priority)
await worker(low_priority)
常见陷阱与解决方案
1. 阻塞代码问题
错误示范:
async def bad_example():
time.sleep(1) # 这会阻塞整个事件循环!
print("这行代码永远不会执行")
正确做法:
async def good_example():
await asyncio.sleep(1) # 使用异步版本
print("这行代码会正常执行")
2. 忘记await
错误示范:
async def wrong():
task = asyncio.create_task(some_coroutine())
# 忘记await,任务可能未完成
return task
正确做法:
async def correct():
task = asyncio.create_task(some_coroutine())
result = await task # 等待任务完成
return result
3. 在协程中调用阻塞函数
错误示范:
async def process_file():
with open('large_file.txt', 'r') as f: # 阻塞I/O
content = f.read()
return content
正确做法:
import aiofiles
async def process_file():
async with aiofiles.open('large_file.txt', 'r') as f: # 异步I/O
content = await f.read()
return content
调试技巧
1. 使用调试模式
import asyncio
import logging
# 启用调试模式
logging.basicConfig(level=logging.DEBUG)
async def main():
# 创建长时间运行的任务
task = asyncio.create_task(long_running_task())
# 检查任务状态
print(f"任务完成: {task.done()}")
print(f"任务取消: {task.cancelled()}")
try:
result = await asyncio.wait_for(task, timeout=2)
except asyncio.TimeoutError:
print("任务超时")
task.cancel()
# 获取任务结果(如果已完成)
if task.done():
try:
result = task.result()
print(f"结果: {result}")
except Exception as e:
print(f"异常: {e}")
# 运行时添加参数
# python -m asyncio --debug your_script.py
2. 任务追踪
async def tracked_task(name):
print(f"[{name}] 开始")
await asyncio.sleep(1)
print(f"[{name}] 完成")
return f"{name}_result"
async def main():
tasks = []
for i in range(3):
task = asyncio.create_task(tracked_task(f"Task-{i}"))
tasks.append(task)
# 等待所有任务完成
pending = tasks
while pending:
done, pending = await asyncio.wait(
pending,
timeout=0.5,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
try:
result = task.result()
print(f"任务完成: {result}")
except Exception as e:
print(f"任务异常: {e}")
与其他技术的集成
1. 与线程池集成
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
"""模拟阻塞I/O操作"""
import time
time.sleep(1)
return "阻塞操作完成"
async def main():
loop = asyncio.get_event_loop()
# 使用线程池执行阻塞代码
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(result)
asyncio.run(main())
2. 与进程池集成
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive(x, y):
"""CPU密集型计算"""
import time
time.sleep(1) # 模拟计算
return x + y
async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
# 在单独进程中执行CPU密集型任务
result = await loop.run_in_executor(pool, cpu_intensive, 10, 20)
print(f"计算结果: {result}")
asyncio.run(main())
3. 与FastAPI集成
from fastapi import FastAPI
import asyncio
import asyncpg
from contextlib import asynccontextmanager
# 全局数据库连接池
db_pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时创建连接池
global db_pool
db_pool = await asyncpg.create_pool(
host="localhost",
user="postgres",
password="password",
database="mydb"
)
yield
# 关闭时清理
if db_pool:
await db_pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1",
user_id
)
return dict(row) if row else None
@app.get("/concurrent")
async def concurrent_requests():
# 并发处理多个请求
results = await asyncio.gather(
get_user(1),
get_user(2),
get_user(3)
)
return {"results": results}
总结
Python的asyncio为现代软件开发提供了强大的异步编程能力。通过本指南,我们从基础概念到高级应用,全面了解了如何使用asyncio构建高性能的并发应用。
关键要点:
- 理解核心概念:协程、事件循环、Future和Task是asyncio的基石
- 正确使用await:确保在所有异步操作前使用await
- 避免阻塞代码:使用异步版本的库,或在必要时使用线程/进程池
- 合理控制并发:使用Semaphore等工具防止资源耗尽
- 注意错误处理:妥善处理取消和超时情况
asyncio特别适合I/O密集型应用,如Web服务器、网络爬虫、数据库操作等。对于CPU密集型任务,建议结合多进程使用。随着Python版本的更新,asyncio的功能也在不断增强,如Python 3.11的TaskGroup让并发任务管理更加安全便捷。
掌握asyncio将使你的Python编程技能提升到新的高度,能够构建出更高效、更现代的应用程序。
