如何实现一个支持 aiter 的异步可迭代类

11次阅读

直接实现__aiter__不够,因为python要求异步迭代器必须同时实现__aiter__(普通方法,返回自身或新对象)和__anext__(async方法,返回值或抛出StopAsyncIteration),仅__aiter__返回异步生成器或缺失__anext__会导致TypeError。

如何实现一个支持 aiter 的异步可迭代类

为什么直接实现 __aiter__ 不够

很多开发者以为只要在类里加个 __aiter__ 方法返回一个异步生成器或自定义对象就完事了,但 Python 会检查该方法是否返回了真正符合 AsyncIterator 协议的对象——即必须同时实现 __aiter____anext__。如果只实现 __aiter__ 并直接 return self,而没提供 __anext__,运行时会报 TypeError: 'X' Object is not an async iterator

常见错误写法:

class BadAsyncIter:     def __init__(self, items):         self.items = items     async def __aiter__(self):         for item in self.items:             yield item  # ❌ 这是 async generator,但 __aiter__ 不能直接 yield

上面代码看似合理,实则语法错误:async def __aiter__ 中不允许 yield(除非用 yield + await 配合 async for 内部机制,但那不是标准用法)。

正确实现:分离 __aiter____anext__

标准做法是让 __aiter__ 返回一个独立对象(通常是自身),该对象实现了 __anext__ 并维护迭代状态。关键点:

  • __aiter__ 必须是普通方法(非 async),返回 self 或另一个支持 __anext__ 的对象
  • __anext__ 必须是 async 方法,返回下一个值;迭代结束时抛出 StopAsyncIteration
  • 状态(如当前索引)必须保存在实例属性中,因为每次 __anext__ 调用都可能跨 await 边界

示例(内存列表模拟异步数据源):

class AsyncList:     def __init__(self, items):         self.items = items         self._index = 0 
def __aiter__(self):     return self  # ✅ 返回自身,要求自身有 __anext__  async def __anext__(self):     if self._index >= len(self.items):         raise StopAsyncIteration     item = self.items[self._index]     self._index += 1     return item

使用方式:

async for x in AsyncList([1, 2, 3]):     print(x)

如何支持带延迟或 I/O 的真实异步场景

上面例子只是同步逻辑包装成异步接口。若要模拟真实异步行为(如从网络/数据库分页拉取),__anext__ 内应包含 await 调用:

  • 每次 __annext__ 可以 await asyncio.sleep(0.1) 模拟延迟
  • 也可以 await aiohttp.Clientsession.get(...) 获取下一页数据
  • 注意:不能在 __aiter__ 中做耗时 await,它必须快速返回迭代器对象
  • 若需预加载或缓存,建议在 __aiter__ 中初始化缓冲区(如 self._buffer = []),再由 __anext__ 动态填充

示例(带延迟的计数器):

import asyncio 

class DelayedCounter: def init(self, start=0, stop=3): self.start = start self.stop = stop self._current = start

def __aiter__(self):     return self  async def __anext__(self):     if self._current >= self.stop:         raise StopAsyncIteration     await asyncio.sleep(0.01)  # ✅ 真实异步等待     value = self._current     self._current += 1     return value

兼容性与调试陷阱

Python 3.7+ 支持 async for,但部分旧版类型检查器(如 mypy ≤ 0.930)可能不识别自定义 __aiter__,需加类型提示:

  • 显式继承 typing.AsyncIterator[T](仅用于类型提示,不参与运行)
  • mypy 要求 __anext__ 注解返回 Coroutine[None, None, T]
  • 若用 asyncio.Queueaiostream.stream 等第三方库替代手写,要注意它们内部已封装好协议,但失去对迭代逻辑的细粒度控制
  • 测试时别忘了:async for 无法在普通函数中直接运行,必须包在 async def 里并用 asyncio.run()

最容易被忽略的是状态重用问题:同一个 AsyncList 实例被多次 async for 时,_index 不会自动重置。如需可重入,应在 __aiter__ 中返回新对象,而不是 return self

text=ZqhQzanResources