找回密码
 立即注册
首页 业界区 业界 Python 异步编程

Python 异步编程

褥师此 2025-9-25 21:07:55
Python 异步编程是一种基于非阻塞 IO 模型的并发编程范式,核心目标是在处理 IO 密集型任务(如网络请求、文件读写、数据库交互)时,通过高效的任务调度减少等待时间,最大化 CPU 利用率。
异步编程通过事件循环实现任务调度:当一个任务因 IO 操作需要等待时,事件循环会暂停该任务,切换到其他就绪任务;当 IO 操作完成(如响应到达),事件循环再恢复原任务的执行。
核心优势

  • 单线程内实现并发,避免多线程的上下文切换开销。
  • 针对 IO 密集型任务,性能提升显著(通常是同步方式的数倍到数十倍)。
1、核心组件

1.1 事件循环

事件循环是异步编程的 “心脏”,负责任务调度、IO 事件监听、状态管理
  1. # 伪代码
  2. 任务列表 = [ 任务1,任务2,任务3,...]
  3. while True:
  4.     可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
  5.    
  6.         for 就绪任务 in 已准备就绪的任务列表:
  7.                 执行已就绪的任务
  8.       
  9.         for 已完成的任务 in 已完成的任务列表:
  10.                 在任务列表中移除 已完成的任务
  11.       
  12.         如果 任务列表 中的任务都已完成,则终止循环
复制代码
关键细节

  • 事件循环在单线程中运行,所有任务都在这个线程内切换执行。
  • 仅当任务遇到 await(表示需要等待 IO)时才会切换,纯计算任务不会触发切换。
1.2 协程

协程是异步任务的基本单元,是一种用户态的上下文切换技术,其实就是通过一个线程实现代码块相互切换执行,本质是可暂停 / 恢复的函数,通过 async def 定义。与普通函数的区别在于:

  • 调用协程不会立即执行,而是返回一个协程对象
  • 必须通过事件循环调度(如 await、create_task)才能执行。
  1. # 协程的定义与状态
  2. import asyncio
  3. async def my_coroutine():
  4.     print("协程开始")
  5.     await asyncio.sleep(1)  # 暂停点:释放CPU,允许切换
  6.     print("协程结束")
  7.     return "结果"
  8. # 协程对象(未执行)
  9. coro = my_coroutine()
  10. print(type(coro))  # <class 'coroutine'>
  11. # 必须通过事件循环执行
  12. async def main():
  13.     result = await coro  # 调度执行,等待结果
  14.     print(result)  # 输出:结果
  15. asyncio.run(main())
复制代码
协程的生命周期

  • 创建:coro = my_coroutine() → 未执行状态。
  • 运行:await coro 或 create_task(coro) → 进入事件循环。
  • 暂停:执行到 await 语句 → 等待 IO 时挂起。
  • 恢复:IO 完成 → 从暂停点继续执行。
  • 完成:执行到函数末尾 → 返回结果或抛出异常。
1.3 任务

任务是协程的包装器,由事件循环直接调度,用于实现并发。任务会将协程注册到事件循环,并跟踪其状态(运行中 / 已完成 / 已取消)。
  1. async def task_func(name, delay):
  2.     print(f"任务 name={name}, delay={delay} === 111")
  3.     await asyncio.sleep(delay)
  4.     print(f"任务 name={name}, delay={delay} === 222")
  5.     return f"任务 {name} 完成"
  6. async def main():
  7.     # 创建任务(立即加入事件循环,开始调度)
  8.     task1 = asyncio.create_task(task_func("A", 1))
  9.     task2 = asyncio.create_task(task_func("B", 2))
  10.    
  11.     print("任务状态:", task1.done())  # False(未完成)
  12.    
  13.     # 等待任务完成并获取结果
  14.     result1 = await task1
  15.     result2 = await task2
  16.    
  17.     print("结果:", result1, result2)  # 任务 A 完成 任务 B 完成
  18.     print("任务状态:", task1.done())  # True(已完成)
  19. asyncio.run(main())
