Go语言并发批量处理数据_Golang数据处理实战

10次阅读

应使用信号量限流而非无限制启goroutine,避免OOM;批量写入需拼接多行sql而非单条执行;优先切片分片而非channel传数据;错误需计数、重试、记录,不可忽略或fatal。

Go语言并发批量处理数据_Golang数据处理实战

并发控制用 semaphore 而不是无限制启 goroutine

直接在循环里写 go process(item) 看似简单,但数据量一大就会触发 runtime: out of memory 或系统级资源耗尽。Go 的 goroutine 虽轻量,但每个仍占 2KB+ 空间,上万并发瞬间吃光内存。

正确做法是用信号量(如 golang.org/x/sync/semaphore)限流:

sem := semaphore.NewWeighted(int64(maxConcurrency)) var wg sync.WaitGroup for _, item := range items {     wg.Add(1)     go func(i Item) {         defer wg.Done()         if err := sem.Acquire(context.Background(), 1); err != nil {             log.Printf("acquire failed: %v", err)             return         }         defer sem.Release(1)         process(i)     }(item) } wg.Wait()
  • maxConcurrency 建议设为 CPU 核心数 × 2~5,具体看 process 是 CPU 密集还是 IO 密集
  • 别漏掉 defer sem.Release(1),否则后续 goroutine 永远拿不到许可
  • sem.Acquire 可能阻塞或超时,生产环境建议传带超时的 context

批量写入数据库要避免逐条 INSERT

go 并发处理数据后直接调 db.Exec("INSERT ...", item.ID, item.Name),本质是把单条 SQL 放到多个连接里跑——不仅没提升吞吐,还放大了连接池压力和事务开销。

真正高效的批量写法是:先分组,再拼 INSERT INTO ... VALUES (...), (...), (...),一次提交多行:

立即学习go语言免费学习笔记(深入)”;

const batchSize = 100 for i := 0; i < len(items); i += batchSize {     batch := items[i:min(i+batchSize, len(items))]     values := make([]interface{}, 0, len(batch)*2)     placeholders := make([]string, 0, len(batch))     for _, b := range batch {         placeholders = append(placeholders, "(?, ?)")         values = append(values, b.ID, b.Name)     }     query := "INSERT INTO users (id, name) VALUES " + strings.Join(placeholders, ", ")     _, err := db.Exec(query, values...)     if err != nil {         log.Printf("batch insert failed: %v", err)     } }
  • mysql / sqlite 支持单语句多值插入;postgresql 需用 UNNESTpgx.Batch
  • 注意 values 参数顺序必须和 placeholders 严格对应
  • 过大的 batchSize(如 > 1000)可能触发 MySQL 的 max_allowed_packet 限制

channel 传数据不如切片传得快,别为了“看起来并发”硬套

常见反模式:itemsCh := make(chan Item, 1000),然后一个 goroutine 往里塞,多个 goroutine 从 channel 读。这引入了额外的同步开销和内存拷贝,实测比直接切片分段慢 15%~30%。

除非需要动态负载均衡(比如任务耗时差异极大),否则优先用预分片 + 启固定 goroutine:

func splitSlice[T any](s []T, n int) [][]T {     var chunks [][]T     for i := 0; i < len(s); i += n {         end := i + n         if end > len(s) {             end = len(s)         }         chunks = append(chunks, s[i:end])     }     return chunks }  chunks := splitSlice(items, len(items)/runtime.NumCPU()) var wg sync.WaitGroup for _, chunk := range chunks {     wg.Add(1)     go func(c []Item) {         defer wg.Done()         for _, item := range c {             process(item)         }     }(chunk) } wg.Wait()
  • channel 适合做事件通知、结果聚合、或跨 goroutine 协作,不是通用数据搬运工
  • 切片分片后传参,零拷贝(只传指针和长度),cache 局部性更好
  • 如果下游要合并结果,用 sync.Map 或预先分配好结果切片更可控

错误处理不能只靠 log.Fatal 或忽略 err

并发场景下,一个 goroutine panic 会终止整个程序;而静默忽略 err 会导致数据丢失却毫无感知。

推荐组合策略:失败计数 + 可恢复错误重试 + 不可恢复错误记录后跳过:

var failed atomic.Int64 var mu sync.RWMutex var errors []error  for _, item := range items {     go func(i Item) {         defer func() {             if r := recover(); r != nil {                 mu.Lock()                 errors = append(errors, fmt.Errorf("panic on %v: %v", i.ID, r))                 mu.Unlock()                 failed.Add(1)             }         }()         if err := processWithRetry(i, 3); err != nil {             mu.Lock()             errors = append(errors, fmt.Errorf("fail on %v: %w", i.ID, err))             mu.Unlock()             failed.Add(1)             return         }     }(item) } // 等待结束后检查 failed.Load() > 0 再决定是否告警
  • 别在 goroutine 里用 log.Fatal,它会杀掉 main goroutine
  • 网络类错误(如 DB timeout)适合指数退避重试;校验失败类错误(如字段为空)应直接跳过
  • 所有错误必须进日志或监控,至少包含 item 关键标识(如 ID、批次号)

实际跑通的关键不在“并发多”,而在“控得住、写得稳、错得明”。很多线上事故,都是漏了信号量释放、批大小越界、或 channel 缓冲区填满后死锁——这些点比语法细节更容易卡住上线节奏。

text=ZqhQzanResources