引言
在现代软件开发中,异步编程已成为处理高并发、I/O密集型任务的关键技术。Python作为一门广泛使用的编程语言,通过asyncio库提供了强大的异步编程支持。本文将详细介绍Python异步编程的核心概念、语法和实践应用,帮助您掌握这一重要技能。
异步编程基础
什么是异步编程?
异步编程是一种编程范式,它允许程序在等待某些操作(如I/O操作)完成时继续执行其他任务,而不是阻塞等待。这在处理网络请求、文件读写等耗时操作时特别有用。
为什么需要异步编程?
- 提高程序性能:避免在I/O操作上浪费CPU时间
- 改善用户体验:在GUI应用中保持界面响应
- 处理高并发:用较少的资源处理大量并发连接
Python中的async/await语法
基本语法
Python 3.5+引入了async和await关键字,使异步编程更加直观:
import asyncio
async def fetch_data():
print("开始获取数据...")
await asyncio.sleep(1) # 模拟I/O操作
print("数据获取完成!")
return {"data": [1, 2, 3]}
async def main():
result = await fetch_data()
print(f"结果: {result}")
# 运行异步程序
asyncio.run(main())
关键概念
- 协程(Coroutine):使用
async def定义的函数 - Awaitable:可以被
await的对象,包括协程、Task和Future - Event Loop:事件循环,负责调度和执行异步任务
实际应用示例
网络请求示例
使用aiohttp进行异步HTTP请求:
import aiohttp
import asyncio
async def fetch_url(session, url):
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
return f"Error: {str(e)}"
async def main():
urls = [
'https://example.com',
'https://httpbin.org/json',
'https://api.github.com'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"URL: {url}\n内容长度: {len(content)}\n")
if __name__ == "__main__":
asyncio.run(main())
数据库操作示例
使用asyncpg进行异步PostgreSQL操作:
import asyncio
import asyncpg
async def run_query():
# 连接数据库
conn = await asyncpg.connect(
host='localhost',
database='testdb',
user='postgres',
password='password'
)
try:
# 执行查询
row = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", 100
)
if row:
print(f"用户: {row['name']}, 邮箱: {row['email']}")
# 批量插入
data = [
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com')
]
await conn.executemany(
"INSERT INTO users(name, email) VALUES($1, $2)", data
)
finally:
await conn.close()
if __name__ == "__main__":
asyncio.run(run_query())
高级技巧与最佳实践
1. 任务管理
async def main():
# 创建多个任务
task1 = asyncio.create_task(fetch_data())
task2 = asyncio.create_task(fetch_data())
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
# 带超时的等待
try:
result = await asyncio.wait_for(fetch_data(), timeout=0.5)
except asyncio.TimeoutError:
print("操作超时!")
# 取消任务
task = asyncio.create_task(fetch_data())
await asyncio.sleep(0.1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务被取消")
2. 同步代码集成
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
# 模拟阻塞I/O操作
import time
time.sleep(1)
return "完成阻塞操作"
async def async_wrapper():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
return result
async def main():
result = await async_wrapper()
print(result)
3. 锁与同步
import asyncio
class SharedResource:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
# 模拟处理时间
await asyncio.sleep(0.01)
self.value += 1
return self.value
async def worker(resource, name):
for i in range(5):
val = await resource.increment()
print(f"Worker {name}: {val}")
async def main():
resource = SharedResource()
await asyncio.gather(
worker(resource, "A"),
worker(resource, "B")
)
if __name__ == "__main__":
asyncio.run(main())
性能优化建议
- 合理设置并发数:使用
asyncio.Semaphore限制并发连接数 - 避免阻塞调用:确保所有I/O操作都是异步的
- 监控事件循环:使用
asyncio的调试模式检测慢操作 - 资源清理:确保正确关闭连接和释放资源
结论
Python的异步编程通过asyncio库提供了强大而灵活的工具来处理并发任务。掌握这些概念和技巧可以显著提高应用程序的性能和响应能力。从简单的协程到复杂的任务管理,异步编程为现代Python开发提供了必要的工具集。
通过本文的示例和解释,您应该能够开始在自己的项目中应用异步编程技术。记住,异步编程最适合I/O密集型任务,对于CPU密集型任务,可能需要结合多进程使用。# Python异步编程完全指南:从入门到精通
1. 异步编程基础概念
1.1 什么是异步编程
异步编程是一种编程范式,它允许程序在等待某些操作(如I/O操作)完成时继续执行其他任务,而不是阻塞等待。这在处理网络请求、文件读写等耗时操作时特别有用。
# 同步编程示例 - 会阻塞程序执行
import time
def sync_task():
print("开始任务")
time.sleep(2) # 模拟耗时操作,程序会在这里阻塞
print("任务完成")
# 异步编程示例 - 不会阻塞
import asyncio
async def async_task():
print("开始任务")
await asyncio.sleep(2) # 模拟耗时操作,程序可以执行其他任务
print("任务完成")
1.2 为什么需要异步编程
- 提高程序性能:避免在I/O操作上浪费CPU时间
- 改善用户体验:在GUI应用中保持界面响应
- 处理高并发:用较少的资源处理大量并发连接
- 资源利用率:减少线程数量,降低上下文切换开销
2. Python异步编程核心语法
2.1 async/await关键字
Python 3.5+引入了async和await关键字,使异步编程更加直观:
import asyncio
# 定义协程函数
async def fetch_data():
print("开始获取数据...")
await asyncio.sleep(1) # 模拟I/O操作
print("数据获取完成!")
return {"data": [1, 2, 3]}
# 定义主协程
async def main():
# await等待协程执行完成
result = await fetch_data()
print(f"结果: {result}")
# 运行异步程序
if __name__ == "__main__":
asyncio.run(main())
2.2 核心概念详解
协程 (Coroutine)
使用async def定义的函数就是协程:
async def my_coroutine():
print("这是一个协程")
return 42
# 调用协程不会立即执行,而是返回一个协程对象
coro = my_coroutine()
print(type(coro)) # <class 'coroutine'>
# 需要在事件循环中运行
result = asyncio.run(coro)
事件循环 (Event Loop)
事件循环是异步编程的核心,负责调度和执行协程:
import asyncio
async def task1():
print("任务1开始")
await asyncio.sleep(1)
print("任务1结束")
async def task2():
print("任务2开始")
await asyncio.sleep(1)
print("任务2结束")
async def main():
# 并发执行多个任务
await asyncio.gather(task1(), task2())
# 获取当前事件循环并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Task对象
Task用于并发执行协程:
import asyncio
async def slow_operation(n):
print(f"操作 {n} 开始")
await asyncio.sleep(2)
print(f"操作 {n} 结束")
return n * 2
async def main():
# 创建任务
task1 = asyncio.create_task(slow_operation(1))
task2 = asyncio.create_task(slow_operation(2))
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"结果: {result1}, {result2}")
asyncio.run(main())
3. 实际应用示例
3.1 网络请求示例
使用aiohttp进行异步HTTP请求:
import aiohttp
import asyncio
import time
async def fetch_url(session, url):
"""获取单个URL的内容"""
try:
print(f"开始请求: {url}")
async with session.get(url, timeout=10) as response:
content = await response.text()
print(f"完成请求: {url} (长度: {len(content)})")
return {"url": url, "content": content, "status": response.status}
except Exception as e:
print(f"请求 {url} 失败: {e}")
return {"url": url, "error": str(e)}
async def fetch_multiple_urls(urls):
"""并发获取多个URL"""
async with aiohttp.ClientSession() as session:
# 创建任务列表
tasks = [fetch_url(session, url) for url in urls]
# 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
urls = [
'https://httpbin.org/json',
'https://httpbin.org/html',
'https://httpbin.org/xml',
'https://httpbin.org/robots.txt',
'https://httpbin.org/uuid'
]
print("开始并发请求...")
start_time = time.time()
results = await fetch_multiple_urls(urls)
end_time = time.time()
print(f"\n总共耗时: {end_time - start_time:.2f}秒")
# 处理结果
for result in results:
if isinstance(result, dict):
if 'error' in result:
print(f"❌ {result['url']}: {result['error']}")
else:
print(f"✅ {result['url']}: 状态 {result['status']}")
if __name__ == "__main__":
asyncio.run(main())
3.2 数据库操作示例
使用asyncpg进行异步PostgreSQL操作:
import asyncio
import asyncpg
from datetime import datetime
# 数据库连接配置
DB_CONFIG = {
'host': 'localhost',
'database': 'testdb',
'user': 'postgres',
'password': 'password'
}
async def create_connection_pool():
"""创建连接池"""
return await asyncpg.create_pool(
min_size=5,
max_size=20,
**DB_CONFIG
)
async def init_database(pool):
"""初始化数据库表"""
async with pool.acquire() as conn:
await conn.execute('''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
created_at TIMESTAMP
)
''')
print("数据库表创建成功")
async def insert_user(pool, name, email):
"""插入用户"""
async with pool.acquire() as conn:
result = await conn.execute(
'INSERT INTO users(name, email, created_at) VALUES($1, $2, $3)',
name, email, datetime.now()
)
return result
async def get_users(pool):
"""获取所有用户"""
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT * FROM users ORDER BY id')
return [dict(row) for row in rows]
async def batch_insert_users(pool, users_data):
"""批量插入用户"""
async with pool.acquire() as conn:
await conn.executemany(
'INSERT INTO users(name, email, created_at) VALUES($1, $2, $3)',
[(name, email, datetime.now()) for name, email in users_data]
)
async def main():
# 创建连接池
pool = await create_connection_pool()
try:
# 初始化数据库
await init_database(pool)
# 插入单个用户
await insert_user(pool, "Alice", "alice@example.com")
# 批量插入用户
users = [
("Bob", "bob@example.com"),
("Charlie", "charlie@example.com"),
("David", "david@example.com"),
("Eve", "eve@example.com")
]
await batch_insert_users(pool, users)
# 查询用户
all_users = await get_users(pool)
print(f"\n数据库中的用户 ({len(all_users)}):")
for user in all_users:
print(f" {user['id']}: {user['name']} ({user['email']})")
finally:
# 关闭连接池
await pool.close()
if __name__ == "__main__":
asyncio.run(main())
3.3 文件系统操作
使用aiofiles进行异步文件操作:
import asyncio
import aiofiles
import json
import os
async def read_file_async(filepath):
"""异步读取文件"""
try:
async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
content = await f.read()
return {"file": filepath, "content": content, "success": True}
except Exception as e:
return {"file": filepath, "error": str(e), "success": False}
async def write_file_async(filepath, content):
"""异步写入文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(filepath), exist_ok=True)
async with aiofiles.open(filepath, 'w', encoding='utf-8') as f:
await f.write(content)
return {"file": filepath, "success": True}
except Exception as e:
return {"file": filepath, "error": str(e), "success": False}
async def process_files_concurrently():
"""并发处理多个文件"""
# 准备数据
files_data = {
"data/file1.txt": "这是文件1的内容\n包含多行文本",
"data/file2.txt": "这是文件2的内容\n另一段文本",
"data/file3.txt": "这是文件3的内容\n更多内容",
"data/config.json": json.dumps({"version": "1.0", "debug": True}, indent=2)
}
# 并发写入文件
write_tasks = [
write_file_async(filepath, content)
for filepath, content in files_data.items()
]
write_results = await asyncio.gather(*write_tasks)
print("写入结果:")
for result in write_results:
status = "✅" if result["success"] else "❌"
print(f" {status} {result['file']}")
# 并发读取文件
read_tasks = [
read_file_async(filepath)
for filepath in files_data.keys()
]
read_results = await asyncio.gather(*read_tasks)
print("\n读取结果:")
for result in read_results:
if result["success"]:
print(f" ✅ {result['file']}: {len(result['content'])} 字符")
else:
print(f" ❌ {result['file']}: {result['error']}")
async def main():
await process_files_concurrently()
if __name__ == "__main__":
asyncio.run(main())
4. 高级技巧与最佳实践
4.1 任务管理与控制
import asyncio
class TaskManager:
"""任务管理器示例"""
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.tasks = []
async def limited_task(self, task_id):
"""受限制的并发任务"""
async with self.semaphore:
print(f"任务 {task_id} 开始执行")
await asyncio.sleep(1) # 模拟工作
print(f"任务 {task_id} 完成")
return task_id * 2
async def run_all(self, task_count):
"""运行所有任务"""
# 创建任务列表
self.tasks = [
asyncio.create_task(self.limited_task(i))
for i in range(task_count)
]
# 等待所有任务完成
results = await asyncio.gather(*self.tasks)
return results
def cancel_all(self):
"""取消所有任务"""
for task in self.tasks:
if not task.done():
task.cancel()
async def task_with_timeout():
"""带超时的任务"""
try:
# 设置超时时间
result = await asyncio.wait_for(
asyncio.sleep(5), # 模拟耗时5秒的操作
timeout=2 # 但最多等待2秒
)
return result
except asyncio.TimeoutError:
print("任务超时!")
return None
async def task_with_retry(max_retries=3):
"""带重试机制的任务"""
for attempt in range(max_retries):
try:
# 模拟可能失败的操作
if attempt < 2:
raise ConnectionError("模拟连接失败")
print(f"第 {attempt + 1} 次尝试成功")
return "成功"
except Exception as e:
print(f"第 {attempt + 1} 次尝试失败: {e}")
if attempt == max_retries - 1:
raise
# 等待后重试
await asyncio.sleep(1)
async def main():
# 1. 限制并发数
print("=== 限制并发数 ===")
manager = TaskManager(max_concurrent=3)
results = await manager.run_all(10)
print(f"结果: {results}\n")
# 2. 超时控制
print("=== 超时控制 ===")
await task_with_timeout()
print()
# 3. 重试机制
print("=== 重试机制 ===")
try:
result = await task_with_retry()
print(f"最终结果: {result}")
except Exception as e:
print(f"最终失败: {e}")
if __name__ == "__main__":
asyncio.run(main())
4.2 同步代码集成
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def blocking_io_operation(duration):
"""模拟阻塞的I/O操作"""
print(f"开始阻塞操作,持续 {duration} 秒")
time.sleep(duration)
return f"完成阻塞操作,持续 {duration} 秒"
def cpu_intensive_calculation(n):
"""模拟CPU密集型计算"""
print(f"开始CPU计算,计算 {n} 的平方")
result = sum(i * i for i in range(n))
return f"计算结果: {result}"
async def async_wrapper():
"""将同步代码包装为异步"""
loop = asyncio.get_event_loop()
# 使用线程池执行阻塞操作
with ThreadPoolExecutor() as pool:
# 在单独的线程中运行阻塞I/O
io_result = await loop.run_in_executor(
pool, blocking_io_operation, 2
)
print(io_result)
# 在单独的线程中运行CPU密集型任务
cpu_result = await loop.run_in_executor(
pool, cpu_intensive_calculation, 1000000
)
print(cpu_result)
async def mixed_operations():
"""混合异步和同步操作"""
async def async_task():
print("异步任务开始")
await asyncio.sleep(1)
print("异步任务结束")
return "async_result"
# 同时运行异步任务和同步包装任务
results = await asyncio.gather(
async_task(),
async_wrapper()
)
print(f"最终结果: {results}")
if __name__ == "__main__":
asyncio.run(mixed_operations())
4.3 锁与同步机制
import asyncio
class AsyncCounter:
"""线程安全的异步计数器"""
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
"""原子性递增"""
async with self.lock:
# 模拟处理时间
await asyncio.sleep(0.01)
self.value += 1
return self.value
class SharedResource:
"""共享资源管理"""
def __init__(self):
self.data = {}
self.read_lock = asyncio.Lock()
self.write_lock = asyncio.Lock()
self.readers = 0
async def read(self, key):
"""读取数据(允许多个读者)"""
async with self.read_lock:
self.readers += 1
if self.readers == 1:
# 第一个读者需要获取写锁
await self.write_lock.acquire()
try:
# 读取数据
await asyncio.sleep(0.01) # 模拟读取时间
return self.data.get(key)
finally:
async with self.read_lock:
self.readers -= 1
if self.readers == 0:
# 最后一个读者释放写锁
self.write_lock.release()
async def write(self, key, value):
"""写入数据(独占访问)"""
async with self.write_lock:
await asyncio.sleep(0.02) # 模拟写入时间
self.data[key] = value
async def worker(counter, resource, worker_id):
"""工作协程"""
# 使用计数器
count = await counter.increment()
print(f"Worker {worker_id}: 计数 {count}")
# 读写共享资源
await resource.write(f"key_{worker_id}", f"value_{worker_id}")
value = await resource.read(f"key_{worker_id}")
print(f"Worker {worker_id}: 读取 {value}")
async def main():
counter = AsyncCounter()
resource = SharedResource()
# 并发执行多个工作协程
workers = [worker(counter, resource, i) for i in range(10)]
await asyncio.gather(*workers)
print(f"\n最终计数: {counter.value}")
print(f"最终数据: {resource.data}")
if __name__ == "__main__":
asyncio.run(main())
5. 性能优化与调试
5.1 性能监控
import asyncio
import time
from functools import wraps
def async_timed(func):
"""异步函数执行时间装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
result = await func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} 执行时间: {end - start:.3f}秒")
return result
return wrapper
@async_timed
async def monitored_task(duration):
"""被监控的任务"""
await asyncio.sleep(duration)
return f"完成 {duration}秒任务"
async def performance_test():
"""性能测试"""
# 测试不同并发策略
print("=== 顺序执行 ===")
start = time.time()
await monitored_task(1)
await monitored_task(1)
print(f"总时间: {time.time() - start:.3f}秒\n")
print("=== 并发执行 ===")
start = time.time()
await asyncio.gather(
monitored_task(1),
monitored_task(1)
)
print(f"总时间: {time.time() - start:.3f}秒")
if __name__ == "__main__":
asyncio.run(performance_test())
5.2 调试技巧
import asyncio
import logging
# 启用调试模式
logging.basicConfig(level=logging.DEBUG)
async def debug_example():
"""调试示例"""
# 检测慢操作
loop = asyncio.get_event_loop()
loop.set_debug(True)
# 模拟慢操作
async def slow_task():
logging.info("开始慢任务")
await asyncio.sleep(2)
logging.info("慢任务完成")
# 设置超时检测
try:
await asyncio.wait_for(slow_task(), timeout=1)
except asyncio.TimeoutError:
logging.error("任务超时!")
# 检查未完成的任务
pending = asyncio.all_tasks()
logging.info(f"未完成的任务数量: {len(pending)}")
if __name__ == "__main__":
asyncio.run(debug_example())
6. 实际项目中的应用模式
6.1 生产者-消费者模式
import asyncio
import random
class AsyncQueue:
"""异步队列实现"""
def __init__(self, maxsize=0):
self.maxsize = maxsize
self.queue = []
self.getters = []
self.putters = []
async def put(self, item):
"""放入项目"""
if self.maxsize > 0 and len(self.queue) >= self.maxsize:
# 队列满,等待
waiter = asyncio.Future()
self.putters.append(waiter)
await waiter
self.queue.append(item)
# 唤醒等待的获取者
if self.getters:
self.getters.pop(0).set_result(True)
async def get(self):
"""获取项目"""
if not self.queue:
# 队列空,等待
waiter = asyncio.Future()
self.getters.append(waiter)
await waiter
item = self.queue.pop(0)
# 唤醒等待的放入者
if self.putters:
self.putters.pop(0).set_result(True)
return item
async def producer(queue, producer_id, count):
"""生产者"""
for i in range(count):
item = f"生产者{producer_id}-项目{i}"
await queue.put(item)
print(f"生产者{producer_id}: 放入 {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue, consumer_id):
"""消费者"""
while True:
item = await queue.get()
if item is None: # 结束信号
break
print(f"消费者{consumer_id}: 处理 {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
async def producer_consumer_demo():
"""生产者-消费者演示"""
queue = AsyncQueue(maxsize=5)
# 创建生产者和消费者
producers = [
producer(queue, i, 5)
for i in range(2)
]
consumers = [
consumer(queue, i)
for i in range(3)
]
# 运行所有任务
await asyncio.gather(*producers, *consumers)
# 发送结束信号
for _ in range(len(consumers)):
await queue.put(None)
if __name__ == "__main__":
asyncio.run(producer_consumer_demo())
6.2 批处理与流处理
import asyncio
from collections import defaultdict
class BatchProcessor:
"""批处理器"""
def __init__(self, batch_size=10, timeout=1.0):
self.batch_size = batch_size
self.timeout = timeout
self.buffer = []
self.timer = None
async def add(self, item):
"""添加项目到批处理"""
self.buffer.append(item)
# 启动计时器(如果未启动)
if self.timer is None:
self.timer = asyncio.create_task(self._flush_after_timeout())
# 如果达到批处理大小,立即处理
if len(self.buffer) >= self.batch_size:
await self._flush()
async def _flush_after_timeout(self):
"""超时后刷新批处理"""
try:
await asyncio.sleep(self.timeout)
await self._flush()
except asyncio.CancelledError:
pass
finally:
self.timer = None
async def _flush(self):
"""刷新批处理"""
if not self.buffer:
return
# 取消计时器
if self.timer:
self.timer.cancel()
self.timer = None
# 处理当前批处理
batch = self.buffer[:]
self.buffer = []
# 模拟批处理操作
print(f"处理批处理: {len(batch)} 个项目 - {batch[:3]}...")
await asyncio.sleep(0.1) # 模拟处理时间
return batch
async def batch_demo():
"""批处理演示"""
processor = BatchProcessor(batch_size=5, timeout=0.5)
# 模拟数据流
for i in range(20):
await processor.add(f"item_{i}")
await asyncio.sleep(0.1) # 模拟数据到达间隔
# 确保处理剩余数据
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(batch_demo())
7. 常见问题与解决方案
7.1 死锁预防
import asyncio
async def deadlock_example():
"""死锁示例"""
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task1():
async with lock1:
await asyncio.sleep(0.1)
async with lock2: # 可能死锁
print("任务1完成")
async def task2():
async with lock2:
await asyncio.sleep(0.1)
async with lock1: # 可能死锁
print("任务2完成")
# 使用asyncio.gather可能导致死锁
# await asyncio.gather(task1(), task2())
# 正确做法:使用锁的超时或固定顺序获取
async def safe_task1():
# 固定顺序获取锁
async with lock1:
await asyncio.sleep(0.1)
async with lock2:
print("安全任务1完成")
async def safe_task2():
# 固定顺序获取锁
async with lock1:
await asyncio.sleep(0.1)
async with lock2:
print("安全任务2完成")
await asyncio.gather(safe_task1(), safe_task2())
if __name__ == "__main__":
asyncio.run(deadlock_example())
7.2 资源清理
import asyncio
class AsyncResource:
"""需要清理的资源"""
def __init__(self, name):
self.name = name
self.closed = False
async def close(self):
"""清理资源"""
if not self.closed:
print(f"清理资源: {self.name}")
await asyncio.sleep(0.1) # 模拟清理时间
self.closed = True
async def __aenter__(self):
"""异步上下文管理器入口"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
await self.close()
return False
async def proper_cleanup():
"""正确的资源清理演示"""
# 方法1:使用async with
async with AsyncResource("资源A") as resource:
print(f"使用资源: {resource.name}")
# 在这里使用资源
await asyncio.sleep(0.5)
# 自动调用close()
# 方法2:try-finally
resource = None
try:
resource = AsyncResource("资源B")
print(f"使用资源: {resource.name}")
await asyncio.sleep(0.5)
finally:
if resource:
await resource.close()
# 方法3:使用asyncio.run确保清理
async def main():
async with AsyncResource("资源C") as resource:
print(f"使用资源: {resource.name}")
await asyncio.sleep(0.5)
await main()
if __name__ == "__main__":
asyncio.run(proper_cleanup())
8. 总结与最佳实践
8.1 关键要点
- 理解事件循环:它是异步编程的核心
- 正确使用await:只在协程和可等待对象上使用
- 避免阻塞操作:确保所有I/O都是异步的
- 合理控制并发:使用Semaphore限制并发数
- 正确处理异常:使用try-except和try-finally
- 资源清理:确保正确关闭连接和释放资源
8.2 性能优化建议
- 批量处理:减少上下文切换
- 连接池:复用数据库/HTTP连接
- 适当超时:防止无限等待
- 监控与调试:使用调试模式检测问题
- 选择合适的工具:CPU密集型考虑多进程
8.3 适用场景
- Web爬虫:并发请求多个网页
- API服务器:处理高并发请求
- 数据处理:并发读写文件/数据库
- 实时通信:WebSocket应用
- 微服务:并发调用多个服务
通过掌握这些概念和技巧,您可以有效地使用Python的异步编程来构建高性能的应用程序。记住,异步编程最适合I/O密集型任务,对于CPU密集型任务,可能需要结合多进程使用。
