Go Concurrency Patterns: A Practical Walkthrough

Motivation

After years of conducting technical interviews and mentoring developers, I’ve noticed a peculiar pattern. Experienced Python, Ruby, and PHP developers who’ve learned Go as their second language often stumble on seemingly simple concurrency questions. “How do you broadcast a message to many goroutines?” I ask, and watch as senior developers with years of experience suddenly look uncertain.

Here’s the thing: Go has become the default choice for building high-throughput services when coming from interpreted languages (Rust is brilliant but feels like learning to think differently). Major tech companies now use Go as their main backend stack for both business logic and infrastructure code. Yet, I keep hearing failure stories at meetups and conferences - teams unable to deliver production-ready concurrent solutions in reasonable time.

Why? Because reliable high-throughput services require implementing multithread patterns that are completely unfamiliar to developers used to framework-based, single-threaded execution models. Things like in-memory caches with proper synchronization, rate limiters, exponential backoff, circuit breakers, batching, and pipelining - these aren’t just nice-to-haves, they’re essential.

This walkthrough fills those gaps. It’s your action manual during solution design and your helper while learning high-load patterns. If you’re coming from PHP/Python/Ruby, or you’re a mid-level Go developer who gets nervous when concurrency comes up in interviews - this is for you.

Concurrency, Huh?

Let’s start with the basics, but not the boring kind you’ll forget tomorrow.

A goroutine is a unit of work that runs inside your application process and CAN run at the same time on different threads and processor cores. Think of it as a super-lightweight thread that Go manages for you.

1
2
// This is all it takes to start concurrent work
go doSomething()

The Go runtime’s scheduler is like a smart manager that decides how many OS threads your application uses and which goroutines run on them. The beautiful part? You don’t have to think about it most of the time.

The key insight: work CAN happen in parallel, but Go decides when it actually does based on available cores and current workload.

How Not to Lose Updates in Values

Thread-Safe Counter with Mutex

Let’s say you have multiple goroutines incrementing a counter. Without protection, you’ll lose updates:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
    "sync"
)

// BadCounter loses updates when used concurrently
type BadCounter struct {
    value int
}

func (c *BadCounter) Inc() {
    c.value++ // DANGER: Read-Modify-Write is not atomic!
}

// SafeCounter protects the value with a mutex
type SafeCounter struct {
    mu    sync.Mutex
    value int
}

func (c *SafeCounter) Inc() {
    c.mu.Lock()         // Only one goroutine can pass this line
    defer c.mu.Unlock() // Always unlock, even if we panic
    c.value++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

Why Mutex? It serializes access - only one goroutine can hold the lock at a time. Others wait in line.

Common Pitfall: Forgetting to unlock causes deadlock. Always use defer right after locking!

Performance Note: While waiting for unlock, goroutines can be moved off their OS thread (parked), freeing resources for other work.

Thread-Safe Counter with Atomic

For simple operations, atomic is faster:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
    "sync/atomic"
)

// AtomicCounter uses CPU-level atomic operations
type AtomicCounter struct {
    value int64 // Must be 64-bit aligned
}

func (c *AtomicCounter) Inc() {
    atomic.AddInt64(&c.value, 1) // Single CPU instruction, no locks!
}

func (c *AtomicCounter) Value() int64 {
    return atomic.LoadInt64(&c.value)
}

Why Atomic? Uses special CPU instructions that guarantee the operation completes without interruption. Much faster than mutex for simple operations.

Common Pitfall: Only works for simple types (int32, int64, etc.) and simple operations. Can’t protect complex logic.

How to Pass Messages

Channels - The Go Way

Channels are typed conduits for passing messages between goroutines:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

// Channels are typed pipes for communication
func basicChannel() {
    ch := make(chan string)     // Unbuffered channel
    
    go func() {
        ch <- "Hello"           // Send blocks until someone receives
    }()
    
    msg := <-ch                 // Receive blocks until someone sends
    println(msg)
}

// Buffered channels don't block immediately
func bufferedChannel() {
    ch := make(chan int, 3)    // Can hold 3 values
    
    ch <- 1                     // Doesn't block
    ch <- 2                     // Doesn't block
    ch <- 3                     // Doesn't block
    // ch <- 4                  // Would block! Buffer is full
}

