Skip to content

Asyncio — Lập trình bất đồng bộ trong Python

Một hệ thống API gateway cần gọi 5 microservices cho mỗi request. Với synchronous code, tổng latency là tổng latency của từng service — 50ms × 5 = 250ms. Với asyncio, tất cả 5 requests chạy đồng thời trên một thread duy nhất, latency chỉ bằng service chậm nhất — 50ms. Đây không phải trick, mà là bản chất của cooperative multitasking.

Asyncio là thư viện built-in của Python cho asynchronous I/O, cho phép xử lý hàng nghìn connections đồng thời mà không cần nhiều threads hay processes. FastAPI, aiohttp, và hầu hết async frameworks hiện đại đều xây trên nền tảng này. Khác với threading (preemptive — OS quyết định khi nào switch), asyncio là cooperative — code tự quyết khi nào "nhường" quyền chạy qua từ khóa await.

Hiểu asyncio không chỉ là học syntax async/await. Bạn cần hiểu event loop hoạt động ra sao, khi nào coroutine thực sự chạy, và tại sao blocking code trong async function là lỗi nghiêm trọng nhất mà engineers thường mắc.

Bức tranh tư duy

Hãy tưởng tượng một quầy giao dịch ngân hàng ở Hà Nội. Có một nhân viên duy nhất (event loop / single thread) phục vụ nhiều khách hàng (coroutines). Nhân viên nhận hồ sơ từ khách A, gửi đi xử lý (I/O operation), rồi không đứng chờ — quay sang phục vụ khách B. Khi hồ sơ khách A có kết quả, nhân viên quay lại phục vụ tiếp khách A.

Một nhân viên giỏi (event loop hiệu quả) có thể phục vụ hàng trăm khách trong một buổi, miễn là phần chờ (I/O) chiếm phần lớn thời gian. Nhưng nếu có khách yêu cầu nhân viên ngồi tính toán phức tạp ngay tại chỗ (CPU-bound blocking) — tất cả khách khác phải chờ, toàn bộ quầy đóng băng.

Khi analogy breakdown: Nhân viên ngân hàng có thể bị ngắt giữa chừng, nhưng coroutine trong asyncio chỉ yield tại điểm await. Giữa hai await, code chạy liên tục không bị gián đoạn — đây là điểm khác biệt cốt lõi giữa cooperative (asyncio) và preemptive (threading) scheduling.

Cốt lõi kỹ thuật

Event Loop — Trái tim của Asyncio

Event loop là scheduler quản lý tất cả coroutines. Nó liên tục kiểm tra: coroutine nào sẵn sàng chạy tiếp, I/O nào đã hoàn thành, timer nào đã hết hạn.

python
import asyncio

async def task_a():
    print("Task A: bắt đầu")
    await asyncio.sleep(2)  # Yield — event loop chạy task khác
    print("Task A: hoàn thành")

async def task_b():
    print("Task B: bắt đầu")
    await asyncio.sleep(1)  # Yield — event loop chạy task khác
    print("Task B: hoàn thành")

async def main():
    await asyncio.gather(task_a(), task_b())

asyncio.run(main())
# Output:
# Task A: bắt đầu
# Task B: bắt đầu
# Task B: hoàn thành  ← 1s
# Task A: hoàn thành  ← 2s
# Tổng: 2s (không phải 3s)

Coroutines và async/await

Coroutine là hàm khai báo bằng async def. Gọi coroutine không thực thi nó — chỉ tạo coroutine object. Phải await hoặc schedule bằng create_task để thực thi.

python
import asyncio

async def fetch_data(source: str) -> dict:
    """Coroutine — có thể pause tại mỗi await."""
    print(f"Bắt đầu fetch từ {source}")
    await asyncio.sleep(0.5)  # Pause: event loop chạy tasks khác
    print(f"Hoàn thành fetch từ {source}")
    return {"source": source, "data": "..."}

async def main():
    # SAI: gọi coroutine không await → không chạy
    fetch_data("api")  # RuntimeWarning: coroutine never awaited

    # ĐÚNG: await coroutine
    result = await fetch_data("api")
    print(result)

asyncio.run(main())

Tasks — Schedule coroutines chạy đồng thời

asyncio.create_task() schedule coroutine vào event loop, cho phép nó chạy đồng thời với code hiện tại.

python
import asyncio

