Skip to content

🏊 Thread Pool — Reusing Threads Efficiently

Thread Pool là pattern tạo sẵn một nhóm threads và tái sử dụng chúng cho nhiều tasks — tránh overhead tạo/hủy thread liên tục.

Tại sao Thread Pool?

Thread Creation Overhead

┌─────────────────────────────────────────────────────────────────┐
│                 THREAD CREATION COST                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Mỗi lần tạo std::thread:                                       │
│  ──────────────────────────                                     │
│  • Allocate stack memory (~1-8 MB per thread)                   │
│  • OS kernel call (expensive!)                                  │
│  • Context initialization                                       │
│  • Thread local storage setup                                   │
│                                                                 │
│  ⏱️ Thời gian: ~100μs - 1ms per thread creation                 │
│                                                                 │
│  Với 10,000 requests:                                           │
│  ─────────────────────                                          │
│  ❌ Create new thread each: 10,000 × 100μs = 1 second overhead! │
│  ✅ Thread pool (8 workers): Reuse existing threads             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Analogy: Taxi Stand vs Uber Order

┌─────────────────────────────────────────────────────────────────┐
│                    TAXI STAND ANALOGY                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ❌ WITHOUT POOL (Uber-style):                                  │
│  ──────────────────────────────                                 │
│  Customer arrives → Call a new driver → Wait for driver         │
│  Customer arrives → Call a new driver → Wait for driver         │
│  Customer arrives → Call a new driver → Wait for driver         │
│                                                                 │
│  🚗 Phải đợi driver đến từ nhà mỗi lần!                         │
│                                                                 │
│  ────────────────────────────────────────────────────────────   │
│                                                                 │
│  ✅ WITH POOL (Taxi Stand):                                     │
│  ───────────────────────────                                    │
│  ┌─────────────────────────────────────────┐                    │
│  │           🚖 TAXI STAND 🚖               │                    │
│  │  🚕 🚕 🚕 🚕 🚕 (5 taxis waiting)        │                    │
│  └─────────────────────────────────────────┘                    │
│                     │                                           │
│  Customer arrives ──┼──► Taxi immediately available!            │
│  Customer arrives ──┼──► Taxi immediately available!            │
│  Customer arrives ──┘                                           │
│                                                                 │
│  🚗 Taxi đã sẵn sàng, không cần chờ!                            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Thread Pool Architecture

┌─────────────────────────────────────────────────────────────────┐
│                   THREAD POOL ARCHITECTURE                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   MAIN THREAD / CLIENTS                                         │
│   ─────────────────────                                         │
│   submit(task1)  submit(task2)  submit(task3)  submit(task4)    │
│        │              │              │              │           │
│        └──────────────┴──────────────┴──────────────┘           │
│                            │                                    │
│                            ▼                                    │
│              ┌─────────────────────────────┐                    │
│              │       TASK QUEUE            │                    │
│              │   ┌────┬────┬────┬────┐     │                    │
│              │   │ T1 │ T2 │ T3 │ T4 │ ... │ ← Thread-safe     │
│              │   └────┴────┴────┴────┘     │                    │
│              └─────────────────────────────┘                    │
│                            │                                    │
│           ┌────────────────┼────────────────┐                   │
│           │                │                │                   │
│           ▼                ▼                ▼                   │
│     ┌──────────┐     ┌──────────┐     ┌──────────┐              │
│     │ Worker 1 │     │ Worker 2 │     │ Worker 3 │              │
│     │  Thread  │     │  Thread  │     │  Thread  │              │
│     └──────────┘     └──────────┘     └──────────┘              │
│           │                │                │                   │
│           └────────────────┴────────────────┘                   │
│                            │                                    │
│                 pop() → execute → wait for next                 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Simple Thread Pool Implementation

cpp
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <stdexcept>

class ThreadPool {
public:
    explicit ThreadPool(size_t numThreads = std::thread::hardware_concurrency()) 
        : stop_(false) 
    {
        for (size_t i = 0; i < numThreads; ++i) {
            workers_.emplace_back([this]() {
                workerLoop();
            });
        }
    }
    
    ~ThreadPool() {
        shutdown();
    }
    
    // Submit a task and get a future for the result
    template<typename F, typename... Args>
    auto submit(F&& f, Args&&... args) 
        -> std::future<decltype(f(args...))> 
    {
        using ReturnType = decltype(f(args...));
        
        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<ReturnType> result = task->get_future();
        
        {
            std::lock_guard<std::mutex> lock(mutex_);
            
            if (stop_) {
                throw std::runtime_error("submit on stopped ThreadPool");
            }
            
            tasks_.emplace([task]() { (*task)(); });
        }
        
        cv_.notify_one();
        return result;
    }
    
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            stop_ = true;
        }
        
        cv_.notify_all();
        
        for (std::thread& worker : workers_) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }
    
    size_t size() const { return workers_.size(); }
    
