Go Concurrency Patterns: A Practical Walkthrough
Motivation
After years of conducting technical interviews and mentoring developers, I’ve noticed a peculiar pattern. Experienced Python, Ruby, and PHP developers who’ve learned Go as their second language often stumble on seemingly simple concurrency questions. “How do you broadcast a message to many goroutines?” I ask, and watch as senior developers with years of experience suddenly look uncertain.
Here’s the thing: Go has become the default choice for building high-throughput services when coming from interpreted languages (Rust is brilliant but feels like learning to think differently). Major tech companies now use Go as their main backend stack for both business logic and infrastructure code. Yet, I keep hearing failure stories at meetups and conferences - teams unable to deliver production-ready concurrent solutions in reasonable time.
Why? Because reliable high-throughput services require implementing multithread patterns that are completely unfamiliar to developers used to framework-based, single-threaded execution models. Things like in-memory caches with proper synchronization, rate limiters, exponential backoff, circuit breakers, batching, and pipelining - these aren’t just nice-to-haves, they’re essential.
This walkthrough fills those gaps. It’s your action manual during solution design and your helper while learning high-load patterns. If you’re coming from PHP/Python/Ruby, or you’re a mid-level Go developer who gets nervous when concurrency comes up in interviews - this is for you.
Concurrency, Huh?
Let’s start with the basics, but not the boring kind you’ll forget tomorrow.
A goroutine is a unit of work that runs inside your application process and CAN run at the same time on different threads and processor cores. Think of it as a super-lightweight thread that Go manages for you.
1
2
|
// This is all it takes to start concurrent work
go doSomething()
|
The Go runtime’s scheduler is like a smart manager that decides how many OS threads your application uses and which goroutines run on them. The beautiful part? You don’t have to think about it most of the time.
The key insight: work CAN happen in parallel, but Go decides when it actually does based on available cores and current workload.
How Not to Lose Updates in Values
Thread-Safe Counter with Mutex
Let’s say you have multiple goroutines incrementing a counter. Without protection, you’ll lose updates:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package main
import (
"sync"
)
// BadCounter loses updates when used concurrently
type BadCounter struct {
value int
}
func (c *BadCounter) Inc() {
c.value++ // DANGER: Read-Modify-Write is not atomic!
}
// SafeCounter protects the value with a mutex
type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Inc() {
c.mu.Lock() // Only one goroutine can pass this line
defer c.mu.Unlock() // Always unlock, even if we panic
c.value++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
|
Why Mutex? It serializes access - only one goroutine can hold the lock at a time. Others wait in line.
Common Pitfall: Forgetting to unlock causes deadlock. Always use defer
right after locking!
Performance Note: While waiting for unlock, goroutines can be moved off their OS thread (parked), freeing resources for other work.
Thread-Safe Counter with Atomic
For simple operations, atomic is faster:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package main
import (
"sync/atomic"
)
// AtomicCounter uses CPU-level atomic operations
type AtomicCounter struct {
value int64 // Must be 64-bit aligned
}
func (c *AtomicCounter) Inc() {
atomic.AddInt64(&c.value, 1) // Single CPU instruction, no locks!
}
func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
|
Why Atomic? Uses special CPU instructions that guarantee the operation completes without interruption. Much faster than mutex for simple operations.
Common Pitfall: Only works for simple types (int32, int64, etc.) and simple operations. Can’t protect complex logic.
How to Pass Messages
Channels - The Go Way
Channels are typed conduits for passing messages between goroutines:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package main
// Channels are typed pipes for communication
func basicChannel() {
ch := make(chan string) // Unbuffered channel
go func() {
ch <- "Hello" // Send blocks until someone receives
}()
msg := <-ch // Receive blocks until someone sends
println(msg)
}
// Buffered channels don't block immediately
func bufferedChannel() {
ch := make(chan int, 3) // Can hold 3 values
ch <- 1 // Doesn't block
ch <- 2 // Doesn't block
ch <- 3 // Doesn't block
// ch <- 4 // Would block! Buffer is full
}
|
Important: Channels do NOT broadcast values! Each value is received by exactly one goroutine.
Common Pitfall: Sending on a closed channel panics. Always ensure proper channel lifecycle management.
Select - Handling Multiple Channels
Select lets you work with multiple channels without blocking:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package main
import "time"
func selectExample() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "from ch2"
}()
// Select waits on multiple channels
select {
case msg1 := <-ch1:
println("Got:", msg1)
case msg2 := <-ch2:
println("Got:", msg2)
case <-time.After(50 * time.Millisecond):
println("Timeout!")
}
}
|
Why Select? Prevents goroutine starvation and enables timeout handling.
How to Cancel Work
Cancel by Signal Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package main
// Worker that stops when signaled
type Worker struct {
stop chan struct{} // Empty struct takes no memory
}
func (w *Worker) Start() {
w.stop = make(chan struct{})
go w.run()
}
func (w *Worker) Stop() {
close(w.stop) // Closing broadcasts to all receivers!
}
func (w *Worker) run() {
for {
select {
case <-w.stop:
println("Stopping worker")
return
default:
// Do work
println("Working...")
// Don't forget to sleep or you'll burn CPU!
time.Sleep(100 * time.Millisecond)
}
}
}
|
Key Insight: Reading from a closed channel returns immediately with zero value. This makes close() a perfect broadcast signal!
Cancel by Timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
package main
import "time"
func timeoutWork() {
done := make(chan bool)
go func() {
// Simulate work
time.Sleep(2 * time.Second)
done <- true
}()
select {
case <-done:
println("Work completed")
case <-time.After(1 * time.Second):
println("Work timed out")
// Note: goroutine is still running! Need proper cleanup
}
}
|
Common Pitfall: time.After creates a new timer each time. In loops, use time.NewTimer and reset it.
Cancel with Context
Context is the standard way to propagate cancellation through function calls:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package main
import (
"context"
"time"
)
type ContextWorker struct{}
func (w *ContextWorker) DoWork(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err() // Returns context.Canceled or context.DeadlineExceeded
default:
// Do actual work
println("Working with context...")
time.Sleep(100 * time.Millisecond)
}
}
}
func useContextWorker() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // Always call cancel to free resources
worker := &ContextWorker{}
err := worker.DoWork(ctx)
if err != nil {
println("Worker stopped:", err.Error())
}
}
|
Why Context? It’s composable - you can chain cancellations through your entire call stack.
Applying: Per-User Request Counters
Let’s build a concurrent counter system that tracks requests per user:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
package main
import (
"sync"
"sync/atomic"
)
// UserCounters manages per-user request counting
type UserCounters struct {
mu sync.RWMutex
counters map[string]*int64
}
func NewUserCounters() *UserCounters {
return &UserCounters{
counters: make(map[string]*int64),
}
}
func (uc *UserCounters) Inc(userID string) {
// Check if counter exists (read lock is enough)
uc.mu.RLock()
counter, exists := uc.counters[userID]
uc.mu.RUnlock()
if exists {
atomic.AddInt64(counter, 1)
return
}
// Need to create counter (write lock required)
uc.mu.Lock()
defer uc.mu.Unlock()
// Double-check after acquiring write lock (another goroutine might have created it)
if counter, exists := uc.counters[userID]; exists {
atomic.AddInt64(counter, 1)
return
}
// Create new counter
var newCounter int64 = 1
uc.counters[userID] = &newCounter
}
func (uc *UserCounters) Get(userID string) int64 {
uc.mu.RLock()
defer uc.mu.RUnlock()
if counter, exists := uc.counters[userID]; exists {
return atomic.LoadInt64(counter)
}
return 0
}
func (uc *UserCounters) GetAll() map[string]int64 {
uc.mu.RLock()
defer uc.mu.RUnlock()
result := make(map[string]int64, len(uc.counters))
for userID, counter := range uc.counters {
result[userID] = atomic.LoadInt64(counter)
}
return result
}
|
Design Choice: We use RWMutex for the map and atomic for counters - best of both worlds! Map structure is protected, but increments are lock-free.
Applying: Dump Counters Worker
Now let’s periodically dump our counters while handling cancellation:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package main
import (
"context"
"fmt"
"time"
)
// CounterDumper periodically outputs counter values
type CounterDumper struct {
counters *UserCounters
interval time.Duration
}
func NewCounterDumper(counters *UserCounters, interval time.Duration) *CounterDumper {
return &CounterDumper{
counters: counters,
interval: interval,
}
}
func (cd *CounterDumper) Start(ctx context.Context) {
ticker := time.NewTicker(cd.interval)
defer ticker.Stop() // Clean up ticker
for {
select {
case <-ctx.Done():
fmt.Println("Dumper stopped:", ctx.Err())
return
case <-ticker.C:
cd.dump()
}
}
}
func (cd *CounterDumper) dump() {
snapshot := cd.counters.GetAll()
if len(snapshot) == 0 {
fmt.Println("No counters to dump")
return
}
fmt.Println("=== Counter Dump ===")
for userID, count := range snapshot {
fmt.Printf("User %s: %d requests\n", userID, count)
}
fmt.Println("==================")
}
|
Pattern: The ticker-select combo is idiomatic Go for periodic tasks with cancellation.
Non-blocking Counter Worker (Resource Keeper Pattern)
Sometimes you need a dedicated goroutine managing a resource:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package main
// ResourceKeeper manages a resource in a dedicated goroutine
type ResourceKeeper struct {
requests chan request
stop chan struct{}
}
type request struct {
op string // "inc", "get", "getall"
userID string
response chan interface{}
}
func NewResourceKeeper() *ResourceKeeper {
rk := &ResourceKeeper{
requests: make(chan request),
stop: make(chan struct{}),
}
go rk.run()
return rk
}
func (rk *ResourceKeeper) run() {
counters := make(map[string]int64)
for {
select {
case <-rk.stop:
return
case req := <-rk.requests:
switch req.op {
case "inc":
counters[req.userID]++
req.response <- nil
case "get":
req.response <- counters[req.userID]
case "getall":
snapshot := make(map[string]int64)
for k, v := range counters {
snapshot[k] = v
}
req.response <- snapshot
}
}
}
}
func (rk *ResourceKeeper) Inc(userID string) {
resp := make(chan interface{})
select {
case rk.requests <- request{op: "inc", userID: userID, response: resp}:
<-resp
case <-time.After(100 * time.Millisecond):
// Handle timeout - request queue might be full
println("Inc request timed out")
}
}
func (rk *ResourceKeeper) Stop() {
close(rk.stop)
}
|
Why This Pattern? Single goroutine owns the data - no locks needed! All access is serialized through channels.
How to Broadcast
Broadcasting a Signal
Closing a channel broadcasts a signal to all listeners:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
package main
// Broadcaster sends signals to multiple listeners
type SignalBroadcaster struct {
listeners []chan struct{}
mu sync.Mutex
}
func (sb *SignalBroadcaster) Subscribe() <-chan struct{} {
sb.mu.Lock()
defer sb.mu.Unlock()
ch := make(chan struct{})
sb.listeners = append(sb.listeners, ch)
return ch
}
func (sb *SignalBroadcaster) Broadcast() {
sb.mu.Lock()
defer sb.mu.Unlock()
for _, ch := range sb.listeners {
close(ch)
}
sb.listeners = nil // Reset for next broadcast
}
|
Broadcasting Messages (Pub-Sub)
For actual values, we need a different approach:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package main
import "sync"
// MessageBroadcaster sends messages to multiple subscribers
type MessageBroadcaster struct {
subscribers map[int]chan string
nextID int
mu sync.RWMutex
}
func NewMessageBroadcaster() *MessageBroadcaster {
return &MessageBroadcaster{
subscribers: make(map[int]chan string),
}
}
func (mb *MessageBroadcaster) Subscribe() (int, <-chan string) {
mb.mu.Lock()
defer mb.mu.Unlock()
id := mb.nextID
mb.nextID++
ch := make(chan string, 10) // Buffered to prevent slow readers from blocking
mb.subscribers[id] = ch
return id, ch
}
func (mb *MessageBroadcaster) Unsubscribe(id int) {
mb.mu.Lock()
defer mb.mu.Unlock()
if ch, ok := mb.subscribers[id]; ok {
close(ch)
delete(mb.subscribers, id)
}
}
func (mb *MessageBroadcaster) Broadcast(msg string) {
mb.mu.RLock()
defer mb.mu.RUnlock()
for _, ch := range mb.subscribers {
select {
case ch <- msg:
// Sent successfully
default:
// Channel is full, skip this subscriber
// In production, you might want to handle this differently
}
}
}
|
Key Pattern: Non-blocking send with select/default prevents slow subscribers from blocking the broadcaster.
Applying: Real-time Messaging Chat
Let’s build a simple chat system using our broadcast pattern:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
package main
import (
"fmt"
"sync"
)
// Chat manages a real-time messaging system
type Chat struct {
messages *MessageBroadcaster
users map[string]int // username -> subscriber ID
mu sync.RWMutex
}
func NewChat() *Chat {
return &Chat{
messages: NewMessageBroadcaster(),
users: make(map[string]int),
}
}
func (c *Chat) Join(username string) (<-chan string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.users[username]; exists {
return nil, fmt.Errorf("user %s already in chat", username)
}
id, ch := c.messages.Subscribe()
c.users[username] = id
// Announce join
go c.messages.Broadcast(fmt.Sprintf("%s joined the chat", username))
return ch, nil
}
func (c *Chat) Leave(username string) {
c.mu.Lock()
defer c.mu.Unlock()
if id, exists := c.users[username]; exists {
c.messages.Unsubscribe(id)
delete(c.users, username)
// Announce leave
go c.messages.Broadcast(fmt.Sprintf("%s left the chat", username))
}
}
func (c *Chat) Post(username, message string) error {
c.mu.RLock()
_, exists := c.users[username]
c.mu.RUnlock()
if !exists {
return fmt.Errorf("user %s not in chat", username)
}
formatted := fmt.Sprintf("%s: %s", username, message)
c.messages.Broadcast(formatted)
return nil
}
// Client represents a chat user
type ChatClient struct {
username string
chat *Chat
messages <-chan string
stop chan struct{}
}
func (c *Chat) NewClient(username string) (*ChatClient, error) {
messages, err := c.Join(username)
if err != nil {
return nil, err
}
client := &ChatClient{
username: username,
chat: c,
messages: messages,
stop: make(chan struct{}),
}
go client.listen()
return client, nil
}
func (cc *ChatClient) listen() {
for {
select {
case msg, ok := <-cc.messages:
if !ok {
return // Channel closed, we've been disconnected
}
fmt.Printf("[%s received]: %s\n", cc.username, msg)
case <-cc.stop:
return
}
}
}
func (cc *ChatClient) Send(message string) {
cc.chat.Post(cc.username, message)
}
func (cc *ChatClient) Leave() {
close(cc.stop)
cc.chat.Leave(cc.username)
}
|
Design Note: Each client gets their own channel and goroutine. This isolates slow clients and makes the system resilient.
Aggregating Results from Parallel Work
When you need to wait for multiple goroutines to complete:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
package main
import (
"fmt"
"sync"
"time"
)
// ParallelProcessor processes items concurrently and aggregates results
type ParallelProcessor struct {
workers int
}
func NewParallelProcessor(workers int) *ParallelProcessor {
return &ParallelProcessor{workers: workers}
}
func (pp *ParallelProcessor) Process(items []string) []string {
var wg sync.WaitGroup
results := make([]string, len(items))
// Create a channel for work distribution
work := make(chan int, len(items))
for i := range items {
work <- i
}
close(work)
// Start workers
wg.Add(pp.workers)
for w := 0; w < pp.workers; w++ {
go func() {
defer wg.Done()
for i := range work {
// Simulate processing
time.Sleep(10 * time.Millisecond)
results[i] = fmt.Sprintf("processed: %s", items[i])
}
}()
}
wg.Wait()
return results
}
// Alternative: Using channels for results
func (pp *ParallelProcessor) ProcessWithChannels(items []string) []string {
type result struct {
index int
value string
}
resultChan := make(chan result, len(items))
var wg sync.WaitGroup
wg.Add(len(items))
for i, item := range items {
go func(idx int, val string) {
defer wg.Done()
// Simulate processing
time.Sleep(10 * time.Millisecond)
resultChan <- result{
index: idx,
value: fmt.Sprintf("processed: %s", val),
}
}(i, item)
}
wg.Wait()
close(resultChan)
// Collect results
results := make([]string, len(items))
for r := range resultChan {
results[r.index] = r.value
}
return results
}
|
Common Pitfall: Don’t forget to pass loop variables as parameters to goroutines, or you’ll have a race condition!
Applying: Worker Pool with Backpressure and Graceful Shutdown
Here’s a production-ready worker pool with all the good stuff:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
|
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
var ErrQueueFull = errors.New("queue is full")
var ErrShuttingDown = errors.New("worker pool is shutting down")
// Job represents a unit of work
type Job struct {
ID string
Payload interface{}
}
// WorkerPool manages a pool of workers with backpressure handling
type WorkerPool struct {
workers int
queue chan Job
results chan Result
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
shutdownMu sync.RWMutex
isShutdown bool
}
type Result struct {
JobID string
Data interface{}
Error error
}
func NewWorkerPool(workers, queueSize int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
wp := &WorkerPool{
workers: workers,
queue: make(chan Job, queueSize),
results: make(chan Result, queueSize),
ctx: ctx,
cancel: cancel,
}
wp.start()
return wp
}
func (wp *WorkerPool) start() {
wp.wg.Add(wp.workers)
for i := 0; i < wp.workers; i++ {
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for {
select {
case <-wp.ctx.Done():
fmt.Printf("Worker %d shutting down\n", id)
return
case job, ok := <-wp.queue:
if !ok {
fmt.Printf("Worker %d: queue closed\n", id)
return
}
// Process job with simulated work
result := wp.processJob(job)
// Try to send result, but don't block on shutdown
select {
case wp.results <- result:
case <-wp.ctx.Done():
return
}
}
}
}
func (wp *WorkerPool) processJob(job Job) Result {
// Simulate processing with possible failure
time.Sleep(50 * time.Millisecond)
// Simulate 10% failure rate
if time.Now().UnixNano()%10 == 0 {
return Result{
JobID: job.ID,
Error: fmt.Errorf("job %s failed", job.ID),
}
}
return Result{
JobID: job.ID,
Data: fmt.Sprintf("Processed: %v", job.Payload),
}
}
// Submit adds a job with backpressure handling
func (wp *WorkerPool) Submit(job Job) error {
wp.shutdownMu.RLock()
if wp.isShutdown {
wp.shutdownMu.RUnlock()
return ErrShuttingDown
}
wp.shutdownMu.RUnlock()
select {
case wp.queue <- job:
return nil
default:
// Queue is full - backpressure!
return ErrQueueFull
}
}
// SubmitWithRetry attempts to submit with exponential backoff
func (wp *WorkerPool) SubmitWithRetry(job Job, maxRetries int) error {
backoff := 10 * time.Millisecond
for i := 0; i < maxRetries; i++ {
err := wp.Submit(job)
if err == nil {
return nil
}
if err == ErrShuttingDown {
return err
}
// Exponential backoff for queue full
if err == ErrQueueFull {
time.Sleep(backoff)
backoff *= 2
if backoff > time.Second {
backoff = time.Second
}
continue
}
}
return ErrQueueFull
}
// Results returns the results channel for processing
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
// Shutdown gracefully stops the worker pool
func (wp *WorkerPool) Shutdown(timeout time.Duration) error {
wp.shutdownMu.Lock()
wp.isShutdown = true
wp.shutdownMu.Unlock()
// Stop accepting new jobs
close(wp.queue)
// Create timeout context
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// Wait for workers to finish
done := make(chan struct{})
go func() {
wp.wg.Wait()
close(done)
}()
select {
case <-done:
// All workers finished
wp.cancel()
close(wp.results)
return nil
case <-ctx.Done():
// Timeout - force shutdown
wp.cancel()
return fmt.Errorf("shutdown timeout after %v", timeout)
}
}
// Usage example
func ExampleWorkerPool() {
pool := NewWorkerPool(5, 100)
// Start result processor
go func() {
for result := range pool.Results() {
if result.Error != nil {
fmt.Printf("Job %s failed: %v\n", result.JobID, result.Error)
} else {
fmt.Printf("Job %s succeeded: %v\n", result.JobID, result.Data)
}
}
}()
// Submit jobs
for i := 0; i < 20; i++ {
job := Job{
ID: fmt.Sprintf("job-%d", i),
Payload: fmt.Sprintf("data-%d", i),
}
err := pool.SubmitWithRetry(job, 3)
if err != nil {
fmt.Printf("Failed to submit job %s: %v\n", job.ID, err)
}
}
// Graceful shutdown
if err := pool.Shutdown(5 * time.Second); err != nil {
fmt.Printf("Shutdown error: %v\n", err)
}
}
|
Production Features:
- Backpressure: Returns error when queue is full instead of blocking
- Exponential Backoff: Retry logic with increasing delays
- Graceful Shutdown: Waits for in-flight work to complete
- Timeout Protection: Forces shutdown after timeout
- Error Handling: Propagates job processing errors
Key Patterns Used:
- Non-blocking channel writes with select/default
- Context for cancellation propagation
- WaitGroup for synchronization
- RWMutex for shutdown state protection
Conclusion
You’ve just learned the concurrency patterns that separate “I know Go syntax” from “I can build production systems in Go.” These aren’t academic exercises - they’re battle-tested patterns I’ve seen succeed (and fail) in real production systems.
Remember:
- Start simple (mutex/atomic for shared state)
- Channels for communication, not sharing memory
- Context for cancellation propagation
- Always handle the unhappy path (timeouts, backpressure, shutdown)
- Test with
go test -race
always
The next time someone asks you “How do you broadcast a message to many goroutines?” you won’t just know the answer - you’ll have several solutions and know when to use each one.
Now go forth and write concurrent code that doesn’t keep you up at night!