Giao diện
Async Runtime Internals Expert
Building blocks của Tokio — từ scratch
1. Kiến trúc Tổng quan
ASYNC RUNTIME ARCHITECTURE
┌─────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────┐ poll() ┌─────────────┐ │
│ │ EXECUTOR │◀────────────▶│ FUTURES │ │
│ │ (runs tasks)│ │ (user code) │ │
│ └──────┬──────┘ └─────────────┘ │
│ │ │
│ │ wake() │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ WAKER │◀─────────────│ REACTOR │ │
│ │(notifies) │ register │ (I/O events)│ │
│ └─────────────┘ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ epoll/kqueue│ │
│ │ (OS) │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘2. The Waker
Cấu trúc Waker
rust
use std::task::{RawWaker, RawWakerVTable, Waker, Context};
use std::sync::Arc;
/// Task có thể được đánh thức
struct Task {
id: usize,
future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
}
/// Tạo Waker cho Task
fn create_waker(task_id: usize, sender: std::sync::mpsc::Sender<usize>) -> Waker {
// Data cho waker
let data = Box::new((task_id, sender));
let data_ptr = Box::into_raw(data) as *const ();
// VTable với các function pointers
static VTABLE: RawWakerVTable = RawWakerVTable::new(
// clone
|data| {
let (id, sender) = unsafe { &*(data as *const (usize, std::sync::mpsc::Sender<usize>)) };
let cloned = Box::new((*id, sender.clone()));
RawWaker::new(Box::into_raw(cloned) as *const (), &VTABLE)
},
// wake
|data| {
let boxed = unsafe { Box::from_raw(data as *mut (usize, std::sync::mpsc::Sender<usize>)) };
let _ = boxed.1.send(boxed.0);
},
// wake_by_ref
|data| {
let (id, sender) = unsafe { &*(data as *const (usize, std::sync::mpsc::Sender<usize>)) };
let _ = sender.send(*id);
},
// drop
|data| {
unsafe { drop(Box::from_raw(data as *mut (usize, std::sync::mpsc::Sender<usize>))); }
}
);
unsafe { Waker::from_raw(RawWaker::new(data_ptr, &VTABLE)) }
}3. The Executor
Minimal Single-Threaded Executor
rust
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::task::{Context, Poll};
pub struct MiniExecutor {
tasks: HashMap<usize, Pin<Box<dyn Future<Output = ()> + Send>>>,
ready_queue: Receiver<usize>,
sender: Sender<usize>,
next_id: usize,
}
impl MiniExecutor {
pub fn new() -> Self {
let (sender, ready_queue) = channel();
Self {
tasks: HashMap::new(),
ready_queue,
sender,
next_id: 0,
}
}
/// Spawn new task
pub fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
let id = self.next_id;
self.next_id += 1;
self.tasks.insert(id, Box::pin(future));
// Mark task as ready
self.sender.send(id).unwrap();
}
/// Run all tasks to completion
pub fn run(&mut self) {
while !self.tasks.is_empty() {
// Wait for ready task
let task_id = match self.ready_queue.recv() {
Ok(id) => id,
Err(_) => break,
};
// Get task
let future = match self.tasks.get_mut(&task_id) {
Some(f) => f,
None => continue,
};
// Create waker and context
let waker = create_waker(task_id, self.sender.clone());
let mut cx = Context::from_waker(&waker);
// Poll the future
match future.as_mut().poll(&mut cx) {
Poll::Ready(()) => {
// Task completed
self.tasks.remove(&task_id);
}
Poll::Pending => {
// Task will be woken when ready
}
}
}
}
}4. The Reactor
I/O Event Loop
rust
use std::collections::HashMap;
use std::os::unix::io::RawFd;
use std::task::Waker;
pub struct MiniReactor {
epoll_fd: RawFd,
wakers: HashMap<RawFd, Waker>,
}
impl MiniReactor {
pub fn new() -> std::io::Result<Self> {
let epoll_fd = unsafe { libc::epoll_create1(0) };
if epoll_fd < 0 {
return Err(std::io::Error::last_os_error());
}
Ok(Self {
epoll_fd,
wakers: HashMap::new(),
})
}
/// Register interest in fd
pub fn register(&mut self, fd: RawFd, waker: Waker) -> std::io::Result<()> {
let mut event = libc::epoll_event {
events: (libc::EPOLLIN | libc::EPOLLET) as u32,
u64: fd as u64,
};
unsafe {
libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut event);
}
self.wakers.insert(fd, waker);
Ok(())
}
/// Process I/O events
pub fn poll(&mut self) -> std::io::Result<()> {
let mut events = [libc::epoll_event { events: 0, u64: 0 }; 64];
let n = unsafe {
libc::epoll_wait(self.epoll_fd, events.as_mut_ptr(), 64, 100)
};
if n < 0 {
return Err(std::io::Error::last_os_error());
}
for i in 0..n as usize {
let fd = events[i].u64 as RawFd;
if let Some(waker) = self.wakers.get(&fd) {
waker.wake_by_ref();
}
}
Ok(())
}
}5. Complete Mini Runtime
rust
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
/// Simple timer future
pub struct Timer {
deadline: Instant,
}
impl Timer {
pub fn after(duration: Duration) -> Self {
Self {
deadline: Instant::now() + duration,
}
}
}
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if Instant::now() >= self.deadline {
Poll::Ready(())
} else {
// Schedule wake-up (simplified: immediate)
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
// Usage
fn main() {
let mut executor = MiniExecutor::new();
executor.spawn(async {
println!("Task 1: Starting");
Timer::after(Duration::from_millis(100)).await;
println!("Task 1: Done");
});
executor.spawn(async {
println!("Task 2: Starting");
Timer::after(Duration::from_millis(50)).await;
println!("Task 2: Done");
});
executor.run();
}
// Output:
// Task 1: Starting
// Task 2: Starting
// Task 2: Done (after 50ms)
// Task 1: Done (after 100ms)6. How Tokio Improves This
| Aspect | Mini Runtime | Tokio |
|---|---|---|
| Threads | Single | Multi-threaded work-stealing |
| Scheduling | FIFO queue | Local + global queues |
| I/O | Manual epoll | mio abstraction |
| Timers | Busy-poll | Timer wheel |
| Task storage | HashMap | Slab allocator |
Tokio's Work-Stealing
┌────────────────────────────────────────────────────────────┐
│ Worker 1 Worker 2 Worker 3 │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Local Q │◀────▶│Local Q │◀────▶│Local Q │ │
│ │[T1,T2] │steal │[ ] │steal │[T5,T6] │ │
│ └────────┘ └────────┘ └────────┘ │
│ │ │ │ │
│ └───────────────┼───────────────┘ │
│ ▼ │
│ ┌────────────┐ │
│ │ Global Q │ │
│ │ [T7,T8,T9] │ │
│ └────────────┘ │
└────────────────────────────────────────────────────────────┘🎯 Key Takeaways
- Waker — Cầu nối giữa I/O events và task scheduling
- Executor — Chạy tasks, poll futures
- Reactor — Nhận I/O events từ OS, đánh thức tasks
- Futures are lazy — Không chạy cho đến khi được poll
💡 DEBUGGING TIP
Khi async code bị stuck:
- Kiểm tra waker có được gọi không
- Kiểm tra future có return Pending mãi không
- Sử dụng
tokio-consoleđể debug Tokio tasks