Important: Channels do NOT broadcast values! Each value is received by exactly one goroutine.

Common Pitfall: Sending on a closed channel panics. Always ensure proper channel lifecycle management.

Select - Handling Multiple Channels

Select lets you work with multiple channels without blocking:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import "time"

func selectExample() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(100 * time.Millisecond)
        ch1 <- "from ch1"
    }()
    
    go func() {
        time.Sleep(200 * time.Millisecond)
        ch2 <- "from ch2"
    }()
    
    // Select waits on multiple channels
    select {
    case msg1 := <-ch1:
        println("Got:", msg1)
    case msg2 := <-ch2:
        println("Got:", msg2)
    case <-time.After(50 * time.Millisecond):
        println("Timeout!")
    }
}

Why Select? Prevents goroutine starvation and enables timeout handling.

How to Cancel Work

Cancel by Signal Channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

// Worker that stops when signaled
type Worker struct {
    stop chan struct{}  // Empty struct takes no memory
}

func (w *Worker) Start() {
    w.stop = make(chan struct{})
    go w.run()
}

func (w *Worker) Stop() {
    close(w.stop)       // Closing broadcasts to all receivers!
}

func (w *Worker) run() {
    for {
        select {
        case <-w.stop:
            println("Stopping worker")
            return
        default:
            // Do work
            println("Working...")
            // Don't forget to sleep or you'll burn CPU!
            time.Sleep(100 * time.Millisecond)
        }
    }
}

Key Insight: Reading from a closed channel returns immediately with zero value. This makes close() a perfect broadcast signal!

Cancel by Timeout

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import "time"

func timeoutWork() {
    done := make(chan bool)
    
    go func() {
        // Simulate work
        time.Sleep(2 * time.Second)
        done <- true
    }()
    
    select {
    case <-done:
        println("Work completed")
    case <-time.After(1 * time.Second):
        println("Work timed out")
        // Note: goroutine is still running! Need proper cleanup
    }
}

Common Pitfall: time.After creates a new timer each time. In loops, use time.NewTimer and reset it.

Cancel with Context

Context is the standard way to propagate cancellation through function calls:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
    "context"
    "time"
)

type ContextWorker struct{}

func (w *ContextWorker) DoWork(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err() // Returns context.Canceled or context.DeadlineExceeded
        default:
            // Do actual work
            println("Working with context...")
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func useContextWorker() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel() // Always call cancel to free resources
    
    worker := &ContextWorker{}
    err := worker.DoWork(ctx)
    if err != nil {
        println("Worker stopped:", err.Error())
    }
}

Why Context? It’s composable - you can chain cancellations through your entire call stack.

Applying: Per-User Request Counters

Let’s build a concurrent counter system that tracks requests per user:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
    "sync"
    "sync/atomic"
)

// UserCounters manages per-user request counting
type UserCounters struct {
    mu       sync.RWMutex
    counters map[string]*int64
}

func NewUserCounters() *UserCounters {
    return &UserCounters{
        counters: make(map[string]*int64),
    }
}

func (uc *UserCounters) Inc(userID string) {
    // Check if counter exists (read lock is enough)
    uc.mu.RLock()
    counter, exists := uc.counters[userID]
    uc.mu.RUnlock()
    
    if exists {
        atomic.AddInt64(counter, 1)
        return
    }
    
    // Need to create counter (write lock required)
    uc.mu.Lock()
    defer uc.mu.Unlock()
    
    // Double-check after acquiring write lock (another goroutine might have created it)
    if counter, exists := uc.counters[userID]; exists {
        atomic.AddInt64(counter, 1)
        return
    }
    
    // Create new counter
    var newCounter int64 = 1
    uc.counters[userID] = &newCounter
}

func (uc *UserCounters) Get(userID string) int64 {
    uc.mu.RLock()
    defer uc.mu.RUnlock()
    
    if counter, exists := uc.counters[userID]; exists {
        return atomic.LoadInt64(counter)
    }
    return 0
}

