Skip to content

🔄 Channels & Concurrency Patterns

Channels là "pipes" kết nối goroutines.
Data flows như nước — sender đẩy vào, receiver lấy ra.

📡 Channels Anatomy

Unbuffered vs Buffered

go
// Unbuffered - BLOCKING (synchronous)
ch := make(chan int)
// Sender blocks until receiver is ready
// Receiver blocks until sender sends

// Buffered - NON-BLOCKING until full
ch := make(chan int, 10)  // Buffer size 10
// Sender blocks only when buffer is FULL
// Receiver blocks only when buffer is EMPTY

Visual Comparison

┌─────────────────────────────────────────────────────────────────────┐
│                 Unbuffered vs Buffered Channels                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   UNBUFFERED (make(chan int)):                                     │
│   ┌─────────┐          ┌─────────┐                                 │
│   │ Sender  │──────────│Receiver │                                 │
│   │ BLOCKS  │ ◄──────► │ BLOCKS  │                                 │
│   └─────────┘          └─────────┘                                 │
│   → Handshake: Both must be ready simultaneously                   │
│                                                                     │
│   ─────────────────────────────────────────────────────────────     │
│                                                                     │
│   BUFFERED (make(chan int, 3)):                                    │
│   ┌─────────┐   ┌───┬───┬───┐   ┌─────────┐                        │
│   │ Sender  │──►│ 1 │ 2 │   │──►│Receiver │                        │
│   │         │   └───┴───┴───┘   │         │                        │
│   └─────────┘     Buffer (3)    └─────────┘                        │
│   → Sender blocks only when buffer FULL                            │
│   → Receiver blocks only when buffer EMPTY                         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

The Pipe Analogy

┌─────────────────────────────────────────────────────────────────────┐
│                    Channel as a Water Pipe                         │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Faucet (Sender)        Pipe (Channel)         Bucket (Receiver)  │
│   ┌─────┐           ════════════════════           ┌─────┐         │
│   │ 💧  │ ────────► ════════════════════ ────────► │     │         │
│   │ 💧  │           ════════════════════           │ 💧  │         │
│   └─────┘           ════════════════════           └─────┘         │
│                                                                     │
│   • Data flows ONE direction                                        │
│   • Pipe can hold some water (buffered)                            │
│   • If bucket full, faucet waits                                    │
│   • If pipe empty, bucket waits                                     │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Channel Ownership

📌 HPN Standard: Sender Closes Channel

go
// ✅ CORRECT: Sender closes
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)  // Sender closes when done
}

func consumer(ch <-chan int) {
    for v := range ch {  // Range exits when channel closed
        process(v)
    }
}

// ❌ WRONG: Receiver closes
// → Sender will panic trying to send to closed channel!

Rule: Whoever CREATES data, CLOSES the channel.


🔀 Control Flow: select

Handling Multiple Channels

go
// select = "switch case" for channels
func multiplexer(ch1, ch2, ch3 <-chan Event) {
    for {
        select {
        case e := <-ch1:
            handleEvent1(e)
        case e := <-ch2:
            handleEvent2(e)
        case e := <-ch3:
            handleEvent3(e)
        }
    }
}

Timeout Pattern

go
func fetchWithTimeout(url string, timeout time.Duration) ([]byte, error) {
    resultCh := make(chan []byte, 1)
    errCh := make(chan error, 1)
    
    go func() {
        data, err := fetch(url)
        if err != nil {
            errCh <- err
            return
        }
        resultCh <- data
    }()
    
    // Wait for result OR timeout
    select {
    case data := <-resultCh:
        return data, nil
    case err := <-errCh:
        return nil, err
    case <-time.After(timeout):
        return nil, fmt.Errorf("timeout after %v", timeout)
    }
}

Non-Blocking Operations

go
// Try to send, but don't block
select {
case ch <- value:
    fmt.Println("Sent!")
default:
    fmt.Println("Channel full, skipping...")
}

// Try to receive, but don't block
select {
case v := <-ch:
    fmt.Println("Received:", v)
default:
    fmt.Println("Channel empty")
}

🏭 Enterprise Patterns

Worker Pool: Limiting Concurrency

🎓 Professor Tom's Deep Dive: Why Limit Workers?

Problem: 1000 jobs, each hits database → 1000 concurrent DB connections = 💥 DB crashes

