Giao diện
Async Runtimes
"Async Rust: Zero-cost abstraction cho I/O-bound concurrency."
1. Future Trait
Định nghĩa Future
rust
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // Future hoàn thành
Pending, // Chưa sẵn sàng, sẽ đánh thức khi sẵn sàng
}Cách Polling hoạt động
Future State Machine
┌──────────────────────────────────────────────────────────┐
│ │
│ ┌─────────┐ poll() ┌─────────┐ │
│ │ │─────────────▶│ │ │
│ │ Created │ │ Running │ │
│ │ │◀─────────────│ │ │
│ └─────────┘ Pending └────┬────┘ │
│ │ │
│ Ready(T) │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │Completed│ │
│ └─────────┘ │
│ │
└──────────────────────────────────────────────────────────┘Triển khai Future thủ công
rust
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct CountDown {
count: u32,
}
impl Future for CountDown {
type Output = String;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready("Phóng!".to_string())
} else {
self.count -= 1;
// Lên lịch để được poll lại
cx.waker().wake_by_ref();
Poll::Pending
}
}
}2. Cơ chế Waker
Kiến trúc Waker
Hệ thống Waker
┌────────────────────────────────────────────────────────┐
│ │
│ Executor I/O Driver │
│ ┌────────┐ ┌────────┐ │
│ │ │ │ │ │
│ │ Task │◀────Waker────────│ epoll │ │
│ │ Queue │ │ kqueue│ │
│ │ │ │ │ │
│ └────────┘ └────────┘ │
│ │ ▲ │
│ │ poll() │ │
│ ▼ │ │
│ ┌────────┐ │ │
│ │ Future │───────Đăng ký─────────┘ │
│ │ (I/O) │ interest │
│ └────────┘ │
│ │
└────────────────────────────────────────────────────────┘Cấu trúc nội bộ Waker
rust
// Cấu trúc Waker đơn giản hóa
pub struct Waker {
// Con trỏ đến hàm wake của executor
wake_fn: fn(*const ()),
// Task ID hoặc tham chiếu
data: *const (),
}
impl Waker {
pub fn wake(self) {
(self.wake_fn)(self.data);
}
pub fn wake_by_ref(&self) {
(self.wake_fn)(self.data);
}
}
// Cách executor tạo waker
fn create_waker(task_id: usize) -> Waker {
fn wake_task(data: *const ()) {
let task_id = data as usize;
// Đưa task vào hàng đợi lại để poll
EXECUTOR.schedule(task_id);
}
Waker {
wake_fn: wake_task,
data: task_id as *const (),
}
}3. Kiến trúc Tokio Runtime
Bộ lập lịch đa luồng
Kiến trúc Tokio Runtime
┌──────────────────────────────────────────────────────────────┐
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Reactor (I/O Driver) │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ epoll │ │ Timer │ │ Signal │ │ │
│ │ │ (Linux) │ │ Wheel │ │ Handler │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Scheduler (Work-stealing) │ │
│ │ │ │
│ │ Worker 1 Worker 2 Worker N │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │Local Q │◀──────▶│Local Q │◀──────▶│Local Q │ │ │
│ │ │[Task] │ steal │[Task] │ steal │[Task] │ │ │
│ │ │[Task] │ │[ ] │ │[Task] │ │ │
│ │ └────────┘ └────────┘ └────────┘ │ │
│ │ │ │ │ │ │
│ │ └─────────────────┼─────────────────┘ │ │
│ │ ▼ │ │
│ │ ┌────────────┐ │ │
│ │ │ Global Q │ │ │
│ │ │ (overflow) │ │ │
│ │ └────────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘Thuật toán Work Stealing
rust
// Pseudocode cho work-stealing
fn worker_loop(worker: &Worker) {
loop {
// 1. Thử local queue (LIFO cho cache locality)
if let Some(task) = worker.local_queue.pop() {
task.poll();
continue;
}
// 2. Thử global queue
if let Some(task) = worker.runtime.global_queue.pop() {
task.poll();
continue;
}
// 3. Thử "ăn cắp" từ workers khác
for other in worker.runtime.workers() {
if let Some(task) = other.local_queue.steal() {
task.poll();
break;
}
}
// 4. Không có việc: park thread cho đến khi được đánh thức
worker.park();
}
}Sử dụng Tokio
rust
use tokio;
#[tokio::main]
async fn main() {
// Khởi tạo multi-threaded runtime
// Mặc định: số lượng CPU threads
let handle = tokio::spawn(async {
// Task này chạy trên worker thread pool
expensive_io_operation().await
});
handle.await.unwrap();
}
// Bên dưới, #[tokio::main] mở rộng thành:
fn main() {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_cpus::get())
.enable_all()
.build()
.unwrap()
.block_on(async {
// Code async main của bạn
})
}4. Pinning (Pin<T>)
Vấn đề: Self-Referential Structs
rust
// Struct này tham chiếu đến chính nó
struct SelfRef {
data: String,
ptr: *const String, // Trỏ đến field `data`
}
impl SelfRef {
fn new(s: String) -> Self {
let mut this = Self {
data: s,
ptr: std::ptr::null(),
};
this.ptr = &this.data; // Tự tham chiếu!
this
}
}
// VẤN ĐỀ: Nếu SelfRef di chuyển trong memory, ptr trở thành dangling!
let x = SelfRef::new("hello".into());
let y = x; // ĐÃ DI CHUYỂN! x.data giờ ở địa chỉ khác
// Nhưng y.ptr vẫn trỏ đến địa chỉ CŨ → UB!Tại sao Futures cần Pin
rust
async fn example() {
let data = vec![1, 2, 3];
some_async_operation(&data).await; // &data được mượn qua await
println!("{:?}", data);
}
// Compiler transform thành state machine:
enum ExampleFuture {
State0 {
data: Vec<i32>,
},
State1 {
data: Vec<i32>,
reference: *const Vec<i32>, // Trỏ đến data! Tự tham chiếu!
},
Done,
}Giải pháp Pin
Mô hình Memory Pin
┌────────────────────────────────────────────────────────┐
│ │
│ Không có Pin: Có Pin: │
│ ───────────── ──────── │
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │ Future │ ──move──▶ ? │ Future │ ═══ PINNED │
│ │ @0x100 │ │ @0x100 │ (không │
│ └─────────┘ └─────────┘ thể move)│
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │self.ptr │ │self.ptr │ │
│ │ → 0x100 │ DANGLING! │ → 0x100 │ ✓ HỢP LỆ │
│ └─────────┘ └─────────┘ │
│ │
└────────────────────────────────────────────────────────┘Pin API
rust
use std::pin::Pin;
use std::marker::PhantomPinned;
// Type không được phép move
struct Unmovable {
data: String,
self_ptr: *const String,
_pin: PhantomPinned, // Opt out của Unpin
}
impl Unmovable {
fn new(data: String) -> Pin<Box<Self>> {
let mut boxed = Box::new(Self {
data,
self_ptr: std::ptr::null(),
_pin: PhantomPinned,
});
let self_ptr: *const String = &boxed.data;
unsafe {
let mut_ref = Pin::as_mut(&mut Pin::new_unchecked(&mut *boxed));
Pin::get_unchecked_mut(mut_ref).self_ptr = self_ptr;
}
unsafe { Pin::new_unchecked(boxed) }
}
}Unpin Trait
rust
// Hầu hết types là Unpin: có thể move ngay cả khi pinned
// Unpin = "bỏ qua Pin, coi như reference bình thường"
struct Normal {
x: i32,
}
// tự động implement: impl Unpin for Normal {}
// Pin<&mut Normal> có thể convert thành &mut Normal
fn example(pinned: Pin<&mut Normal>) {
let normal_ref: &mut Normal = Pin::into_inner(pinned);
// An toàn để move Normal dù đã được pinned
}
// Types là !Unpin (opt-out):
// - Futures (thường tự tham chiếu)
// - Types với PhantomPinned
// - Explicitly impl !Unpin for T {}5. Async Patterns
Select: Racing Futures
rust
use tokio::select;
async fn example() {
let fut1 = async { "first" };
let fut2 = async { "second" };
// Race: cái nào hoàn thành trước thắng
let result = select! {
v1 = fut1 => v1,
v2 = fut2 => v2,
};
}Join: Thực thi đồng thời
rust
use tokio::join;
async fn example() {
let (a, b, c) = join!(
fetch_user(1),
fetch_user(2),
fetch_user(3),
);
// Cả ba chạy đồng thời
}Spawn vs Async Block
rust
// spawn: Task độc lập, chạy nền
let handle = tokio::spawn(async {
// Có thể sống lâu hơn function hiện tại
// Có thể chạy trên thread khác
heavy_work().await
});
// async block: Inline, một phần của task hiện tại
let result = async {
// Chạy khi được poll
// Cùng task, cùng thread
light_work().await
}.await;🎯 Best Practices
Mẹo hiệu năng
| Pattern | Tốt cho | Tránh khi |
|---|---|---|
spawn | CPU-bound + I/O | Logic tuần tự đơn giản |
spawn_blocking | Blocking I/O/CPU | Có libs async-compatible |
select! | Timeouts, cancellation | Cần tất cả kết quả |
join! | I/O đồng thời | Kết quả phụ thuộc nhau |
Lỗi thường gặp
rust
// ❌ Blocking trong async context
async fn bad() {
std::thread::sleep(Duration::from_secs(1)); // Block thread!
}
// ✅ Dùng async sleep
async fn good() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
// ❌ Mutex qua await
async fn bad() {
let guard = mutex.lock().unwrap();
some_async_op().await; // Vẫn giữ lock!
// Tasks khác bị blocked
}
// ✅ Drop lock trước await
async fn good() {
let data = {
let guard = mutex.lock().unwrap();
guard.clone()
}; // Lock được drop ở đây
some_async_op().await;
}