Giao diện
🔔 Condition Variables — Producer-Consumer Pattern
std::condition_variable cho phép threads chờ đợi hiệu quả cho đến khi một điều kiện được thỏa mãn. Vấn đề: Busy Waiting
❌ Anti-pattern: Polling Loop
cpp
#include <thread>
#include <mutex>
std::mutex mtx;
bool dataReady = false;
int data = 0;
void consumer() {
while (true) {
std::lock_guard<std::mutex> lock(mtx);
if (dataReady) { // Check condition
std::cout << "Got data: " << data << std::endl;
break;
}
// ❌ Unlock, sleep a bit, lock again, check again...
// ❌ WASTES CPU! Burns cycles checking repeatedly
}
}Vấn đề:
- CPU Burn: Thread liên tục check, dù không có gì thay đổi
- Latency: Nếu sleep lâu, phản hồi chậm
- Không scalable: 10 threads polling = disaster
Analogy: Doorbell vs Gõ cửa liên tục
┌─────────────────────────────────────────────────────────────────┐
│ DOORBELL ANALOGY │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ❌ BUSY WAITING (Polling): │
│ ──────────────────────────── │
│ │
│ 🚶 Bạn đứng ngoài cửa, gõ liên tục: │
│ *knock* *knock* *knock* *knock* ... │
│ "Có ai không? Có ai không? Có ai không?" │
│ │
│ → Mệt mỏi, phiền người trong nhà │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ ✅ CONDITION VARIABLE (Notification): │
│ ────────────────────────────────────── │
│ │
│ 🚶 Bạn bấm chuông 🔔 rồi ngồi chờ: │
│ *ding-dong* → Ngồi băng ghế đọc sách │
│ ... │
│ 🔔 Chuông nhà kêu → Bạn vào │
│ │
│ → Tiết kiệm năng lượng, phản hồi nhanh │
│ │
└─────────────────────────────────────────────────────────────────┘std::condition_variable Basics
Core API
cpp
#include <condition_variable>
#include <mutex>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
// WAITING THREAD
void waiter() {
std::unique_lock<std::mutex> lock(mtx); // Must use unique_lock!
cv.wait(lock, []{ return ready; }); // Wait until ready == true
// Proceed when notified AND ready == true
}
// NOTIFYING THREAD
void notifier() {
{
std::lock_guard<std::mutex> lock(mtx);
ready = true;
}
cv.notify_one(); // Wake up ONE waiting thread
// cv.notify_all(); // Wake up ALL waiting threads
}Wait Variants
cpp
// Wait forever until notified AND predicate is true
cv.wait(lock, predicate);
// Wait with timeout (returns cv_status)
auto status = cv.wait_for(lock, std::chrono::seconds(5));
if (status == std::cv_status::timeout) {
// Timed out
}
// Wait with timeout AND predicate (returns bool)
bool success = cv.wait_for(lock, std::chrono::seconds(5), predicate);
if (!success) {
// Timed out OR predicate still false
}
// Wait until specific time point
cv.wait_until(lock, timePoint, predicate);Spurious Wakeups
⚠️ CRITICAL CONCEPT
wait() có thể return dù không có notify! Đây gọi là spurious wakeup.
cpp
// ❌ WRONG: No predicate check
cv.wait(lock); // May wake up randomly!
doWork(); // Condition might not be true!
// ✅ CORRECT: Always use predicate
cv.wait(lock, []{ return dataReady; });
// OR
while (!dataReady) {
cv.wait(lock);
}Tại sao có Spurious Wakeups?
- OS scheduling: Kernel có thể wake thread vì lý do internal
- Signal handling: Interrupts có thể cause wakeup
- Performance: Cho phép implementation tối ưu hơn
Producer-Consumer Pattern
Classic Implementation
cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::queue<int> buffer;
const int MAX_SIZE = 10;
std::mutex mtx;
std::condition_variable cvProducer; // Notify producer when space available
std::condition_variable cvConsumer; // Notify consumer when data available
void producer(int id) {
for (int i = 0; i < 20; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// Wait until buffer has space
cvProducer.wait(lock, []{ return buffer.size() < MAX_SIZE; });
// Produce
int item = id * 100 + i;
buffer.push(item);
std::cout << "Producer " << id << " produced: " << item
<< " (size: " << buffer.size() << ")\n";
lock.unlock();
cvConsumer.notify_one(); // Notify consumer
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
void consumer(int id) {
for (int i = 0; i < 20; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// Wait until buffer has data
cvConsumer.wait(lock, []{ return !buffer.empty(); });
// Consume
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed: " << item
<< " (size: " << buffer.size() << ")\n";
lock.unlock();
cvProducer.notify_one(); // Notify producer
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
}
int main() {
std::thread p1(producer, 1);
std::thread p2(producer, 2);
std::thread c1(consumer, 1);
std::thread c2(consumer, 2);
p1.join();
p2.join();
c1.join();
c2.join();
return 0;
}Visualization
┌─────────────────────────────────────────────────────────────────┐
│ PRODUCER-CONSUMER PATTERN │
├─────────────────────────────────────────────────────────────────┤
│ │
│ PRODUCERS CONSUMERS │
│ ───────── ───────── │
│ ┌─────────┐ ┌─────────┐ │
│ │Producer1│──┐ ┌──│Consumer1│ │
│ └─────────┘ │ │ └─────────┘ │
│ ▼ │ │
│ ┌─────────┐ │ ┌──────────────────┐ │ ┌─────────┐ │
│ │Producer2│──┼─►│ BOUNDED BUFFER │──┼──│Consumer2│ │
│ └─────────┘ │ │ (Queue) │ │ └─────────┘ │
│ │ │ │ │ │
│ ┌─────────┐ │ │ [1][2][3][4] │ │ ┌─────────┐ │
│ │Producer3│──┘ └──────────────────┘ └──│Consumer3│ │
│ └─────────┘ │ │ └─────────┘ │
│ │ │ │
│ notify_one wait │
│ │
│ 📥 Producers wait nếu buffer FULL │
│ 📤 Consumers wait nếu buffer EMPTY │
│ │
└─────────────────────────────────────────────────────────────────┘Thread-safe Queue Class
cpp
#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>
#include <chrono>
template<typename T>
class ThreadSafeQueue {
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cv_;
public:
void push(T value) {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(value));
}
cv_.notify_one();
}
// Blocking pop
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]{ return !queue_.empty(); });
T value = std::move(queue_.front());
queue_.pop();
return value;
}
// Non-blocking try_pop
std::optional<T> try_pop() {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) {
return std::nullopt;
}
T value = std::move(queue_.front());
queue_.pop();
return value;
}
// Pop with timeout
std::optional<T> pop_for(std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mutex_);
if (!cv_.wait_for(lock, timeout, [this]{ return !queue_.empty(); })) {
return std::nullopt; // Timeout
}
T value = std::move(queue_.front());
queue_.pop();
return value;
}
bool empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
};Usage
cpp
ThreadSafeQueue<int> taskQueue;
// Producer thread
taskQueue.push(42);
taskQueue.push(100);
// Consumer thread
int task = taskQueue.pop(); // Blocks if empty
// With timeout
auto maybeTask = taskQueue.pop_for(std::chrono::seconds(5));
if (maybeTask) {
process(*maybeTask);
} else {
std::cout << "Timeout waiting for task\n";
}notify_one vs notify_all
┌─────────────────────────────────────────────────────────────────┐
│ NOTIFY_ONE vs NOTIFY_ALL │
├─────────────────────────────────────────────────────────────────┤
│ │
│ notify_one(): │
│ ───────────── │
│ • Wake up ONE waiting thread │
│ • Which one? Undefined (OS decides) │
│ • Use when: Only 1 thread can proceed │
│ (e.g., single item added to queue) │
│ │
│ notify_all(): │
│ ────────────── │
│ • Wake up ALL waiting threads │
│ • Each thread re-checks predicate │
│ • Use when: Multiple threads might proceed │
│ (e.g., shutdown signal, batch data ready) │
│ │
│ ⚠️ THUNDERING HERD: │
│ notify_all() với nhiều waiters có thể gây performance spike │
│ │
└─────────────────────────────────────────────────────────────────┘Shutdown Pattern
cpp
class Worker {
std::mutex mutex_;
std::condition_variable cv_;
std::queue<std::function<void()>> tasks_;
bool shutdown_ = false;
public:
void addTask(std::function<void()> task) {
{
std::lock_guard<std::mutex> lock(mutex_);
tasks_.push(std::move(task));
}
cv_.notify_one(); // One task → one worker
}
void stop() {
{
std::lock_guard<std::mutex> lock(mutex_);
shutdown_ = true;
}
cv_.notify_all(); // Wake ALL workers to check shutdown
}
void run() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]{
return shutdown_ || !tasks_.empty();
});
if (shutdown_ && tasks_.empty()) {
return; // Exit
}
task = std::move(tasks_.front());
tasks_.pop();
}
task(); // Execute outside lock
}
}
};std::condition_variable_any
Dùng được với bất kỳ lockable type, không chỉ std::unique_lock<std::mutex>:
cpp
#include <condition_variable>
#include <shared_mutex>
std::shared_mutex sharedMtx;
std::condition_variable_any cvAny;
void reader() {
std::shared_lock<std::shared_mutex> lock(sharedMtx);
cvAny.wait(lock, predicate); // ✅ Works with shared_lock
}📌 Performance Note
std::condition_variable (với std::mutex) thường nhanh hơn std::condition_variable_any. Dùng _any chỉ khi cần custom lockable types.
Common Pitfalls
1. Forgetting to hold lock during wait
cpp
// ❌ WRONG
mutex_.unlock();
cv_.wait(...); // Must hold lock!
// ✅ CORRECT
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, predicate);2. Notify without modifying condition
cpp
// ❌ USELESS
cv_.notify_one(); // No state changed, waiter will go back to sleep
// ✅ CORRECT
{
std::lock_guard<std::mutex> lock(mutex_);
dataReady = true; // Change state first!
}
cv_.notify_one();3. Lost Wakeup
cpp
// ❌ POTENTIAL BUG: Check condition BEFORE waiting
if (!ready) {
// Producer might set ready=true and notify HERE
cv.wait(lock); // Notification lost! Wait forever!
}
// ✅ CORRECT: Use predicate version
cv.wait(lock, []{ return ready; });📚 Tổng kết
| Method | Description |
|---|---|
wait(lock, pred) | Block until predicate is true |
wait_for(lock, duration, pred) | Block with timeout |
notify_one() | Wake one waiting thread |
notify_all() | Wake all waiting threads |
| Use Case | notify Type |
|---|---|
| Single resource added | notify_one |
| Multiple resources added | notify_all or multiple notify_one |
| Shutdown/broadcast | notify_all |
➡️ Tiếp theo
Bạn đã học cách threads communicate hiệu quả. Nhưng tạo thread mới cho mỗi task rất tốn kém. Giải pháp?
Thread Pool → — Reusing threads efficiently.