如何在 for 循环中异步发送消息而不阻塞执行

8次阅读

如何在 for 循环中异步发送消息而不阻塞执行

本文介绍如何在保持发送顺序的前提下,将原本同步阻塞的 api 调用(如 send_to_space())改造为非阻塞异步执行,从而显著提升循环吞吐量,适用于 python 3.9+ 环境。

在实际开发中,我们常遇到这样一类场景:需按序向远程服务(如某协作空间 API)逐条提交数据,但每次调用都因网络延迟而耗时数十毫秒,且函数本身不可修改(同步阻塞、含 await 或 requests 等 I/O 操作)。若直接使用普通 for 循环,整个流程将线性阻塞——第 n+1 条消息必须等待第 n 条响应返回后才开始发送,导致整体耗时随列表长度线性增长。

此时,asyncio 是最恰当的选择——它不依赖线程(避免 GIL 限制与线程安全问题),也不需要修改 send_to_space() 的内部实现,仅通过协程调度即可实现“逻辑上顺序发送、物理上无等待重叠”。关键在于:我们不 await 整个任务完成,而是立即 await 一个已启动的 asyncio.create_task(),使其在事件循环中“即刻出发”,同时确保前序任务已调度完毕后再调度下一个。这既维持了发送顺序(无并发竞争),又消除了空等时间。

以下是推荐的重构方案:

import asyncio  async def send_items(items_list):     for item in items_list:         sub_item = item['sub_item']         # 创建并立即调度任务;await 此 task 表示“等待它被成功提交到事件循环”,         # 而非等待其全部完成(实际 await create_task() 仅保证调度,不等待执行结束)         await asyncio.create_task(send_to_space(sub_item))  # 注意:send_to_space 必须是协程函数(即定义为 async def) # 若原函数是同步的(如基于 requests),需先用 asyncio.to_thread 封装(见下方说明)

⚠️ 重要前提与注意事项

  • send_to_space() 必须是原生协程函数(async def),否则 await 会报错。若它本质是同步函数(例如使用 requests.get()),请改用 asyncio.to_thread() 包装:
    async def send_items(items_list):     for item in items_list:         sub_item = item['sub_item']         await asyncio.to_thread(send_to_space, sub_item)  # 安全调用同步函数
  • asyncio.create_task() 在此处的作用是“立即提交任务并返回 Task 对象”,而 await 它则确保该任务已进入事件循环调度队列,从而保障下一条消息不会“超前”发送——这是维持逻辑顺序的核心机制。
  • 不要误用 asyncio.gather() 或 asyncio.create_task() 后不 await:前者会并发执行(破坏顺序),后者不 await 则无法保证调度时机,可能导致未定义行为。
  • 主程序入口必须使用 asyncio.run() 启动事件循环,且不能在同步上下文中直接调用异步函数。

最终,在 python 3.9+ 中,这种模式兼具简洁性、可维护性与性能优势:既规避了线程管理复杂度,又比纯同步快数倍(尤其当 send_to_space() 平均耗时 20–50ms、列表含数百项时),是 I/O 密集型顺序调用的标准解法。

text=ZqhQzanResources