asyncio 如何在 gather 中只取消超时任务但保留其他结果

8次阅读

应使用 asyncio.wait + done/pending 分离处理:启动 task 后调用 asyncio.wait(tasks, timeout=5.0),从 done 提取结果,对 pending 显式 cancel;封装为 timeout_gather 函数可复用,注意协程内捕获 CancelledError 做清理。

asyncio 如何在 gather 中只取消超时任务但保留其他结果

asyncio.gather 超时后如何避免取消全部任务

直接用 asyncio.wait_for 包裹整个 asyncio.gather,会导致超时后所有子任务一并被 cancel——这通常不是你想要的。真正需要的是:只中断超时的那几个,其余已完成或正在运行的任务结果照常返回。

用 asyncio.wait + done/pending 分离处理

asyncio.wait 不会主动 cancel 未完成任务,它只帮你区分哪些已结束、哪些还在跑。结合 return_when=asyncio.FIRST_COMPLETED 或手动控制超时,就能实现“只放弃超时项,保留已有结果”。

  • 启动所有协程为 task:tasks = [asyncio.create_task(coro) for coro in coros]
  • asyncio.wait(tasks, timeout=5.0) 等待指定时间
  • done 集合中用 task.result() 提取成功结果;对 pending 中的 task 显式调用 task.cancel()(仅清理,不影响已 done 的)
  • 注意:pending 里的 task 若后续自行完成,其结果不会自动加入返回值——你得决定是否 await 它们(通常不等,否则就失去超时意义)

封装成可复用的 timeout_gather 函数

下面这个轻量封装能替代原生 gather,行为更可控:

async def timeout_gather(*coros, timeout=None):     if timeout is None:         return await asyncio.gather(*coros)     tasks = [asyncio.create_task(c) for c in coros]     done, pending = await asyncio.wait(tasks, timeout=timeout)     results = []     for t in done:         try:             results.append(t.result())         except Exception as e:             results.append(e)  # 保留异常,而非抛出     for t in pending:         t.cancel()     return results

调用方式和 gather 一致:results = await timeout_gather(coro_a(), coro_b(), timeout=3.0)。每个位置的结果对应原始顺序,失败/超时项以异常对象None 占位(可根据需要调整)。

容易忽略的细节:task.cancel() 后的异常捕获与资源清理

被 cancel 的 task 如果在 await 某个 IO 操作(如 asyncio.sleephttp 请求),会抛出 CancelledError。若没在协程内捕获,可能污染日志或导致未关闭连接。

  • 推荐在每个被 gather 的协程里加 try/except CancelledError 块,做必要清理(如关闭 client session
  • asyncio.shield() 不能用于防 cancel——它只防外层取消,对 wait 中的 pending 无作用
  • 如果某个协程本身含长时间 CPU 密集操作,asyncio 无法中断它,timeout 只能作用于 await 点,这点和线程不同

超时逻辑本质是“放弃等待”,不是“强制终止执行”。真要中止不可中断的计算,得靠协程自己轮询 asyncio.current_task().cancelled() 并主动退出。

text=ZqhQzanResources