Giao diện
Asyncio Modern Python
Asyncio = Một thread, ngàn connections
Asyncio là gì?
Asyncio là thư viện built-in của Python cho asynchronous I/O — cho phép xử lý hàng nghìn I/O operations đồng thời với một thread duy nhất, không bị ảnh hưởng bởi GIL.
Ví von: Người phục vụ nhà hàng
Synchronous: Waiter đứng chờ một bàn xong mới sang bàn khác
Asynchronous: Waiter nhận order nhiều bàn, ai xong trước phục vụ trước
Cú pháp async / await
python
import asyncio
# Hàm async - có thể "pause" và "resume"
async def chao():
print("Xin chào...")
await asyncio.sleep(1) # "Pause" - cho tasks khác chạy
print("...thế giới!")
# Chạy async function
asyncio.run(chao())async def vs def
python
# Hàm thường - chạy tuần tự
def sync_function():
return "result"
# Hàm async - trả về coroutine object
async def async_function():
return "result"
# async function PHẢI được await
result = await async_function()await là gì?
await là điểm mà hàm tạm dừng để Event Loop chạy tasks khác.
python
async def goi_api():
print("1. Bắt đầu request")
response = await fetch_data() # PAUSE - Event Loop chạy tasks khác
print("2. Nhận response") # RESUME khi fetch_data() xong
return responseEvent Loop - Trái tim của Asyncio
Event Loop là scheduler quản lý tất cả async tasks.
python
import asyncio
async def task_a():
print("Task A: bắt đầu")
await asyncio.sleep(2)
print("Task A: xong")
async def task_b():
print("Task B: bắt đầu")
await asyncio.sleep(1)
print("Task B: xong")
async def main():
# Chạy cả 2 tasks đồng thời
await asyncio.gather(
task_a(),
task_b()
)
asyncio.run(main())
# Output:
# Task A: bắt đầu
# Task B: bắt đầu
# Task B: xong ← Task B xong trước (chỉ 1s)
# Task A: xong ← Task A xong sau (2s)
# Tổng thời gian: 2s (không phải 3s!)Cách Event Loop hoạt động
So sánh: Requests (Sync) vs Aiohttp (Async)
Sync: Requests
python
import time
import requests
urls = [f"https://api.example.com/data/{i}" for i in range(100)]
bat_dau = time.perf_counter()
# Tuần tự - chờ từng request
ket_qua = []
for url in urls:
response = requests.get(url)
ket_qua.append(response.json())
print(f"Sync: {time.perf_counter() - bat_dau:.2f}s")
# Output: Sync: 25.00s (100 requests × 0.25s mỗi cái)
]Async: Aiohttp
import asyncio import aiohttp import time
urls = [f"https://api.example.com/data/{i}" for i in range(100)]
async def fetch(session, url): async with session.get(url) as response: return await response.json()
async def main(): async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] ket_qua = await asyncio.gather(*tasks) return ket_qua
bat_dau = time.perf_counter() asyncio.run(main()) print(f"Async: {time.perf_counter() - bat_dau:.2f}s")
Output: Async: 0.50s (gần như đồng thời!)
]
Output: Async: 0.50s (gần như đồng thời!)
### Kết quả
| Phương pháp | Thời gian | Tỷ lệ |
|-------------|-----------|-------|
| Sync (requests) | 25.00s | 1x |
| Async (aiohttp) | 0.50s | **50x nhanh hơn!** |
---
## Các Pattern Asyncio Quan trọng
### 1. `asyncio.gather()` - Chạy nhiều tasks
```python
async def main():
# Chạy đồng thời, đợi tất cả xong
ket_qua = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
# ket_qua = [user1, user2, user3]2. asyncio.create_task() - Tạo task chạy nền
python
async def main():
# Tạo task nhưng không đợi
task = asyncio.create_task(background_job())
# Làm việc khác
await do_something_else()
# Đợi task khi cần
result = await task3. asyncio.wait_for() - Timeout (Legacy)
python
async def main():
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=5.0 # Timeout sau 5 giây
)
except asyncio.TimeoutError:
print("Quá thời gian!")4. asyncio.timeout() - Modern Timeout (Python 3.11+)
python
import asyncio
async def main():
# Context manager - cleaner syntax
try:
async with asyncio.timeout(5.0):
result = await slow_operation()
print(f"Kết quả: {result}")
except TimeoutError: # Không phải asyncio.TimeoutError!
print("Quá thời gian!")
# Có thể reschedule timeout
async def flexible_timeout():
async with asyncio.timeout(10.0) as cm:
await step_1()
# Cần thêm thời gian? Reschedule!
cm.reschedule(asyncio.get_running_loop().time() + 20.0)
await step_2() # Có thêm 20s
# timeout_at - deadline cụ thể
async def deadline_example():
loop = asyncio.get_running_loop()
deadline = loop.time() + 30.0 # 30s từ bây giờ
async with asyncio.timeout_at(deadline):
await long_running_task()💡 asyncio.timeout() vs asyncio.wait_for()
timeout(): Context manager, có thể reschedule, raiseTimeoutErrorwait_for(): Function wrapper, raiseasyncio.TimeoutError- Khuyến nghị: Dùng
timeout()cho Python 3.11+
4. asyncio.Semaphore - Giới hạn concurrent tasks
python
async def fetch_with_limit(sem, url):
async with sem: # Chỉ N tasks chạy cùng lúc
return await fetch(url)
async def main():
sem = asyncio.Semaphore(10) # Max 10 concurrent
tasks = [fetch_with_limit(sem, url) for url in urls_1000]
results = await asyncio.gather(*tasks)5. asyncio.TaskGroup - Structured Concurrency (Python 3.11+)
python
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1) # Simulate API call
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# TaskGroup - tự động cancel tất cả tasks nếu một task fail
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_user(2))
task3 = tg.create_task(fetch_user(3))
# Tất cả tasks đã hoàn thành khi ra khỏi context
print(task1.result()) # {"id": 1, "name": "User 1"}
print(task2.result())
print(task3.result())
asyncio.run(main())TaskGroup vs gather() - Khi nào dùng cái nào?
python
# ❌ gather() - Một task fail, các tasks khác vẫn chạy
async def gather_example():
try:
results = await asyncio.gather(
task_that_fails(),
task_that_succeeds(),
return_exceptions=True # Không raise, trả về exception
)
except Exception:
# Các tasks khác vẫn đang chạy!
pass
# ✅ TaskGroup - Một task fail, TẤT CẢ tasks bị cancel
async def taskgroup_example():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task_that_fails())
tg.create_task(task_that_succeeds())
except* ValueError: # except* cho ExceptionGroup (Python 3.11+)
# Tất cả tasks đã bị cancel
print("Có lỗi xảy ra, đã cleanup")| Feature | gather() | TaskGroup |
|---|---|---|
| Error handling | Một task fail, các task khác tiếp tục | Một task fail, tất cả bị cancel |
| Cleanup | Manual | Automatic |
| Syntax | Function call | Context manager |
| Python version | 3.4+ | 3.11+ |
| Use case | Independent tasks | Related tasks cần cleanup cùng nhau |
6. async for - Async iteration
python
async def stream_data():
async for chunk in async_reader:
yield process(chunk)
async def main():
async for data in stream_data():
print(data)7. async with - Async context manager
python
async def main():
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()Exception Handling Patterns
Pattern 1: Basic try/except trong async
python
async def safe_fetch(url: str) -> dict | None:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
except aiohttp.ClientError as e:
print(f"Network error: {e}")
return None
except asyncio.TimeoutError:
print(f"Timeout fetching {url}")
return NonePattern 2: Exception handling với gather()
python
async def main():
# return_exceptions=True: Không raise, trả về exceptions trong list
results = await asyncio.gather(
fetch_user(1),
fetch_user(2), # Giả sử task này fail
fetch_user(3),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")Pattern 3: ExceptionGroup với TaskGroup (Python 3.11+)
python
async def risky_task(task_id: int):
if task_id == 2:
raise ValueError(f"Task {task_id} failed!")
await asyncio.sleep(0.1)
return f"Task {task_id} done"
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task(1))
tg.create_task(risky_task(2)) # Sẽ fail
tg.create_task(risky_task(3))
except* ValueError as eg:
# except* bắt ExceptionGroup
for exc in eg.exceptions:
print(f"Caught: {exc}")
except* TypeError as eg:
# Có thể bắt nhiều loại exception
for exc in eg.exceptions:
print(f"Type error: {exc}")
asyncio.run(main())Pattern 4: Graceful shutdown với signal handling
python
import asyncio
import signal
async def long_running_task():
while True:
print("Working...")
await asyncio.sleep(1)
async def main():
# Tạo task
task = asyncio.create_task(long_running_task())
# Setup signal handler
loop = asyncio.get_running_loop()
def shutdown():
print("Shutting down...")
task.cancel()
loop.add_signal_handler(signal.SIGINT, shutdown)
loop.add_signal_handler(signal.SIGTERM, shutdown)
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
# Chạy và thử Ctrl+C
asyncio.run(main())Pattern 5: Retry với exponential backoff
python
import asyncio
import random
async def fetch_with_retry(
url: str,
max_retries: int = 3,
base_delay: float = 1.0
) -> dict:
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with asyncio.timeout(10.0):
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, TimeoutError) as e:
if attempt == max_retries - 1:
raise # Hết retry, raise exception
# Exponential backoff với jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s...")
await asyncio.sleep(delay)Production Pitfalls
⚠️ NHỮNG LỖI THƯỜNG GẶP
1. Quên await - Silent Bug
python
# ❌ SAI - Không có await, coroutine không chạy!
async def main():
fetch_data() # Trả về coroutine object, không execute!
print("Done") # In ra ngay lập tức
# ✅ ĐÚNG
async def main():
await fetch_data()
print("Done")2. Blocking code trong async function
python
# ❌ SAI - time.sleep() block event loop!
async def bad_sleep():
time.sleep(5) # BLOCK toàn bộ event loop!
return "done"
# ✅ ĐÚNG
async def good_sleep():
await asyncio.sleep(5) # Non-blocking
return "done"3. Tạo quá nhiều tasks cùng lúc
python
# ❌ SAI - 10000 connections cùng lúc = crash server
async def bad_scraper():
tasks = [fetch(url) for url in urls_10000]
return await asyncio.gather(*tasks)
# ✅ ĐÚNG - Giới hạn concurrent connections
async def good_scraper():
sem = asyncio.Semaphore(50) # Max 50 concurrent
async def limited_fetch(url):
async with sem:
return await fetch(url)
tasks = [limited_fetch(url) for url in urls_10000]
return await asyncio.gather(*tasks)4. Không handle CancelledError đúng cách
python
# ❌ SAI - Swallow CancelledError
async def bad_task():
try:
await long_operation()
except Exception: # Bắt cả CancelledError!
pass
# ✅ ĐÚNG - Re-raise CancelledError
async def good_task():
try:
await long_operation()
except asyncio.CancelledError:
# Cleanup nếu cần
raise # PHẢI re-raise!
except Exception as e:
print(f"Error: {e}")5. Dùng asyncio.run() trong async context
python
# ❌ SAI - Nested event loops
async def outer():
asyncio.run(inner()) # RuntimeError!
# ✅ ĐÚNG
async def outer():
await inner()Real-world: Async Web Scraper
import asyncio import aiohttp from bs4 import BeautifulSoup
class AsyncScraper: def init(self, max_concurrent: int = 20): self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_page(self, session, url: str) -> str:
async with self.semaphore:
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"Lỗi {url}: {e}")
return ""
async def parse_page(self, html: str) -> dict:
soup = BeautifulSoup(html, 'html.parser')
return {
"title": soup.title.string if soup.title else "",
"links": len(soup.find_all('a'))
}
async def scrape(self, urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
results = []
for html in pages:
if html:
results.append(await self.parse_page(html))
return results
Sử dụng
async def main(): scraper = AsyncScraper(max_concurrent=50) urls = [f"https://example.com/page/{i}" for i in range(1000)]
results = await scraper.scrape(urls)
print(f"Đã scrape {len(results)} trang")
asyncio.run(main()) ]
asyncio.run(main())
---
## Khi nào KHÔNG dùng Asyncio?
::: warning ⚠️ ASYNCIO KHÔNG PHẢI CHO MỌI THỨ
❌ **Không dùng cho CPU-bound tasks**:
```python
# ❌ SAI - Chặn Event Loop
async def tinh_toan():
result = heavy_cpu_work() # Không có await!
return result✅ Dùng run_in_executor cho CPU-bound:
python
import asyncio
from concurrent.futures import ProcessPoolExecutor
async def tinh_toan():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, heavy_cpu_work)
return result:::
Async Frameworks
from fastapi import FastAPI import httpx
app = FastAPI()
@app.get("/users/{user_id}") async def get_user(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json() ) async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json()
### Starlette (Lightweight)
```python
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
async def homepage(request):
return JSONResponse({"message": "Hello"})
app = Starlette(routes=[Route("/", homepage)])Deep Dive vào Python Concurrency
💡 MASTER ASYNCIO
Asyncio là kỹ năng quan trọng cho Python Engineer hiện đại. Chúng tôi đã tổng hợp tất cả patterns, pitfalls, và best practices trong ebook chuyên sâu.
✦
✧
✦
PREMIUM
HPN Interview Kit
Bộ câu hỏi phỏng vấn Big Tech
- ✓ 200+ câu hỏi thực tế từ FAANG
- ✓ Giải thích chi tiết bằng Tiếng Việt
- ✓ Cập nhật liên tục 2025
Bảng Tóm tắt
python
# === CƠ BẢN ===
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "done"
# Chạy async code
asyncio.run(my_coroutine())
# === NHIỀU TASKS ===
# gather - chạy đồng thời, đợi tất cả
results = await asyncio.gather(task1(), task2(), task3())
# create_task - tạo task chạy nền
task = asyncio.create_task(background())
# wait_for - với timeout (legacy)
result = await asyncio.wait_for(slow(), timeout=5.0)
# === PYTHON 3.11+ FEATURES ===
# timeout context manager (modern)
async with asyncio.timeout(5.0):
result = await slow()
# TaskGroup - structured concurrency
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_user(2))
# Tất cả tasks hoàn thành khi ra khỏi context
# ExceptionGroup handling
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task())
except* ValueError as eg:
for exc in eg.exceptions:
print(f"Caught: {exc}")
# === CONTROL CONCURRENCY ===
sem = asyncio.Semaphore(10)
async def limited_task():
async with sem:
await do_work()
# === ASYNC ITERATION ===
async for item in async_generator():
process(item)
# === ASYNC CONTEXT MANAGER ===
async with resource() as r:
await r.do_something()
# === HTTP CLIENT ===
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
# === MIX CPU + I/O ===
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, cpu_function)
# === RETRY PATTERN ===
async def fetch_with_retry(url, max_retries=3):
for attempt in range(max_retries):
try:
return await fetch(url)
except Exception:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff