Skip to content

Async Runtimes

"Async Rust: Zero-cost abstraction cho I/O-bound concurrency."

1. Future Trait

Định nghĩa Future

rust
pub trait Future {
    type Output;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),      // Future hoàn thành
    Pending,       // Chưa sẵn sàng, sẽ đánh thức khi sẵn sàng
}

Cách Polling hoạt động

                    Future State Machine
┌──────────────────────────────────────────────────────────┐
│                                                          │
│   ┌─────────┐    poll()    ┌─────────┐                   │
│   │         │─────────────▶│         │                   │
│   │ Created │              │ Running │                   │
│   │         │◀─────────────│         │                   │
│   └─────────┘   Pending    └────┬────┘                   │
│                                 │                        │
│                            Ready(T)                      │
│                                 │                        │
│                                 ▼                        │
│                           ┌─────────┐                    │
│                           │Completed│                    │
│                           └─────────┘                    │
│                                                          │
└──────────────────────────────────────────────────────────┘

Triển khai Future thủ công

rust
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct CountDown {
    count: u32,
}

impl Future for CountDown {
    type Output = String;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.count == 0 {
            Poll::Ready("Phóng!".to_string())
        } else {
            self.count -= 1;
            // Lên lịch để được poll lại
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

2. Cơ chế Waker

Kiến trúc Waker

                       Hệ thống Waker
┌────────────────────────────────────────────────────────┐
│                                                        │
│    Executor                    I/O Driver              │
│   ┌────────┐                  ┌────────┐               │
│   │        │                  │        │               │
│   │  Task  │◀────Waker────────│  epoll │               │
│   │  Queue │                  │  kqueue│               │
│   │        │                  │        │               │
│   └────────┘                  └────────┘               │
│       │                            ▲                   │
│       │ poll()                     │                   │
│       ▼                            │                   │
│   ┌────────┐                       │                   │
│   │ Future │───────Đăng ký─────────┘                   │
│   │  (I/O) │       interest                            │
│   └────────┘                                           │
│                                                        │
└────────────────────────────────────────────────────────┘

Cấu trúc nội bộ Waker

rust
// Cấu trúc Waker đơn giản hóa
pub struct Waker {
    // Con trỏ đến hàm wake của executor
    wake_fn: fn(*const ()),
    // Task ID hoặc tham chiếu
    data: *const (),
}

impl Waker {
    pub fn wake(self) {
        (self.wake_fn)(self.data);
    }
    
    pub fn wake_by_ref(&self) {
        (self.wake_fn)(self.data);
    }
}

// Cách executor tạo waker
fn create_waker(task_id: usize) -> Waker {
    fn wake_task(data: *const ()) {
        let task_id = data as usize;
        // Đưa task vào hàng đợi lại để poll
        EXECUTOR.schedule(task_id);
    }
    
    Waker {
        wake_fn: wake_task,
        data: task_id as *const (),
    }
}

3. Kiến trúc Tokio Runtime

Bộ lập lịch đa luồng

                    Kiến trúc Tokio Runtime
┌──────────────────────────────────────────────────────────────┐
│                                                              │
│   ┌────────────────────────────────────────────────────┐     │
│   │                    Reactor (I/O Driver)            │     │
│   │   ┌─────────┐   ┌─────────┐   ┌─────────┐          │     │
│   │   │  epoll  │   │  Timer  │   │ Signal  │          │     │
│   │   │ (Linux) │   │  Wheel  │   │ Handler │          │     │
│   │   └─────────┘   └─────────┘   └─────────┘          │     │
│   └────────────────────────────────────────────────────┘     │
│                            │                                 │
│                            ▼                                 │
│   ┌────────────────────────────────────────────────────┐     │
│   │              Scheduler (Work-stealing)             │     │
│   │                                                    │     │
│   │   Worker 1          Worker 2          Worker N     │     │
│   │  ┌────────┐        ┌────────┐        ┌────────┐    │     │
│   │  │Local Q │◀──────▶│Local Q │◀──────▶│Local Q │    │     │
│   │  │[Task]  │  steal │[Task]  │  steal │[Task]  │    │     │
│   │  │[Task]  │        │[    ]  │        │[Task]  │    │     │
│   │  └────────┘        └────────┘        └────────┘    │     │
│   │       │                 │                 │        │     │
│   │       └─────────────────┼─────────────────┘        │     │
│   │                         ▼                          │     │
│   │                  ┌────────────┐                    │     │
│   │                  │ Global Q   │                    │     │
│   │                  │ (overflow) │                    │     │
│   │                  └────────────┘                    │     │
│   └────────────────────────────────────────────────────┘     │
│                                                              │
└──────────────────────────────────────────────────────────────┘

Thuật toán Work Stealing

rust
// Pseudocode cho work-stealing
fn worker_loop(worker: &Worker) {
    loop {
        // 1. Thử local queue (LIFO cho cache locality)
        if let Some(task) = worker.local_queue.pop() {
            task.poll();
            continue;
        }
        
        // 2. Thử global queue
        if let Some(task) = worker.runtime.global_queue.pop() {
            task.poll();
            continue;
        }
        
        // 3. Thử "ăn cắp" từ workers khác
        for other in worker.runtime.workers() {
            if let Some(task) = other.local_queue.steal() {
                task.poll();
                break;
            }
        }
        
        // 4. Không có việc: park thread cho đến khi được đánh thức
        worker.park();
    }
}

Sử dụng Tokio

rust
use tokio;

#[tokio::main]
async fn main() {
    // Khởi tạo multi-threaded runtime
    // Mặc định: số lượng CPU threads
    
    let handle = tokio::spawn(async {
        // Task này chạy trên worker thread pool
        expensive_io_operation().await
    });
    
    handle.await.unwrap();
}

// Bên dưới, #[tokio::main] mở rộng thành:
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .worker_threads(num_cpus::get())
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            // Code async main của bạn
        })
}

