Concurrency Patterns

Production Go code uses a handful of concurrency patterns repeatedly. Master these and you can handle most concurrent workloads.

Worker Pool

Process N items concurrently with a fixed number of workers:

func workerPool(jobs []string, numWorkers int) []Result {
    jobCh := make(chan string, len(jobs))
    resultCh := make(chan Result, len(jobs))

    // Start workers
    var wg sync.WaitGroup
    for range numWorkers {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobCh {
                resultCh <- process(job)
            }
        }()
    }

    // Send jobs
    for _, job := range jobs {
        jobCh <- job
    }
    close(jobCh)

    // Wait and collect
    go func() {
        wg.Wait()
        close(resultCh)
    }()

    var results []Result
    for r := range resultCh {
        results = append(results, r)
    }
    return results
}

Tip: Set numWorkers to runtime.NumCPU() for CPU-bound work. For I/O-bound work (HTTP calls, DB queries), use higher values like 20-50.

Fan-Out / Fan-In

Fan-out: multiple goroutines read from the same channel. Fan-in: multiple channels merge into one.

func fanOut(ctx context.Context, input <-chan URL, workers int) <-chan Result {
    resultCh := make(chan Result)
    var wg sync.WaitGroup

    // Fan-out: N workers read from the same input channel
    for range workers {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range input {
                select {
                case <-ctx.Done():
                    return
                case resultCh <- fetch(ctx, url):
                }
            }
        }()
    }

    // Close result channel when all workers finish
    go func() {
        wg.Wait()
        close(resultCh)
    }()

    return resultCh
}

// Fan-in: merge multiple channels into one
func fanIn(channels ...<-chan Result) <-chan Result {
    merged := make(chan Result)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for v := range ch {
                merged <- v
            }
        }()
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

Pipeline

Chain processing stages. Each stage reads from an input channel and writes to an output channel:

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func filter(in <-chan int, pred func(int) bool) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if pred(n) {
                out <- n
            }
        }
        close(out)
    }()
    return out
}

// Chain them:
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(nums)
even := filter(squared, func(n int) bool { return n%2 == 0 })

for v := range even {
    fmt.Println(v) // 4, 16, 36, 64, 100
}

Context-Based Cancellation

Propagate cancellation through goroutine trees:

func longRunning(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err() // context.Canceled or context.DeadlineExceeded
        default:
            // do a chunk of work
            if err := doChunk(); err != nil {
                return err
            }
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    g, ctx := errgroup.WithContext(ctx)

    g.Go(func() error { return longRunning(ctx) })
    g.Go(func() error { return longRunning(ctx) })

    if err := g.Wait(); err != nil {
        log.Fatal(err)
    }
}

Tip: errgroup from golang.org/x/sync/errgroup is the standard way to run goroutines that can fail. It cancels the group context on first error.

Rate Limiting

Control how fast work is processed:

func rateLimited(ctx context.Context, items []string, rps int) {
    ticker := time.NewTicker(time.Second / time.Duration(rps))
    defer ticker.Stop()

    for _, item := range items {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            go process(item)
        }
    }
}

For burst-then-throttle, use a buffered channel as a token bucket:

type RateLimiter struct {
    tokens chan struct{}
}

func NewRateLimiter(maxConcurrent int) *RateLimiter {
    rl := &RateLimiter{tokens: make(chan struct{}, maxConcurrent)}
    for range maxConcurrent {
        rl.tokens <- struct{}{}
    }
    return rl
}

func (rl *RateLimiter) Do(f func()) {
    <-rl.tokens       // acquire
    defer func() { rl.tokens <- struct{}{} }() // release
    f()
}

Sync Primitives

When channels are overkill, use sync primitives:

// sync.Once: run initialization exactly once
var (
    instance *DB
    once     sync.Once
)

func GetDB() *DB {
    once.Do(func() {
        instance = connectToDB()
    })
    return instance
}

// sync.RWMutex: multiple readers, single writer
type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    v, ok := c.data[key]
    return v, ok
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

// sync.Map: concurrent map (use when keys are stable or disjoint per goroutine)
var m sync.Map
m.Store("key", "value")
v, ok := m.Load("key")
m.Range(func(key, value any) bool {
    fmt.Println(key, value)
    return true // continue iteration
})

Gotcha: sync.Map is rarely what you want. It is optimized for two specific cases: (1) keys are mostly written once and read many times, or (2) goroutines access disjoint key sets. For everything else, use sync.RWMutex with a regular map.

Goroutine Leak Detection

Leaked goroutines keep growing and eventually crash your program. Common causes:

  1. Channel send with no receiver - goroutine blocks forever
  2. Missing context cancellation - background work never stops
  3. Forgetting to close channels - range over channel blocks forever

Detect in tests with goleak:

import "go.uber.org/goleak"

func TestMain(m *testing.M) {
    goleak.VerifyTestMain(m)
}

Detect at runtime by monitoring goroutine count:

func monitorGoroutines(ctx context.Context) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            slog.Info("goroutines", "count", runtime.NumGoroutine())
        }
    }
}

Pattern Summary

PatternUse whenKey building blocks
Worker poolFixed concurrency, batch processingBuffered channel + WaitGroup
Fan-out/fan-inParallelize then mergeMultiple goroutines on one channel
PipelineSequential transformationsChannel chains
Rate limiterThrottle external callsTicker or token channel
errgroupGoroutines that can failgolang.org/x/sync/errgroup

Exercises

  1. Bounded downloader: Download 100 URLs concurrently with at most 10 in-flight at a time. Collect results and errors.

  2. Pipeline with cancellation: Build a pipeline that generates numbers, filters primes, and squares them. Support context cancellation at each stage.

  3. Pub/sub: Implement a topic-based pub/sub system where publishers send to a topic channel and multiple subscribers receive copies.

Next: Concurrency | Testing