引言:异步编程的基本概念
异步编程是现代软件开发中的重要概念,它允许程序在等待某些操作完成时继续执行其他任务,从而提高整体效率。在Python中,asyncio库是实现异步编程的核心工具。
为什么需要异步编程?
想象你在餐厅点餐:同步方式是你点完餐后站在柜台前等待食物准备好;而异步方式是你点完餐后可以回到座位上玩手机,等食物准备好服务员会通知你。这就是同步与异步的区别。
在编程中,传统的同步代码会阻塞程序执行,直到当前操作完成。例如:
import time
def download_file(filename):
print(f"开始下载 {filename}")
time.sleep(2) # 模拟下载耗时
print(f"{filename} 下载完成")
return filename
# 同步执行
start = time.time()
download_file("file1.txt")
download_file("file2.txt")
download_file("file3.txt")
print(f"总耗时: {time.time() - start:.2f}秒")
这段代码会顺序执行,总耗时约6秒。而使用异步编程,我们可以同时下载多个文件:
import asyncio
import time
async def download_file_async(filename):
print(f"开始下载 {filename}")
await asyncio.sleep(2) # 模拟异步下载
print(f"{filename} 下载完成")
return filename
async def main():
tasks = [
download_file_async("file1.txt"),
download_file_async("file2.txt"),
download_file_async("file3.txt")
]
await asyncio.gather(*tasks)
start = time.time()
asyncio.run(main())
print(f"总耗时: {time.time() - start:.2f}秒")
这段异步代码总耗时约2秒,因为三个下载任务是并发执行的。
asyncio核心概念
1. 协程(Coroutine)
协程是asyncio的基础,它是可以暂停和恢复的函数。使用async def定义协程:
async def my_coroutine():
print("开始执行协程")
await asyncio.sleep(1)
print("协程执行完毕")
return "结果"
2. 事件循环(Event Loop)
事件循环是asyncio的核心,它负责调度和执行协程。可以把它想象成一个任务调度器:
import asyncio
async def task1():
print("任务1开始")
await asyncio.sleep(1)
print("任务1完成")
async def task2():
print("任务2开始")
await asyncio.sleep(1)
print("任务2完成")
async def main():
# 创建任务并加入事件循环
task1_obj = asyncio.create_task(task1())
task2_obj = asyncio.create_task(task2())
# 等待所有任务完成
await task1_obj
await task2_obj
# 运行主协程
asyncio.run(main())
3. Future对象
Future代表一个尚未完成的结果。当你调用一个异步函数时,它会返回一个coroutine对象,而当你在coroutine中调用另一个异步函数时,实际上是在处理Future:
import asyncio
async def operation1():
await asyncio.sleep(1)
return "操作1结果"
async def operation2():
await asyncio.sleep(2)
return "操作2结果"
async def main():
# 创建Future对象
future1 = asyncio.ensure_future(operation1())
future2 = asyncio.ensure_future(operation2())
# 等待所有future完成
results = await asyncio.gather(future1, future2)
print("所有操作完成:", results)
asyncio.run(main())
asyncio常用函数和方法
1. asyncio.run()
运行一个协程,这是程序的主入口点:
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(main())
2. asyncio.create_task()
将协程包装成Task并加入事件循环:
async def background_task():
while True:
print("后台任务运行中...")
await asyncio.sleep(5)
async def main():
# 创建后台任务
task = asyncio.create_task(background_task())
# 主程序继续执行
print("主程序继续运行")
await asyncio.sleep(10)
# 取消任务
task.cancel()
asyncio.run(main())
3. asyncio.gather()
并发运行多个协程:
async def fetch_data(url):
print(f"开始获取 {url}")
await asyncio.sleep(1) # 模拟网络请求
print(f"完成获取 {url}")
return f"来自 {url} 的数据"
async def main():
urls = ["http://example.com/1", "http://example.com/2", "http://example.com/3"]
results = await asyncio.gather(*[fetch_data(url) for url in urls])
print("所有结果:", results)
asyncio.run(main())
4. asyncio.wait_for()
设置协程执行的超时时间:
async def long_running_task():
await asyncio.sleep(5)
return "任务完成"
async def main():
try:
# 设置3秒超时
result = await asyncio.wait_for(long_running_task(), timeout=3)
print(result)
except asyncio.TimeoutError:
print("任务超时!")
asyncio.run(main())
5. asyncio.wait()
等待多个任务完成,可以设置返回条件:
async def task_with_result(n):
await asyncio.sleep(n)
return f"任务{n}完成"
async def main():
tasks = [task_with_result(i) for i in range(1, 4)]
# 等待所有任务完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
print("已完成的任务:", len(done))
print("未完成的任务:", len(pending))
for task in done:
print(task.result())
asyncio.run(main())
asyncio中的锁和同步
虽然异步代码是并发的,但有时我们仍需要同步机制:
1. asyncio.Lock()
import asyncio
class SharedResource:
def __init__(self):
self.lock = asyncio.Lock()
self.value = 0
async def update(self, increment):
async with self.lock: # 获取锁
print(f"当前值: {self.value}, 准备增加 {increment}")
# 模拟耗时操作
await asyncio.sleep(0.1)
self.value += increment
print(f"更新后值: {self.value}")
async def main():
resource = SharedResource()
# 创建多个并发任务
tasks = [resource.update(1) for _ in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
2. asyncio.Semaphore()
限制同时访问资源的协程数量:
async def limited_resource(semaphore, name):
async with semaphore:
print(f"{name} 获取了资源")
await asyncio.sleep(1)
print(f"{name} 释放了资源")
async def main():
# 最多允许3个协程同时访问
semaphore = asyncio.Semaphore(3)
tasks = [limited_resource(semaphore, f"任务{i}") for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
实际应用案例:异步Web爬虫
下面是一个完整的异步Web爬虫示例,展示如何在实际项目中应用asyncio:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
class AsyncCrawler:
def __init__(self, max_concurrent=5):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def fetch(self, url):
async with self.semaphore:
try:
async with self.session.get(url, timeout=10) as response:
if response.status == 200:
html = await response.text()
return html
else:
print(f"错误状态码: {response.status} - {url}")
return None
except Exception as e:
print(f"获取 {url} 失败: {e}")
return None
async def parse_links(self, html, base_url):
if not html:
return []
soup = BeautifulSoup(html, 'html.parser')
links = []
for link in soup.find_all('a', href=True):
href = link['href']
if href.startswith('http'):
links.append(href)
elif href.startswith('/'):
links.append(base_url + href)
return links
async def crawl(self, start_url, max_pages=10):
self.session = aiohttp.ClientSession()
visited = set()
to_visit = [start_url]
results = []
while to_visit and len(visited) < max_pages:
current_tasks = []
# 每次取出多个URL并发处理
for _ in range(min(len(to_visit), self.max_concurrent)):
if to_visit:
url = to_visit.pop(0)
if url not in visited:
visited.add(url)
current_tasks.append(self.process_page(url, to_visit, results))
if current_tasks:
await asyncio.gather(*current_tasks)
await self.session.close()
return results
async def process_page(self, url, to_visit, results):
print(f"正在爬取: {url}")
html = await self.fetch(url)
if html:
# 解析页面内容(这里简化为提取标题)
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string if soup.title else "无标题"
results.append({"url": url, "title": title})
# 提取新链接
links = await self.parse_links(html, url)
for link in links:
if link not in to_visit and link not in results:
to_visit.append(link)
async def main():
crawler = AsyncCrawler(max_concurrent=3)
start_url = "https://example.com"
start_time = time.time()
results = await crawler.crawl(start_url, max_pages=5)
end_time = time.time()
print(f"\n爬取完成! 耗时: {end_time - start_time:.2f}秒")
print(f"共爬取 {len(results)} 个页面:")
for result in results:
print(f" - {result['title']} ({result['url']})")
if __name__ == "__main__":
asyncio.run(main())
asyncio最佳实践
避免阻塞操作:在异步代码中不要使用
time.sleep(),而应该使用asyncio.sleep()。同样,避免使用同步的文件I/O或网络请求。正确处理异常:
async def safe_operation():
try:
result = await some_async_operation()
return result
except Exception as e:
print(f"操作失败: {e}")
return None
- 合理使用任务组:
async def main():
# 好的做法:使用TaskGroup(Python 3.11+)
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coroutine())
task2 = tg.create_task(another_coroutine())
# 或者使用gather
results = await asyncio.gather(
some_coroutine(),
another_coroutine(),
return_exceptions=True # 收集异常而不是抛出
)
- 资源清理:
async def main():
# 使用async with确保资源释放
async with aiohttp.ClientSession() as session:
async with session.get('http://example.com') as resp:
data = await resp.text()
- 性能监控:
import time
async def monitored_operation():
start = time.perf_counter()
result = await some_operation()
elapsed = time.perf_counter() - start
print(f"操作耗时: {elapsed:.3f}秒")
return result
总结
Python的asyncio库为编写高效并发程序提供了强大的工具。通过理解协程、事件循环和各种同步原语,你可以构建出高性能的网络应用、爬虫、数据处理系统等。记住以下几点:
- 使用
async/await语法定义和调用异步操作 - 事件循环是asyncio的核心,负责任务调度
- 合理使用
asyncio.gather()、create_task()等工具管理并发 - 注意资源管理和异常处理
- 避免在异步代码中使用阻塞操作
随着Python异步生态的不断发展,越来越多的库开始支持asyncio,掌握异步编程将成为Python开发者的重要技能。
