Skip to content

🔔 Condition Variables — Producer-Consumer Pattern

std::condition_variable cho phép threads chờ đợi hiệu quả cho đến khi một điều kiện được thỏa mãn.

Vấn đề: Busy Waiting

Anti-pattern: Polling Loop

cpp
#include <thread>
#include <mutex>

std::mutex mtx;
bool dataReady = false;
int data = 0;

void consumer() {
    while (true) {
        std::lock_guard<std::mutex> lock(mtx);
        if (dataReady) {  // Check condition
            std::cout << "Got data: " << data << std::endl;
            break;
        }
        // ❌ Unlock, sleep a bit, lock again, check again...
        // ❌ WASTES CPU! Burns cycles checking repeatedly
    }
}

Vấn đề:

  • CPU Burn: Thread liên tục check, dù không có gì thay đổi
  • Latency: Nếu sleep lâu, phản hồi chậm
  • Không scalable: 10 threads polling = disaster

Analogy: Doorbell vs Gõ cửa liên tục

┌─────────────────────────────────────────────────────────────────┐
│                    DOORBELL ANALOGY                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ❌ BUSY WAITING (Polling):                                     │
│  ────────────────────────────                                   │
│                                                                 │
│  🚶 Bạn đứng ngoài cửa, gõ liên tục:                            │
│     *knock* *knock* *knock* *knock* ...                         │
│     "Có ai không? Có ai không? Có ai không?"                    │
│                                                                 │
│     → Mệt mỏi, phiền người trong nhà                            │
│                                                                 │
│  ────────────────────────────────────────────────────────────   │
│                                                                 │
│  ✅ CONDITION VARIABLE (Notification):                          │
│  ──────────────────────────────────────                         │
│                                                                 │
│  🚶 Bạn bấm chuông 🔔 rồi ngồi chờ:                              │
│     *ding-dong* → Ngồi băng ghế đọc sách                        │
│     ...                                                         │
│     🔔 Chuông nhà kêu → Bạn vào                                 │
│                                                                 │
│     → Tiết kiệm năng lượng, phản hồi nhanh                      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

std::condition_variable Basics

Core API

cpp
#include <condition_variable>
#include <mutex>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

// WAITING THREAD
void waiter() {
    std::unique_lock<std::mutex> lock(mtx);  // Must use unique_lock!
    
    cv.wait(lock, []{ return ready; });  // Wait until ready == true
    
    // Proceed when notified AND ready == true
}

// NOTIFYING THREAD  
void notifier() {
    {
        std::lock_guard<std::mutex> lock(mtx);
        ready = true;
    }
    cv.notify_one();  // Wake up ONE waiting thread
    // cv.notify_all();  // Wake up ALL waiting threads
}

Wait Variants

cpp
// Wait forever until notified AND predicate is true
cv.wait(lock, predicate);

// Wait with timeout (returns cv_status)
auto status = cv.wait_for(lock, std::chrono::seconds(5));
if (status == std::cv_status::timeout) {
    // Timed out
}

// Wait with timeout AND predicate (returns bool)
bool success = cv.wait_for(lock, std::chrono::seconds(5), predicate);
if (!success) {
    // Timed out OR predicate still false
}

// Wait until specific time point
cv.wait_until(lock, timePoint, predicate);

Spurious Wakeups

⚠️ CRITICAL CONCEPT

wait() có thể return dù không có notify! Đây gọi là spurious wakeup.

cpp
// ❌ WRONG: No predicate check
cv.wait(lock);  // May wake up randomly!
doWork();       // Condition might not be true!

// ✅ CORRECT: Always use predicate
cv.wait(lock, []{ return dataReady; });
// OR
while (!dataReady) {
    cv.wait(lock);
}

Tại sao có Spurious Wakeups?

  • OS scheduling: Kernel có thể wake thread vì lý do internal
  • Signal handling: Interrupts có thể cause wakeup
  • Performance: Cho phép implementation tối ưu hơn

Producer-Consumer Pattern

Classic Implementation

cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::queue<int> buffer;
const int MAX_SIZE = 10;

