应使用信号量限流而非无限制启goroutine,避免OOM;批量写入需拼接多行sql而非单条执行;优先切片分片而非channel传数据;错误需计数、重试、记录,不可忽略或fatal。

并发控制用 semaphore 而不是无限制启 goroutine
直接在循环里写 go process(item) 看似简单,但数据量一大就会触发 runtime: out of memory 或系统级资源耗尽。Go 的 goroutine 虽轻量,但每个仍占 2KB+ 栈空间,上万并发瞬间吃光内存。
正确做法是用信号量(如 golang.org/x/sync/semaphore)限流:
sem := semaphore.NewWeighted(int64(maxConcurrency)) var wg sync.WaitGroup for _, item := range items { wg.Add(1) go func(i Item) { defer wg.Done() if err := sem.Acquire(context.Background(), 1); err != nil { log.Printf("acquire failed: %v", err) return } defer sem.Release(1) process(i) }(item) } wg.Wait()
-
maxConcurrency建议设为 CPU 核心数 × 2~5,具体看process是 CPU 密集还是 IO 密集 - 别漏掉
defer sem.Release(1),否则后续 goroutine 永远拿不到许可 -
sem.Acquire可能阻塞或超时,生产环境建议传带超时的context
批量写入数据库要避免逐条 INSERT
用 go 并发处理数据后直接调 db.Exec("INSERT ...", item.ID, item.Name),本质是把单条 SQL 放到多个连接里跑——不仅没提升吞吐,还放大了连接池压力和事务开销。
真正高效的批量写法是:先分组,再拼 INSERT INTO ... VALUES (...), (...), (...),一次提交多行:
立即学习“go语言免费学习笔记(深入)”;
const batchSize = 100 for i := 0; i < len(items); i += batchSize { batch := items[i:min(i+batchSize, len(items))] values := make([]interface{}, 0, len(batch)*2) placeholders := make([]string, 0, len(batch)) for _, b := range batch { placeholders = append(placeholders, "(?, ?)") values = append(values, b.ID, b.Name) } query := "INSERT INTO users (id, name) VALUES " + strings.Join(placeholders, ", ") _, err := db.Exec(query, values...) if err != nil { log.Printf("batch insert failed: %v", err) } }
- mysql / sqlite 支持单语句多值插入;postgresql 需用
UNNEST或pgx.Batch - 注意
values参数顺序必须和placeholders严格对应 - 过大的
batchSize(如 > 1000)可能触发 MySQL 的max_allowed_packet限制
channel 传数据不如切片传得快,别为了“看起来并发”硬套
常见反模式:itemsCh := make(chan Item, 1000),然后一个 goroutine 往里塞,多个 goroutine 从 channel 读。这引入了额外的同步开销和内存拷贝,实测比直接切片分段慢 15%~30%。
除非需要动态负载均衡(比如任务耗时差异极大),否则优先用预分片 + 启固定 goroutine:
func splitSlice[T any](s []T, n int) [][]T { var chunks [][]T for i := 0; i < len(s); i += n { end := i + n if end > len(s) { end = len(s) } chunks = append(chunks, s[i:end]) } return chunks } chunks := splitSlice(items, len(items)/runtime.NumCPU()) var wg sync.WaitGroup for _, chunk := range chunks { wg.Add(1) go func(c []Item) { defer wg.Done() for _, item := range c { process(item) } }(chunk) } wg.Wait()
- channel 适合做事件通知、结果聚合、或跨 goroutine 协作,不是通用数据搬运工
- 切片分片后传参,零拷贝(只传指针和长度),cache 局部性更好
- 如果下游要合并结果,用
sync.Map或预先分配好结果切片更可控
错误处理不能只靠 log.Fatal 或忽略 err
并发场景下,一个 goroutine panic 会终止整个程序;而静默忽略 err 会导致数据丢失却毫无感知。
推荐组合策略:失败计数 + 可恢复错误重试 + 不可恢复错误记录后跳过:
var failed atomic.Int64 var mu sync.RWMutex var errors []error for _, item := range items { go func(i Item) { defer func() { if r := recover(); r != nil { mu.Lock() errors = append(errors, fmt.Errorf("panic on %v: %v", i.ID, r)) mu.Unlock() failed.Add(1) } }() if err := processWithRetry(i, 3); err != nil { mu.Lock() errors = append(errors, fmt.Errorf("fail on %v: %w", i.ID, err)) mu.Unlock() failed.Add(1) return } }(item) } // 等待结束后检查 failed.Load() > 0 再决定是否告警
- 别在 goroutine 里用
log.Fatal,它会杀掉 main goroutine - 网络类错误(如 DB timeout)适合指数退避重试;校验失败类错误(如字段为空)应直接跳过
- 所有错误必须进日志或监控,至少包含
item关键标识(如 ID、批次号)
实际跑通的关键不在“并发多”,而在“控得住、写得稳、错得明”。很多线上事故,都是漏了信号量释放、批大小越界、或 channel 缓冲区填满后死锁——这些点比语法细节更容易卡住上线节奏。