Giao diện
🔄 Channels & Concurrency Patterns
Channels là "pipes" kết nối goroutines.
Data flows như nước — sender đẩy vào, receiver lấy ra.
Data flows như nước — sender đẩy vào, receiver lấy ra.
📡 Channels Anatomy
Unbuffered vs Buffered
go
// Unbuffered - BLOCKING (synchronous)
ch := make(chan int)
// Sender blocks until receiver is ready
// Receiver blocks until sender sends
// Buffered - NON-BLOCKING until full
ch := make(chan int, 10) // Buffer size 10
// Sender blocks only when buffer is FULL
// Receiver blocks only when buffer is EMPTYVisual Comparison
┌─────────────────────────────────────────────────────────────────────┐
│ Unbuffered vs Buffered Channels │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ UNBUFFERED (make(chan int)): │
│ ┌─────────┐ ┌─────────┐ │
│ │ Sender │──────────│Receiver │ │
│ │ BLOCKS │ ◄──────► │ BLOCKS │ │
│ └─────────┘ └─────────┘ │
│ → Handshake: Both must be ready simultaneously │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ BUFFERED (make(chan int, 3)): │
│ ┌─────────┐ ┌───┬───┬───┐ ┌─────────┐ │
│ │ Sender │──►│ 1 │ 2 │ │──►│Receiver │ │
│ │ │ └───┴───┴───┘ │ │ │
│ └─────────┘ Buffer (3) └─────────┘ │
│ → Sender blocks only when buffer FULL │
│ → Receiver blocks only when buffer EMPTY │
│ │
└─────────────────────────────────────────────────────────────────────┘The Pipe Analogy
┌─────────────────────────────────────────────────────────────────────┐
│ Channel as a Water Pipe │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Faucet (Sender) Pipe (Channel) Bucket (Receiver) │
│ ┌─────┐ ════════════════════ ┌─────┐ │
│ │ 💧 │ ────────► ════════════════════ ────────► │ │ │
│ │ 💧 │ ════════════════════ │ 💧 │ │
│ └─────┘ ════════════════════ └─────┘ │
│ │
│ • Data flows ONE direction │
│ • Pipe can hold some water (buffered) │
│ • If bucket full, faucet waits │
│ • If pipe empty, bucket waits │
│ │
└─────────────────────────────────────────────────────────────────────┘Channel Ownership
📌 HPN Standard: Sender Closes Channel
go
// ✅ CORRECT: Sender closes
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch) // Sender closes when done
}
func consumer(ch <-chan int) {
for v := range ch { // Range exits when channel closed
process(v)
}
}
// ❌ WRONG: Receiver closes
// → Sender will panic trying to send to closed channel!Rule: Whoever CREATES data, CLOSES the channel.
🔀 Control Flow: select
Handling Multiple Channels
go
// select = "switch case" for channels
func multiplexer(ch1, ch2, ch3 <-chan Event) {
for {
select {
case e := <-ch1:
handleEvent1(e)
case e := <-ch2:
handleEvent2(e)
case e := <-ch3:
handleEvent3(e)
}
}
}Timeout Pattern
go
func fetchWithTimeout(url string, timeout time.Duration) ([]byte, error) {
resultCh := make(chan []byte, 1)
errCh := make(chan error, 1)
go func() {
data, err := fetch(url)
if err != nil {
errCh <- err
return
}
resultCh <- data
}()
// Wait for result OR timeout
select {
case data := <-resultCh:
return data, nil
case err := <-errCh:
return nil, err
case <-time.After(timeout):
return nil, fmt.Errorf("timeout after %v", timeout)
}
}Non-Blocking Operations
go
// Try to send, but don't block
select {
case ch <- value:
fmt.Println("Sent!")
default:
fmt.Println("Channel full, skipping...")
}
// Try to receive, but don't block
select {
case v := <-ch:
fmt.Println("Received:", v)
default:
fmt.Println("Channel empty")
}🏭 Enterprise Patterns
Worker Pool: Limiting Concurrency
🎓 Professor Tom's Deep Dive: Why Limit Workers?
Problem: 1000 jobs, each hits database → 1000 concurrent DB connections = 💥 DB crashes
Solution: Worker Pool với 5 workers → Max 5 concurrent DB connections at any time
go
func WorkerPool(jobs <-chan Job, numWorkers int) <-chan Result {
results := make(chan Result, len(jobs))
var wg sync.WaitGroup
// Start fixed number of workers
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobs {
result := process(job)
results <- result
}
}(w)
}
// Close results when all workers done
go func() {
wg.Wait()
close(results)
}()
return results
}
// Usage
func main() {
jobs := make(chan Job, 1000)
// Fill jobs
for i := 0; i < 1000; i++ {
jobs <- Job{ID: i}
}
close(jobs)
// Process with 5 workers (protect DB!)
results := WorkerPool(jobs, 5)
for result := range results {
fmt.Println(result)
}
}┌─────────────────────────────────────────────────────────────────────┐
│ WORKER POOL PATTERN │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Jobs Queue (1000 jobs) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ J1 │ J2 │ J3 │ J4 │ J5 │ J6 │ ... │ J1000 │ │
│ └───────────────────────────┬─────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ W1 │ │ W2 │ │ W3 │ │ W4 │ │ W5 │ ← 5 Workers only │
│ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ │
│ │ │ │ │ │ │
│ └───────┴───────┴───────┴───────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ R1 │ R2 │ R3 │ ... │ Results │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘Fan-Out / Fan-In
go
// Fan-Out: One source → Multiple workers
func FanOut(source <-chan int, numWorkers int) []<-chan int {
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
outputs[i] = worker(source)
}
return outputs
}
// Fan-In: Multiple sources → One destination
func FanIn(inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for _, in := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for v := range ch {
output <- v
}
}(in)
}
go func() {
wg.Wait()
close(output)
}()
return output
}┌─────────────────────────────────────────────────────────────────────┐
│ FAN-OUT / FAN-IN PATTERN │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ FAN-OUT (distribute work): │
│ ┌─────────┐ │
│ ┌───►│ Worker1 │───┐ │
│ ┌────────┐ │ └─────────┘ │ ┌────────┐ │
│ │ Source │───┼───►│ Worker2 │───┼───►│ FanIn │ │
│ └────────┘ │ └─────────┘ │ └────────┘ │
│ └───►│ Worker3 │───┘ │
│ └─────────┘ │
│ │
│ FAN-IN (collect results): │
│ │
└─────────────────────────────────────────────────────────────────────┘Generator Pattern
go
// Generator: Function that returns a channel
func GenerateNumbers(start, end int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := start; i <= end; i++ {
out <- i
}
}()
return out // Return channel immediately
}
// Usage - very clean!
for n := range GenerateNumbers(1, 100) {
fmt.Println(n)
}🛑 Graceful Shutdown
Using context.Context
go
func Worker(ctx context.Context, jobs <-chan Job) {
for {
select {
case <-ctx.Done():
fmt.Println("Shutting down gracefully...")
return // Exit cleanly
case job := <-jobs:
process(job)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
jobs := make(chan Job, 100)
// Start workers
for i := 0; i < 5; i++ {
go Worker(ctx, jobs)
}
// Handle SIGINT/SIGTERM
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh // Wait for signal
cancel() // Cancel context → workers exit
time.Sleep(time.Second) // Give workers time to cleanup
}Using Done Channel
go
func Worker(done <-chan struct{}, jobs <-chan Job) {
for {
select {
case <-done:
return // Exit when done is closed
case job := <-jobs:
process(job)
}
}
}
func main() {
done := make(chan struct{})
// Start workers with done channel
go Worker(done, jobs)
// Shutdown: close done channel
close(done) // All workers will exit
}🎮 Architecture Challenge
🧩 Design Challenge: Image Processing Pipeline
Requirement: Download 100 images, resize them (using 4 workers), save to disk.
Constraints:
- Max 10 concurrent downloads (network limit)
- Max 4 concurrent resize operations (CPU intensive)
- No image should be lost if system crashes mid-process
💡 Architecture Solution
go
package main
import (
"context"
"sync"
)
type Image struct {
URL string
Data []byte
Resized []byte
Filename string
}
// Stage 1: Download (10 workers)
func Downloader(ctx context.Context, urls <-chan string) <-chan Image {
out := make(chan Image)
var wg sync.WaitGroup
for i := 0; i < 10; i++ { // 10 concurrent downloads
wg.Add(1)
go func() {
defer wg.Done()
for url := range urls {
select {
case <-ctx.Done():
return
default:
data := download(url)
out <- Image{URL: url, Data: data}
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Stage 2: Resize (4 workers - CPU intensive)
func Resizer(ctx context.Context, images <-chan Image) <-chan Image {
out := make(chan Image)
var wg sync.WaitGroup
for i := 0; i < 4; i++ { // 4 CPU workers
wg.Add(1)
go func() {
defer wg.Done()
for img := range images {
select {
case <-ctx.Done():
return
default:
img.Resized = resize(img.Data)
out <- img
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Stage 3: Save (sequential for disk I/O)
func Saver(ctx context.Context, images <-chan Image) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for img := range images {
select {
case <-ctx.Done():
return
default:
filename := save(img.Resized)
out <- filename
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create URL source
urls := make(chan string, 100)
for i := 0; i < 100; i++ {
urls <- fmt.Sprintf("https://example.com/image%d.jpg", i)
}
close(urls)
// Pipeline: URLs → Download → Resize → Save
downloaded := Downloader(ctx, urls)
resized := Resizer(ctx, downloaded)
saved := Saver(ctx, resized)
// Collect results
for filename := range saved {
fmt.Println("Saved:", filename)
}
}
)}}Pipeline visualization:
┌─────────────────────────────────────────────────────────────────────┐
│ IMAGE PROCESSING PIPELINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ URLs (100) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ url1 │ url2 │ url3 │ ... │ url100 │ │
│ └───────────────────────────┬─────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ DOWNLOAD STAGE (10 workers) │ │
│ │ D1 D2 D3 D4 D5 D6 D7 D8 D9 D10 │ │
│ └───────────────────────────┬─────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RESIZE STAGE (4 workers) │ │
│ │ R1 R2 R3 R4 │ │
│ └───────────────────────────┬─────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ SAVE STAGE (sequential) │ │
│ │ S1 │ │
│ └───────────────────────────┬─────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ file1.jpg │ file2.jpg │ ... │ file100.jpg │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘📊 Summary
| Pattern | Use Case |
|---|---|
| Unbuffered Channel | Synchronization, handshake |
| Buffered Channel | Decouple producer/consumer speed |
| select | Handle multiple channels, timeouts |
| Worker Pool | Limit concurrent operations |
| Fan-Out/Fan-In | Distribute work, collect results |
| Generator | Lazy sequence generation |
| Context Cancellation | Graceful shutdown |
➡️ Tiếp theo
Patterns nắm vững rồi! Quay lại: Concurrency Foundations - CSP philosophy, GMP scheduler, và sync primitives.