Giao diện
Asyncio — Lập trình bất đồng bộ trong Python
Một hệ thống API gateway cần gọi 5 microservices cho mỗi request. Với synchronous code, tổng latency là tổng latency của từng service — 50ms × 5 = 250ms. Với asyncio, tất cả 5 requests chạy đồng thời trên một thread duy nhất, latency chỉ bằng service chậm nhất — 50ms. Đây không phải trick, mà là bản chất của cooperative multitasking.
Asyncio là thư viện built-in của Python cho asynchronous I/O, cho phép xử lý hàng nghìn connections đồng thời mà không cần nhiều threads hay processes. FastAPI, aiohttp, và hầu hết async frameworks hiện đại đều xây trên nền tảng này. Khác với threading (preemptive — OS quyết định khi nào switch), asyncio là cooperative — code tự quyết khi nào "nhường" quyền chạy qua từ khóa await.
Hiểu asyncio không chỉ là học syntax async/await. Bạn cần hiểu event loop hoạt động ra sao, khi nào coroutine thực sự chạy, và tại sao blocking code trong async function là lỗi nghiêm trọng nhất mà engineers thường mắc.
Bức tranh tư duy
Hãy tưởng tượng một quầy giao dịch ngân hàng ở Hà Nội. Có một nhân viên duy nhất (event loop / single thread) phục vụ nhiều khách hàng (coroutines). Nhân viên nhận hồ sơ từ khách A, gửi đi xử lý (I/O operation), rồi không đứng chờ — quay sang phục vụ khách B. Khi hồ sơ khách A có kết quả, nhân viên quay lại phục vụ tiếp khách A.
Một nhân viên giỏi (event loop hiệu quả) có thể phục vụ hàng trăm khách trong một buổi, miễn là phần chờ (I/O) chiếm phần lớn thời gian. Nhưng nếu có khách yêu cầu nhân viên ngồi tính toán phức tạp ngay tại chỗ (CPU-bound blocking) — tất cả khách khác phải chờ, toàn bộ quầy đóng băng.
Khi analogy breakdown: Nhân viên ngân hàng có thể bị ngắt giữa chừng, nhưng coroutine trong asyncio chỉ yield tại điểm await. Giữa hai await, code chạy liên tục không bị gián đoạn — đây là điểm khác biệt cốt lõi giữa cooperative (asyncio) và preemptive (threading) scheduling.
Cốt lõi kỹ thuật
Event Loop — Trái tim của Asyncio
Event loop là scheduler quản lý tất cả coroutines. Nó liên tục kiểm tra: coroutine nào sẵn sàng chạy tiếp, I/O nào đã hoàn thành, timer nào đã hết hạn.
python
import asyncio
async def task_a():
print("Task A: bắt đầu")
await asyncio.sleep(2) # Yield — event loop chạy task khác
print("Task A: hoàn thành")
async def task_b():
print("Task B: bắt đầu")
await asyncio.sleep(1) # Yield — event loop chạy task khác
print("Task B: hoàn thành")
async def main():
await asyncio.gather(task_a(), task_b())
asyncio.run(main())
# Output:
# Task A: bắt đầu
# Task B: bắt đầu
# Task B: hoàn thành ← 1s
# Task A: hoàn thành ← 2s
# Tổng: 2s (không phải 3s)Coroutines và async/await
Coroutine là hàm khai báo bằng async def. Gọi coroutine không thực thi nó — chỉ tạo coroutine object. Phải await hoặc schedule bằng create_task để thực thi.
python
import asyncio
async def fetch_data(source: str) -> dict:
"""Coroutine — có thể pause tại mỗi await."""
print(f"Bắt đầu fetch từ {source}")
await asyncio.sleep(0.5) # Pause: event loop chạy tasks khác
print(f"Hoàn thành fetch từ {source}")
return {"source": source, "data": "..."}
async def main():
# SAI: gọi coroutine không await → không chạy
fetch_data("api") # RuntimeWarning: coroutine never awaited
# ĐÚNG: await coroutine
result = await fetch_data("api")
print(result)
asyncio.run(main())Tasks — Schedule coroutines chạy đồng thời
asyncio.create_task() schedule coroutine vào event loop, cho phép nó chạy đồng thời với code hiện tại.
python
import asyncio
async def download(url: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Data from {url}"
async def main():
# Tạo tasks — chạy đồng thời ngay lập tức
task1 = asyncio.create_task(download("api/users", 2.0))
task2 = asyncio.create_task(download("api/orders", 1.0))
task3 = asyncio.create_task(download("api/products", 1.5))
# Làm việc khác trong khi tasks chạy
print("Đang tải dữ liệu...")
# Đợi kết quả khi cần
result1 = await task1
result2 = await task2
result3 = await task3
print(f"Kết quả: {result1}, {result2}, {result3}")
asyncio.run(main())
# Tổng: ~2s (bằng task chậm nhất), không phải 4.5sasyncio.gather() — Chạy nhiều coroutines đồng thời
python
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# gather() chạy tất cả đồng thời, trả về list kết quả theo thứ tự
users = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
print(users)
# [{"id": 1, ...}, {"id": 2, ...}, {"id": 3, ...}]
# Với return_exceptions=True: không raise, trả exception trong list
results = await asyncio.gather(
fetch_user(1),
failing_operation(),
fetch_user(3),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"Lỗi: {r}")
else:
print(f"OK: {r}")
asyncio.run(main())asyncio.Semaphore — Giới hạn concurrency
python
import asyncio
async def fetch_url(sem: asyncio.Semaphore, url: str) -> str:
async with sem: # Chỉ N coroutines chạy đồng thời
print(f"Fetching {url}")
await asyncio.sleep(0.5)
return f"Data from {url}"
async def main():
sem = asyncio.Semaphore(10) # Tối đa 10 requests đồng thời
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
tasks = [fetch_url(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())asyncio.timeout() — Timeout hiện đại (Python 3.11+)
python
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(10)
return "done"
async def main():
# Context manager — sạch hơn wait_for()
try:
async with asyncio.timeout(3.0):
result = await slow_operation()
except TimeoutError:
print("Quá thời gian!")
# Có thể reschedule timeout
async with asyncio.timeout(10.0) as cm:
await step_one()
# Cần thêm thời gian cho step_two
cm.reschedule(asyncio.get_running_loop().time() + 20.0)
await step_two()
asyncio.run(main())Thực chiến
Tình huống: HTTP Client tốc độ cao với rate limiting
Bối cảnh: Service cần gọi external API để enrich dữ liệu cho 10,000 records. API giới hạn 50 requests/giây. Timeout mỗi request: 10s. Cần retry khi gặp lỗi tạm thời.
Mục tiêu: Xử lý 10,000 records trong < 5 phút, respect rate limit, không crash external API.
python
import asyncio
import time
from dataclasses import dataclass
from typing import Any
@dataclass
class APIResult:
record_id: int
data: dict | None
error: str | None
class RateLimitedClient:
def __init__(self, max_concurrent: int = 50, rate_per_second: float = 50.0):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._rate_interval = 1.0 / rate_per_second
self._last_request_time = 0.0
self._rate_lock = asyncio.Lock()
async def _wait_for_rate(self):
async with self._rate_lock:
now = time.monotonic()
elapsed = now - self._last_request_time
if elapsed < self._rate_interval:
await asyncio.sleep(self._rate_interval - elapsed)
self._last_request_time = time.monotonic()
async def fetch(self, record_id: int, max_retries: int = 3) -> APIResult:
async with self._semaphore:
for attempt in range(max_retries):
try:
await self._wait_for_rate()
# Giả lập API call
async with asyncio.timeout(10.0):
await asyncio.sleep(0.05) # Giả lập latency
if record_id % 100 == 0: # 1% failure rate
raise ConnectionError("Service unavailable")
return APIResult(record_id, {"enriched": True}, None)
except TimeoutError:
if attempt == max_retries - 1:
return APIResult(record_id, None, "Timeout")
await asyncio.sleep(2 ** attempt)
except ConnectionError as e:
if attempt == max_retries - 1:
return APIResult(record_id, None, str(e))
await asyncio.sleep(2 ** attempt)
return APIResult(record_id, None, "Unknown error")
async def fetch_all(self, record_ids: list[int]) -> list[APIResult]:
tasks = [self.fetch(rid) for rid in record_ids]
return await asyncio.gather(*tasks)
async def main():
client = RateLimitedClient(max_concurrent=50, rate_per_second=50.0)
record_ids = list(range(1000))
start = time.perf_counter()
results = await client.fetch_all(record_ids)
elapsed = time.perf_counter() - start
success = sum(1 for r in results if r.data is not None)
failed = sum(1 for r in results if r.error is not None)
print(f"1000 records: {elapsed:.1f}s ({success} OK, {failed} lỗi)")
print(f"Throughput: {1000 / elapsed:.0f} records/s")
if __name__ == "__main__":
asyncio.run(main())Phân tích:
Semaphore(50)giới hạn 50 connections mở cùng lúc — tránh exhaustion file descriptors- Rate limiting qua lock + sleep — đảm bảo không vượt 50 req/s
- Retry với exponential backoff — tăng tỷ lệ recovery cho transient errors
asyncio.timeout()— tránh request treo vô hạn
Tình huống: FastAPI endpoint với async dependencies
python
from dataclasses import dataclass
import asyncio
@dataclass
class UserProfile:
user_id: int
name: str
orders: list
recommendations: list
async def get_user(user_id: int) -> dict:
await asyncio.sleep(0.05)
return {"id": user_id, "name": f"User {user_id}"}
async def get_orders(user_id: int) -> list:
await asyncio.sleep(0.08)
return [{"order_id": i} for i in range(3)]
async def get_recommendations(user_id: int) -> list:
await asyncio.sleep(0.06)
return [{"product_id": i} for i in range(5)]
async def get_user_profile(user_id: int) -> UserProfile:
"""Gọi 3 services đồng thời — latency = max(50, 80, 60) = 80ms."""
user, orders, recs = await asyncio.gather(
get_user(user_id),
get_orders(user_id),
get_recommendations(user_id),
)
return UserProfile(
user_id=user["id"],
name=user["name"],
orders=orders,
recommendations=recs,
)
# Trong FastAPI:
# @app.get("/users/{user_id}/profile")
# async def profile_endpoint(user_id: int):
# return await get_user_profile(user_id)Sai lầm điển hình
❌ Sai lầm 1: Quên await — Silent bug
Vấn đề: Gọi coroutine mà không await — code tưởng chạy nhưng không thực thi.
python
# SAI: Coroutine không được await → không chạy
async def save_to_db(data: dict):
await asyncio.sleep(0.1)
print(f"Saved: {data}")
async def main():
save_to_db({"key": "value"}) # RuntimeWarning, KHÔNG lưu gì cả!
print("Done")Tại sao sai: save_to_db() trả về coroutine object nhưng không ai await nó. Data mất mà không có error rõ ràng. Trong production, nghĩa là order không được lưu, log không được ghi.
python
# ĐÚNG: Luôn await coroutine
async def main():
await save_to_db({"key": "value"})
print("Done")❌ Sai lầm 2: Blocking code trong async function
Vấn đề: Gọi synchronous blocking function bên trong async function — đóng băng toàn bộ event loop.
python
# SAI: time.sleep() block event loop!
import time
async def process_request():
time.sleep(5) # CHẶN tất cả coroutines khác trong 5 giây!
return "done"Tại sao sai: Event loop chạy trên single thread. time.sleep(5) chặn thread đó, khiến tất cả coroutines khác (bao gồm cả HTTP responses đang pending) bị treo.
python
# ĐÚNG: Dùng asyncio.sleep hoặc run_in_executor cho blocking code
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def process_request():
await asyncio.sleep(5) # Non-blocking
return "done"
# Nếu BẮT BUỘC phải gọi sync code:
async def process_with_blocking():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_function)
return result❌ Sai lầm 3: Tạo quá nhiều tasks cùng lúc
Vấn đề: Tạo 10,000 connections đồng thời — crash server hoặc bị ban.
python
# SAI: 10,000 connections mở cùng lúc
async def bad_scraper(urls: list[str]):
tasks = [fetch(url) for url in urls] # 10,000 tasks!
return await asyncio.gather(*tasks)Tại sao sai: Mỗi connection tiêu file descriptor + memory. OS giới hạn ~1024 FDs mặc định. External API sẽ rate-limit hoặc ban IP.
python
# ĐÚNG: Semaphore giới hạn concurrency
async def good_scraper(urls: list[str], max_concurrent: int = 50):
sem = asyncio.Semaphore(max_concurrent)
async def limited_fetch(url: str):
async with sem:
return await fetch(url)
tasks = [limited_fetch(url) for url in urls]
return await asyncio.gather(*tasks)❌ Sai lầm 4: Nuốt CancelledError
Vấn đề: Bắt Exception rộng, vô tình nuốt CancelledError.
python
# SAI: CancelledError bị nuốt → task không thực sự bị cancel
async def bad_task():
try:
await long_operation()
except Exception: # Bắt cả CancelledError!
pass # Task "sống lại" — TaskGroup không thể cancel nóTại sao sai: CancelledError là cơ chế để TaskGroup và timeout hoạt động. Nuốt nó khiến cleanup flow bị phá vỡ, resource leak.
python
# ĐÚNG: Xử lý CancelledError riêng, luôn re-raise
async def good_task():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup_resources()
raise # BẮT BUỘC re-raise
except Exception as e:
logger.error(f"Error: {e}")❌ Sai lầm 5: Gọi asyncio.run() trong async context
Vấn đề: Tạo event loop lồng nhau.
python
# SAI: Nested event loop → RuntimeError
async def outer():
asyncio.run(inner()) # RuntimeError: event loop already running!
# ĐÚNG:
async def outer():
await inner() # Đơn giản awaitUnder the Hood
Event Loop Internals
Event loop của asyncio dựa trên I/O multiplexing — cơ chế OS cho phép monitor nhiều file descriptors trên single thread:
- Linux:
epoll— O(1) per event, hiệu quả nhất - macOS:
kqueue— tương tự epoll - Windows:
IOCP(I/O Completion Ports) — proactor model
Mỗi iteration của event loop:
- Chạy tất cả callbacks đã ready (scheduled bởi
call_soon) - Poll I/O events (với timeout = thời gian đến scheduled callback tiếp theo)
- Chạy callbacks từ I/O events hoàn thành
- Lặp lại
Coroutine vs Thread — So sánh hiệu năng
| Metric | Thread | Coroutine |
|---|---|---|
| Memory overhead | ~8MB stack/thread | ~1KB/coroutine |
| Context switch | ~1-10μs (OS kernel) | ~0.1μs (userspace) |
| Tạo mới | ~1ms | ~0.01ms |
| Số lượng thực tế | ~1,000-4,000 | ~100,000+ |
| Parallelism | Có (nhưng GIL) | Không (single thread) |
| Scheduling | Preemptive (OS) | Cooperative (await) |
Khi nào KHÔNG dùng asyncio
| Tình huống | Vấn đề | Giải pháp thay thế |
|---|---|---|
| CPU-bound thuần Python | Block event loop | multiprocessing, ProcessPoolExecutor |
| Thư viện sync legacy | Không có async API | run_in_executor với ThreadPool |
| Tasks ít (< 10 đồng thời) | Overkill, phức tạp hơn | threading đơn giản hơn |
| Cần true parallelism | Single thread | multiprocessing |
Trade-offs
Asyncio mạnh khi: I/O-bound workload, hàng nghìn connections đồng thời, event-driven architectures (WebSocket, streaming). Memory footprint cực thấp so với threading.
Asyncio yếu khi: Ecosystem chia rẻ sync/async (phải dùng aiohttp thay requests, asyncpg thay psycopg2). Debug khó hơn — stack traces dài hơn, timing bugs khó reproduce. Và một blocking call duy nhất có thể đóng băng toàn bộ hệ thống.
Checklist ghi nhớ
✅ Checklist triển khai
Async Fundamentals
- [ ] Mọi coroutine phải được
awaithoặc schedule bằngcreate_task - [ ] Không gọi blocking code (
time.sleep,requests.get) trong async function - [ ] Dùng
asyncio.run()chỉ ở entry point — không gọi trong async context
Concurrency Control
- [ ] Dùng
Semaphoregiới hạn concurrent connections (tránh FD exhaustion) - [ ] Set
timeoutcho mọi external I/O call - [ ] Dùng
gather(return_exceptions=True)khi tasks independent - [ ] Dùng
TaskGroupkhi tasks liên quan, cần cancel cùng nhau (Python 3.11+)
Error Handling
- [ ] Bắt
asyncio.CancelledErrorriêng, luôn re-raise - [ ] Implement retry với exponential backoff cho transient errors
- [ ] Log đầy đủ context: task_id, URL, attempt number
Production
- [ ] Dùng
run_in_executorcho CPU-bound hoặc legacy sync code - [ ] Monitor event loop lag (
loop.slow_callback_duration) - [ ] Không tạo quá nhiều tasks cùng lúc — dùng bounded queue hoặc semaphore
- [ ] Test graceful shutdown: handle SIGTERM, cancel tasks, cleanup resources
Bài tập luyện tập
Bài 1: Async URL Fetcher — Intermediate
Đề bài: Viết hàm fetch_all_urls(urls, max_concurrent) dùng asyncio + Semaphore. Mỗi URL giả lập bằng asyncio.sleep(random). Trả về dict {url: result}. Xử lý timeout 5s cho mỗi URL.
🧠 Quiz
Câu hỏi: Trong asyncio, create_task() và await coroutine() khác nhau thế nào?
- [ ] A. Không khác — cả hai đều chạy coroutine ngay lập tức
- [x] B.
create_task()schedule coroutine chạy đồng thời;awaitchạy tuần tự tại điểm gọi - [ ] C.
create_task()tạo thread mới;awaitchạy trên event loop - [ ] D.
create_task()chỉ dùng được trong TaskGroup Giải thích:await coroutine()chạy coroutine và đợi kết quả — code sauawaitchỉ chạy khi coroutine hoàn thành.create_task()schedule coroutine vào event loop và trả về Task object ngay — coroutine chạy đồng thời. A sai vìawaitlà tuần tự. C sai vì không có thread mới. D sai vìcreate_task()dùng được mọi nơi trong async context.
💡 Gợi ý
- Tạo
asyncio.Semaphore(max_concurrent)để giới hạn - Bọc mỗi fetch trong
async with semvàasyncio.timeout(5.0) - Dùng
asyncio.gather()để chạy tất cả
✅ Lời giải
python
import asyncio
import random
async def fetch_url(url: str) -> str:
delay = random.uniform(0.1, 2.0)
await asyncio.sleep(delay)
return f"Data from {url} ({delay:.1f}s)"
async def fetch_all_urls(
urls: list[str], max_concurrent: int = 10
) -> dict[str, str | None]:
sem = asyncio.Semaphore(max_concurrent)
results: dict[str, str | None] = {}
async def limited_fetch(url: str):
async with sem:
try:
async with asyncio.timeout(5.0):
result = await fetch_url(url)
results[url] = result
except TimeoutError:
results[url] = None
tasks = [limited_fetch(url) for url in urls]
await asyncio.gather(*tasks)
return results
async def main():
urls = [f"https://api.example.com/{i}" for i in range(50)]
results = await fetch_all_urls(urls, max_concurrent=10)
success = sum(1 for v in results.values() if v is not None)
print(f"{success}/{len(urls)} URLs fetched successfully")
asyncio.run(main())Phân tích: Semaphore giới hạn 10 concurrent fetches. Timeout 5s cho mỗi URL. gather() chạy tất cả đồng thời nhưng chỉ 10 active tại một thời điểm.
Bài 2: Producer-Consumer Async — Intermediate
Đề bài: Implement async producer-consumer với asyncio.Queue. Producer tạo 100 items, 3 consumers xử lý. Dùng sentinel None để signal completion.
💡 Gợi ý
- Producer:
await queue.put(item), cuối cùng putNonecho mỗi consumer - Consumer:
while True, lấy item, break khi gặpNone - Dùng
asyncio.TaskGroup()để quản lý lifecycle
✅ Lời giải
python
import asyncio
async def producer(queue: asyncio.Queue, num_items: int, num_consumers: int):
for i in range(num_items):
await queue.put(f"item-{i}")
await asyncio.sleep(0.01)
for _ in range(num_consumers):
await queue.put(None)
async def consumer(queue: asyncio.Queue, consumer_id: int) -> int:
count = 0
while True:
item = await queue.get()
if item is None:
break
await asyncio.sleep(0.02) # Giả lập xử lý
count += 1
queue.task_done()
return count
async def main():
queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=20)
num_consumers = 3
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue, 100, num_consumers))
consumer_tasks = [
tg.create_task(consumer(queue, i))
for i in range(num_consumers)
]
for i, task in enumerate(consumer_tasks):
print(f"Consumer {i}: xử lý {task.result()} items")
asyncio.run(main())Phân tích: Bounded queue (maxsize=20) tạo backpressure — producer chậm lại khi consumers không kịp. Sentinel None cho mỗi consumer đảm bảo tất cả nhận signal dừng.
Liên kết học tiếp
Từ khóa glossary: asyncio, event loop, coroutine, async/await, cooperative multitasking, Semaphore, gather, TaskGroup
Tìm kiếm liên quan: asyncio python tutorial, event loop là gì, async await python, fastapi async