private:
    void workerLoop() {
        while (true) {
            std::function<void()> task;
            
            {
                std::unique_lock<std::mutex> lock(mutex_);
                
                cv_.wait(lock, [this]() {
                    return stop_ || !tasks_.empty();
                });
                
                if (stop_ && tasks_.empty()) {
                    return;  // Exit worker
                }
                
                task = std::move(tasks_.front());
                tasks_.pop();
            }
            
            task();  // Execute task outside the lock
        }
    }
    
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex mutex_;
    std::condition_variable cv_;
    bool stop_;
};

Usage Examples

Basic Usage

cpp
#include <iostream>
#include <chrono>

int main() {
    ThreadPool pool(4);  // 4 worker threads
    
    // Submit tasks
    auto future1 = pool.submit([]() {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return 42;
    });
    
    auto future2 = pool.submit([](int x, int y) {
        return x + y;
    }, 10, 20);
    
    // Get results
    std::cout << "Result 1: " << future1.get() << std::endl;  // 42
    std::cout << "Result 2: " << future2.get() << std::endl;  // 30
    
    return 0;
}

Parallel Processing

cpp
#include <vector>
#include <numeric>

int main() {
    ThreadPool pool(std::thread::hardware_concurrency());
    
    std::vector<int> data(1000000);
    std::iota(data.begin(), data.end(), 0);  // 0, 1, 2, ..., 999999
    
    // Split into chunks
    size_t chunkSize = data.size() / pool.size();
    std::vector<std::future<long long>> futures;
    
    for (size_t i = 0; i < pool.size(); ++i) {
        size_t start = i * chunkSize;
        size_t end = (i == pool.size() - 1) ? data.size() : (i + 1) * chunkSize;
        
        futures.push_back(pool.submit([&data, start, end]() {
            return std::accumulate(
                data.begin() + start,
                data.begin() + end,
                0LL
            );
        }));
    }
    
    // Aggregate results
    long long total = 0;
    for (auto& f : futures) {
        total += f.get();
    }
    
    std::cout << "Total: " << total << std::endl;
    // Expected: 499999500000
    
    return 0;
}

Web Server Simulation

cpp
#include <iostream>
#include <chrono>
#include <random>

struct HttpRequest {
    int id;
    std::string path;
};

struct HttpResponse {
    int statusCode;
    std::string body;
};

HttpResponse handleRequest(HttpRequest req) {
    // Simulate I/O latency
    std::this_thread::sleep_for(std::chrono::milliseconds(50 + rand() % 100));
    
    return HttpResponse{
        200,
        "Response for " + req.path + " (request #" + std::to_string(req.id) + ")"
    };
}

int main() {
    ThreadPool pool(8);  // 8 workers for handling requests
    
    std::vector<std::future<HttpResponse>> responses;
    
    // Simulate 100 incoming requests
    for (int i = 0; i < 100; ++i) {
        HttpRequest req{i, "/api/data/" + std::to_string(i)};
        responses.push_back(pool.submit(handleRequest, req));
    }
    
    // Process responses as they complete
    for (int i = 0; i < responses.size(); ++i) {
        auto response = responses[i].get();
        std::cout << "Request " << i << ": " 
                  << response.statusCode << " - " 
                  << response.body << std::endl;
    }
    
    return 0;
}
)}}

Thread Pool Sizing

Guidelines

┌─────────────────────────────────────────────────────────────────┐
│                  THREAD POOL SIZING                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  CPU-BOUND TASKS (calculations, compression):                   │
│  ─────────────────────────────────────────────                  │
│  Pool size = Number of CPU cores                                │
│            = std::thread::hardware_concurrency()                │
│                                                                 │
│  More threads → Context switching overhead                      │
│                                                                 │
│  ────────────────────────────────────────────────────────────   │
│                                                                 │
│  I/O-BOUND TASKS (network, disk, database):                     │
│  ──────────────────────────────────────────                     │
│  Pool size = Number of cores × (1 + Wait time / Compute time)   │
│                                                                 │
│  Example: 8 cores, tasks wait 90% of time                       │
│           8 × (1 + 0.9/0.1) = 8 × 10 = 80 threads               │
│                                                                 │
│  ────────────────────────────────────────────────────────────   │
│                                                                 │
│  MIXED WORKLOAD:                                                │
│  ───────────────                                                │
│  Consider separate pools for CPU and I/O tasks                  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Dynamic Sizing (Advanced)

cpp
// Một số implementations hỗ trợ dynamic sizing:
// - Tạo thêm threads khi queue quá dài
// - Thu hồi idle threads sau timeout
// - Min/max bounds

// Example concept (not full implementation):
class DynamicThreadPool {
    size_t minThreads_;
    size_t maxThreads_;
    std::chrono::seconds idleTimeout_;
    
    void maybeSpawnWorker() {
        if (tasks_.size() > workers_.size() * 2 
            && workers_.size() < maxThreads_) {
            spawnWorker();
        }
    }
    
