Giao diện
🔄 Advanced Concurrency Patterns
"Concurrency is not parallelism." — Rob Pike
Advanced patterns cho những system phức tạp nhất.
Advanced patterns cho những system phức tạp nhất.
🎯 Pattern Overview
Basic Patterns (đã học) → Advanced Patterns (chương này)
├── Worker Pool │ ├── Or-Done Channel
├── Fan-Out/Fan-In │ ├── Tee Channel
├── Pipeline │ ├── Bridge Channel
└── Context Cancellation │ ├── Rate Limiting
│ ├── Backpressure
│ └── Graceful Degradation🛡️ Or-Done Channel
Problem
Khi bạn đọc từ channel nhưng cũng cần respect context cancellation:
go
// ❌ Clunky: Check done trong select mỗi lần
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
// Process v
}
}Solution: Or-Done Pattern
go
// orDone wraps a channel to respect context cancellation
func orDone[T any](ctx context.Context, c <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-c:
if !ok {
return
}
select {
case out <- v:
case <-ctx.Done():
return
}
}
}
}()
return out
}
// Usage - clean, readable
func process(ctx context.Context, data <-chan int) {
for v := range orDone(ctx, data) {
fmt.Println(v)
}
}🔀 Tee Channel
Problem
Cần gửi cùng data đến nhiều consumers:
go
// ❌ BAD: Data chỉ đến một consumer
for v := range input {
consumer1 <- v // Consumer 2 không nhận được!
}Solution: Tee Pattern
go
// tee splits one channel into two
func tee[T any](ctx context.Context, in <-chan T) (<-chan T, <-chan T) {
out1 := make(chan T)
out2 := make(chan T)
go func() {
defer close(out1)
defer close(out2)
for v := range orDone(ctx, in) {
// Local copies for select
var o1, o2 = out1, out2
// Send to both (non-blocking on already sent)
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
return
case o1 <- v:
o1 = nil // Disable this case after sending
case o2 <- v:
o2 = nil
}
}
}
}()
return out1, out2
}
// Usage
func main() {
ctx := context.Background()
input := generateNumbers(ctx, 10)
// Split into two streams
forLogger, forProcessor := tee(ctx, input)
// Both receive same data
go logNumbers(forLogger)
processNumbers(forProcessor)
}🌉 Bridge Channel
Problem
Có channel of channels, cần flatten thành single channel:
go
// Input: <-chan <-chan int (channel of channels)
// Want: <-chan int (flattened)Solution: Bridge Pattern
go
// bridge flattens a channel of channels
func bridge[T any](ctx context.Context, chanStream <-chan <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for {
var stream <-chan T
select {
case <-ctx.Done():
return
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
}
// Drain current stream
for v := range orDone(ctx, stream) {
select {
case out <- v:
case <-ctx.Done():
return
}
}
}
}()
return out
}
// Usage
func main() {
ctx := context.Background()
// Generate sequence of channels
chanStream := make(chan (<-chan int))
go func() {
defer close(chanStream)
for i := 0; i < 3; i++ {
chanStream <- generateBatch(ctx, i*10, 10)
}
}()
// Flatten into single stream
for v := range bridge(ctx, chanStream) {
fmt.Println(v)
}
// Prints: 0,1,2...9, 10,11...19, 20,21...29
}⏱️ Rate Limiting
Token Bucket Pattern
go
import "golang.org/x/time/rate"
// rate.Limiter implements token bucket algorithm
func rateLimitedProcessor(ctx context.Context, items <-chan Item) error {
// 10 requests per second, burst of 20
limiter := rate.NewLimiter(rate.Limit(10), 20)
for item := range items {
// Block until allowed
if err := limiter.Wait(ctx); err != nil {
return err
}
process(item)
}
return nil
}
// Per-key rate limiting
type PerKeyLimiter struct {
mu sync.Mutex
limiters map[string]*rate.Limiter
rate rate.Limit
burst int
}
func (l *PerKeyLimiter) Allow(key string) bool {
l.mu.Lock()
limiter, ok := l.limiters[key]
if !ok {
limiter = rate.NewLimiter(l.rate, l.burst)
l.limiters[key] = limiter
}
l.mu.Unlock()
return limiter.Allow()
}
// Usage
limiter := &PerKeyLimiter{
limiters: make(map[string]*rate.Limiter),
rate: rate.Limit(1), // 1 per second per key
burst: 5,
}
if !limiter.Allow(userID) {
return ErrRateLimited
}Sliding Window
go
type SlidingWindowLimiter struct {
mu sync.Mutex
windowSize time.Duration
maxRequests int
timestamps []time.Time
}
func (l *SlidingWindowLimiter) Allow() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
windowStart := now.Add(-l.windowSize)
// Remove expired timestamps
valid := l.timestamps[:0]
for _, ts := range l.timestamps {
if ts.After(windowStart) {
valid = append(valid, ts)
}
}
l.timestamps = valid
if len(l.timestamps) >= l.maxRequests {
return false
}
l.timestamps = append(l.timestamps, now)
return true
}🌊 Backpressure Handling
Problem
Producer faster than consumer → memory exhaustion
Solution: Bounded Channels + Dropping
go
// Strategy 1: Block producer (bounded channel)
func blockingPipeline(ctx context.Context) {
// Producer blocks when buffer full
ch := make(chan Event, 1000)
go produce(ctx, ch)
go consume(ctx, ch)
}
// Strategy 2: Drop oldest (ring buffer)
type RingBuffer[T any] struct {
mu sync.Mutex
data []T
head int
tail int
count int
}
func (r *RingBuffer[T]) Push(v T) {
r.mu.Lock()
defer r.mu.Unlock()
r.data[r.head] = v
r.head = (r.head + 1) % len(r.data)
if r.count == len(r.data) {
r.tail = (r.tail + 1) % len(r.data) // Overwrite oldest
} else {
r.count++
}
}
// Strategy 3: Drop newest (non-blocking send)
func tryEnqueue[T any](ch chan<- T, v T) bool {
select {
case ch <- v:
return true
default:
return false // Drop if full
}
}
// Strategy 4: Sample (process every Nth item)
func samplePipeline[T any](ctx context.Context, in <-chan T, sampleRate int) <-chan T {
out := make(chan T)
go func() {
defer close(out)
count := 0
for v := range orDone(ctx, in) {
count++
if count%sampleRate == 0 {
out <- v
}
}
}()
return out
}Adaptive Backpressure
go
type AdaptiveProcessor struct {
targetLatency time.Duration
minWorkers int
maxWorkers int
workers int
latencies []time.Duration
mu sync.Mutex
}
func (p *AdaptiveProcessor) recordLatency(d time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
p.latencies = append(p.latencies, d)
if len(p.latencies) > 100 {
p.latencies = p.latencies[1:]
}
}
func (p *AdaptiveProcessor) adjust() {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.latencies) < 10 {
return
}
// Calculate P95
sorted := make([]time.Duration, len(p.latencies))
copy(sorted, p.latencies)
sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] })
p95 := sorted[int(float64(len(sorted))*0.95)]
// Scale workers based on P95
if p95 > p.targetLatency && p.workers < p.maxWorkers {
p.workers++
// Spawn new worker
} else if p95 < p.targetLatency/2 && p.workers > p.minWorkers {
p.workers--
// Signal worker to stop
}
}🛡️ Graceful Degradation
Circuit Breaker Pattern
go
type CircuitBreaker struct {
mu sync.Mutex
failures int
successes int
state string // "closed", "open", "half-open"
lastFailure time.Time
threshold int
timeout time.Duration
halfOpenMax int
}
func (cb *CircuitBreaker) Execute(fn func() error) error {
if !cb.allowRequest() {
return ErrCircuitOpen
}
err := fn()
cb.recordResult(err)
return err
}
func (cb *CircuitBreaker) allowRequest() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case "closed":
return true
case "open":
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = "half-open"
cb.successes = 0
return true
}
return false
case "half-open":
return cb.successes < cb.halfOpenMax
}
return false
}
func (cb *CircuitBreaker) recordResult(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.threshold {
cb.state = "open"
}
} else {
if cb.state == "half-open" {
cb.successes++
if cb.successes >= cb.halfOpenMax {
cb.state = "closed"
cb.failures = 0
}
}
}
}
// Usage
cb := &CircuitBreaker{
threshold: 5,
timeout: 30 * time.Second,
halfOpenMax: 3,
state: "closed",
}
err := cb.Execute(func() error {
return callExternalService()
})
if errors.Is(err, ErrCircuitOpen) {
// Use fallback
return cachedResult()
}💻 Engineering Example: Resilient Pipeline
go
package pipeline
import (
"context"
"time"
"golang.org/x/time/rate"
)
type ResilientPipeline struct {
workers int
rateLimiter *rate.Limiter
circuit *CircuitBreaker
input chan Event
output chan Result
}
func NewResilientPipeline(cfg Config) *ResilientPipeline {
return &ResilientPipeline{
workers: cfg.Workers,
rateLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit), cfg.BurstSize),
circuit: &CircuitBreaker{
threshold: cfg.CBThreshold,
timeout: cfg.CBTimeout,
halfOpenMax: 3,
state: "closed",
},
input: make(chan Event, cfg.BufferSize),
output: make(chan Result, cfg.BufferSize),
}
}
func (p *ResilientPipeline) Start(ctx context.Context) {
for i := 0; i < p.workers; i++ {
go p.worker(ctx)
}
}
func (p *ResilientPipeline) worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case event := <-p.input:
p.processWithResilience(ctx, event)
}
}
}
func (p *ResilientPipeline) processWithResilience(ctx context.Context, event Event) {
// Rate limiting
if err := p.rateLimiter.Wait(ctx); err != nil {
return
}
// Circuit breaker
err := p.circuit.Execute(func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result, err := process(ctx, event)
if err != nil {
return err
}
select {
case p.output <- result:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
if err != nil {
// Log, metric, fallback...
handleError(event, err)
}
}
func (p *ResilientPipeline) Submit(ctx context.Context, event Event) error {
select {
case p.input <- event:
return nil
case <-ctx.Done():
return ctx.Err()
default:
return ErrBufferFull // Backpressure signal
}
}✅ Ship-to-Prod Checklist
Patterns Applied
- [ ] Or-Done for clean cancellation propagation
- [ ] Rate Limiting to protect downstream services
- [ ] Backpressure strategy chosen (block/drop/sample)
- [ ] Circuit Breaker for external dependencies
Resilience
- [ ] Timeouts on all external calls
- [ ] Fallbacks when circuit open
- [ ] Graceful degradation plan documented
- [ ] Dead letter queue for failed items
Monitoring
- [ ] Queue depths tracked
- [ ] Rate limit rejections counted
- [ ] Circuit state exposed as metric
- [ ] Latency percentiles (P50, P95, P99)
📊 Summary
| Pattern | Use Case |
|---|---|
| Or-Done | Clean cancellation in channel reads |
| Tee | Split stream to multiple consumers |
| Bridge | Flatten channel of channels |
| Rate Limit | Protect resources |
| Backpressure | Handle overload |
| Circuit Breaker | Fail fast on dependency issues |
🎉 Advanced Section Complete!
Bạn đã master Go Advanced Topics:
- ✅ Generics
- ✅ Reflection
- ✅ CGO & FFI
- ✅ Performance Tuning
- ✅ Advanced Concurrency
You are now ready to build production-grade Go systems!