Skip to content

Async Runtime Internals Expert

Building blocks của Tokio — từ scratch

1. Kiến trúc Tổng quan

                     ASYNC RUNTIME ARCHITECTURE
┌─────────────────────────────────────────────────────────────────┐
│                                                                 │
│   ┌─────────────┐    poll()    ┌─────────────┐                  │
│   │   EXECUTOR  │◀────────────▶│   FUTURES   │                  │
│   │ (runs tasks)│              │ (user code) │                  │
│   └──────┬──────┘              └─────────────┘                  │
│          │                                                      │
│          │ wake()                                               │
│          ▼                                                      │
│   ┌─────────────┐              ┌─────────────┐                  │
│   │    WAKER    │◀─────────────│   REACTOR   │                  │
│   │(notifies)   │    register  │ (I/O events)│                  │
│   └─────────────┘              └──────┬──────┘                  │
│                                       │                         │
│                                       ▼                         │
│                              ┌─────────────┐                    │
│                              │ epoll/kqueue│                    │
│                              │    (OS)     │                    │
│                              └─────────────┘                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2. The Waker

Cấu trúc Waker

rust
use std::task::{RawWaker, RawWakerVTable, Waker, Context};
use std::sync::Arc;

/// Task có thể được đánh thức
struct Task {
    id: usize,
    future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
}

/// Tạo Waker cho Task
fn create_waker(task_id: usize, sender: std::sync::mpsc::Sender<usize>) -> Waker {
    // Data cho waker
    let data = Box::new((task_id, sender));
    let data_ptr = Box::into_raw(data) as *const ();
    
    // VTable với các function pointers
    static VTABLE: RawWakerVTable = RawWakerVTable::new(
        // clone
        |data| {
            let (id, sender) = unsafe { &*(data as *const (usize, std::sync::mpsc::Sender<usize>)) };
            let cloned = Box::new((*id, sender.clone()));
            RawWaker::new(Box::into_raw(cloned) as *const (), &VTABLE)
        },
        // wake
        |data| {
            let boxed = unsafe { Box::from_raw(data as *mut (usize, std::sync::mpsc::Sender<usize>)) };
            let _ = boxed.1.send(boxed.0);
        },
        // wake_by_ref
        |data| {
            let (id, sender) = unsafe { &*(data as *const (usize, std::sync::mpsc::Sender<usize>)) };
            let _ = sender.send(*id);
        },
        // drop
        |data| {
            unsafe { drop(Box::from_raw(data as *mut (usize, std::sync::mpsc::Sender<usize>))); }
        }
    );
    
    unsafe { Waker::from_raw(RawWaker::new(data_ptr, &VTABLE)) }
}

3. The Executor

Minimal Single-Threaded Executor

rust
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::task::{Context, Poll};

pub struct MiniExecutor {
    tasks: HashMap<usize, Pin<Box<dyn Future<Output = ()> + Send>>>,
    ready_queue: Receiver<usize>,
    sender: Sender<usize>,
    next_id: usize,
}

impl MiniExecutor {
    pub fn new() -> Self {
        let (sender, ready_queue) = channel();
        Self {
            tasks: HashMap::new(),
            ready_queue,
            sender,
            next_id: 0,
        }
    }
    
    /// Spawn new task
    pub fn spawn<F>(&mut self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        let id = self.next_id;
        self.next_id += 1;
        
        self.tasks.insert(id, Box::pin(future));
        // Mark task as ready
        self.sender.send(id).unwrap();
    }
    
    /// Run all tasks to completion
    pub fn run(&mut self) {
        while !self.tasks.is_empty() {
            // Wait for ready task
            let task_id = match self.ready_queue.recv() {
                Ok(id) => id,
                Err(_) => break,
            };
            
            // Get task
            let future = match self.tasks.get_mut(&task_id) {
                Some(f) => f,
                None => continue,
            };
            
            // Create waker and context
            let waker = create_waker(task_id, self.sender.clone());
            let mut cx = Context::from_waker(&waker);
            
            // Poll the future
            match future.as_mut().poll(&mut cx) {
                Poll::Ready(()) => {
                    // Task completed
                    self.tasks.remove(&task_id);
                }
                Poll::Pending => {
                    // Task will be woken when ready
                }
            }
        }
    }
}