    void workerLoop() {
        while (true) {
            auto status = cv_.wait_for(lock, idleTimeout_, predicate);
            if (status == std::cv_status::timeout 
                && workers_.size() > minThreads_) {
                // Retire this worker
                return;
            }
            // ... execute task ...
        }
    }
};

Work Stealing (Advanced Concept)

┌─────────────────────────────────────────────────────────────────┐
│                    WORK STEALING                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  STANDARD POOL:              WORK STEALING POOL:                │
│  ───────────────             ──────────────────                 │
│                                                                 │
│  ┌─────────────┐             ┌─────────────────────────────┐    │
│  │ GLOBAL      │             │  Each worker has LOCAL queue│    │
│  │ QUEUE       │             │                             │    │
│  │ (contended) │             │  Worker1: [T1][T2][T3]      │    │
│  └─────────────┘             │  Worker2: [T4]              │    │
│        │                     │  Worker3: [T5][T6]          │    │
│   All workers                │  Worker4: [] ← IDLE         │    │
│   compete for                │             │               │    │
│   the same lock              │             ▼               │    │
│                              │  Worker4 STEALS from Worker1│    │
│                              │  [T1][T2] ← steal [T3]      │    │
│                              └─────────────────────────────┘    │
│                                                                 │
│  ✅ Work stealing giảm lock contention                          │
│  ✅ Better cache locality (tasks stay on same worker)           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Real-world Libraries

C++ Libraries

LibraryFeatures
BS::thread_poolHeader-only, modern C++17
Intel TBBWork stealing, task graphs
HPXDistributed computing support
Boost.AsioI/O-focused, strand support

BS::thread_pool Example

cpp
#include "BS_thread_pool.hpp"  // Header-only library

int main() {
    BS::thread_pool pool;  // Auto-detect hardware concurrency
    
    // Submit and forget
    pool.push_task([]() {
        std::cout << "Task running\n";
    });
    
    // Submit with future
    auto result = pool.submit([]() { return 42; });
    std::cout << result.get() << std::endl;
    
    // Parallel loop
    pool.parallelize_loop(0, 1000, [](int start, int end) {
        for (int i = start; i < end; ++i) {
            // Process item i
        }
    });
    
    pool.wait_for_tasks();  // Wait for all tasks
    
    return 0;
}

C++17 Parallel Algorithms

cpp
#include <algorithm>
#include <execution>
#include <vector>

int main() {
    std::vector<int> data(1000000);
    
    // Sequential
    std::sort(data.begin(), data.end());
    
    // Parallel (uses internal thread pool)
    std::sort(std::execution::par, data.begin(), data.end());
    
    // Parallel + vectorized
    std::sort(std::execution::par_unseq, data.begin(), data.end());
    
    return 0;
}

Common Pitfalls

1. Task Queue Overflow

cpp
// ❌ No backpressure
while (moreDataAvailable()) {
    pool.submit(processData);  // Queue grows unbounded!
}

// ✅ Bounded queue with backpressure
class BoundedThreadPool {
    size_t maxQueueSize_;
    std::condition_variable cvFull_;
    
    void submit(Task task) {
        std::unique_lock<std::mutex> lock(mutex_);
        cvFull_.wait(lock, [this]() { 
            return tasks_.size() < maxQueueSize_; 
        });
        tasks_.push(std::move(task));
        cv_.notify_one();
    }
};

2. Exception Handling

cpp
// Tasks có thể throw, handle properly
auto future = pool.submit([]() {
    throw std::runtime_error("Oops!");
});

try {
    future.get();  // Exception rethrown here
} catch (const std::exception& e) {
    std::cerr << "Task failed: " << e.what() << std::endl;
}

3. Deadlock from Nested Submits

cpp
// ❌ DEADLOCK: Task submits another task and waits
pool.submit([&pool]() {
    auto inner = pool.submit([]() { return 42; });
    return inner.get();  // If all workers are waiting like this...
});

// ✅ Avoid blocking waits inside tasks, or use separate pools

📚 Tổng kết

ComponentRole
Worker ThreadsPre-created, reusable executors
Task QueueThread-safe buffer for pending work
Condition VariableEfficient worker notification
FutureHandle to async result
Sizing StrategyPool Size Formula
CPU-boundhardware_concurrency()
I/O-boundcores × (1 + wait/compute)
MixedSeparate pools or dynamic

🎯 Module 5 Complete!

Chúc mừng! Bạn đã hoàn thành Module 5: Concurrency trong C++:

Part 1: Threads & Mutexes

  • std::thread, join(), detach()
  • Race Conditions & Data Corruption
  • Synchronization: mutex, lock_guard, unique_lock
  • Deadlocks & Dining Philosophers

Part 2: Advanced

  • async/future/promise — Higher abstractions
  • Atomics — Lock-free programming
  • Condition Variables — Producer-Consumer
  • Thread Pool — Reusing threads

"Concurrency is not about parallelism. Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." — Rob Pike