Giao diện
Fearless Concurrency
"Hầu hết các ngôn ngữ đánh đổi safety lấy performance. Rust cho bạn cả hai."
1. OS Threads vs Green Threads
Lịch sử Threading Model của Rust
Rust từng có Green Threads (như Go) nhưng đã loại bỏ trong Rust 1.0.
Rust 0.x (2010-2014):
├── Green threads runtime (mô hình M:N)
├── Nhẹ, được lập lịch bởi runtime
└── Vấn đề: Overhead FFI, độ trễ không dự đoán được
Rust 1.0+ (2015-nay):
├── Chỉ OS threads (mô hình 1:1)
├── Zero-cost abstraction
└── Giải pháp: Async/await cho lightweight concurrencyTại sao loại bỏ Green Threads?
rust
// Green threads cần runtime
// Mỗi FFI call phải "switch" context
extern "C" fn call_external_lib() {
// 1. Tạm dừng green thread
// 2. Chuyển sang OS thread
// 3. Gọi hàm C
// 4. Chuyển về lại
// → Overhead không thể dự đoán
}
// OS threads: Không có overhead FFI
extern "C" fn call_external_lib() {
// Gọi trực tiếp, không switch context
some_c_function();
}Tạo Thread
rust
use std::thread;
fn main() {
// Tạo OS thread
let handle = thread::spawn(|| {
println!("Xin chào từ thread!");
});
// Chờ hoàn thành
handle.join().unwrap();
}2. Message Passing: Cơ chế MPSC Channel
Kiến trúc Channel
MPSC Channel
┌─────────────────────────────────────────────┐
│ │
│ Producer 1 ──┐ │
│ │ ┌──────────────┐ │
│ Producer 2 ──┼────▶│ Ring Buffer │───────▶│ Consumer
│ │ │ (Lock-free) │ │
│ Producer N ──┘ └──────────────┘ │
│ │
│ Nhiều Producer ──▶ Một Consumer │
└─────────────────────────────────────────────┘Cấu trúc nội bộ
rust
// Cấu trúc MPSC đơn giản hóa
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
struct Inner<T> {
// Hàng đợi lock-free
buffer: UnsafeCell<VecDeque<T>>,
// Đồng bộ hóa
mutex: Mutex<()>,
condvar: Condvar,
// Điều khiển
sender_count: AtomicUsize,
disconnected: AtomicBool,
}Patterns sử dụng Channel
rust
use std::sync::mpsc;
use std::thread;
fn main() {
// Tạo unbounded channel
let (tx, rx) = mpsc::channel();
// Clone sender cho nhiều producers
let tx2 = tx.clone();
thread::spawn(move || {
tx.send("từ thread 1").unwrap();
});
thread::spawn(move || {
tx2.send("từ thread 2").unwrap();
});
// Nhận (blocking)
for msg in rx {
println!("{}", msg);
}
}Bounded vs Unbounded Channels
rust
// Unbounded: Có thể tăng vô hạn (rủi ro memory)
let (tx, rx) = mpsc::channel::<i32>();
// Bounded: sync_channel với capacity
let (tx, rx) = mpsc::sync_channel::<i32>(10);
// Blocks khi buffer đầy (backpressure)3. Shared State: Arc<Mutex<T>> và RwLock
Cấu trúc Mutex
Cấu trúc Mutex<T>
┌─────────────────────────────────────────────┐
│ │
│ ┌─────────────┐ ┌───────────────────┐ │
│ │ AtomicU8 │ │ Data: T │ │
│ │ (locked) │ │ │ │
│ │ 0=free │ │ [Được bảo vệ │ │
│ │ 1=held │ │ bởi lock] │ │
│ └─────────────┘ └───────────────────┘ │
│ │ │ │
│ └────────────────────┘ │
│ Lock bảo vệ truy cập │
│ │
└─────────────────────────────────────────────┘Triển khai Lock
rust
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc: Atomic Reference Counting (Rc thread-safe)
// Mutex: Khóa loại trừ lẫn nhau
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// lock() trả về MutexGuard
// Tự động unlock khi dropped (RAII)
let mut num = counter.lock().unwrap();
*num += 1;
// MutexGuard dropped ở đây → unlock
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Kết quả: {}", *counter.lock().unwrap());
}RwLock: Nhiều Reader, Một Writer
rust
use std::sync::RwLock;
let lock = RwLock::new(5);
// Cho phép nhiều readers
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
// Cả hai có thể đọc đồng thời
println!("{}, {}", *r1, *r2);
}
// Một writer, truy cập độc quyền
{
let mut w = lock.write().unwrap();
*w += 1;
}Mutex Poisoning
rust
use std::sync::{Arc, Mutex};
use std::thread;
let lock = Arc::new(Mutex::new(0));
let lock2 = Arc::clone(&lock);
// Thread panic khi đang giữ lock
let _ = thread::spawn(move || {
let _guard = lock2.lock().unwrap();
panic!("Thread panic!");
}).join();
// Mutex giờ bị "poisoned"
match lock.lock() {
Ok(guard) => { /* bình thường */ }
Err(poisoned) => {
// Vẫn có thể truy cập data, nhưng biết có thể bị hỏng
let guard = poisoned.into_inner();
println!("Khôi phục: {}", *guard);
}
}4. Memory Ordering & Atomics
Vấn đề Memory Ordering
CPU 1 CPU 2
───── ─────
x = 1; while (y == 0) {}
y = 1; print(x);
// Có thể x vẫn = 0!
// CPU/Compiler có thể sắp xếp lại instructionsCác mức Ordering
rust
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
// ═══════════════════════════════════════════════════
// Relaxed: Không đảm bảo ordering, chỉ atomicity
// ═══════════════════════════════════════════════════
let counter = AtomicUsize::new(0);
counter.fetch_add(1, Ordering::Relaxed);
// Use case: Counter thống kê (không quan tâm thứ tự)
// ═══════════════════════════════════════════════════
// Release/Acquire: Cặp đồng bộ hóa
// ═══════════════════════════════════════════════════
static DATA: AtomicUsize = AtomicUsize::new(0);
static FLAG: AtomicBool = AtomicBool::new(false);
// Thread 1: Producer
fn producer() {
DATA.store(42, Ordering::Relaxed);
FLAG.store(true, Ordering::Release); // "Xuất bản" tất cả writes trước
}
// Thread 2: Consumer
fn consumer() {
while !FLAG.load(Ordering::Acquire) {} // "Thấy" tất cả writes trước Release
assert_eq!(DATA.load(Ordering::Relaxed), 42); // ✓ Đảm bảo!
}
// ═══════════════════════════════════════════════════
// SeqCst: Sequential Consistency (mạnh nhất, chậm nhất)
// ═══════════════════════════════════════════════════
let flag = AtomicBool::new(false);
flag.store(true, Ordering::SeqCst);
// Thứ tự toàn cục hiển thị cho tất cả threads
// Dùng khi: Nhiều atomics cần ordering nhất quánBiểu đồ Memory Ordering
Độ mạnh Ordering (Yếu → Mạnh)
═══════════════════════════════════════════════════
Relaxed Acquire AcqRel SeqCst
│ │ │ │
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Không │ │Reads │ │Cả │ │Thứ tự│
│thứ tự│ │sau │ │hai │ │toàn │
│ │ │Sync │ │ │ │cục │
└──────┘ └──────┘ └──────┘ └──────┘
Hiệu năng: Nhanh ──────────────────▶ Chậm
Đảm bảo: Yếu ─────────────────────▶ MạnhVí dụ Spinlock (Dùng Atomics)
rust
use std::sync::atomic::{AtomicBool, Ordering};
use std::cell::UnsafeCell;
pub struct SpinLock<T> {
locked: AtomicBool,
data: UnsafeCell<T>,
}
// Safety: T phải là Send
unsafe impl<T: Send> Sync for SpinLock<T> {}
impl<T> SpinLock<T> {
pub fn new(data: T) -> Self {
Self {
locked: AtomicBool::new(false),
data: UnsafeCell::new(data),
}
}
pub fn lock(&self) -> SpinLockGuard<'_, T> {
// Spin cho đến khi acquire lock
while self.locked
.compare_exchange_weak(
false, true,
Ordering::Acquire, // Nếu thành công, thấy tất cả writes trước
Ordering::Relaxed, // Nếu thất bại, chỉ cần retry
)
.is_err()
{
// Gợi ý cho CPU: đang trong spin loop
std::hint::spin_loop();
}
SpinLockGuard { lock: self }
}
}
pub struct SpinLockGuard<'a, T> {
lock: &'a SpinLock<T>,
}
impl<T> Drop for SpinLockGuard<'_, T> {
fn drop(&mut self) {
self.lock.locked.store(false, Ordering::Release);
}
}
impl<T> std::ops::Deref for SpinLockGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.data.get() }
}
}
impl<T> std::ops::DerefMut for SpinLockGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.data.get() }
}
}🎯 Best Practices
Khi nào dùng gì?
| Pattern | Use Case | Trade-off |
|---|---|---|
Mutex<T> | Shared mutable state chung | Có thể contention |
RwLock<T> | Workloads nhiều đọc | Writer starvation có thể xảy ra |
Atomics | Counter/flags đơn giản | Logic phức tạp khó |
mpsc | Producer-consumer | Overhead của channel |
crossbeam | Hiệu năng cao | External dependency |
Tránh Deadlocks
rust
// ❌ RỦI RO DEADLOCK
fn transfer(from: &Mutex<Account>, to: &Mutex<Account>) {
let from_guard = from.lock().unwrap();
let to_guard = to.lock().unwrap(); // Có thể deadlock nếu thread khác lock ngược
// ...
}
// ✅ THỨ TỰ NHẤT QUÁN
fn transfer(a: &Mutex<Account>, b: &Mutex<Account>) {
// Luôn lock theo thứ tự như nhau (theo địa chỉ)
let (first, second) = if std::ptr::addr_of!(*a) < std::ptr::addr_of!(*b) {
(a, b)
} else {
(b, a)
};
let _g1 = first.lock().unwrap();
let _g2 = second.lock().unwrap();
}