Giao diện
🏊 Thread Pool — Reusing Threads Efficiently
Thread Pool là pattern tạo sẵn một nhóm threads và tái sử dụng chúng cho nhiều tasks — tránh overhead tạo/hủy thread liên tục.
Tại sao Thread Pool?
Thread Creation Overhead
┌─────────────────────────────────────────────────────────────────┐
│ THREAD CREATION COST │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Mỗi lần tạo std::thread: │
│ ────────────────────────── │
│ • Allocate stack memory (~1-8 MB per thread) │
│ • OS kernel call (expensive!) │
│ • Context initialization │
│ • Thread local storage setup │
│ │
│ ⏱️ Thời gian: ~100μs - 1ms per thread creation │
│ │
│ Với 10,000 requests: │
│ ───────────────────── │
│ ❌ Create new thread each: 10,000 × 100μs = 1 second overhead! │
│ ✅ Thread pool (8 workers): Reuse existing threads │
│ │
└─────────────────────────────────────────────────────────────────┘Analogy: Taxi Stand vs Uber Order
┌─────────────────────────────────────────────────────────────────┐
│ TAXI STAND ANALOGY │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ❌ WITHOUT POOL (Uber-style): │
│ ────────────────────────────── │
│ Customer arrives → Call a new driver → Wait for driver │
│ Customer arrives → Call a new driver → Wait for driver │
│ Customer arrives → Call a new driver → Wait for driver │
│ │
│ 🚗 Phải đợi driver đến từ nhà mỗi lần! │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ ✅ WITH POOL (Taxi Stand): │
│ ─────────────────────────── │
│ ┌─────────────────────────────────────────┐ │
│ │ 🚖 TAXI STAND 🚖 │ │
│ │ 🚕 🚕 🚕 🚕 🚕 (5 taxis waiting) │ │
│ └─────────────────────────────────────────┘ │
│ │ │
│ Customer arrives ──┼──► Taxi immediately available! │
│ Customer arrives ──┼──► Taxi immediately available! │
│ Customer arrives ──┘ │
│ │
│ 🚗 Taxi đã sẵn sàng, không cần chờ! │
│ │
└─────────────────────────────────────────────────────────────────┘Thread Pool Architecture
┌─────────────────────────────────────────────────────────────────┐
│ THREAD POOL ARCHITECTURE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ MAIN THREAD / CLIENTS │
│ ───────────────────── │
│ submit(task1) submit(task2) submit(task3) submit(task4) │
│ │ │ │ │ │
│ └──────────────┴──────────────┴──────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────┐ │
│ │ TASK QUEUE │ │
│ │ ┌────┬────┬────┬────┐ │ │
│ │ │ T1 │ T2 │ T3 │ T4 │ ... │ ← Thread-safe │
│ │ └────┴────┴────┴────┘ │ │
│ └─────────────────────────────┘ │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │
│ │ Thread │ │ Thread │ │ Thread │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ └────────────────┴────────────────┘ │
│ │ │
│ pop() → execute → wait for next │
│ │
└─────────────────────────────────────────────────────────────────┘Simple Thread Pool Implementation
cpp
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <stdexcept>
class ThreadPool {
public:
explicit ThreadPool(size_t numThreads = std::thread::hardware_concurrency())
: stop_(false)
{
for (size_t i = 0; i < numThreads; ++i) {
workers_.emplace_back([this]() {
workerLoop();
});
}
}
~ThreadPool() {
shutdown();
}
// Submit a task and get a future for the result
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> std::future<decltype(f(args...))>
{
using ReturnType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<ReturnType> result = task->get_future();
{
std::lock_guard<std::mutex> lock(mutex_);
if (stop_) {
throw std::runtime_error("submit on stopped ThreadPool");
}
tasks_.emplace([task]() { (*task)(); });
}
cv_.notify_one();
return result;
}
void shutdown() {
{
std::lock_guard<std::mutex> lock(mutex_);
stop_ = true;
}
cv_.notify_all();
for (std::thread& worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
}
size_t size() const { return workers_.size(); }
private:
void workerLoop() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() {
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty()) {
return; // Exit worker
}
task = std::move(tasks_.front());
tasks_.pop();
}
task(); // Execute task outside the lock
}
}
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable cv_;
bool stop_;
};Usage Examples
Basic Usage
cpp
#include <iostream>
#include <chrono>
int main() {
ThreadPool pool(4); // 4 worker threads
// Submit tasks
auto future1 = pool.submit([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 42;
});
auto future2 = pool.submit([](int x, int y) {
return x + y;
}, 10, 20);
// Get results
std::cout << "Result 1: " << future1.get() << std::endl; // 42
std::cout << "Result 2: " << future2.get() << std::endl; // 30
return 0;
}Parallel Processing
cpp
#include <vector>
#include <numeric>
int main() {
ThreadPool pool(std::thread::hardware_concurrency());
std::vector<int> data(1000000);
std::iota(data.begin(), data.end(), 0); // 0, 1, 2, ..., 999999
// Split into chunks
size_t chunkSize = data.size() / pool.size();
std::vector<std::future<long long>> futures;
for (size_t i = 0; i < pool.size(); ++i) {
size_t start = i * chunkSize;
size_t end = (i == pool.size() - 1) ? data.size() : (i + 1) * chunkSize;
futures.push_back(pool.submit([&data, start, end]() {
return std::accumulate(
data.begin() + start,
data.begin() + end,
0LL
);
}));
}
// Aggregate results
long long total = 0;
for (auto& f : futures) {
total += f.get();
}
std::cout << "Total: " << total << std::endl;
// Expected: 499999500000
return 0;
}Web Server Simulation
cpp
#include <iostream>
#include <chrono>
#include <random>
struct HttpRequest {
int id;
std::string path;
};
struct HttpResponse {
int statusCode;
std::string body;
};
HttpResponse handleRequest(HttpRequest req) {
// Simulate I/O latency
std::this_thread::sleep_for(std::chrono::milliseconds(50 + rand() % 100));
return HttpResponse{
200,
"Response for " + req.path + " (request #" + std::to_string(req.id) + ")"
};
}
int main() {
ThreadPool pool(8); // 8 workers for handling requests
std::vector<std::future<HttpResponse>> responses;
// Simulate 100 incoming requests
for (int i = 0; i < 100; ++i) {
HttpRequest req{i, "/api/data/" + std::to_string(i)};
responses.push_back(pool.submit(handleRequest, req));
}
// Process responses as they complete
for (int i = 0; i < responses.size(); ++i) {
auto response = responses[i].get();
std::cout << "Request " << i << ": "
<< response.statusCode << " - "
<< response.body << std::endl;
}
return 0;
}
)}}Thread Pool Sizing
Guidelines
┌─────────────────────────────────────────────────────────────────┐
│ THREAD POOL SIZING │
├─────────────────────────────────────────────────────────────────┤
│ │
│ CPU-BOUND TASKS (calculations, compression): │
│ ───────────────────────────────────────────── │
│ Pool size = Number of CPU cores │
│ = std::thread::hardware_concurrency() │
│ │
│ More threads → Context switching overhead │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ I/O-BOUND TASKS (network, disk, database): │
│ ────────────────────────────────────────── │
│ Pool size = Number of cores × (1 + Wait time / Compute time) │
│ │
│ Example: 8 cores, tasks wait 90% of time │
│ 8 × (1 + 0.9/0.1) = 8 × 10 = 80 threads │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ MIXED WORKLOAD: │
│ ─────────────── │
│ Consider separate pools for CPU and I/O tasks │
│ │
└─────────────────────────────────────────────────────────────────┘Dynamic Sizing (Advanced)
cpp
// Một số implementations hỗ trợ dynamic sizing:
// - Tạo thêm threads khi queue quá dài
// - Thu hồi idle threads sau timeout
// - Min/max bounds
// Example concept (not full implementation):
class DynamicThreadPool {
size_t minThreads_;
size_t maxThreads_;
std::chrono::seconds idleTimeout_;
void maybeSpawnWorker() {
if (tasks_.size() > workers_.size() * 2
&& workers_.size() < maxThreads_) {
spawnWorker();
}
}
void workerLoop() {
while (true) {
auto status = cv_.wait_for(lock, idleTimeout_, predicate);
if (status == std::cv_status::timeout
&& workers_.size() > minThreads_) {
// Retire this worker
return;
}
// ... execute task ...
}
}
};Work Stealing (Advanced Concept)
┌─────────────────────────────────────────────────────────────────┐
│ WORK STEALING │
├─────────────────────────────────────────────────────────────────┤
│ │
│ STANDARD POOL: WORK STEALING POOL: │
│ ─────────────── ────────────────── │
│ │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ GLOBAL │ │ Each worker has LOCAL queue│ │
│ │ QUEUE │ │ │ │
│ │ (contended) │ │ Worker1: [T1][T2][T3] │ │
│ └─────────────┘ │ Worker2: [T4] │ │
│ │ │ Worker3: [T5][T6] │ │
│ All workers │ Worker4: [] ← IDLE │ │
│ compete for │ │ │ │
│ the same lock │ ▼ │ │
│ │ Worker4 STEALS from Worker1│ │
│ │ [T1][T2] ← steal [T3] │ │
│ └─────────────────────────────┘ │
│ │
│ ✅ Work stealing giảm lock contention │
│ ✅ Better cache locality (tasks stay on same worker) │
│ │
└─────────────────────────────────────────────────────────────────┘Real-world Libraries
C++ Libraries
| Library | Features |
|---|---|
| BS::thread_pool | Header-only, modern C++17 |
| Intel TBB | Work stealing, task graphs |
| HPX | Distributed computing support |
| Boost.Asio | I/O-focused, strand support |
BS::thread_pool Example
cpp
#include "BS_thread_pool.hpp" // Header-only library
int main() {
BS::thread_pool pool; // Auto-detect hardware concurrency
// Submit and forget
pool.push_task([]() {
std::cout << "Task running\n";
});
// Submit with future
auto result = pool.submit([]() { return 42; });
std::cout << result.get() << std::endl;
// Parallel loop
pool.parallelize_loop(0, 1000, [](int start, int end) {
for (int i = start; i < end; ++i) {
// Process item i
}
});
pool.wait_for_tasks(); // Wait for all tasks
return 0;
}C++17 Parallel Algorithms
cpp
#include <algorithm>
#include <execution>
#include <vector>
int main() {
std::vector<int> data(1000000);
// Sequential
std::sort(data.begin(), data.end());
// Parallel (uses internal thread pool)
std::sort(std::execution::par, data.begin(), data.end());
// Parallel + vectorized
std::sort(std::execution::par_unseq, data.begin(), data.end());
return 0;
}Common Pitfalls
1. Task Queue Overflow
cpp
// ❌ No backpressure
while (moreDataAvailable()) {
pool.submit(processData); // Queue grows unbounded!
}
// ✅ Bounded queue with backpressure
class BoundedThreadPool {
size_t maxQueueSize_;
std::condition_variable cvFull_;
void submit(Task task) {
std::unique_lock<std::mutex> lock(mutex_);
cvFull_.wait(lock, [this]() {
return tasks_.size() < maxQueueSize_;
});
tasks_.push(std::move(task));
cv_.notify_one();
}
};2. Exception Handling
cpp
// Tasks có thể throw, handle properly
auto future = pool.submit([]() {
throw std::runtime_error("Oops!");
});
try {
future.get(); // Exception rethrown here
} catch (const std::exception& e) {
std::cerr << "Task failed: " << e.what() << std::endl;
}3. Deadlock from Nested Submits
cpp
// ❌ DEADLOCK: Task submits another task and waits
pool.submit([&pool]() {
auto inner = pool.submit([]() { return 42; });
return inner.get(); // If all workers are waiting like this...
});
// ✅ Avoid blocking waits inside tasks, or use separate pools📚 Tổng kết
| Component | Role |
|---|---|
| Worker Threads | Pre-created, reusable executors |
| Task Queue | Thread-safe buffer for pending work |
| Condition Variable | Efficient worker notification |
| Future | Handle to async result |
| Sizing Strategy | Pool Size Formula |
|---|---|
| CPU-bound | hardware_concurrency() |
| I/O-bound | cores × (1 + wait/compute) |
| Mixed | Separate pools or dynamic |
🎯 Module 5 Complete!
Chúc mừng! Bạn đã hoàn thành Module 5: Concurrency trong C++:
Part 1: Threads & Mutexes
- std::thread, join(), detach()
- Race Conditions & Data Corruption
- Synchronization: mutex, lock_guard, unique_lock
- Deadlocks & Dining Philosophers
Part 2: Advanced
- async/future/promise — Higher abstractions
- Atomics — Lock-free programming
- Condition Variables — Producer-Consumer
- Thread Pool — Reusing threads
"Concurrency is not about parallelism. Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." — Rob Pike