复制代码
任务的核心方法

  • task.done():判断任务是否完成。
  • task.result():获取任务返回值(任务未完成时调用会报错)。
  • task.cancel():取消任务(触发 CancelledError)。
  • task.add_done_callback(func):注册回调函数(任务完成后执行)。
更常用写法:
  1. async def task_func(name, delay):
  2.     print(f"任务 name={name}, delay={delay} === 111")
  3.     await asyncio.sleep(delay)
  4.     print(f"任务 name={name}, delay={delay} === 222")
  5.     return f"任务 {name} 完成"
  6. async def main():
  7.     # 创建任务(立即加入事件循环,开始调度)
  8.     task_list = [
  9.         asyncio.create_task(task_func("A", 1), name="task_A"),
  10.         asyncio.create_task(task_func("B", 2), name="task_B"),
  11.     ]
  12.     done, pending = await asyncio.wait(task_list, timeout=None)
  13.     # 等待任务完成并获取结果
  14.     print(done)
  15. asyncio.run(main())
复制代码
1.4 Future 对象

Future 是异步操作结果的容器,表示 “未来可能完成的操作”。任务(Task)是 Future 的子类,因此具备 Future 的所有特性,task对象内部await结果的处理是基于future的:

  • 存储异步操作的状态(PENDING/FINISHED/CANCELLED)。
  • 提供结果设置(set_result())和异常设置(set_exception())方法。
  • 支持通过 await 获取结果,或注册回调函数。
  1. async def main():
  2.     # 创建一个空的Future对象
  3.     future = asyncio.Future()
  4.    
  5.     # 定义一个设置Future结果的协程
  6.     async def set_future_result():
  7.         await asyncio.sleep(1)
  8.         future.set_result("Future 结果")  # 设置结果,标记为完成
  9.    
  10.     # 并发执行:设置结果的协程 + 等待结果的操作
  11.     asyncio.create_task(set_future_result())
  12.     result = await future  # 等待Future完成
  13.     print(result)  # 输出:Future 结果
  14. asyncio.run(main())
复制代码
Task 与 Future 的关系

  • Task 继承自 Future,是 “可执行的 Future”(绑定了协程)。
  • Future 更底层,通常用于手动管理异步操作结果(如包装回调式 API)。
2、基础语法和核心API

2.1 async/await 语法

async/await 是 Python 3.5+ 引入的异步语法糖,用于定义协程和暂停执行:

  • async def:定义协程函数(返回协程对象)。
  • await:暂停协程,等待另一个协程 / Future/Task 完成,只能在协程内部使用。
  1. async def nested():
  2.     return 42
  3. async def main():
  4.     # 直接调用协程不会执行,必须用await
  5.     result = await nested()  # 等待nested完成,获取结果
  6.     print(result)  # 42
  7. asyncio.run(main())
复制代码
注意

  • 普通函数中不能使用 await(会报 SyntaxError)。
  • await 后面必须是 “可等待对象”(协程、Task、Future)
2.2 事件循环的启动与管理

Python 3.7+ 推荐用 asyncio.run() 启动事件循环(自动创建、运行、关闭循环),低版本需手动管理:
  1. # Python 3.7+ 推荐方式
  2. async def main():
  3.     await asyncio.sleep(1)
  4.     print("完成")
  5. asyncio.run(main())  # 自动处理事件循环的生命周期
  6. # 低版本手动管理方式(3.6及以下)
  7. loop = asyncio.get_event_loop()  # 获取事件循环
  8. try:
  9.     loop.run_until_complete(main())  # 运行直到协程完成
  10. finally:
  11.     loop.close()  # 关闭循环
复制代码
2.3 并发任务管理

2.3.1 asyncio.gather()

