Ubuntu 下 Go 并发编程实践指南
一 环境准备与工具链
二 核心原语速览
三 实战示例 并发任务处理与 Worker Pool
package main
import (
"context"
"fmt"
"runtime"
"sync"
"time"
)
type Task struct {
ID int
}
type Result struct {
TaskID int
Err error
}
// 模拟耗时任务
func (t Task) Process(ctx context.Context) (Result, error) {
select {
case <-time.After(100 * time.Millisecond): // 模拟IO
return Result{TaskID: t.ID}, nil
case <-ctx.Done():
return Result{TaskID: t.ID}, ctx.Err()
}
}
// 固定大小 Worker Pool
func WorkerPool(ctx context.Context, tasks []Task, concurrency int, timeout time.Duration) []Result {
var wg sync.WaitGroup
results := make(chan Result, len(tasks))
sem := make(chan struct{}, concurrency) // 并发信号量
for _, task := range tasks {
select {
case <-ctx.Done():
return nil
default:
}
wg.Add(1)
sem <- struct{}{} // 获取令牌
go func(t Task) {
defer wg.Done()
defer func() { <-sem }() // 释放令牌
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err := t.Process(ctx)
select {
case results <- res:
case <-ctx.Done():
}
}(task)
}
// 等待全部完成或取消
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
}
close(results)
var out []Result
for r := range results {
out = append(out, r)
}
return out
}
func main() {
// 显式设置 P 数量(容器/虚拟机中尤为重要)
runtime.GOMAXPROCS(runtime.NumCPU())
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
tasks := make([]Task, 20)
for i := 0; i < len(tasks); i++ {
tasks[i] = Task{ID: i + 1}
}
start := time.Now()
results := WorkerPool(ctx, tasks, 5, 500*time.Millisecond)
elapsed := time.Since(start)
var ok, fail int
for _, r := range results {
if r.Err != nil {
fail++
} else {
ok++
}
}
fmt.Printf("完成: %d, 失败: %d, 耗时: %v\n", ok, fail, elapsed)
}
四 常见陷阱与排查清单
五 性能与工程化建议