Python 滑动窗口限流的 python 实现

1次阅读

滑动窗口限流不能只靠time.time()算时间差,因为需维护多个滑动时间桶而非单一起始时间;正确做法是用deque存(time_bucket, count)元组,每次请求先剔除过期桶再累加,并依部署模型选threading.lock或asyncio.lock,生产环境应优先用redis实现分布式原子限流。

Python 滑动窗口限流的 python 实现

滑动窗口限流为什么不能只靠 time.time() 算时间差

因为窗口是“滑动”的,不是固定切片;单纯用当前时间减去窗口起始时间,会漏掉跨多个小窗口的请求累积。比如 1 秒内限 10 次,但实际要支持每 100ms 滑动一次——你得存最近 10 个 100ms 的计数,而不是只记“这秒开始时间”。

  • 典型错误:用一个全局 last_reset 时间 + 单一计数器,导致窗口边界僵硬、突增流量被误放行
  • 正确思路:维护一个有序的时间戳队列(或环形缓冲区),每次请求进来时,先剔除超时的旧记录,再累加新请求
  • python 中推荐用 collections.deque(timestamp, count) 元组,maxlen 可设为窗口分片数,避免无限增长

deque 实现滑动窗口的核心逻辑怎么写

关键不在“加”,而在“删旧”——每次请求都得先清理过期桶,再决定是否允许通过。不清理就等于计数永远只增不减。

  • 窗口总长设为 window_size_ms = 1000,分片粒度 step_ms = 100,则最多存 10 个桶
  • 每个桶是 (int(time.time() * 1000) // step_ms, count),用整数时间戳做 key,避免浮点误差
  • 插入前遍历 deque 左端,弹出所有 timestamp 的项
  • 示例片段:
    bucket_id = int(time.time() * 1000) // 100<br>while dq and dq[0][0] < bucket_id - 10:<br>    dq.popleft()<br>if len(dq) == 0 or dq[-1][0] != bucket_id:<br>    dq.append([bucket_id, 0])<br>dq[-1][1] += 1<br>allowed = sum(cnt for _, cnt in dq) <= 10

threading.Lockasyncio.Lock 怎么选

取决于你的服务模型。Web 框架如 flask/fastapi 默认同步,用 threading.Lock 就够;若用 uvicorn --workers 1 --loop uvloop 配合 async 路由,则必须用 asyncio.Lock,否则 await 会卡死。

  • 常见错误:在 async 函数里用 threading.Lock.acquire() —— 这不是协程,会阻塞整个 Event loop
  • 性能影响:锁粒度越细越好,别把整个滑动窗口结构包在一个大锁里;可考虑对每个 bucket_id 做分段锁,但 Python GIL 下收益有限,通常单锁更稳
  • 如果用 Redis 后端,反而不用本地锁,直接靠 INCR + EXPIRE 组合实现原子滑动窗口(例如用 redis-cell 或自建 lua 脚本)

为什么生产环境慎用纯内存滑动窗口

多进程部署时,每个 worker 有独立内存,限流状态不共享——用户连续请求打到不同进程,就等于绕过限制。

立即学习Python免费学习笔记(深入)”;

  • FastAPI + Uvicorn 多 worker 模式下,deque 限流完全失效,除非你上 Redismemcached
  • 单进程 + 线程安全,但扩容能力差;单进程 + async 安全,但扛不住突发连接数
  • 真正落地时,90% 的场景该直接用 redis-py 配合 Lua 脚本:用 ZSET 存时间戳+请求 ID,ZREMRANGEBYSCORE 清旧,ZCARD 计数,天然分布式、原子、无锁

滑动窗口看着简单,难点从来不在算法,而在状态一致性。本地内存只是开发验证用,上线前记得确认部署模型和存储边界。

text=ZqhQzanResources