Skip to content

🔄 Advanced Concurrency Patterns

"Concurrency is not parallelism." — Rob Pike
Advanced patterns cho những system phức tạp nhất.

🎯 Pattern Overview

Basic Patterns (đã học)     →    Advanced Patterns (chương này)
├── Worker Pool             │    ├── Or-Done Channel
├── Fan-Out/Fan-In          │    ├── Tee Channel
├── Pipeline                │    ├── Bridge Channel
└── Context Cancellation    │    ├── Rate Limiting
                            │    ├── Backpressure
                            │    └── Graceful Degradation

🛡️ Or-Done Channel

Problem

Khi bạn đọc từ channel nhưng cũng cần respect context cancellation:

go
// ❌ Clunky: Check done trong select mỗi lần
for {
    select {
    case <-done:
        return
    case v, ok := <-c:
        if !ok {
            return
        }
        // Process v
    }
}

Solution: Or-Done Pattern

go
// orDone wraps a channel to respect context cancellation
func orDone[T any](ctx context.Context, c <-chan T) <-chan T {
    out := make(chan T)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

// Usage - clean, readable
func process(ctx context.Context, data <-chan int) {
    for v := range orDone(ctx, data) {
        fmt.Println(v)
    }
}

🔀 Tee Channel

Problem

Cần gửi cùng data đến nhiều consumers:

go
// ❌ BAD: Data chỉ đến một consumer
for v := range input {
    consumer1 <- v  // Consumer 2 không nhận được!
}

Solution: Tee Pattern

go
// tee splits one channel into two
func tee[T any](ctx context.Context, in <-chan T) (<-chan T, <-chan T) {
    out1 := make(chan T)
    out2 := make(chan T)
    
    go func() {
        defer close(out1)
        defer close(out2)
        
        for v := range orDone(ctx, in) {
            // Local copies for select
            var o1, o2 = out1, out2
            
            // Send to both (non-blocking on already sent)
            for i := 0; i < 2; i++ {
                select {
                case <-ctx.Done():
                    return
                case o1 <- v:
                    o1 = nil  // Disable this case after sending
                case o2 <- v:
                    o2 = nil
                }
            }
        }
    }()
    
    return out1, out2
}

// Usage
func main() {
    ctx := context.Background()
    
    input := generateNumbers(ctx, 10)
    
    // Split into two streams
    forLogger, forProcessor := tee(ctx, input)
    
    // Both receive same data
    go logNumbers(forLogger)
    processNumbers(forProcessor)
}

🌉 Bridge Channel

Problem

Có channel of channels, cần flatten thành single channel:

go
// Input: <-chan <-chan int (channel of channels)
// Want:  <-chan int (flattened)

Solution: Bridge Pattern

go
// bridge flattens a channel of channels
func bridge[T any](ctx context.Context, chanStream <-chan <-chan T) <-chan T {
    out := make(chan T)
    
    go func() {
        defer close(out)
        
        for {
            var stream <-chan T
            select {
            case <-ctx.Done():
                return
            case maybeStream, ok := <-chanStream:
                if !ok {
                    return
                }
                stream = maybeStream
            }
            
            // Drain current stream
            for v := range orDone(ctx, stream) {
                select {
                case out <- v:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    
    return out
}

// Usage
func main() {
    ctx := context.Background()
    
    // Generate sequence of channels
    chanStream := make(chan (<-chan int))
    go func() {
        defer close(chanStream)
        for i := 0; i < 3; i++ {
            chanStream <- generateBatch(ctx, i*10, 10)
        }
    }()
    
    // Flatten into single stream
    for v := range bridge(ctx, chanStream) {
        fmt.Println(v)
    }
    // Prints: 0,1,2...9, 10,11...19, 20,21...29
}

⏱️ Rate Limiting

Token Bucket Pattern

go
import "golang.org/x/time/rate"

// rate.Limiter implements token bucket algorithm
func rateLimitedProcessor(ctx context.Context, items <-chan Item) error {
    // 10 requests per second, burst of 20
    limiter := rate.NewLimiter(rate.Limit(10), 20)
    
    for item := range items {
        // Block until allowed
        if err := limiter.Wait(ctx); err != nil {
            return err
        }
        
        process(item)
    }
    return nil
}

// Per-key rate limiting
type PerKeyLimiter struct {
    mu       sync.Mutex
    limiters map[string]*rate.Limiter
    rate     rate.Limit
    burst    int
}

func (l *PerKeyLimiter) Allow(key string) bool {
    l.mu.Lock()
    limiter, ok := l.limiters[key]
    if !ok {
        limiter = rate.NewLimiter(l.rate, l.burst)
        l.limiters[key] = limiter
    }
    l.mu.Unlock()
    
    return limiter.Allow()
}

// Usage
limiter := &PerKeyLimiter{
    limiters: make(map[string]*rate.Limiter),
    rate:     rate.Limit(1),  // 1 per second per key
    burst:    5,
}

if !limiter.Allow(userID) {
    return ErrRateLimited
}

Sliding Window

go
type SlidingWindowLimiter struct {
    mu          sync.Mutex
    windowSize  time.Duration
    maxRequests int
    timestamps  []time.Time
}

func (l *SlidingWindowLimiter) Allow() bool {
    l.mu.Lock()
    defer l.mu.Unlock()
    
    now := time.Now()
    windowStart := now.Add(-l.windowSize)
    
    // Remove expired timestamps
    valid := l.timestamps[:0]
    for _, ts := range l.timestamps {
        if ts.After(windowStart) {
            valid = append(valid, ts)
        }
    }
    l.timestamps = valid
    
    if len(l.timestamps) >= l.maxRequests {
        return false
    }
    
    l.timestamps = append(l.timestamps, now)
    return true
}

🌊 Backpressure Handling

Problem

Producer faster than consumer → memory exhaustion

Solution: Bounded Channels + Dropping

go
// Strategy 1: Block producer (bounded channel)
func blockingPipeline(ctx context.Context) {
    // Producer blocks when buffer full
    ch := make(chan Event, 1000)
    
    go produce(ctx, ch)
    go consume(ctx, ch)
}

// Strategy 2: Drop oldest (ring buffer)
type RingBuffer[T any] struct {
    mu    sync.Mutex
    data  []T
    head  int
    tail  int
    count int
}

func (r *RingBuffer[T]) Push(v T) {
    r.mu.Lock()
    defer r.mu.Unlock()
    
    r.data[r.head] = v
    r.head = (r.head + 1) % len(r.data)
    
    if r.count == len(r.data) {
        r.tail = (r.tail + 1) % len(r.data)  // Overwrite oldest
    } else {
        r.count++
    }
}

// Strategy 3: Drop newest (non-blocking send)
func tryEnqueue[T any](ch chan<- T, v T) bool {
    select {
    case ch <- v:
        return true
    default:
        return false  // Drop if full
    }
}

// Strategy 4: Sample (process every Nth item)
func samplePipeline[T any](ctx context.Context, in <-chan T, sampleRate int) <-chan T {
    out := make(chan T)
    go func() {
        defer close(out)
        count := 0
        for v := range orDone(ctx, in) {
            count++
            if count%sampleRate == 0 {
                out <- v
            }
        }
    }()
    return out
}

Adaptive Backpressure

go
type AdaptiveProcessor struct {
    targetLatency time.Duration
    minWorkers    int
    maxWorkers    int
    
    workers   int
    latencies []time.Duration
    mu        sync.Mutex
}

func (p *AdaptiveProcessor) recordLatency(d time.Duration) {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    p.latencies = append(p.latencies, d)
    if len(p.latencies) > 100 {
        p.latencies = p.latencies[1:]
    }
}

func (p *AdaptiveProcessor) adjust() {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    if len(p.latencies) < 10 {
        return
    }
    
    // Calculate P95
    sorted := make([]time.Duration, len(p.latencies))
    copy(sorted, p.latencies)
    sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] })
    p95 := sorted[int(float64(len(sorted))*0.95)]
    
    // Scale workers based on P95
    if p95 > p.targetLatency && p.workers < p.maxWorkers {
        p.workers++
        // Spawn new worker
    } else if p95 < p.targetLatency/2 && p.workers > p.minWorkers {
        p.workers--
        // Signal worker to stop
    }
}

🛡️ Graceful Degradation

Circuit Breaker Pattern

go
type CircuitBreaker struct {
    mu sync.Mutex
    
    failures     int
    successes    int
    state        string  // "closed", "open", "half-open"
    lastFailure  time.Time
    
    threshold    int
    timeout      time.Duration
    halfOpenMax  int
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    if !cb.allowRequest() {
        return ErrCircuitOpen
    }
    
    err := fn()
    
    cb.recordResult(err)
    return err
}

func (cb *CircuitBreaker) allowRequest() bool {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    switch cb.state {
    case "closed":
        return true
    case "open":
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = "half-open"
            cb.successes = 0
            return true
        }
        return false
    case "half-open":
        return cb.successes < cb.halfOpenMax
    }
    return false
}

func (cb *CircuitBreaker) recordResult(err error) {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        
        if cb.failures >= cb.threshold {
            cb.state = "open"
        }
    } else {
        if cb.state == "half-open" {
            cb.successes++
            if cb.successes >= cb.halfOpenMax {
                cb.state = "closed"
                cb.failures = 0
            }
        }
    }
}