func (uc *UserCounters) GetAll() map[string]int64 {
    uc.mu.RLock()
    defer uc.mu.RUnlock()
    
    result := make(map[string]int64, len(uc.counters))
    for userID, counter := range uc.counters {
        result[userID] = atomic.LoadInt64(counter)
    }
    return result
}

Design Choice: We use RWMutex for the map and atomic for counters - best of both worlds! Map structure is protected, but increments are lock-free.

Applying: Dump Counters Worker

Now let’s periodically dump our counters while handling cancellation:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
    "context"
    "fmt"
    "time"
)

// CounterDumper periodically outputs counter values
type CounterDumper struct {
    counters *UserCounters
    interval time.Duration
}

func NewCounterDumper(counters *UserCounters, interval time.Duration) *CounterDumper {
    return &CounterDumper{
        counters: counters,
        interval: interval,
    }
}

func (cd *CounterDumper) Start(ctx context.Context) {
    ticker := time.NewTicker(cd.interval)
    defer ticker.Stop() // Clean up ticker
    
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Dumper stopped:", ctx.Err())
            return
        case <-ticker.C:
            cd.dump()
        }
    }
}

func (cd *CounterDumper) dump() {
    snapshot := cd.counters.GetAll()
    
    if len(snapshot) == 0 {
        fmt.Println("No counters to dump")
        return
    }
    
    fmt.Println("=== Counter Dump ===")
    for userID, count := range snapshot {
        fmt.Printf("User %s: %d requests\n", userID, count)
    }
    fmt.Println("==================")
}

Pattern: The ticker-select combo is idiomatic Go for periodic tasks with cancellation.

Non-blocking Counter Worker (Resource Keeper Pattern)

Sometimes you need a dedicated goroutine managing a resource:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

// ResourceKeeper manages a resource in a dedicated goroutine
type ResourceKeeper struct {
    requests chan request
    stop     chan struct{}
}

type request struct {
    op       string      // "inc", "get", "getall"
    userID   string
    response chan interface{}
}

func NewResourceKeeper() *ResourceKeeper {
    rk := &ResourceKeeper{
        requests: make(chan request),
        stop:     make(chan struct{}),
    }
    go rk.run()
    return rk
}

func (rk *ResourceKeeper) run() {
    counters := make(map[string]int64)
    
    for {
        select {
        case <-rk.stop:
            return
        case req := <-rk.requests:
            switch req.op {
            case "inc":
                counters[req.userID]++
                req.response <- nil
            case "get":
                req.response <- counters[req.userID]
            case "getall":
                snapshot := make(map[string]int64)
                for k, v := range counters {
                    snapshot[k] = v
                }
                req.response <- snapshot
            }
        }
    }
}

func (rk *ResourceKeeper) Inc(userID string) {
    resp := make(chan interface{})
    select {
    case rk.requests <- request{op: "inc", userID: userID, response: resp}:
        <-resp
    case <-time.After(100 * time.Millisecond):
        // Handle timeout - request queue might be full
        println("Inc request timed out")
    }
}

func (rk *ResourceKeeper) Stop() {
    close(rk.stop)
}

Why This Pattern? Single goroutine owns the data - no locks needed! All access is serialized through channels.

How to Broadcast

Broadcasting a Signal

Closing a channel broadcasts a signal to all listeners:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

// Broadcaster sends signals to multiple listeners
type SignalBroadcaster struct {
    listeners []chan struct{}
    mu        sync.Mutex
}

func (sb *SignalBroadcaster) Subscribe() <-chan struct{} {
    sb.mu.Lock()
    defer sb.mu.Unlock()
    
    ch := make(chan struct{})
    sb.listeners = append(sb.listeners, ch)
    return ch
}

func (sb *SignalBroadcaster) Broadcast() {
    sb.mu.Lock()
    defer sb.mu.Unlock()
    
    for _, ch := range sb.listeners {
        close(ch)
    }
    sb.listeners = nil // Reset for next broadcast
}

Broadcasting Messages (Pub-Sub)

For actual values, we need a different approach:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import "sync"

