asyncio.Semaphore 如何与限流装饰器结合使用

11次阅读

限流装饰器不能直接套 asyncio.Semaphore,因为其 acquire() 是协程需 await,而同步装饰器无法等待;正确做法是用异步装饰器封装 async with semaphore: 逻辑,确保复用同一信号量实例并自动释放。

asyncio.Semaphore 如何与限流装饰器结合使用

限流装饰器为什么不能直接套 asyncio.Semaphore

因为 asyncio.Semaphoreacquire() 是协程函数,必须用 await 调用;而普通装饰器在定义时是同步执行的,无法 await 一个协程对象。直接写 @semaphore.acquire() 会报 RuntimeWarning: coroutine 'Semaphore.acquire' was never awaited,甚至导致死锁。

正确做法:用异步装饰器 + async with 包裹

核心是把信号量控制逻辑封装进一个真正的异步装饰器里,并确保每次调用都走 async with semaphore: 流程。示例如下:

import asyncio from functools import wraps 

def rate_limit(limit: int): semaphore = asyncio.Semaphore(limit) def decorator(func): @wraps(func) async def wrapper(*args, *kwargs): async with semaphore: return await func(args, **kwargs) return wrapper return decorator

@rate_limit(3) async def fetch_data(url: str): print(f"GET {url}") await asyncio.sleep(1) # 模拟请求 return f"done: {url}"

这个模式的关键点:

  • semaphore 在装饰器工厂函数中创建一次,复用同一个实例(不是每次调用都新建)
  • async with 确保自动获取/释放,即使 func 抛异常也不会漏掉 release
  • 装饰器返回的是 async def wrapper,能被 await 正确调度

常见踩坑场景与修复

实际用的时候容易掉进这几个坑:

  • 多个装饰器顺序错乱:比如同时用 @retry@rate_limit,要把 @rate_limit 放在最外层,否则重试会绕过限流
  • 信号量作用域错误:在 fastapi 路由里误把 semaphore = asyncio.Semaphore(3) 写在 @app.get 函数内部 —— 每次请求都新建一个,完全失效
  • 跨协程共享失败:在不同模块或类方法里各自初始化 asyncio.Semaphore(3),等于建了多个独立池子,总并发数变成 3×N
  • 忘记 await 装饰后函数:调用 fetch_data("https://...") 却没加 await,结果拿到一个 coroutine 对象而非结果

进阶:按用户/路径维度做差异化限流

如果需要对不同 API 路径、不同用户 ID 或不同目标域名分别限流,就不能只用一个全局 semaphore。推荐用字典缓存 + 键隔离:

from collections import defaultdict import asyncio 

_semaphores = defaultdict(lambda: asyncio.Semaphore(3))

def per_domain_rate_limit(domain: str): semaphore = _semaphores[domain] def decorator(func): @wraps(func) async def wrapper(*args, *kwargs): async with semaphore: return await func(args, **kwargs) return wrapper return decorator

@per_domain_rate_limit("httpbin.org") async def fetch_httpbin(): ...

注意:_semaphores 字典本身不需要加锁 —— asyncio.Semaphore线程/协程安全的,但字典读写在高并发下可能有竞态,生产环境建议用 asyncio.Lock 包一层或改用 weakref.WeakValueDictionary 防内存泄漏。

真正难的不是写对语法,而是想清楚「谁和谁共用一个信号量」—— 同一资源池里的所有协程,必须共享同一个 semaphore 实例,且生命周期要覆盖整个应用运行期。

text=ZqhQzanResources