// Usage
cb := &CircuitBreaker{
    threshold:   5,
    timeout:     30 * time.Second,
    halfOpenMax: 3,
    state:       "closed",
}

err := cb.Execute(func() error {
    return callExternalService()
})
if errors.Is(err, ErrCircuitOpen) {
    // Use fallback
    return cachedResult()
}

💻 Engineering Example: Resilient Pipeline

go
package pipeline

import (
    "context"
    "time"
    
    "golang.org/x/time/rate"
)

type ResilientPipeline struct {
    workers     int
    rateLimiter *rate.Limiter
    circuit     *CircuitBreaker
    input       chan Event
    output      chan Result
}

func NewResilientPipeline(cfg Config) *ResilientPipeline {
    return &ResilientPipeline{
        workers:     cfg.Workers,
        rateLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit), cfg.BurstSize),
        circuit: &CircuitBreaker{
            threshold:   cfg.CBThreshold,
            timeout:     cfg.CBTimeout,
            halfOpenMax: 3,
            state:       "closed",
        },
        input:  make(chan Event, cfg.BufferSize),
        output: make(chan Result, cfg.BufferSize),
    }
}

func (p *ResilientPipeline) Start(ctx context.Context) {
    for i := 0; i < p.workers; i++ {
        go p.worker(ctx)
    }
}

