Python 如何限制 asyncio 任务并发数量(类似 semaphore)

11次阅读

asyncio.Semaphore可限制并发协程数,推荐用async with自动管理acquire/release;避免手动调用导致异常泄漏;不适用于CPU密集型任务,应配合线程/进程池。

Python 如何限制 asyncio 任务并发数量(类似 semaphore)

asyncio.Semaphore 就可以轻松限制并发任务数,它本质就是一个异步信号量,控制同时运行的协程数量。

创建并使用 Semaphore 控制并发

初始化时传入最大并发数,每个任务在执行前 acquire(),结束后 release()。推荐用 async with 自动管理:

  • ✅ 推荐写法(自动释放):
import asyncio 

sem = asyncio.Semaphore(3) # 最多 3 个任务同时运行

async def fetch_url(url): async with sem: # 自动 acquire + release print(f"开始请求 {url}") await asyncio.sleep(1) # 模拟网络耗时 print(f"完成请求 {url}")

启动 10 个任务

tasks = [fetch_url(f"https://www.php.cn/link/a09322db0d8eef5eb501a32c65bdecad}") for i in range(10)] await asyncio.gather(*tasks)

避免常见陷阱

手动调用 acquire()/release() 容易出错,尤其遇到异常时可能漏掉 release()

立即学习Python免费学习笔记(深入)”;

  • ❌ 不安全(异常时未释放):
await sem.acquire() try:     await do_something() finally:     sem.release()  # 忘写或写错位置就卡死
  • ✅ 始终用 async with 更可靠

按需动态调整并发上限

Semaphore 的计数器是内部的,不能直接修改最大值,但你可以重新创建一个新实例并替换旧的(注意正在运行的任务仍受原 semaphore 约束):

  • 需要动态限流时,建议封装成类,配合条件变量或配置重载逻辑
  • 更简单的方式:启动前根据环境变量或配置设好初始值,如 asyncio.Semaphore(int(os.getenv("MAX_CONCURRENCY", "5")))

与线程/进程池的对比理解

asyncio.Semaphore 不阻塞事件循环,只暂停协程调度;它不管理线程或子进程,和 concurrent.futures.ThreadPoolExecutor 是不同层面的工具

  • 适合 I/O 密集型协程限流(如 http 请求、数据库查询)
  • 不适合 CPU 密集型任务——应改用 loop.run_in_executor 配合线程/进程池

text=ZqhQzanResources