Concurrency Patterns
Production Go code uses a handful of concurrency patterns repeatedly. Master these and you can handle most concurrent workloads.
Worker Pool
Process N items concurrently with a fixed number of workers:
func workerPool(jobs []string, numWorkers int) []Result {
jobCh := make(chan string, len(jobs))
resultCh := make(chan Result, len(jobs))
// Start workers
var wg sync.WaitGroup
for range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobCh {
resultCh <- process(job)
}
}()
}
// Send jobs
for _, job := range jobs {
jobCh <- job
}
close(jobCh)
// Wait and collect
go func() {
wg.Wait()
close(resultCh)
}()
var results []Result
for r := range resultCh {
results = append(results, r)
}
return results
}
Tip: Set
numWorkerstoruntime.NumCPU()for CPU-bound work. For I/O-bound work (HTTP calls, DB queries), use higher values like 20-50.
Fan-Out / Fan-In
Fan-out: multiple goroutines read from the same channel. Fan-in: multiple channels merge into one.
func fanOut(ctx context.Context, input <-chan URL, workers int) <-chan Result {
resultCh := make(chan Result)
var wg sync.WaitGroup
// Fan-out: N workers read from the same input channel
for range workers {
wg.Add(1)
go func() {
defer wg.Done()
for url := range input {
select {
case <-ctx.Done():
return
case resultCh <- fetch(ctx, url):
}
}
}()
}
// Close result channel when all workers finish
go func() {
wg.Wait()
close(resultCh)
}()
return resultCh
}
// Fan-in: merge multiple channels into one
func fanIn(channels ...<-chan Result) <-chan Result {
merged := make(chan Result)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func() {
defer wg.Done()
for v := range ch {
merged <- v
}
}()
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
Pipeline
Chain processing stages. Each stage reads from an input channel and writes to an output channel:
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func filter(in <-chan int, pred func(int) bool) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if pred(n) {
out <- n
}
}
close(out)
}()
return out
}
// Chain them:
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(nums)
even := filter(squared, func(n int) bool { return n%2 == 0 })
for v := range even {
fmt.Println(v) // 4, 16, 36, 64, 100
}
Context-Based Cancellation
Propagate cancellation through goroutine trees:
func longRunning(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err() // context.Canceled or context.DeadlineExceeded
default:
// do a chunk of work
if err := doChunk(); err != nil {
return err
}
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return longRunning(ctx) })
g.Go(func() error { return longRunning(ctx) })
if err := g.Wait(); err != nil {
log.Fatal(err)
}
}
Tip:
errgroupfromgolang.org/x/sync/errgroupis the standard way to run goroutines that can fail. It cancels the group context on first error.
Rate Limiting
Control how fast work is processed:
func rateLimited(ctx context.Context, items []string, rps int) {
ticker := time.NewTicker(time.Second / time.Duration(rps))
defer ticker.Stop()
for _, item := range items {
select {
case <-ctx.Done():
return
case <-ticker.C:
go process(item)
}
}
}
For burst-then-throttle, use a buffered channel as a token bucket:
type RateLimiter struct {
tokens chan struct{}
}
func NewRateLimiter(maxConcurrent int) *RateLimiter {
rl := &RateLimiter{tokens: make(chan struct{}, maxConcurrent)}
for range maxConcurrent {
rl.tokens <- struct{}{}
}
return rl
}
func (rl *RateLimiter) Do(f func()) {
<-rl.tokens // acquire
defer func() { rl.tokens <- struct{}{} }() // release
f()
}
Sync Primitives
When channels are overkill, use sync primitives:
// sync.Once: run initialization exactly once
var (
instance *DB
once sync.Once
)
func GetDB() *DB {
once.Do(func() {
instance = connectToDB()
})
return instance
}
// sync.RWMutex: multiple readers, single writer
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.data[key]
return v, ok
}
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
// sync.Map: concurrent map (use when keys are stable or disjoint per goroutine)
var m sync.Map
m.Store("key", "value")
v, ok := m.Load("key")
m.Range(func(key, value any) bool {
fmt.Println(key, value)
return true // continue iteration
})
Gotcha:
sync.Mapis rarely what you want. It is optimized for two specific cases: (1) keys are mostly written once and read many times, or (2) goroutines access disjoint key sets. For everything else, usesync.RWMutexwith a regular map.
Goroutine Leak Detection
Leaked goroutines keep growing and eventually crash your program. Common causes:
- Channel send with no receiver - goroutine blocks forever
- Missing context cancellation - background work never stops
- Forgetting to close channels -
rangeover channel blocks forever
Detect in tests with goleak:
import "go.uber.org/goleak"
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
Detect at runtime by monitoring goroutine count:
func monitorGoroutines(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
slog.Info("goroutines", "count", runtime.NumGoroutine())
}
}
}
Pattern Summary
| Pattern | Use when | Key building blocks |
|---|---|---|
| Worker pool | Fixed concurrency, batch processing | Buffered channel + WaitGroup |
| Fan-out/fan-in | Parallelize then merge | Multiple goroutines on one channel |
| Pipeline | Sequential transformations | Channel chains |
| Rate limiter | Throttle external calls | Ticker or token channel |
| errgroup | Goroutines that can fail | golang.org/x/sync/errgroup |
Exercises
-
Bounded downloader: Download 100 URLs concurrently with at most 10 in-flight at a time. Collect results and errors.
-
Pipeline with cancellation: Build a pipeline that generates numbers, filters primes, and squares them. Support context cancellation at each stage.
-
Pub/sub: Implement a topic-based pub/sub system where publishers send to a topic channel and multiple subscribers receive copies.
Next: Concurrency | Testing