4. The Reactor

I/O Event Loop

rust
use std::collections::HashMap;
use std::os::unix::io::RawFd;
use std::task::Waker;

pub struct MiniReactor {
    epoll_fd: RawFd,
    wakers: HashMap<RawFd, Waker>,
}

impl MiniReactor {
    pub fn new() -> std::io::Result<Self> {
        let epoll_fd = unsafe { libc::epoll_create1(0) };
        if epoll_fd < 0 {
            return Err(std::io::Error::last_os_error());
        }
        Ok(Self {
            epoll_fd,
            wakers: HashMap::new(),
        })
    }
    
    /// Register interest in fd
    pub fn register(&mut self, fd: RawFd, waker: Waker) -> std::io::Result<()> {
        let mut event = libc::epoll_event {
            events: (libc::EPOLLIN | libc::EPOLLET) as u32,
            u64: fd as u64,
        };
        
        unsafe {
            libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut event);
        }
        
        self.wakers.insert(fd, waker);
        Ok(())
    }
    
    /// Process I/O events
    pub fn poll(&mut self) -> std::io::Result<()> {
        let mut events = [libc::epoll_event { events: 0, u64: 0 }; 64];
        
        let n = unsafe {
            libc::epoll_wait(self.epoll_fd, events.as_mut_ptr(), 64, 100)
        };
        
        if n < 0 {
            return Err(std::io::Error::last_os_error());
        }
        
        for i in 0..n as usize {
            let fd = events[i].u64 as RawFd;
            if let Some(waker) = self.wakers.get(&fd) {
                waker.wake_by_ref();
            }
        }
        
        Ok(())
    }
}

5. Complete Mini Runtime

rust
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

/// Simple timer future
pub struct Timer {
    deadline: Instant,
}

impl Timer {
    pub fn after(duration: Duration) -> Self {
        Self {
            deadline: Instant::now() + duration,
        }
    }
}

impl Future for Timer {
    type Output = ();
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if Instant::now() >= self.deadline {
            Poll::Ready(())
        } else {
            // Schedule wake-up (simplified: immediate)
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

// Usage
fn main() {
    let mut executor = MiniExecutor::new();
    
    executor.spawn(async {
        println!("Task 1: Starting");
        Timer::after(Duration::from_millis(100)).await;
        println!("Task 1: Done");
    });
    
    executor.spawn(async {
        println!("Task 2: Starting");
        Timer::after(Duration::from_millis(50)).await;
        println!("Task 2: Done");
    });
    
    executor.run();
}

// Output:
// Task 1: Starting
// Task 2: Starting
// Task 2: Done (after 50ms)
// Task 1: Done (after 100ms)

6. How Tokio Improves This

AspectMini RuntimeTokio
ThreadsSingleMulti-threaded work-stealing
SchedulingFIFO queueLocal + global queues
I/OManual epollmio abstraction
TimersBusy-pollTimer wheel
Task storageHashMapSlab allocator

Tokio's Work-Stealing

┌────────────────────────────────────────────────────────────┐
│   Worker 1        Worker 2        Worker 3                 │
│  ┌────────┐      ┌────────┐      ┌────────┐               │
│  │Local Q │◀────▶│Local Q │◀────▶│Local Q │               │
│  │[T1,T2] │steal │[    ]  │steal │[T5,T6] │               │
│  └────────┘      └────────┘      └────────┘               │
│       │               │               │                   │
│       └───────────────┼───────────────┘                   │
│                       ▼                                   │
│                ┌────────────┐                             │
│                │ Global Q   │                             │
│                │ [T7,T8,T9] │                             │
│                └────────────┘                             │
└────────────────────────────────────────────────────────────┘

🎯 Key Takeaways

  1. Waker — Cầu nối giữa I/O events và task scheduling
  2. Executor — Chạy tasks, poll futures
  3. Reactor — Nhận I/O events từ OS, đánh thức tasks
  4. Futures are lazy — Không chạy cho đến khi được poll

💡 DEBUGGING TIP

Khi async code bị stuck:

  • Kiểm tra waker có được gọi không
  • Kiểm tra future có return Pending mãi không
  • Sử dụng tokio-console để debug Tokio tasks