// MessageBroadcaster sends messages to multiple subscribers
type MessageBroadcaster struct {
    subscribers map[int]chan string
    nextID      int
    mu          sync.RWMutex
}

func NewMessageBroadcaster() *MessageBroadcaster {
    return &MessageBroadcaster{
        subscribers: make(map[int]chan string),
    }
}

func (mb *MessageBroadcaster) Subscribe() (int, <-chan string) {
    mb.mu.Lock()
    defer mb.mu.Unlock()
    
    id := mb.nextID
    mb.nextID++
    
    ch := make(chan string, 10) // Buffered to prevent slow readers from blocking
    mb.subscribers[id] = ch
    
    return id, ch
}

func (mb *MessageBroadcaster) Unsubscribe(id int) {
    mb.mu.Lock()
    defer mb.mu.Unlock()
    
    if ch, ok := mb.subscribers[id]; ok {
        close(ch)
        delete(mb.subscribers, id)
    }
}

func (mb *MessageBroadcaster) Broadcast(msg string) {
    mb.mu.RLock()
    defer mb.mu.RUnlock()
    
    for _, ch := range mb.subscribers {
        select {
        case ch <- msg:
            // Sent successfully
        default:
            // Channel is full, skip this subscriber
            // In production, you might want to handle this differently
        }
    }
}

Key Pattern: Non-blocking send with select/default prevents slow subscribers from blocking the broadcaster.

Applying: Real-time Messaging Chat

Let’s build a simple chat system using our broadcast pattern:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
package main

import (
    "fmt"
    "sync"
)

// Chat manages a real-time messaging system
type Chat struct {
    messages    *MessageBroadcaster
    users       map[string]int // username -> subscriber ID
    mu          sync.RWMutex
}

func NewChat() *Chat {
    return &Chat{
        messages: NewMessageBroadcaster(),
        users:    make(map[string]int),
    }
}

func (c *Chat) Join(username string) (<-chan string, error) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    if _, exists := c.users[username]; exists {
        return nil, fmt.Errorf("user %s already in chat", username)
    }
    
    id, ch := c.messages.Subscribe()
    c.users[username] = id
    
    // Announce join
    go c.messages.Broadcast(fmt.Sprintf("%s joined the chat", username))
    
    return ch, nil
}

func (c *Chat) Leave(username string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    if id, exists := c.users[username]; exists {
        c.messages.Unsubscribe(id)
        delete(c.users, username)
        
        // Announce leave
        go c.messages.Broadcast(fmt.Sprintf("%s left the chat", username))
    }
}

func (c *Chat) Post(username, message string) error {
    c.mu.RLock()
    _, exists := c.users[username]
    c.mu.RUnlock()
    
    if !exists {
        return fmt.Errorf("user %s not in chat", username)
    }
    
    formatted := fmt.Sprintf("%s: %s", username, message)
    c.messages.Broadcast(formatted)
    return nil
}

// Client represents a chat user
type ChatClient struct {
    username string
    chat     *Chat
    messages <-chan string
    stop     chan struct{}
}

func (c *Chat) NewClient(username string) (*ChatClient, error) {
    messages, err := c.Join(username)
    if err != nil {
        return nil, err
    }
    
    client := &ChatClient{
        username: username,
        chat:     c,
        messages: messages,
        stop:     make(chan struct{}),
    }
    
    go client.listen()
    return client, nil
}

func (cc *ChatClient) listen() {
    for {
        select {
        case msg, ok := <-cc.messages:
            if !ok {
                return // Channel closed, we've been disconnected
            }
            fmt.Printf("[%s received]: %s\n", cc.username, msg)
        case <-cc.stop:
            return
        }
    }
}

func (cc *ChatClient) Send(message string) {
    cc.chat.Post(cc.username, message)
}

func (cc *ChatClient) Leave() {
    close(cc.stop)
    cc.chat.Leave(cc.username)
}

Design Note: Each client gets their own channel and goroutine. This isolates slow clients and makes the system resilient.

Aggregating Results from Parallel Work

When you need to wait for multiple goroutines to complete:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main

import (
    "fmt"
    "sync"
    "time"
)

