Skip to content

Fearless Concurrency

"Hầu hết các ngôn ngữ đánh đổi safety lấy performance. Rust cho bạn cả hai."

1. OS Threads vs Green Threads

Lịch sử Threading Model của Rust

Rust từng có Green Threads (như Go) nhưng đã loại bỏ trong Rust 1.0.

Rust 0.x (2010-2014):
├── Green threads runtime (mô hình M:N)
├── Nhẹ, được lập lịch bởi runtime
└── Vấn đề: Overhead FFI, độ trễ không dự đoán được

Rust 1.0+ (2015-nay):
├── Chỉ OS threads (mô hình 1:1)
├── Zero-cost abstraction
└── Giải pháp: Async/await cho lightweight concurrency

Tại sao loại bỏ Green Threads?

rust
// Green threads cần runtime
// Mỗi FFI call phải "switch" context
extern "C" fn call_external_lib() {
    // 1. Tạm dừng green thread
    // 2. Chuyển sang OS thread
    // 3. Gọi hàm C
    // 4. Chuyển về lại
    // → Overhead không thể dự đoán
}

// OS threads: Không có overhead FFI
extern "C" fn call_external_lib() {
    // Gọi trực tiếp, không switch context
    some_c_function();
}

Tạo Thread

rust
use std::thread;

fn main() {
    // Tạo OS thread
    let handle = thread::spawn(|| {
        println!("Xin chào từ thread!");
    });
    
    // Chờ hoàn thành
    handle.join().unwrap();
}

2. Message Passing: Cơ chế MPSC Channel

Kiến trúc Channel

                    MPSC Channel
┌─────────────────────────────────────────────┐
│                                             │
│  Producer 1 ──┐                             │
│               │     ┌──────────────┐        │
│  Producer 2 ──┼────▶│  Ring Buffer │───────▶│ Consumer
│               │     │  (Lock-free) │        │
│  Producer N ──┘     └──────────────┘        │
│                                             │
│  Nhiều Producer ──▶ Một Consumer            │
└─────────────────────────────────────────────┘

Cấu trúc nội bộ

rust
// Cấu trúc MPSC đơn giản hóa
pub struct Sender<T> {
    inner: Arc<Inner<T>>,
}

pub struct Receiver<T> {
    inner: Arc<Inner<T>>,
}

struct Inner<T> {
    // Hàng đợi lock-free
    buffer: UnsafeCell<VecDeque<T>>,
    // Đồng bộ hóa
    mutex: Mutex<()>,
    condvar: Condvar,
    // Điều khiển
    sender_count: AtomicUsize,
    disconnected: AtomicBool,
}

Patterns sử dụng Channel

rust
use std::sync::mpsc;
use std::thread;

fn main() {
    // Tạo unbounded channel
    let (tx, rx) = mpsc::channel();
    
    // Clone sender cho nhiều producers
    let tx2 = tx.clone();
    
    thread::spawn(move || {
        tx.send("từ thread 1").unwrap();
    });
    
    thread::spawn(move || {
        tx2.send("từ thread 2").unwrap();
    });
    
    // Nhận (blocking)
    for msg in rx {
        println!("{}", msg);
    }
}

Bounded vs Unbounded Channels

rust
// Unbounded: Có thể tăng vô hạn (rủi ro memory)
let (tx, rx) = mpsc::channel::<i32>();

// Bounded: sync_channel với capacity
let (tx, rx) = mpsc::sync_channel::<i32>(10);
// Blocks khi buffer đầy (backpressure)

3. Shared State: Arc<Mutex<T>>RwLock

Cấu trúc Mutex

                    Cấu trúc Mutex<T>
┌─────────────────────────────────────────────┐
│                                             │
│  ┌─────────────┐    ┌───────────────────┐   │
│  │   AtomicU8  │    │    Data: T        │   │
│  │   (locked)  │    │                   │   │
│  │    0=free   │    │  [Được bảo vệ     │   │
│  │    1=held   │    │   bởi lock]       │   │
│  └─────────────┘    └───────────────────┘   │
│         │                    │              │
│         └────────────────────┘              │
│          Lock bảo vệ truy cập               │
│                                             │
└─────────────────────────────────────────────┘

Triển khai Lock

rust
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Arc: Atomic Reference Counting (Rc thread-safe)
    // Mutex: Khóa loại trừ lẫn nhau
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // lock() trả về MutexGuard
            // Tự động unlock khi dropped (RAII)
            let mut num = counter.lock().unwrap();
            *num += 1;
            // MutexGuard dropped ở đây → unlock
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("Kết quả: {}", *counter.lock().unwrap());
}

RwLock: Nhiều Reader, Một Writer

rust
use std::sync::RwLock;

let lock = RwLock::new(5);

// Cho phép nhiều readers
{
    let r1 = lock.read().unwrap();
    let r2 = lock.read().unwrap();
    // Cả hai có thể đọc đồng thời
    println!("{}, {}", *r1, *r2);
}

// Một writer, truy cập độc quyền
{
    let mut w = lock.write().unwrap();
    *w += 1;
}

Mutex Poisoning

rust
use std::sync::{Arc, Mutex};
use std::thread;

let lock = Arc::new(Mutex::new(0));
let lock2 = Arc::clone(&lock);

// Thread panic khi đang giữ lock
let _ = thread::spawn(move || {
    let _guard = lock2.lock().unwrap();
    panic!("Thread panic!");
}).join();

// Mutex giờ bị "poisoned"
match lock.lock() {
    Ok(guard) => { /* bình thường */ }
    Err(poisoned) => {
        // Vẫn có thể truy cập data, nhưng biết có thể bị hỏng
        let guard = poisoned.into_inner();
        println!("Khôi phục: {}", *guard);
    }
}

