如何在Golang中实现发布订阅模式_Golang发布订阅模式实现与应用示例

2次阅读

用 sync.map 实现线程安全订阅者管理需外层用 sync.map 存 topic→*sublist 映射,内层 sublist 用 rwmutex 保护 callback 切片,避免并发读写 panic;回调须自行处理超时或取消,防止 goroutine 泄漏。

如何在Golang中实现发布订阅模式_Golang发布订阅模式实现与应用示例

sync.Map 实现线程安全的订阅者管理

Go 标准库没有内置的发布订阅(Pub/Sub)结构,但核心难点在于:多个 goroutine 同时 SubscribeUnsubscribePublish 时,必须保证订阅者列表的读写安全。直接用 map[String][]func(Interface{}) 会引发 panic:”concurrent map read and map write“。

推荐用 sync.Map 存储 topic → []callback 映射,但注意:sync.Map 不支持原子性地对 value 做切片操作(比如并发 append),所以需额外加锁保护 callback 列表本身:

type PubSub struct {     subs sync.Map // topic string → *subList } <p>type subList struct { mu sync.RWMutex fns []func(interface{}) }</p><p>func (s *subList) add(f func(interface{})) { s.mu.Lock() defer s.mu.Unlock() s.fns = append(s.fns, f) }</p><p>func (s *subList) call(data interface{}) { s.mu.RLock() defer s.mu.RUnlock() for _, f := range s.fns { f(data) } }

常见错误是只锁外层 sync.Map,却忽略对每个 topic 下 callback 切片的并发保护——这会导致漏调用或 panic。

避免 goroutine 泄漏:订阅者回调必须可取消或带超时

如果某个 func(interface{}) 订阅者内部阻塞(如等待 http 请求、数据库查询),而发布方持续调用 Publish,就会积大量等待中的 goroutine,最终 OOM。

立即学习go语言免费学习笔记(深入)”;

实操建议:

  • 回调函数内自行控制上下文(ctx.Done())和超时,不要依赖发布方
  • 不推荐在 Publish 中统一加 timeout —— 这会中断正常回调,且无法区分哪些已执行、哪些被丢弃
  • 若需强制解耦,可用 select { case ch 非阻塞投递,配合缓冲 channel 控制积压量

典型场景:微服务中监听配置变更事件,回调里调用 http.Get 却没设 timeout,一次网络抖动就卡住整个 topic 的后续分发。

chan interface{} 替代闭包回调,更适合跨服务边界

闭包方式(Subscribe("log", func(v interface{}) { ... }))适合进程内轻量通信;但若要对接消息队列(如 NATS、Redis Pub/Sub)、或需要序列化/反序列化(如 JSON over HTTP),闭包不可传递,必须换通道模型。

改造要点:

  • 每个订阅者获取专属 chan interface{},由 PubSub 内部 goroutine 转发事件
  • Subscribe 返回 chan interface{}unsubscribe func(),调用后者关闭通道并清理
  • 发布时用 select + default 避免阻塞发送(接收方消费慢时自动丢弃旧消息)

示例片段:

func (p *PubSub) Subscribe(topic string) (<code>chan interface{}</code>, func()) {     ch := make(chan interface{}, 16)     p.subs.LoadOrStore(topic, &subChan{ch: ch})     return ch, func() {         close(ch)         p.subs.Delete(topic) // 简化处理,实际需更细粒度     } }

这种模式天然适配 range ch 循环,也方便做背压(例如用 time.AfterFunc 清理空闲 channel)。

小心 interface{} 类型擦除带来的运行时 panic

Pub/Sub 通常用 interface{} 作消息载体,但下游回调若强制断言为具体类型(如 v.(MyEvent)),上游传入 string 就直接 panic。

两种务实做法:

  • 约定消息结构体统一实现某个接口(如 Event 接口含 Type() stringTimestamp() time.Time),回调用类型开关而非断言
  • 发布前用 json.Marshal 序列化,订阅端再 json.Unmarshal 到目标结构——虽有开销,但类型安全、跨语言友好

最容易被忽略的是日志和监控场景:你写了 log.Printf("event: %+v", v),结果某次传入 nil 或含 sync.Mutex 字段的结构体,直接 crash。

text=ZqhQzanResources