如何在 Go 中使用 WebSocket 向所有客户端广播消息

3次阅读

如何在 Go 中使用 WebSocket 向所有客户端广播消息

本文详解如何基于 gorilla websocket 构建支持全局广播的服务器,通过连接池(hub)管理客户端,并实现“一发即达全体”的实时通信能力,附完整可运行示例与关键注意事项。

本文详解如何基于 gorilla websocket 构建支持全局广播的服务器,通过连接池(hub)管理客户端,并实现“一发即达全体”的实时通信能力,附完整可运行示例与关键注意事项。

在 Go Websocket 开发中,单点回显(如 conn.WriteMessage() 仅回复发送者)是默认行为;若需实现类似聊天室的“全员通知”——即任一客户端发消息后,所有已连接客户端(包括发送方自身)均实时收到该消息——必须引入中心化连接管理机制:连接池(Connection Pool)+ 广播通道(Broadcast channel

核心思路是:不再让每个 handler 独立处理连接,而是将所有活跃连接注册到一个共享的 hub 结构体中,由 hub 统一接收广播指令并分发至各客户端的发送通道。

以下是一个精简、可直接运行的生产级广播服务示例(基于 gorilla/websocket v1.5+):

package main  import (     "log"     "net/http"     "sync"      "github.com/gorilla/websocket" )  var upgrader = websocket.Upgrader{     CheckOrigin: func(r *http.Request) bool { return true }, // 生产环境请严格校验 Origin }  // connection 表示一个 WebSocket 客户端连接 type connection struct {     ws   *websocket.Conn     send chan []byte // 缓冲发送通道,解耦读写     h    *hub }  func (c *connection) writer() {     defer c.ws.Close()     for message := range c.send {         if err := c.ws.WriteMessage(websocket.TextMessage, message); err != nil {             break         }     } }  // hub 管理所有连接和广播逻辑 type hub struct {     connections map[*connection]bool     broadcast   chan []byte     register    chan *connection     unregister  chan *connection     mu          sync.RWMutex }  func newHub() *hub {     return &hub{         connections: make(map[*connection]bool),         broadcast:   make(chan []byte, 128),         register:    make(chan *connection, 128),         unregister:  make(chan *connection, 128),     } }  func (h *hub) run() {     for {         select {         case c := <-h.register:             h.mu.Lock()             h.connections[c] = true             h.mu.Unlock()         case c := <-h.unregister:             h.mu.Lock()             if _, ok := h.connections[c]; ok {                 delete(h.connections, c)                 close(c.send)             }             h.mu.Unlock()         case message := <-h.broadcast:             h.mu.RLock()             for c := range h.connections {                 select {                 case c.send <- message:                 default: // 发送失败(如客户端断连、send channel 已满),清理连接                     delete(h.connections, c)                     close(c.send)                 }             }             h.mu.RUnlock()         }     } }  var h = newHub()  func serveWs(w http.ResponseWriter, r *http.Request) {     conn, err := upgrader.Upgrade(w, r, nil)     if err != nil {         log.Println("Upgrade error:", err)         return     }      c := &connection{ws: conn, send: make(chan []byte, 256), h: h}     h.register <- c      // 启动写协程(异步推送)     go c.writer()      // 主读循环:接收消息并广播     for {         _, message, err := conn.ReadMessage()         if err != nil {             if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {                 log.Printf("Read error: %v", err)             }             break         }         log.Printf("Received: %s", message)         h.broadcast <- message // 关键:推入广播通道,由 hub 统一分发     }      // 连接关闭时注销     h.unregister <- c     close(c.send) }  func main() {     http.HandleFunc("/ws", serveWs)     log.Println("Server started on :8080")     log.Fatal(http.ListenAndServe(":8080", nil)) }

关键设计说明:

  • hub 是线程安全的中央调度器,broadcast 通道接收待广播的原始字节流;
  • 每个 connection 持有独立 send 通道,writer() 协程负责从该通道取数据并调用 ws.WriteMessage;
  • select + default 的写法确保:若某客户端 send 通道已满或阻塞(如网络中断),立即清理该连接,避免内存泄漏;
  • 使用 sync.RWMutex 保护连接映射读写,RWMutex 在高并发读(广播)场景下性能优于普通 Mutex。

⚠️ 注意事项:

  • 永远不要在 handler 中直接调用 conn.WriteMessage() 广播 —— 这会导致竞态与连接泄漏;
  • send 通道需设合理缓冲(如 256),过小易触发 default 分支误删连接,过大则增加内存压力;
  • 生产环境务必替换 CheckOrigin 实现,防止跨站 WebSocket 劫持;
  • 建议为 connection 增加心跳检测(SetPingHandler/SetPongHandler)与超时控制(SetReadDeadline),提升健壮性。

通过此架构,你获得的不再是一个“回声服务器”,而是一个可横向扩展的实时广播中枢——无论是聊天、协同编辑、实时通知还是 iot 设备状态同步,均可在此基础上快速构建。

text=ZqhQzanResources