批量并发与结果聚合
gather() 用于同时运行多个可等待对象,按输入顺序返回结果,适合需要统一收集结果的场景。
  1. async def task(i):
  2.     await asyncio.sleep(i)
  3.     return i
  4. async def main():
  5.     # 并发执行3个任务
  6.     results = await asyncio.gather(
  7.         task(1),
  8.         task(2),
  9.         task(0.5)
  10.     )
  11.     print(results)  # [1, 2, 0.5](按输入顺序,而非完成顺序)
  12. asyncio.run(main())
复制代码
高级参数

  • return_exceptions=True:将异常作为结果返回,不中断其他任务。
    1. async def faulty_task():
    2.     raise ValueError("出错了")
    3. async def main():
    4.     results = await asyncio.gather(
    5.         faulty_task(),
    6.         task(1),
    7.         return_exceptions=True  # 异常会被包装到结果中
    8.     )
    9.     print(results)  # [ValueError('出错了'), 1]
    复制代码
2.3.2 asyncio.wait()

灵活控制任务完成条件
wait() 比 gather() 更灵活,支持按 “第一个完成”“所有完成” 等条件返回,返回值是已完成和未完成的任务集合。
  1. async def main():
  2.     tasks = [task(1), task(2), task(0.5)]
  3.     # 等待所有任务完成(默认)
  4.     done, pending = await asyncio.wait(tasks)
  5.     print("已完成任务数:", len(done))  # 3
  6.     print("未完成任务数:", len(pending))  # 0
  7.     # 等待第一个任务完成
  8.     done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
  9.     print("第一个完成的任务结果:", [t.result() for t in done])  # [0.5]
复制代码
return_when 可选值:

  • FIRST_COMPLETED:第一个任务完成时返回。
  • FIRST_EXCEPTION:第一个任务抛出异常时返回(无异常则等所有完成)。
  • ALL_COMPLETED:所有任务完成时返回(默认)。
3、同步代码的异步化:兼容旧库

实际开发中常需在异步程序中调用同步阻塞库(如 requests、pymysql),直接调用会阻塞事件循环,需通过线程池异步执行。
3.1 核心方法:loop.run_in_executor()

该方法将同步函数提交到线程池执行,返回 Future 对象,可通过 await 获取结果。
  1. import asyncio
  2. import requests  # 同步阻塞库
  3. # 同步函数(阻塞)
  4. def sync_get(url):
  5.     return requests.get(url).status_code
  6. async def async_get(url):
  7.     # 获取事件循环
  8.     loop = asyncio.get_event_loop()
  9.     # 提交到线程池执行(None 表示使用默认线程池)
  10.     future = loop.run_in_executor(
  11.         None,  # 线程池执行器(可选自定义)
  12.         sync_get,  # 同步函数
  13.         url  # 函数参数
  14.     )
  15.     return await future  # 等待线程池结果
  16. async def main():
  17.     urls = ["https://www.baidu.com", "https://www.github.com"]
  18.     # 并发执行同步函数的异步包装
  19.     results = await asyncio.gather(*[async_get(url) for url in urls])
  20.     print("结果:", results)  # [200, 200]
  21. asyncio.run(main())
复制代码
3.2 自定义线程池

默认线程池大小有限(通常为 CPU 核心数 * 5),高并发场景可自定义线程池:
  1. from concurrent.futures import ThreadPoolExecutor
  2. async def main():
  3.     # 自定义线程池(最大10个线程)
  4.     executor = ThreadPoolExecutor(max_workers=10)
  5.     loop = asyncio.get_event_loop()
  6.    
  7.     # 使用自定义线程池
  8.     future = loop.run_in_executor(executor, sync_get, "https://www.baidu.com")
  9.     print(await future)  # 200
复制代码
4、异步 IO 实战

4.1  异步网络请求(aiohttp)

