如何用 asyncio.Semaphore 限制并发任务数量

9次阅读

asyncio.Semaphore通过acquire/release机制控制并发数,需在协程外统一创建并共享;推荐用async with自动管理;BoundedSemaphore可防止非法release导致计数错误。

如何用 asyncio.Semaphore 限制并发任务数量

asyncio.Semaphore 限制并发任务数量,核心是让每个协程在执行前先“申请许可”,拿到信号量才运行,完成后释放,从而控制同时运行的任务数。

创建并使用信号量控制并发数

初始化一个 Semaphore,传入最大允许的并发数量(比如 3),然后在协程中用 async with semaphore: 语句自动获取和释放。

  • 信号量初始值即为最大并发数,每次 acquire() 成功就减 1,release() 加 1
  • async with semaphore: 是推荐写法,确保异常时也能释放,避免死锁
  • 所有需要限流的协程都必须共用同一个 Semaphore 实例

典型用法示例

假设要并发请求 10 个 URL,但最多只允许 3 个 http 请求同时发出:

import asyncio import aiohttp 

async def fetch(session, url, semaphore): async with semaphore: # 这里会阻塞直到有空闲许可 async with session.get(url) as response: return await response.text()

async def main(): semaphore = asyncio.Semaphore(3) # 最多 3 个并发 urls = [f"https://www.php.cn/link/5f69e19efaba426d62faeab93c308f5c"] * 10

async with aiohttp.ClientSession() as session:     tasks = [fetch(session, url, semaphore) for url in urls]     results = await asyncio.gather(*tasks) return results

注意共享作用域和生命周期

信号量必须在所有目标协程可见的作用域内创建,且不能在循环内部重复新建:

  • ❌ 错误:在 for 循环里每次 new 一个 Semaphore(3) → 完全失去限流效果
  • ✅ 正确:在 main() 或更高层创建一次,作为参数传入或通过闭包共享
  • 如果协程分布在不同模块或类中,可通过依赖注入、全局变量(不推荐)或上下文传递

与 asyncio.BoundedSemaphore 的区别

普通 Semaphore 允许 release() 次数超过初始值,而 BoundedSemaphore 会检查是否超出上限,防止误调用导致计数错乱:

  • 日常限流用 Semaphore 足够,只要确保每次 acquire() 都配对 async with 或手动 release()
  • 逻辑复杂、可能多次释放时,用 BoundedSemaphore 更安全,它会在非法 release() 时抛出 ValueError

text=ZqhQzanResources