asyncio.timeout() 在 3.11+ 如何与 gather/wait 配合使用

10次阅读

asyncio.timeout() 是上下文管理器而非超时参数,须用 async with 包裹 gather() 或 wait();超时触发时自动取消未完成任务,但要求协程支持取消,否则无法强制终止。

asyncio.timeout() 在 3.11+ 如何与 gather/wait 配合使用

asyncio.timeout() 不能直接传给 gather() 或 wait()

它不是超时参数,而是上下文管理器或异步生成器。常见错误是写成 asyncio.gather(..., timeout=asyncio.timeout(5)) —— 这会报 TypeError: gather() got an unexpected keyword argument 'timeout'。正确做法是把 gather()wait() 整个包进 asyncio.timeout()作用域里。

用 async with asyncio.timeout() 包裹 gather()

这是最直观、推荐的组合方式。超时触发时,所有正在运行的子协程会被取消(前提是它们能响应取消):

async def fetch_user(user_id):     await asyncio.sleep(2)     return f"user_{user_id}" 

async def main(): try: async with asyncio.timeout(3): results = await asyncio.gather( fetch_user(1), fetch_user(2), fetch_user(3) ) print(results) except asyncio.TimeoutError: print("至少一个任务超时了,整个 gather 被中断")

  • asyncio.timeout() 在超时时刻抛出 asyncio.TimeoutError,并自动调用 cancel() 所有未完成的 Task
  • gather() 包裹的协程必须支持取消(比如不阻塞在纯同步 I/O 或没做 shield()
  • 如果某个子协程内部已处理了 CancelledError 且没重新抛出,gather() 可能不会立即失败,但超时上下文仍会退出

wait() 配合 timeout() 时要注意 done/pending 分离

asyncio.wait() 返回 (done, pending),而 asyncio.timeout() 不改变这个行为。你得手动检查 pending 是否非空:

tasks = [asyncio.create_task(fetch_user(i)) for i in range(3)] try:     async with asyncio.timeout(2.5):         done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)     # 此时 pending 可能还有未完成的任务     for t in pending:         t.cancel() except asyncio.TimeoutError:     # 所有剩余 task 已被 cancel,但需 await 它们以清理状态     await asyncio.gather(*pending, return_exceptions=True)
  • wait()return_when 参数决定何时返回,timeout() 只控制整体等待上限
  • pending 中的 task 不会自动 await,必须显式 cancel() + await gather(..., return_exceptions=True) 避免 “task was destroyed but it is pending!” 警告
  • 若用 return_when=asyncio.ALL_COMPLETEDpending 为空,但超时仍会提前跳出

嵌套 timeout() 或与 shield() 冲突要小心

如果你在子协程里也用了 asyncio.timeout(),外层超时会优先触发并取消内层;而 asyncio.shield() 会让被保护的协程忽略外层取消 —— 这可能导致 gather() 卡住或超时不生效:

  • 避免对单个子任务用 shield() 后再丢进带 timeout()gather()
  • 想给不同任务设不同超时?别嵌套 timeout(),改用 wait_for(task, timeout=...) 单独包装每个任务,再 gather() 这些 wait_for() 调用
  • wait_for()timeout() 不兼容混用:前者返回值,后者是上下文;混用容易漏掉取消传播

真正难处理的是那些不响应取消的协程(比如调用了阻塞的 time.sleep() 或没加 await 的 CPU 密集循环),这种情况下 timeout() 无法强制终止,只能等它自己结束。

text=ZqhQzanResources