// ParallelProcessor processes items concurrently and aggregates results
type ParallelProcessor struct {
    workers int
}

func NewParallelProcessor(workers int) *ParallelProcessor {
    return &ParallelProcessor{workers: workers}
}

func (pp *ParallelProcessor) Process(items []string) []string {
    var wg sync.WaitGroup
    results := make([]string, len(items))
    
    // Create a channel for work distribution
    work := make(chan int, len(items))
    for i := range items {
        work <- i
    }
    close(work)
    
    // Start workers
    wg.Add(pp.workers)
    for w := 0; w < pp.workers; w++ {
        go func() {
            defer wg.Done()
            for i := range work {
                // Simulate processing
                time.Sleep(10 * time.Millisecond)
                results[i] = fmt.Sprintf("processed: %s", items[i])
            }
        }()
    }
    
    wg.Wait()
    return results
}

// Alternative: Using channels for results
func (pp *ParallelProcessor) ProcessWithChannels(items []string) []string {
    type result struct {
        index int
        value string
    }
    
    resultChan := make(chan result, len(items))
    
    var wg sync.WaitGroup
    wg.Add(len(items))
    
    for i, item := range items {
        go func(idx int, val string) {
            defer wg.Done()
            // Simulate processing
            time.Sleep(10 * time.Millisecond)
            resultChan <- result{
                index: idx,
                value: fmt.Sprintf("processed: %s", val),
            }
        }(i, item)
    }
    
    wg.Wait()
    close(resultChan)
    
    // Collect results
    results := make([]string, len(items))
    for r := range resultChan {
        results[r.index] = r.value
    }
    
    return results
}

Common Pitfall: Don’t forget to pass loop variables as parameters to goroutines, or you’ll have a race condition!

Applying: Worker Pool with Backpressure and Graceful Shutdown

Here’s a production-ready worker pool with all the good stuff:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package main

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"
)

var ErrQueueFull = errors.New("queue is full")
var ErrShuttingDown = errors.New("worker pool is shutting down")

// Job represents a unit of work
type Job struct {
    ID      string
    Payload interface{}
}

// WorkerPool manages a pool of workers with backpressure handling
type WorkerPool struct {
    workers    int
    queue      chan Job
    results    chan Result
    wg         sync.WaitGroup
    ctx        context.Context
    cancel     context.CancelFunc
    shutdownMu sync.RWMutex
    isShutdown bool
}

type Result struct {
    JobID string
    Data  interface{}
    Error error
}

func NewWorkerPool(workers, queueSize int) *WorkerPool {
    ctx, cancel := context.WithCancel(context.Background())
    
    wp := &WorkerPool{
        workers: workers,
        queue:   make(chan Job, queueSize),
        results: make(chan Result, queueSize),
        ctx:     ctx,
        cancel:  cancel,
    }
    
    wp.start()
    return wp
}