aiohttp 是异步 HTTP 客户端 / 服务器库,支持异步请求、连接池、超时控制等,是替代同步 requests 的最佳选择。
并发爬取网页(带超时与重试)
  1. import asyncio
  2. import aiohttp
  3. from aiohttp import ClientTimeout
  4. async def fetch(session, url, retry=3):
  5.     """带重试机制的异步请求"""
  6.     timeout = ClientTimeout(total=10)  # 超时控制(10秒)
  7.     try:
  8.         async with session.get(url, timeout=timeout) as response:
  9.             return {
  10.                 "url": url,
  11.                 "status": response.status,
  12.                 "length": len(await response.text())
  13.             }
  14.     except Exception as e:
  15.         if retry > 0:
  16.             print(f"请求 {url} 失败,重试 {retry-1} 次: {e}")
  17.             await asyncio.sleep(1)  # 重试前等待1秒
  18.             return await fetch(session, url, retry-1)
  19.         return {"url": url, "error": str(e)}
  20. async def main():
  21.     urls = [
  22.         "https://www.baidu.com",
  23.         "https://www.github.com",
  24.         "https://www.python.org",
  25.         "https://invalid.url"
  26.     ]
  27.    
  28.     # 创建会话(复用连接,提高效率)
  29.     async with aiohttp.ClientSession() as session:
  30.         # 生成任务列表
  31.         tasks = [fetch(session, url) for url in urls]
  32.         # 并发执行
  33.         results = await asyncio.gather(*tasks)
  34.    
  35.     # 输出结果
  36.     for res in results:
  37.         if "error" in res:
  38.             print(f"{res['url']}: {res['error']}")
  39.         else:
  40.             print(f"{res['url']} | 状态: {res['status']} | 长度: {res['length']}")
  41. asyncio.run(main())
复制代码
4.2 异步文件操作(aiofiles)

传统 open() 是同步阻塞的,aiofiles 提供异步文件读写,支持 async with 和 await 语法。
异步读写多文件
  1. import asyncio
  2. import aiofiles
  3. async def write_file(filename, content):
  4.     """异步写入文件"""
  5.     async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
  6.         await f.write(content)  # 异步写入
  7.     print(f"已写入: {filename}")
  8. async def read_file(filename):
  9.     """异步读取文件"""
  10.     async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
  11.         content = await f.read()  # 异步读取
  12.     return filename, content
  13. async def main():
  14.     # 并发写入3个文件
  15.     await asyncio.gather(
  16.         write_file("file1.txt", "异步文件1"),
  17.         write_file("file2.txt", "异步文件2"),
  18.         write_file("file3.txt", "异步文件3")
  19.     )
  20.    
  21.     # 并发读取文件
  22.     files = ["file1.txt", "file2.txt", "file3.txt"]
  23.     results = await asyncio.gather(*[read_file(f) for f in files])
  24.    
  25.     # 打印内容
  26.     for name, content in results:
  27.         print(f"{name} 内容: {content}")
  28. asyncio.run(main())
复制代码
4.3 异步数据库操作(aiomysql)

aiomysql 是 MySQL 的异步驱动,支持异步连接、查询、事务,避免同步 pymysql 的阻塞问题。
异步查询 MySQL
  1. import asyncio
  2. import aiomysql
  3. async def query_db():
  4.     # 建立异步连接
  5.     connection = await aiomysql.connect(
  6.         host='localhost',
  7.         port=3306,
  8.         user='root',
  9.         password='password',
  10.         db='test',
  11.         autocommit=True
  12.     )
  13.    
  14.     try:
  15.         # 创建游标
  16.         async with connection.cursor(aiomysql.DictCursor) as cursor:
  17.             # 异步执行查询
  18.             await cursor.execute("SELECT * FROM users LIMIT 3")
  19.             # 异步获取结果
  20.             results = await cursor.fetchall()
  21.             print("查询结果:", results)
  22.     finally:
  23.         # 关闭连接
  24.         connection.close()
  25. asyncio.run(query_db())
复制代码
5、总结

Python 异步编程通过事件循环驱动的任务切换,实现了 IO 密集型任务的高效并发。核心组件包括协程(任务单元)、事件循环(调度中心)、任务(并发单元)和 Future(结果容器)。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册