基于Golang的Fan-in与Fan-out模式_高并发数据流处理

1次阅读

fan-out 数据漏掉因未等待goroutine完成就关闭channel,需用sync.waitgroup协调;fan-in cpu高因select忙等,应避免无限轮询并合理管理channel关闭。

基于Golang的Fan-in与Fan-out模式_高并发数据流处理

Go 里 Fan-out 用 for range 启多个 goroutine,为什么数据总漏掉?

因为没等所有 goroutine 完成就关闭了汇聚 channel。Fan-out 的本质是“分发”,但分发后必须协调完成信号,否则主 goroutine 可能提前退出。

  • 别直接 close(out) 在启动 goroutine 后立刻调用——此时它们大概率还没读完源数据
  • sync.WaitGroup 计数:每启一个 goroutine 就 wg.Add(1),结束前 wg.Done()
  • wg.Wait() 之后再 close(out),确保所有分支已退出
  • 如果源 channel 是无缓冲的,且接收端不及时消费,for range in 可能被阻塞,导致部分 goroutine 根本没启起来

Fan-in 合并多个 channel,用 select 轮询为啥 CPU 占用高?

select 或没加 default 的轮询会变成忙等;更常见的是 channel 数量多但数据稀疏,select 每次都遍历全部 case 却多数无就绪。

  • 避免手写无限 for { select { ... } } —— 改用 range 配合 reflect.Select(仅当 channel 数动态可变时)
  • 静态数量(比如固定 3 个输入 channel)直接写死 select case,性能最好
  • 如果某 channel 可能永远不发数据,考虑加超时或用 context.WithTimeout 控制整体等待
  • 别把 nil channel 塞进 select,它会永久忽略;要停用某个输入,把它设为 nil 是安全的,但得确认逻辑意图

fanIn 函数返回的 channel 为啥一读就 panic: “send on closed channel”?

典型是多个 goroutine 往同一个输出 channel 写,但没人负责关——或者关早了,还有 goroutine 在往里 send。

  • Fan-in 的输出 channel 必须由**唯一 goroutine** 管理关闭,通常是汇聚逻辑所在 goroutine
  • 每个输入分支 goroutine 应该只读、不关 channel;关的操作留给汇聚者,在所有输入都关闭且读完后执行
  • for { select { case v, ok := 这类模式,避免对已关闭 channel 做 <code>case v :=
  • 如果输入 channel 本身带错误(如 io.ReadCloser 关闭引发的 io.EOF),别忽略,要传给下游或记录,否则看似正常却丢数据

处理 HTTP 流式响应时套 Fan-out/Fan-in,为什么连接卡住不释放?

根本原因是 context 没透传,或 goroutine 泄露导致底层 TCP 连接无法被 GC 回收。

立即学习go语言免费学习笔记(深入)”;

  • 每个 Fan-out goroutine 必须接收 ctx context.Context 参数,并在 select 中加入 case
  • HTTP client 要设 Timeout 或用 context.WithTimeout,否则远端 hang 住,你的 goroutine 就一直挂着
  • 别在 Fan-in 汇聚层做耗时操作(比如 JSON 解析、DB 写入),这会让整个 pipeline 堵在输出端,上游 goroutine 积压
  • runtime.NumGoroutine() 在日志里打点,上线后发现数字持续上涨,基本就是 goroutine 没退出

真正难的不是写对那几行 go func() { ... }(),而是想清楚谁关 channel、谁管超时、谁对错误负责——这些边界一旦模糊,问题就藏在并发毛细血管里,查起来比单协程慢十倍。

text=ZqhQzanResources