4. Memory Ordering & Atomics

Vấn đề Memory Ordering

CPU 1                          CPU 2
─────                          ─────
x = 1;                         while (y == 0) {}
y = 1;                         print(x);

// Có thể x vẫn = 0!
// CPU/Compiler có thể sắp xếp lại instructions

Các mức Ordering

rust
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

// ═══════════════════════════════════════════════════
// Relaxed: Không đảm bảo ordering, chỉ atomicity
// ═══════════════════════════════════════════════════
let counter = AtomicUsize::new(0);
counter.fetch_add(1, Ordering::Relaxed);
// Use case: Counter thống kê (không quan tâm thứ tự)

// ═══════════════════════════════════════════════════
// Release/Acquire: Cặp đồng bộ hóa
// ═══════════════════════════════════════════════════
static DATA: AtomicUsize = AtomicUsize::new(0);
static FLAG: AtomicBool = AtomicBool::new(false);

// Thread 1: Producer
fn producer() {
    DATA.store(42, Ordering::Relaxed);
    FLAG.store(true, Ordering::Release);  // "Xuất bản" tất cả writes trước
}

// Thread 2: Consumer  
fn consumer() {
    while !FLAG.load(Ordering::Acquire) {} // "Thấy" tất cả writes trước Release
    assert_eq!(DATA.load(Ordering::Relaxed), 42); // ✓ Đảm bảo!
}

// ═══════════════════════════════════════════════════
// SeqCst: Sequential Consistency (mạnh nhất, chậm nhất)
// ═══════════════════════════════════════════════════
let flag = AtomicBool::new(false);
flag.store(true, Ordering::SeqCst);
// Thứ tự toàn cục hiển thị cho tất cả threads
// Dùng khi: Nhiều atomics cần ordering nhất quán

Biểu đồ Memory Ordering

Độ mạnh Ordering (Yếu → Mạnh)
═══════════════════════════════════════════════════

Relaxed     Acquire     AcqRel      SeqCst
   │           │          │           │
   ▼           ▼          ▼           ▼
┌──────┐   ┌──────┐   ┌──────┐   ┌──────┐
│Không │   │Reads │   │Cả    │   │Thứ tự│
│thứ tự│   │sau   │   │hai   │   │toàn  │
│      │   │Sync  │   │      │   │cục   │
└──────┘   └──────┘   └──────┘   └──────┘

Hiệu năng: Nhanh ──────────────────▶ Chậm
Đảm bảo:   Yếu ─────────────────────▶ Mạnh

Ví dụ Spinlock (Dùng Atomics)

rust
use std::sync::atomic::{AtomicBool, Ordering};
use std::cell::UnsafeCell;

pub struct SpinLock<T> {
    locked: AtomicBool,
    data: UnsafeCell<T>,
}

// Safety: T phải là Send
unsafe impl<T: Send> Sync for SpinLock<T> {}

impl<T> SpinLock<T> {
    pub fn new(data: T) -> Self {
        Self {
            locked: AtomicBool::new(false),
            data: UnsafeCell::new(data),
        }
    }
    
    pub fn lock(&self) -> SpinLockGuard<'_, T> {
        // Spin cho đến khi acquire lock
        while self.locked
            .compare_exchange_weak(
                false, true,
                Ordering::Acquire,  // Nếu thành công, thấy tất cả writes trước
                Ordering::Relaxed,  // Nếu thất bại, chỉ cần retry
            )
            .is_err()
        {
            // Gợi ý cho CPU: đang trong spin loop
            std::hint::spin_loop();
        }
        SpinLockGuard { lock: self }
    }
}

pub struct SpinLockGuard<'a, T> {
    lock: &'a SpinLock<T>,
}

impl<T> Drop for SpinLockGuard<'_, T> {
    fn drop(&mut self) {
        self.lock.locked.store(false, Ordering::Release);
    }
}

impl<T> std::ops::Deref for SpinLockGuard<'_, T> {
    type Target = T;
    fn deref(&self) -> &T {
        unsafe { &*self.lock.data.get() }
    }
}

impl<T> std::ops::DerefMut for SpinLockGuard<'_, T> {
    fn deref_mut(&mut self) -> &mut T {
        unsafe { &mut *self.lock.data.get() }
    }
}

🎯 Best Practices

Khi nào dùng gì?

PatternUse CaseTrade-off
Mutex<T>Shared mutable state chungCó thể contention
RwLock<T>Workloads nhiều đọcWriter starvation có thể xảy ra
AtomicsCounter/flags đơn giảnLogic phức tạp khó
mpscProducer-consumerOverhead của channel
crossbeamHiệu năng caoExternal dependency

Tránh Deadlocks

rust
// ❌ RỦI RO DEADLOCK
fn transfer(from: &Mutex<Account>, to: &Mutex<Account>) {
    let from_guard = from.lock().unwrap();
    let to_guard = to.lock().unwrap();  // Có thể deadlock nếu thread khác lock ngược
    // ...
}

// ✅ THỨ TỰ NHẤT QUÁN
fn transfer(a: &Mutex<Account>, b: &Mutex<Account>) {
    // Luôn lock theo thứ tự như nhau (theo địa chỉ)
    let (first, second) = if std::ptr::addr_of!(*a) < std::ptr::addr_of!(*b) {
        (a, b)
    } else {
        (b, a)
    };
    
    let _g1 = first.lock().unwrap();
    let _g2 = second.lock().unwrap();
}