4. Pinning (Pin<T>)

Vấn đề: Self-Referential Structs

rust
// Struct này tham chiếu đến chính nó
struct SelfRef {
    data: String,
    ptr: *const String,  // Trỏ đến field `data`
}

impl SelfRef {
    fn new(s: String) -> Self {
        let mut this = Self {
            data: s,
            ptr: std::ptr::null(),
        };
        this.ptr = &this.data;  // Tự tham chiếu!
        this
    }
}

// VẤN ĐỀ: Nếu SelfRef di chuyển trong memory, ptr trở thành dangling!
let x = SelfRef::new("hello".into());
let y = x;  // ĐÃ DI CHUYỂN! x.data giờ ở địa chỉ khác
            // Nhưng y.ptr vẫn trỏ đến địa chỉ CŨ → UB!

Tại sao Futures cần Pin

rust
async fn example() {
    let data = vec![1, 2, 3];
    some_async_operation(&data).await;  // &data được mượn qua await
    println!("{:?}", data);
}

// Compiler transform thành state machine:
enum ExampleFuture {
    State0 {
        data: Vec<i32>,
    },
    State1 {
        data: Vec<i32>,
        reference: *const Vec<i32>,  // Trỏ đến data! Tự tham chiếu!
    },
    Done,
}

Giải pháp Pin

                    Mô hình Memory Pin
┌────────────────────────────────────────────────────────┐
│                                                        │
│    Không có Pin:              Có Pin:                  │
│    ─────────────              ────────                 │
│                                                        │
│    ┌─────────┐                ┌─────────┐              │
│    │ Future  │ ──move──▶ ?    │ Future  │ ═══ PINNED   │
│    │  @0x100 │                │  @0x100 │    (không    │
│    └─────────┘                └─────────┘     thể move)│
│         │                          │                   │
│         ▼                          ▼                   │
│    ┌─────────┐                ┌─────────┐              │
│    │self.ptr │                │self.ptr │              │
│    │ → 0x100 │  DANGLING!     │ → 0x100 │  ✓ HỢP LỆ   │
│    └─────────┘                └─────────┘              │
│                                                        │
└────────────────────────────────────────────────────────┘

Pin API

rust
use std::pin::Pin;
use std::marker::PhantomPinned;

// Type không được phép move
struct Unmovable {
    data: String,
    self_ptr: *const String,
    _pin: PhantomPinned,  // Opt out của Unpin
}

impl Unmovable {
    fn new(data: String) -> Pin<Box<Self>> {
        let mut boxed = Box::new(Self {
            data,
            self_ptr: std::ptr::null(),
            _pin: PhantomPinned,
        });
        
        let self_ptr: *const String = &boxed.data;
        unsafe {
            let mut_ref = Pin::as_mut(&mut Pin::new_unchecked(&mut *boxed));
            Pin::get_unchecked_mut(mut_ref).self_ptr = self_ptr;
        }
        
        unsafe { Pin::new_unchecked(boxed) }
    }
}

Unpin Trait

rust
// Hầu hết types là Unpin: có thể move ngay cả khi pinned
// Unpin = "bỏ qua Pin, coi như reference bình thường"

struct Normal {
    x: i32,
}

// tự động implement: impl Unpin for Normal {}

// Pin<&mut Normal> có thể convert thành &mut Normal
fn example(pinned: Pin<&mut Normal>) {
    let normal_ref: &mut Normal = Pin::into_inner(pinned);
    // An toàn để move Normal dù đã được pinned
}

// Types là !Unpin (opt-out):
// - Futures (thường tự tham chiếu)
// - Types với PhantomPinned
// - Explicitly impl !Unpin for T {}

5. Async Patterns

Select: Racing Futures

rust
use tokio::select;

async fn example() {
    let fut1 = async { "first" };
    let fut2 = async { "second" };
    
    // Race: cái nào hoàn thành trước thắng
    let result = select! {
        v1 = fut1 => v1,
        v2 = fut2 => v2,
    };
}

Join: Thực thi đồng thời

rust
use tokio::join;

async fn example() {
    let (a, b, c) = join!(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    );
    // Cả ba chạy đồng thời
}

Spawn vs Async Block

rust
// spawn: Task độc lập, chạy nền
let handle = tokio::spawn(async {
    // Có thể sống lâu hơn function hiện tại
    // Có thể chạy trên thread khác
    heavy_work().await
});

// async block: Inline, một phần của task hiện tại
let result = async {
    // Chạy khi được poll
    // Cùng task, cùng thread
    light_work().await
}.await;

🎯 Best Practices

Mẹo hiệu năng

PatternTốt choTránh khi
spawnCPU-bound + I/OLogic tuần tự đơn giản
spawn_blockingBlocking I/O/CPUCó libs async-compatible
select!Timeouts, cancellationCần tất cả kết quả
join!I/O đồng thờiKết quả phụ thuộc nhau

Lỗi thường gặp

rust
// ❌ Blocking trong async context
async fn bad() {
    std::thread::sleep(Duration::from_secs(1));  // Block thread!
}

// ✅ Dùng async sleep
async fn good() {
    tokio::time::sleep(Duration::from_secs(1)).await;
}

// ❌ Mutex qua await
async fn bad() {
    let guard = mutex.lock().unwrap();
    some_async_op().await;  // Vẫn giữ lock!
    // Tasks khác bị blocked
}

// ✅ Drop lock trước await
async fn good() {
    let data = {
        let guard = mutex.lock().unwrap();
        guard.clone()
    };  // Lock được drop ở đây
    
    some_async_op().await;
}