func (p *ResilientPipeline) worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case event := <-p.input:
            p.processWithResilience(ctx, event)
        }
    }
}

func (p *ResilientPipeline) processWithResilience(ctx context.Context, event Event) {
    // Rate limiting
    if err := p.rateLimiter.Wait(ctx); err != nil {
        return
    }
    
    // Circuit breaker
    err := p.circuit.Execute(func() error {
        ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
        defer cancel()
        
        result, err := process(ctx, event)
        if err != nil {
            return err
        }
        
        select {
        case p.output <- result:
        case <-ctx.Done():
            return ctx.Err()
        }
        return nil
    })
    
    if err != nil {
        // Log, metric, fallback...
        handleError(event, err)
    }
}

func (p *ResilientPipeline) Submit(ctx context.Context, event Event) error {
    select {
    case p.input <- event:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    default:
        return ErrBufferFull  // Backpressure signal
    }
}

Ship-to-Prod Checklist

Patterns Applied

  • [ ] Or-Done for clean cancellation propagation
  • [ ] Rate Limiting to protect downstream services
  • [ ] Backpressure strategy chosen (block/drop/sample)
  • [ ] Circuit Breaker for external dependencies

Resilience

  • [ ] Timeouts on all external calls
  • [ ] Fallbacks when circuit open
  • [ ] Graceful degradation plan documented
  • [ ] Dead letter queue for failed items

Monitoring

  • [ ] Queue depths tracked
  • [ ] Rate limit rejections counted
  • [ ] Circuit state exposed as metric
  • [ ] Latency percentiles (P50, P95, P99)

📊 Summary

PatternUse Case
Or-DoneClean cancellation in channel reads
TeeSplit stream to multiple consumers
BridgeFlatten channel of channels
Rate LimitProtect resources
BackpressureHandle overload
Circuit BreakerFail fast on dependency issues

🎉 Advanced Section Complete!

Bạn đã master Go Advanced Topics:

  • ✅ Generics
  • ✅ Reflection
  • ✅ CGO & FFI
  • ✅ Performance Tuning
  • ✅ Advanced Concurrency

You are now ready to build production-grade Go systems!