std::mutex mtx;
std::condition_variable cvProducer;  // Notify producer when space available
std::condition_variable cvConsumer;  // Notify consumer when data available

void producer(int id) {
    for (int i = 0; i < 20; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        
        // Wait until buffer has space
        cvProducer.wait(lock, []{ return buffer.size() < MAX_SIZE; });
        
        // Produce
        int item = id * 100 + i;
        buffer.push(item);
        std::cout << "Producer " << id << " produced: " << item 
                  << " (size: " << buffer.size() << ")\n";
        
        lock.unlock();
        cvConsumer.notify_one();  // Notify consumer
        
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

void consumer(int id) {
    for (int i = 0; i < 20; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        
        // Wait until buffer has data
        cvConsumer.wait(lock, []{ return !buffer.empty(); });
        
        // Consume
        int item = buffer.front();
        buffer.pop();
        std::cout << "Consumer " << id << " consumed: " << item 
                  << " (size: " << buffer.size() << ")\n";
        
        lock.unlock();
        cvProducer.notify_one();  // Notify producer
        
        std::this_thread::sleep_for(std::chrono::milliseconds(80));
    }
}

int main() {
    std::thread p1(producer, 1);
    std::thread p2(producer, 2);
    std::thread c1(consumer, 1);
    std::thread c2(consumer, 2);
    
    p1.join();
    p2.join();
    c1.join();
    c2.join();
    
    return 0;
}

Visualization

┌─────────────────────────────────────────────────────────────────┐
│                  PRODUCER-CONSUMER PATTERN                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  PRODUCERS                    CONSUMERS                         │
│  ─────────                    ─────────                         │
│  ┌─────────┐                  ┌─────────┐                       │
│  │Producer1│──┐            ┌──│Consumer1│                       │
│  └─────────┘  │            │  └─────────┘                       │
│               ▼            │                                    │
│  ┌─────────┐  │  ┌──────────────────┐  │  ┌─────────┐          │
│  │Producer2│──┼─►│  BOUNDED BUFFER  │──┼──│Consumer2│          │
│  └─────────┘  │  │   (Queue)        │  │  └─────────┘          │
│               │  │                  │  │                        │
│  ┌─────────┐  │  │  [1][2][3][4]    │  │  ┌─────────┐          │
│  │Producer3│──┘  └──────────────────┘  └──│Consumer3│          │
│  └─────────┘           │    │             └─────────┘          │
│                        │    │                                   │
│                 notify_one  wait                                │
│                                                                 │
│  📥 Producers wait nếu buffer FULL                              │
│  📤 Consumers wait nếu buffer EMPTY                             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Thread-safe Queue Class

cpp
#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>
#include <chrono>

template<typename T>
class ThreadSafeQueue {
    std::queue<T> queue_;
    mutable std::mutex mutex_;
    std::condition_variable cv_;
    
public:
    void push(T value) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            queue_.push(std::move(value));
        }
        cv_.notify_one();
    }
    
    // Blocking pop
    T pop() {
        std::unique_lock<std::mutex> lock(mutex_);
        cv_.wait(lock, [this]{ return !queue_.empty(); });
        
        T value = std::move(queue_.front());
        queue_.pop();
        return value;
    }
    
    // Non-blocking try_pop
    std::optional<T> try_pop() {
        std::lock_guard<std::mutex> lock(mutex_);
        if (queue_.empty()) {
            return std::nullopt;
        }
        
        T value = std::move(queue_.front());
        queue_.pop();
        return value;
    }
    
    // Pop with timeout
    std::optional<T> pop_for(std::chrono::milliseconds timeout) {
        std::unique_lock<std::mutex> lock(mutex_);
        
        if (!cv_.wait_for(lock, timeout, [this]{ return !queue_.empty(); })) {
            return std::nullopt;  // Timeout
        }
        
        T value = std::move(queue_.front());
        queue_.pop();
        return value;
    }
    
    bool empty() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return queue_.empty();
    }
    
    size_t size() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return queue_.size();
    }
};

Usage

cpp
ThreadSafeQueue<int> taskQueue;

// Producer thread
taskQueue.push(42);
taskQueue.push(100);