Solution: Worker Pool với 5 workers → Max 5 concurrent DB connections at any time

go
func WorkerPool(jobs <-chan Job, numWorkers int) <-chan Result {
    results := make(chan Result, len(jobs))
    
    var wg sync.WaitGroup
    
    // Start fixed number of workers
    for w := 0; w < numWorkers; w++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for job := range jobs {
                result := process(job)
                results <- result
            }
        }(w)
    }
    
    // Close results when all workers done
    go func() {
        wg.Wait()
        close(results)
    }()
    
    return results
}

// Usage
func main() {
    jobs := make(chan Job, 1000)
    
    // Fill jobs
    for i := 0; i < 1000; i++ {
        jobs <- Job{ID: i}
    }
    close(jobs)
    
    // Process with 5 workers (protect DB!)
    results := WorkerPool(jobs, 5)
    
    for result := range results {
        fmt.Println(result)
    }
}
┌─────────────────────────────────────────────────────────────────────┐
│                      WORKER POOL PATTERN                           │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Jobs Queue (1000 jobs)                                           │
│   ┌─────────────────────────────────────────────────────────────┐  │
│   │ J1 │ J2 │ J3 │ J4 │ J5 │ J6 │ ... │ J1000                  │  │
│   └───────────────────────────┬─────────────────────────────────┘  │
│                               │                                     │
│                               ▼                                     │
│   ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐                          │
│   │ W1  │ │ W2  │ │ W3  │ │ W4  │ │ W5  │  ← 5 Workers only       │
│   └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘                          │
│      │       │       │       │       │                              │
│      └───────┴───────┴───────┴───────┘                              │
│                      │                                              │
│                      ▼                                              │
│   ┌─────────────────────────────────────────────────────────────┐  │
│   │ R1 │ R2 │ R3 │ ... │                              Results   │  │
│   └─────────────────────────────────────────────────────────────┘  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Fan-Out / Fan-In

go
// Fan-Out: One source → Multiple workers
func FanOut(source <-chan int, numWorkers int) []<-chan int {
    outputs := make([]<-chan int, numWorkers)
    
    for i := 0; i < numWorkers; i++ {
        outputs[i] = worker(source)
    }
    
    return outputs
}

// Fan-In: Multiple sources → One destination
func FanIn(inputs ...<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup
    
    for _, in := range inputs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for v := range ch {
                output <- v
            }
        }(in)
    }
    
    go func() {
        wg.Wait()
        close(output)
    }()
    
    return output
}
┌─────────────────────────────────────────────────────────────────────┐
│                    FAN-OUT / FAN-IN PATTERN                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   FAN-OUT (distribute work):                                       │
│                     ┌─────────┐                                    │
│                ┌───►│ Worker1 │───┐                                │
│   ┌────────┐   │    └─────────┘   │    ┌────────┐                  │
│   │ Source │───┼───►│ Worker2 │───┼───►│ FanIn  │                  │
│   └────────┘   │    └─────────┘   │    └────────┘                  │
│                └───►│ Worker3 │───┘                                │
│                     └─────────┘                                    │
│                                                                     │
│   FAN-IN (collect results):                                        │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Generator Pattern

go
// Generator: Function that returns a channel
func GenerateNumbers(start, end int) <-chan int {
    out := make(chan int)
    
    go func() {
        defer close(out)
        for i := start; i <= end; i++ {
            out <- i
        }
    }()
    
    return out  // Return channel immediately
}

// Usage - very clean!
for n := range GenerateNumbers(1, 100) {
    fmt.Println(n)
}

🛑 Graceful Shutdown

Using context.Context

go
func Worker(ctx context.Context, jobs <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Shutting down gracefully...")
            return  // Exit cleanly
        case job := <-jobs:
            process(job)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    jobs := make(chan Job, 100)
    
    // Start workers
    for i := 0; i < 5; i++ {
        go Worker(ctx, jobs)
    }
    
    // Handle SIGINT/SIGTERM
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    
    <-sigCh           // Wait for signal
    cancel()          // Cancel context → workers exit
    time.Sleep(time.Second)  // Give workers time to cleanup
}

Using Done Channel

go
func Worker(done <-chan struct{}, jobs <-chan Job) {
    for {
        select {
        case <-done:
            return  // Exit when done is closed
        case job := <-jobs:
            process(job)
        }
    }
}

