Laravel队列如何实现任务的速率限制? (Redis与漏斗算法)

12次阅读

不能。laravel的throttle方法面向http请求,不适用于队列任务;需用redis原子操作(如redis::throttle()或自定义lua脚本)实现漏斗/令牌桶限流,并配合release()延迟重试。

Laravel队列如何实现任务的速率限制? (Redis与漏斗算法)

Redis + Laravel 的 throttle 方法能直接限速吗?

不能。Laravel 自带的 throttle 方法(如 RateLimiter::attempt())面向 HTTP 请求,不适用于队列任务调度。队列任务在后台运行,没有请求上下文,直接复用 Web 限流逻辑会失效或误判。

真正可行的方式是:在任务执行前,用 Redis 原子操作模拟漏斗(Leaky Bucket)或令牌桶(Token Bucket),由开发者手动控制「放行」与「延迟重试」。

  • 必须显式调用 Redis::eval()Redis::throttle()(Laravel 9.2+ 提供的封装
  • 限流粒度需自行定义:按任务类型、用户 ID、租户 ID 等作为 $key 前缀
  • 失败时不能抛异常中断队列,而应调用 $this->release($delay) 延迟重试

Redis::throttle() 实现每秒最多 5 次的任务执行

Laravel 9.2+ 内置了基于 Redis Lua 脚本的漏斗限流封装,底层使用固定窗口 + 计数器,行为接近漏斗但非严格连续漏出。它适合大多数队列节流场景,且无需手写 Lua。

public function handle() {     $key = 'job:send_email:'.$this->userId; 
$result = Redis::throttle($key)     ->allow(5)           // 允许每分钟最多 5 次(注意:单位是「每分钟」,不是每秒)     ->every(60)          // 时间窗口为 60 秒     ->then(function () {         // 限流通过,执行业务逻辑         sendEmail($this->email);     }, function () {         // 被限流,延迟 2 秒后重试         $this->release(2);     });

}

⚠️ 注意:every(60) 是窗口长度,allow(5) 是该窗口内最大许可数;它不是“每秒 5 次”,而是“60 秒内最多 5 次”。若真要实现每秒级控制,需改用 every(1)->allow(1),但高并发下易被击穿,建议搭配更长窗口 + 合理配额。

自定义漏斗算法:用 Redis::eval() 实现平滑速率控制

当需要严格模拟漏斗(如“每 200ms 放行 1 个任务”),必须手写 Lua 脚本。核心思路是:记录上一次通过时间,计算当前是否已“漏出”足够令牌。

以下脚本实现「固定间隔漏出」:每 $intervalMs 毫秒允许 1 次,自动累积最多 $capacity 个令牌:

Redis::eval("     local key = KEYS[1]     local interval_ms = tonumber(ARGV[1])     local capacity = tonumber(ARGV[2])     local now = tonumber(ARGV[3]) 
local last_time, tokens = unpack(redis.call('hmget', key, 'last_time', 'tokens')) last_time = tonumber(last_time) or 0 tokens = tonumber(tokens) or capacity  -- 计算应新增令牌数(按时间推移) local elapsed = now - last_time local new_tokens = math.min(capacity, tokens + elapsed / interval_ms)  if new_tokens >= 1 then     redis.call('hset', key, 'last_time', now)     redis.call('hset', key, 'tokens', new_tokens - 1)     return 1 else     redis.call('hset', key, 'last_time', last_time)     redis.call('hset', key, 'tokens', new_tokens)     return 0 end

", 1, 'leaky:send_sms:'.$this->phone, 200, 5, round(microtime(true) * 1000));

返回 1 表示可执行,0 表示需等待。你需据此决定是否 release() 并设置合理延迟(例如再等 200 - (now - last_time) ms)。

为什么不能只靠 delay()数据库字段控制速率?

单纯在任务分发时用 dispatch()->delay(30) 只能控制「首次延后」,无法应对持续高频入队;而依赖数据库时间戳做查询判断(如 where('last_run_at', 'subSeconds(2)))存在竞态条件——多个 worker 同时读到“可执行”,然后一起写入,导致超发。

根本原因在于:限速是分布式状态协调问题,必须依赖原子性存储(Redis)和原子操作(evalthrottle 封装)。任何非原子的“读-判-写”流程,在多 worker 场景下都不可靠。

漏斗逻辑本身不难,难的是把时间精度、令牌累积、错误重试、监控埋点这几件事串成一条不丢、不重、可观测的链路——实际落地时,80% 的坑出在延迟重试的指数退避没做,或者限流 key 没带上业务维度,导致全站共用一个桶。

text=ZqhQzanResources