// Consumer thread
int task = taskQueue.pop();  // Blocks if empty

// With timeout
auto maybeTask = taskQueue.pop_for(std::chrono::seconds(5));
if (maybeTask) {
    process(*maybeTask);
} else {
    std::cout << "Timeout waiting for task\n";
}

notify_one vs notify_all

┌─────────────────────────────────────────────────────────────────┐
│                NOTIFY_ONE vs NOTIFY_ALL                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  notify_one():                                                  │
│  ─────────────                                                  │
│  • Wake up ONE waiting thread                                   │
│  • Which one? Undefined (OS decides)                            │
│  • Use when: Only 1 thread can proceed                          │
│    (e.g., single item added to queue)                           │
│                                                                 │
│  notify_all():                                                  │
│  ──────────────                                                 │
│  • Wake up ALL waiting threads                                  │
│  • Each thread re-checks predicate                              │
│  • Use when: Multiple threads might proceed                     │
│    (e.g., shutdown signal, batch data ready)                    │
│                                                                 │
│  ⚠️ THUNDERING HERD:                                           │
│  notify_all() với nhiều waiters có thể gây performance spike   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Shutdown Pattern

cpp
class Worker {
    std::mutex mutex_;
    std::condition_variable cv_;
    std::queue<std::function<void()>> tasks_;
    bool shutdown_ = false;
    
public:
    void addTask(std::function<void()> task) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            tasks_.push(std::move(task));
        }
        cv_.notify_one();  // One task → one worker
    }
    
    void stop() {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            shutdown_ = true;
        }
        cv_.notify_all();  // Wake ALL workers to check shutdown
    }
    
    void run() {
        while (true) {
            std::function<void()> task;
            
            {
                std::unique_lock<std::mutex> lock(mutex_);
                cv_.wait(lock, [this]{ 
                    return shutdown_ || !tasks_.empty(); 
                });
                
                if (shutdown_ && tasks_.empty()) {
                    return;  // Exit
                }
                
                task = std::move(tasks_.front());
                tasks_.pop();
            }
            
            task();  // Execute outside lock
        }
    }
};

std::condition_variable_any

Dùng được với bất kỳ lockable type, không chỉ std::unique_lock<std::mutex>:

cpp
#include <condition_variable>
#include <shared_mutex>

std::shared_mutex sharedMtx;
std::condition_variable_any cvAny;

void reader() {
    std::shared_lock<std::shared_mutex> lock(sharedMtx);
    cvAny.wait(lock, predicate);  // ✅ Works with shared_lock
}

📌 Performance Note

std::condition_variable (với std::mutex) thường nhanh hơn std::condition_variable_any. Dùng _any chỉ khi cần custom lockable types.


Common Pitfalls

1. Forgetting to hold lock during wait

cpp
// ❌ WRONG
mutex_.unlock();
cv_.wait(...);  // Must hold lock!

// ✅ CORRECT
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, predicate);

2. Notify without modifying condition

cpp
// ❌ USELESS
cv_.notify_one();  // No state changed, waiter will go back to sleep

// ✅ CORRECT
{
    std::lock_guard<std::mutex> lock(mutex_);
    dataReady = true;  // Change state first!
}
cv_.notify_one();

3. Lost Wakeup

cpp
// ❌ POTENTIAL BUG: Check condition BEFORE waiting
if (!ready) {
    // Producer might set ready=true and notify HERE
    cv.wait(lock);  // Notification lost! Wait forever!
}

// ✅ CORRECT: Use predicate version
cv.wait(lock, []{ return ready; });

📚 Tổng kết

MethodDescription
wait(lock, pred)Block until predicate is true
wait_for(lock, duration, pred)Block with timeout
notify_one()Wake one waiting thread
notify_all()Wake all waiting threads
Use Casenotify Type
Single resource addednotify_one
Multiple resources addednotify_all or multiple notify_one
Shutdown/broadcastnotify_all

➡️ Tiếp theo

Bạn đã học cách threads communicate hiệu quả. Nhưng tạo thread mới cho mỗi task rất tốn kém. Giải pháp?

Thread Pool → — Reusing threads efficiently.