func main() {
    done := make(chan struct{})
    
    // Start workers with done channel
    go Worker(done, jobs)
    
    // Shutdown: close done channel
    close(done)  // All workers will exit
}

🎮 Architecture Challenge

🧩 Design Challenge: Image Processing Pipeline

Requirement: Download 100 images, resize them (using 4 workers), save to disk.

Constraints:

  • Max 10 concurrent downloads (network limit)
  • Max 4 concurrent resize operations (CPU intensive)
  • No image should be lost if system crashes mid-process
💡 Architecture Solution
go
package main

import (
    "context"
    "sync"
)

type Image struct {
    URL      string
    Data     []byte
    Resized  []byte
    Filename string
}

// Stage 1: Download (10 workers)
func Downloader(ctx context.Context, urls <-chan string) <-chan Image {
    out := make(chan Image)
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {  // 10 concurrent downloads
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range urls {
                select {
                case <-ctx.Done():
                    return
                default:
                    data := download(url)
                    out <- Image{URL: url, Data: data}
                }
            }
        }()
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// Stage 2: Resize (4 workers - CPU intensive)
func Resizer(ctx context.Context, images <-chan Image) <-chan Image {
    out := make(chan Image)
    var wg sync.WaitGroup
    
    for i := 0; i < 4; i++ {  // 4 CPU workers
        wg.Add(1)
        go func() {
            defer wg.Done()
            for img := range images {
                select {
                case <-ctx.Done():
                    return
                default:
                    img.Resized = resize(img.Data)
                    out <- img
                }
            }
        }()
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// Stage 3: Save (sequential for disk I/O)
func Saver(ctx context.Context, images <-chan Image) <-chan string {
    out := make(chan string)
    
    go func() {
        defer close(out)
        for img := range images {
            select {
            case <-ctx.Done():
                return
            default:
                filename := save(img.Resized)
                out <- filename
            }
        }
    }()
    
    return out
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Create URL source
    urls := make(chan string, 100)
    for i := 0; i < 100; i++ {
        urls <- fmt.Sprintf("https://example.com/image%d.jpg", i)
    }
    close(urls)
    
    // Pipeline: URLs → Download → Resize → Save
    downloaded := Downloader(ctx, urls)
    resized := Resizer(ctx, downloaded)
    saved := Saver(ctx, resized)
    
    // Collect results
    for filename := range saved {
        fmt.Println("Saved:", filename)
    }
}
)}}

Pipeline visualization:

┌─────────────────────────────────────────────────────────────────────┐
│                    IMAGE PROCESSING PIPELINE                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   URLs (100)                                                        │
│   ┌─────────────────────────────────────────────────────────────┐  │
│   │ url1 │ url2 │ url3 │ ... │ url100                          │  │
│   └───────────────────────────┬─────────────────────────────────┘  │
│                               ▼                                     │
│   ┌─────────────────────────────────────────────────────────────┐  │
│   │              DOWNLOAD STAGE (10 workers)                    │  │
│   │  D1 D2 D3 D4 D5 D6 D7 D8 D9 D10                            │  │
│   └───────────────────────────┬─────────────────────────────────┘  │
│                               ▼                                     │
│   ┌─────────────────────────────────────────────────────────────┐  │
│   │              RESIZE STAGE (4 workers)                       │  │
│   │  R1   R2   R3   R4                                         │  │
│   └───────────────────────────┬─────────────────────────────────┘  │
│                               ▼                                     │
│   ┌─────────────────────────────────────────────────────────────┐  │
│   │              SAVE STAGE (sequential)                        │  │
│   │  S1                                                         │  │
│   └───────────────────────────┬─────────────────────────────────┘  │
│                               ▼                                     │
│   ┌─────────────────────────────────────────────────────────────┐  │
│   │ file1.jpg │ file2.jpg │ ... │ file100.jpg                  │  │
│   └─────────────────────────────────────────────────────────────┘  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

📊 Summary

PatternUse Case
Unbuffered ChannelSynchronization, handshake
Buffered ChannelDecouple producer/consumer speed
selectHandle multiple channels, timeouts
Worker PoolLimit concurrent operations
Fan-Out/Fan-InDistribute work, collect results
GeneratorLazy sequence generation
Context CancellationGraceful shutdown

➡️ Tiếp theo

Patterns nắm vững rồi! Quay lại: Concurrency Foundations - CSP philosophy, GMP scheduler, và sync primitives.