func (wp *WorkerPool) start() {
    wp.wg.Add(wp.workers)
    
    for i := 0; i < wp.workers; i++ {
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    
    for {
        select {
        case <-wp.ctx.Done():
            fmt.Printf("Worker %d shutting down\n", id)
            return
            
        case job, ok := <-wp.queue:
            if !ok {
                fmt.Printf("Worker %d: queue closed\n", id)
                return
            }
            
            // Process job with simulated work
            result := wp.processJob(job)
            
            // Try to send result, but don't block on shutdown
            select {
            case wp.results <- result:
            case <-wp.ctx.Done():
                return
            }
        }
    }
}

func (wp *WorkerPool) processJob(job Job) Result {
    // Simulate processing with possible failure
    time.Sleep(50 * time.Millisecond)
    
    // Simulate 10% failure rate
    if time.Now().UnixNano()%10 == 0 {
        return Result{
            JobID: job.ID,
            Error: fmt.Errorf("job %s failed", job.ID),
        }
    }
    
    return Result{
        JobID: job.ID,
        Data:  fmt.Sprintf("Processed: %v", job.Payload),
    }
}

// Submit adds a job with backpressure handling
func (wp *WorkerPool) Submit(job Job) error {
    wp.shutdownMu.RLock()
    if wp.isShutdown {
        wp.shutdownMu.RUnlock()
        return ErrShuttingDown
    }
    wp.shutdownMu.RUnlock()
    
    select {
    case wp.queue <- job:
        return nil
    default:
        // Queue is full - backpressure!
        return ErrQueueFull
    }
}

// SubmitWithRetry attempts to submit with exponential backoff
func (wp *WorkerPool) SubmitWithRetry(job Job, maxRetries int) error {
    backoff := 10 * time.Millisecond
    
    for i := 0; i < maxRetries; i++ {
        err := wp.Submit(job)
        if err == nil {
            return nil
        }
        
        if err == ErrShuttingDown {
            return err
        }
        
        // Exponential backoff for queue full
        if err == ErrQueueFull {
            time.Sleep(backoff)
            backoff *= 2
            if backoff > time.Second {
                backoff = time.Second
            }
            continue
        }
    }
    
    return ErrQueueFull
}

// Results returns the results channel for processing
func (wp *WorkerPool) Results() <-chan Result {
    return wp.results
}

// Shutdown gracefully stops the worker pool
func (wp *WorkerPool) Shutdown(timeout time.Duration) error {
    wp.shutdownMu.Lock()
    wp.isShutdown = true
    wp.shutdownMu.Unlock()
    
    // Stop accepting new jobs
    close(wp.queue)
    
    // Create timeout context
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    
    // Wait for workers to finish
    done := make(chan struct{})
    go func() {
        wp.wg.Wait()
        close(done)
    }()
    
    select {
    case <-done:
        // All workers finished
        wp.cancel()
        close(wp.results)
        return nil
    case <-ctx.Done():
        // Timeout - force shutdown
        wp.cancel()
        return fmt.Errorf("shutdown timeout after %v", timeout)
    }
}

// Usage example
func ExampleWorkerPool() {
    pool := NewWorkerPool(5, 100)
    
    // Start result processor
    go func() {
        for result := range pool.Results() {
            if result.Error != nil {
                fmt.Printf("Job %s failed: %v\n", result.JobID, result.Error)
            } else {
                fmt.Printf("Job %s succeeded: %v\n", result.JobID, result.Data)
            }
        }
    }()
    
    // Submit jobs
    for i := 0; i < 20; i++ {
        job := Job{
            ID:      fmt.Sprintf("job-%d", i),
            Payload: fmt.Sprintf("data-%d", i),
        }
        
        err := pool.SubmitWithRetry(job, 3)
        if err != nil {
            fmt.Printf("Failed to submit job %s: %v\n", job.ID, err)
        }
    }
    
    // Graceful shutdown
    if err := pool.Shutdown(5 * time.Second); err != nil {
        fmt.Printf("Shutdown error: %v\n", err)
    }
}

Production Features:

  • Backpressure: Returns error when queue is full instead of blocking
  • Exponential Backoff: Retry logic with increasing delays
  • Graceful Shutdown: Waits for in-flight work to complete
  • Timeout Protection: Forces shutdown after timeout
  • Error Handling: Propagates job processing errors

Key Patterns Used:

  • Non-blocking channel writes with select/default
  • Context for cancellation propagation
  • WaitGroup for synchronization
  • RWMutex for shutdown state protection

Conclusion

You’ve just learned the concurrency patterns that separate “I know Go syntax” from “I can build production systems in Go.” These aren’t academic exercises - they’re battle-tested patterns I’ve seen succeed (and fail) in real production systems.

Remember:

  • Start simple (mutex/atomic for shared state)
  • Channels for communication, not sharing memory
  • Context for cancellation propagation
  • Always handle the unhappy path (timeouts, backpressure, shutdown)
  • Test with go test -race always

The next time someone asks you “How do you broadcast a message to many goroutines?” you won’t just know the answer - you’ll have several solutions and know when to use each one.

Now go forth and write concurrent code that doesn’t keep you up at night!

Licensed under CC BY-NC-SA 4.0
Diagrams by Mermaid, C4-PlantUML, Kroki
Built with Hugo
Theme Stack designed by Jimmy