async def download(url: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Data from {url}"

async def main():
    # Tạo tasks — chạy đồng thời ngay lập tức
    task1 = asyncio.create_task(download("api/users", 2.0))
    task2 = asyncio.create_task(download("api/orders", 1.0))
    task3 = asyncio.create_task(download("api/products", 1.5))

    # Làm việc khác trong khi tasks chạy
    print("Đang tải dữ liệu...")

    # Đợi kết quả khi cần
    result1 = await task1
    result2 = await task2
    result3 = await task3
    print(f"Kết quả: {result1}, {result2}, {result3}")

asyncio.run(main())
# Tổng: ~2s (bằng task chậm nhất), không phải 4.5s

asyncio.gather() — Chạy nhiều coroutines đồng thời

python
import asyncio

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    # gather() chạy tất cả đồng thời, trả về list kết quả theo thứ tự
    users = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    print(users)
    # [{"id": 1, ...}, {"id": 2, ...}, {"id": 3, ...}]

    # Với return_exceptions=True: không raise, trả exception trong list
    results = await asyncio.gather(
        fetch_user(1),
        failing_operation(),
        fetch_user(3),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Lỗi: {r}")
        else:
            print(f"OK: {r}")

asyncio.run(main())

asyncio.Semaphore — Giới hạn concurrency

python
import asyncio

async def fetch_url(sem: asyncio.Semaphore, url: str) -> str:
    async with sem:  # Chỉ N coroutines chạy đồng thời
        print(f"Fetching {url}")
        await asyncio.sleep(0.5)
        return f"Data from {url}"

async def main():
    sem = asyncio.Semaphore(10)  # Tối đa 10 requests đồng thời
    urls = [f"https://api.example.com/item/{i}" for i in range(100)]

    tasks = [fetch_url(sem, url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f"Fetched {len(results)} URLs")

asyncio.run(main())

asyncio.timeout() — Timeout hiện đại (Python 3.11+)

python
import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(10)
    return "done"

async def main():
    # Context manager — sạch hơn wait_for()
    try:
        async with asyncio.timeout(3.0):
            result = await slow_operation()
    except TimeoutError:
        print("Quá thời gian!")

    # Có thể reschedule timeout
    async with asyncio.timeout(10.0) as cm:
        await step_one()
        # Cần thêm thời gian cho step_two
        cm.reschedule(asyncio.get_running_loop().time() + 20.0)
        await step_two()

asyncio.run(main())

Thực chiến

Tình huống: HTTP Client tốc độ cao với rate limiting

Bối cảnh: Service cần gọi external API để enrich dữ liệu cho 10,000 records. API giới hạn 50 requests/giây. Timeout mỗi request: 10s. Cần retry khi gặp lỗi tạm thời.

Mục tiêu: Xử lý 10,000 records trong < 5 phút, respect rate limit, không crash external API.

python
import asyncio
import time
from dataclasses import dataclass
from typing import Any

@dataclass
class APIResult:
    record_id: int
    data: dict | None
    error: str | None

class RateLimitedClient:
    def __init__(self, max_concurrent: int = 50, rate_per_second: float = 50.0):
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._rate_interval = 1.0 / rate_per_second
        self._last_request_time = 0.0
        self._rate_lock = asyncio.Lock()

    async def _wait_for_rate(self):
        async with self._rate_lock:
            now = time.monotonic()
            elapsed = now - self._last_request_time
            if elapsed < self._rate_interval:
                await asyncio.sleep(self._rate_interval - elapsed)
            self._last_request_time = time.monotonic()

    async def fetch(self, record_id: int, max_retries: int = 3) -> APIResult:
        async with self._semaphore:
            for attempt in range(max_retries):
                try:
                    await self._wait_for_rate()

                    # Giả lập API call
                    async with asyncio.timeout(10.0):
                        await asyncio.sleep(0.05)  # Giả lập latency
                        if record_id % 100 == 0:  # 1% failure rate
                            raise ConnectionError("Service unavailable")
                        return APIResult(record_id, {"enriched": True}, None)

                except TimeoutError:
                    if attempt == max_retries - 1:
                        return APIResult(record_id, None, "Timeout")
                    await asyncio.sleep(2 ** attempt)

                except ConnectionError as e:
                    if attempt == max_retries - 1:
                        return APIResult(record_id, None, str(e))
                    await asyncio.sleep(2 ** attempt)

        return APIResult(record_id, None, "Unknown error")

    async def fetch_all(self, record_ids: list[int]) -> list[APIResult]:
        tasks = [self.fetch(rid) for rid in record_ids]
        return await asyncio.gather(*tasks)

async def main():
    client = RateLimitedClient(max_concurrent=50, rate_per_second=50.0)
    record_ids = list(range(1000))

    start = time.perf_counter()
    results = await client.fetch_all(record_ids)
    elapsed = time.perf_counter() - start

    success = sum(1 for r in results if r.data is not None)
    failed = sum(1 for r in results if r.error is not None)
    print(f"1000 records: {elapsed:.1f}s ({success} OK, {failed} lỗi)")
    print(f"Throughput: {1000 / elapsed:.0f} records/s")

if __name__ == "__main__":
    asyncio.run(main())

Phân tích:

  • Semaphore(50) giới hạn 50 connections mở cùng lúc — tránh exhaustion file descriptors
  • Rate limiting qua lock + sleep — đảm bảo không vượt 50 req/s
  • Retry với exponential backoff — tăng tỷ lệ recovery cho transient errors
  • asyncio.timeout() — tránh request treo vô hạn

Tình huống: FastAPI endpoint với async dependencies

python
from dataclasses import dataclass
import asyncio

@dataclass
class UserProfile:
    user_id: int
    name: str
    orders: list
    recommendations: list

async def get_user(user_id: int) -> dict:
    await asyncio.sleep(0.05)
    return {"id": user_id, "name": f"User {user_id}"}

async def get_orders(user_id: int) -> list:
    await asyncio.sleep(0.08)
    return [{"order_id": i} for i in range(3)]

async def get_recommendations(user_id: int) -> list:
    await asyncio.sleep(0.06)
    return [{"product_id": i} for i in range(5)]

async def get_user_profile(user_id: int) -> UserProfile:
    """Gọi 3 services đồng thời — latency = max(50, 80, 60) = 80ms."""
    user, orders, recs = await asyncio.gather(
        get_user(user_id),
        get_orders(user_id),
        get_recommendations(user_id),
    )
    return UserProfile(
        user_id=user["id"],
        name=user["name"],
        orders=orders,
        recommendations=recs,
    )

# Trong FastAPI:
# @app.get("/users/{user_id}/profile")
# async def profile_endpoint(user_id: int):
#     return await get_user_profile(user_id)

Sai lầm điển hình

Sai lầm 1: Quên await — Silent bug

Vấn đề: Gọi coroutine mà không await — code tưởng chạy nhưng không thực thi.

python
# SAI: Coroutine không được await → không chạy
async def save_to_db(data: dict):
    await asyncio.sleep(0.1)
    print(f"Saved: {data}")

async def main():
    save_to_db({"key": "value"})  # RuntimeWarning, KHÔNG lưu gì cả!
    print("Done")

Tại sao sai: save_to_db() trả về coroutine object nhưng không ai await nó. Data mất mà không có error rõ ràng. Trong production, nghĩa là order không được lưu, log không được ghi.

python
# ĐÚNG: Luôn await coroutine
async def main():
    await save_to_db({"key": "value"})
    print("Done")

Sai lầm 2: Blocking code trong async function

Vấn đề: Gọi synchronous blocking function bên trong async function — đóng băng toàn bộ event loop.

python
# SAI: time.sleep() block event loop!
import time

async def process_request():
    time.sleep(5)  # CHẶN tất cả coroutines khác trong 5 giây!
    return "done"

Tại sao sai: Event loop chạy trên single thread. time.sleep(5) chặn thread đó, khiến tất cả coroutines khác (bao gồm cả HTTP responses đang pending) bị treo.

python
# ĐÚNG: Dùng asyncio.sleep hoặc run_in_executor cho blocking code
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def process_request():
    await asyncio.sleep(5)  # Non-blocking
    return "done"

# Nếu BẮT BUỘC phải gọi sync code:
async def process_with_blocking():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_function)
    return result

Sai lầm 3: Tạo quá nhiều tasks cùng lúc

Vấn đề: Tạo 10,000 connections đồng thời — crash server hoặc bị ban.

python
# SAI: 10,000 connections mở cùng lúc
async def bad_scraper(urls: list[str]):
    tasks = [fetch(url) for url in urls]  # 10,000 tasks!
    return await asyncio.gather(*tasks)

Tại sao sai: Mỗi connection tiêu file descriptor + memory. OS giới hạn ~1024 FDs mặc định. External API sẽ rate-limit hoặc ban IP.

python
# ĐÚNG: Semaphore giới hạn concurrency
async def good_scraper(urls: list[str], max_concurrent: int = 50):
    sem = asyncio.Semaphore(max_concurrent)

    async def limited_fetch(url: str):
        async with sem:
            return await fetch(url)

    tasks = [limited_fetch(url) for url in urls]
    return await asyncio.gather(*tasks)

Sai lầm 4: Nuốt CancelledError

Vấn đề: Bắt Exception rộng, vô tình nuốt CancelledError.

python
# SAI: CancelledError bị nuốt → task không thực sự bị cancel
async def bad_task():
    try:
        await long_operation()
    except Exception:  # Bắt cả CancelledError!
        pass  # Task "sống lại" — TaskGroup không thể cancel nó

Tại sao sai: CancelledError là cơ chế để TaskGroup và timeout hoạt động. Nuốt nó khiến cleanup flow bị phá vỡ, resource leak.

python
# ĐÚNG: Xử lý CancelledError riêng, luôn re-raise
async def good_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        await cleanup_resources()
        raise  # BẮT BUỘC re-raise
    except Exception as e:
        logger.error(f"Error: {e}")

Sai lầm 5: Gọi asyncio.run() trong async context

Vấn đề: Tạo event loop lồng nhau.

python
# SAI: Nested event loop → RuntimeError
async def outer():
    asyncio.run(inner())  # RuntimeError: event loop already running!

# ĐÚNG:
async def outer():
    await inner()  # Đơn giản await

Under the Hood

Event Loop Internals

Event loop của asyncio dựa trên I/O multiplexing — cơ chế OS cho phép monitor nhiều file descriptors trên single thread:

  • Linux: epoll — O(1) per event, hiệu quả nhất
  • macOS: kqueue — tương tự epoll
  • Windows: IOCP (I/O Completion Ports) — proactor model

Mỗi iteration của event loop:

  1. Chạy tất cả callbacks đã ready (scheduled bởi call_soon)
  2. Poll I/O events (với timeout = thời gian đến scheduled callback tiếp theo)
  3. Chạy callbacks từ I/O events hoàn thành
  4. Lặp lại

Coroutine vs Thread — So sánh hiệu năng

MetricThreadCoroutine
Memory overhead~8MB stack/thread~1KB/coroutine
Context switch~1-10μs (OS kernel)~0.1μs (userspace)
Tạo mới~1ms~0.01ms
Số lượng thực tế~1,000-4,000~100,000+
ParallelismCó (nhưng GIL)Không (single thread)
SchedulingPreemptive (OS)Cooperative (await)

Khi nào KHÔNG dùng asyncio

Tình huốngVấn đềGiải pháp thay thế
CPU-bound thuần PythonBlock event loopmultiprocessing, ProcessPoolExecutor
Thư viện sync legacyKhông có async APIrun_in_executor với ThreadPool
Tasks ít (< 10 đồng thời)Overkill, phức tạp hơnthreading đơn giản hơn
Cần true parallelismSingle threadmultiprocessing

Trade-offs

Asyncio mạnh khi: I/O-bound workload, hàng nghìn connections đồng thời, event-driven architectures (WebSocket, streaming). Memory footprint cực thấp so với threading.

Asyncio yếu khi: Ecosystem chia rẻ sync/async (phải dùng aiohttp thay requests, asyncpg thay psycopg2). Debug khó hơn — stack traces dài hơn, timing bugs khó reproduce. Và một blocking call duy nhất có thể đóng băng toàn bộ hệ thống.

Checklist ghi nhớ

✅ Checklist triển khai

Async Fundamentals

  • [ ] Mọi coroutine phải được await hoặc schedule bằng create_task
  • [ ] Không gọi blocking code (time.sleep, requests.get) trong async function
  • [ ] Dùng asyncio.run() chỉ ở entry point — không gọi trong async context

Concurrency Control

  • [ ] Dùng Semaphore giới hạn concurrent connections (tránh FD exhaustion)
  • [ ] Set timeout cho mọi external I/O call
  • [ ] Dùng gather(return_exceptions=True) khi tasks independent
  • [ ] Dùng TaskGroup khi tasks liên quan, cần cancel cùng nhau (Python 3.11+)

Error Handling

  • [ ] Bắt asyncio.CancelledError riêng, luôn re-raise
  • [ ] Implement retry với exponential backoff cho transient errors
  • [ ] Log đầy đủ context: task_id, URL, attempt number

Production

  • [ ] Dùng run_in_executor cho CPU-bound hoặc legacy sync code
  • [ ] Monitor event loop lag (loop.slow_callback_duration)
  • [ ] Không tạo quá nhiều tasks cùng lúc — dùng bounded queue hoặc semaphore
  • [ ] Test graceful shutdown: handle SIGTERM, cancel tasks, cleanup resources

Bài tập luyện tập

Bài 1: Async URL Fetcher — Intermediate

Đề bài: Viết hàm fetch_all_urls(urls, max_concurrent) dùng asyncio + Semaphore. Mỗi URL giả lập bằng asyncio.sleep(random). Trả về dict {url: result}. Xử lý timeout 5s cho mỗi URL.

🧠 Quiz

Câu hỏi: Trong asyncio, create_task()await coroutine() khác nhau thế nào?

  • [ ] A. Không khác — cả hai đều chạy coroutine ngay lập tức
  • [x] B. create_task() schedule coroutine chạy đồng thời; await chạy tuần tự tại điểm gọi
  • [ ] C. create_task() tạo thread mới; await chạy trên event loop
  • [ ] D. create_task() chỉ dùng được trong TaskGroup Giải thích: await coroutine() chạy coroutine và đợi kết quả — code sau await chỉ chạy khi coroutine hoàn thành. create_task() schedule coroutine vào event loop và trả về Task object ngay — coroutine chạy đồng thời. A sai vì await là tuần tự. C sai vì không có thread mới. D sai vì create_task() dùng được mọi nơi trong async context.
💡 Gợi ý
  • Tạo asyncio.Semaphore(max_concurrent) để giới hạn
  • Bọc mỗi fetch trong async with semasyncio.timeout(5.0)
  • Dùng asyncio.gather() để chạy tất cả
✅ Lời giải
python
import asyncio
import random

async def fetch_url(url: str) -> str:
    delay = random.uniform(0.1, 2.0)
    await asyncio.sleep(delay)
    return f"Data from {url} ({delay:.1f}s)"

async def fetch_all_urls(
    urls: list[str], max_concurrent: int = 10
) -> dict[str, str | None]:
    sem = asyncio.Semaphore(max_concurrent)
    results: dict[str, str | None] = {}

    async def limited_fetch(url: str):
        async with sem:
            try:
                async with asyncio.timeout(5.0):
                    result = await fetch_url(url)
                    results[url] = result
            except TimeoutError:
                results[url] = None

    tasks = [limited_fetch(url) for url in urls]
    await asyncio.gather(*tasks)
    return results

async def main():
    urls = [f"https://api.example.com/{i}" for i in range(50)]
    results = await fetch_all_urls(urls, max_concurrent=10)
    success = sum(1 for v in results.values() if v is not None)
    print(f"{success}/{len(urls)} URLs fetched successfully")

asyncio.run(main())

Phân tích: Semaphore giới hạn 10 concurrent fetches. Timeout 5s cho mỗi URL. gather() chạy tất cả đồng thời nhưng chỉ 10 active tại một thời điểm.

Bài 2: Producer-Consumer Async — Intermediate

Đề bài: Implement async producer-consumer với asyncio.Queue. Producer tạo 100 items, 3 consumers xử lý. Dùng sentinel None để signal completion.

💡 Gợi ý
  • Producer: await queue.put(item), cuối cùng put None cho mỗi consumer
  • Consumer: while True, lấy item, break khi gặp None
  • Dùng asyncio.TaskGroup() để quản lý lifecycle
✅ Lời giải
python
import asyncio

async def producer(queue: asyncio.Queue, num_items: int, num_consumers: int):
    for i in range(num_items):
        await queue.put(f"item-{i}")
        await asyncio.sleep(0.01)

    for _ in range(num_consumers):
        await queue.put(None)

async def consumer(queue: asyncio.Queue, consumer_id: int) -> int:
    count = 0
    while True:
        item = await queue.get()
        if item is None:
            break
        await asyncio.sleep(0.02)  # Giả lập xử lý
        count += 1
        queue.task_done()
    return count

async def main():
    queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=20)
    num_consumers = 3

    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, 100, num_consumers))
        consumer_tasks = [
            tg.create_task(consumer(queue, i))
            for i in range(num_consumers)
        ]

    for i, task in enumerate(consumer_tasks):
        print(f"Consumer {i}: xử lý {task.result()} items")

asyncio.run(main())

Phân tích: Bounded queue (maxsize=20) tạo backpressure — producer chậm lại khi consumers không kịp. Sentinel None cho mỗi consumer đảm bảo tất cả nhận signal dừng.

Liên kết học tiếp

Từ khóa glossary: asyncio, event loop, coroutine, async/await, cooperative multitasking, Semaphore, gather, TaskGroup

Tìm kiếm liên quan: asyncio python tutorial, event loop là gì, async await python, fastapi async