Golang worker pool并发模型实现思路

12次阅读

worker pool 的核心价值是可控并发,通过固定数量 worker 从共享通道取任务实现限流与复用;需用 sync.WaitGroup 和 done 通道实现优雅退出,避免任务丢失。

Golang worker pool并发模型实现思路

为什么不用 go f() 直接起 goroutine

直接用 go f() 启动大量任务,容易导致 goroutine 泛滥,内存暴涨甚至 OOM。尤其当任务来自网络请求、文件读取或数据库查询时,数量不可控。worker pool 的核心价值不是“并发”,而是“可控并发”——把任务排队、限流、复用 goroutine。

  • 典型错误:每来一个 http 请求就 go handle(req),QPS 上千时可能创建上万个 goroutine
  • 正确思路:固定 N 个长期运行的 worker,从共享 chan *Task 拿任务,处理完继续取下一个
  • 注意:worker 数量不等于 CPU 核心数;IO 密集型任务(如 HTTP 调用)可设为 10–100,CPU 密集型建议 ≤ runtime.NumCPU()

如何设计任务通道与退出机制

通道类型和关闭时机决定 worker 是否能干净退出。别用 chan Task(值拷贝开销大),优先用 chan *Taskchan func();退出不能靠 panic 或 os.Exit,必须支持 graceful shutdown。

  • 任务通道用 chan Interface{} 灵活但 lose type safety,推荐 chan Job(定义具体接口结构体
  • 不要在主 goroutine 关闭通道后立刻 return —— worker 可能还在读,要等所有 worker 空转退出
  • 标准做法:用 sync.WaitGroup 计数活跃 worker,配合 done chan Struct{} 通知停止取新任务
  • 示例中常见坑:close(jobChan) 后没等 worker 处理完剩余任务就退出,导致任务丢失

workerPool.Run() 的典型实现要点

一个健壮的 Run() 方法要处理启动、任务分发、错误传播、超时控制和回收。它不该阻塞主流程,但要提供同步等待入口(如 Wait())。

func (p *WorkerPool) Run() {     for i := 0; i < p.workers; i++ {         go func() {             defer p.wg.Done()             for {                 select {                 case job, ok := <-p.jobs:                     if !ok {                         return // 通道关闭,退出                     }                     job.Do()                 case <-p.done:                     return // 强制退出                 }             }         }()     } }
  • jobs 通道建议带缓冲(如 make(chan Job, 1000)),避免生产者阻塞
  • 每个 worker 必须用 defer p.wg.Done(),否则 Wait() 永远不返回
  • 不要在 job.Do() 中 recover panic —— 应由上层统一处理,否则错误静默丢失
  • 如果任务需返回结果,别用全局 map 存,改用 job.ResultChan 或回调函数

实际项目中容易被忽略的边界情况

真实场景下,worker pool 很少只跑“理想任务”。超时、重试、优先级、任务取消这些需求一加,逻辑复杂度指数上升。

立即学习go语言免费学习笔记(深入)”;

  • 任务带 context?必须把 ctx 传进 job 结构体,并在 Do() 中 select 判断 ctx.Done()
  • 需要动态扩缩容?别在运行时改 worker 数量,改用两级队列:先入内存队列,再由调度器按负载分发到不同 pool
  • panic 发生在 job.Do() 中?外层要加 defer func(){ if r := recover(); r != nil { p.errCh
  • 日志打点缺失?每个 worker 应有唯一 ID(如 worker-3),否则并发日志无法归因

最麻烦的从来不是启动一 goroutine,而是让它们在出错、中断、扩容、监控全链路里都保持可观察、可终止、不丢任务。写完 Run() 只是开始,压测时看 goroutine 数是否稳定、pprof 查 block 链路、日志查 timeout 分布,这些才决定 pool 是否真可用。

text=ZqhQzanResources