如何实现一个支持 anext 的异步迭代器

10次阅读

直接 return async def 不行,因为 aiter 必须返回自身(同步返回),且对象需实现 async def anext 并显式 raise StopAsyncIteration。

如何实现一个支持 anext 的异步迭代器

为什么直接 return async def 不行

很多人写异步迭代器时,第一反应是用 async def __iter__ 直接返回一个协程对象,但这样会报错:TypeError: 'async_generator' Object is not an iterator。因为 python 要求迭代器必须实现同步的 __iter____next__,而 anext() 是用来驱动 **异步迭代器协议**(即实现了 __aiter____anext__ 的对象)的——它不关心你 __iter__ 干了什么,只认 __aiter__

必须实现 __aiter____anext__

支持 anext() 的对象得是「异步迭代器」,不是「异步可迭代对象」。区别在于:

  • __aiter__ 必须返回 自身(不能返回协程),且该对象要带 __anext__
  • __anext__ 必须是 async def,返回下一个值或抛出 StopAsyncIteration
  • 不能在 __anext__ 里用 return 终止,得显式 raise StopAsyncIteration

示例:

class AsyncCounter:     def __init__(self, stop):         self.stop = stop         self.i = 0 
def __aiter__(self):     return self  # 必须返回自身  async def __anext__(self):     if self.i >= self.stop:         raise StopAsyncIteration     await asyncio.sleep(0.1)  # 模拟异步操作     value = self.i     self.i += 1     return value

之后就能用:anext(AsyncCounter(3)),或 async for 驱动。

anext() 的 timeout 和默认值怎么处理

anext() 本身不支持 timeoutdefault 参数(不像 next())。如果想加默认值或超时,得自己包一层:

  • 加默认值:用 try/except StopAsyncIteration 捕获
  • 加超时:用 asyncio.wait_for(anext(it), timeout=...)
  • 注意:两次调用 anext() 会推进同一个迭代器状态,别重复用同一个 anext(...) 结果

例如安全取第一个值:

try:     first = await anext(AsyncCounter(5)) except StopAsyncIteration:     first = None

常见踩坑点

写完发现 anext()TypeError: object ... is not an async iterator?检查这几处:

  • __aiter__ 返回的是协程(比如写了 return self.__aiter__())→ 应该直接 return self
  • 类里漏了 __anext__,只写了 __aiter__ → 两者必须成对出现
  • 把异步生成器函数(async def + yield)当作了异步迭代器 → 它是 async_generator 类型,有 __aiter__ 但没 __anext__;它本身支持 anext(),但不能被当成“类实例”那样手动控制生命周期
  • __anext__ 里用了 return 而非 raise StopAsyncIteration → 这会导致返回 None,后续 anext() 永远不会停

真正需要精细控制(如重置、暂停、共享状态)时,还是得手写类;单纯流式产出数据,用 async def + yield 更轻量,但它的 __anext__ 是解释器自动生成的,不可覆盖。

text=ZqhQzanResources