Skip to content

Asyncio Modern Python

Asyncio = Một thread, ngàn connections

Asyncio là gì?

Asyncio là thư viện built-in của Python cho asynchronous I/O — cho phép xử lý hàng nghìn I/O operations đồng thời với một thread duy nhất, không bị ảnh hưởng bởi GIL.

Ví von: Người phục vụ nhà hàng

Synchronous: Waiter đứng chờ một bàn xong mới sang bàn khác
Asynchronous: Waiter nhận order nhiều bàn, ai xong trước phục vụ trước


Cú pháp async / await

python
import asyncio

# Hàm async - có thể "pause" và "resume"
async def chao():
    print("Xin chào...")
    await asyncio.sleep(1)  # "Pause" - cho tasks khác chạy
    print("...thế giới!")

# Chạy async function
asyncio.run(chao())

async def vs def

python
# Hàm thường - chạy tuần tự
def sync_function():
    return "result"

# Hàm async - trả về coroutine object
async def async_function():
    return "result"

# async function PHẢI được await
result = await async_function()

await là gì?

await là điểm mà hàm tạm dừng để Event Loop chạy tasks khác.

python
async def goi_api():
    print("1. Bắt đầu request")
    response = await fetch_data()  # PAUSE - Event Loop chạy tasks khác
    print("2. Nhận response")       # RESUME khi fetch_data() xong
    return response

Event Loop - Trái tim của Asyncio

Event Loop là scheduler quản lý tất cả async tasks.

python
import asyncio

async def task_a():
    print("Task A: bắt đầu")
    await asyncio.sleep(2)
    print("Task A: xong")

async def task_b():
    print("Task B: bắt đầu")
    await asyncio.sleep(1)
    print("Task B: xong")

async def main():
    # Chạy cả 2 tasks đồng thời
    await asyncio.gather(
        task_a(),
        task_b()
    )

asyncio.run(main())

# Output:
# Task A: bắt đầu
# Task B: bắt đầu
# Task B: xong     ← Task B xong trước (chỉ 1s)
# Task A: xong     ← Task A xong sau (2s)
# Tổng thời gian: 2s (không phải 3s!)

Cách Event Loop hoạt động


So sánh: Requests (Sync) vs Aiohttp (Async)

Sync: Requests

python
import time
import requests

urls = [f"https://api.example.com/data/{i}" for i in range(100)]

bat_dau = time.perf_counter()

# Tuần tự - chờ từng request
ket_qua = []
for url in urls:
    response = requests.get(url)
    ket_qua.append(response.json())

print(f"Sync: {time.perf_counter() - bat_dau:.2f}s")
# Output: Sync: 25.00s (100 requests × 0.25s mỗi cái)
]

Async: Aiohttp

import asyncio import aiohttp import time

urls = [f"https://api.example.com/data/{i}" for i in range(100)]

async def fetch(session, url): async with session.get(url) as response: return await response.json()

async def main(): async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] ket_qua = await asyncio.gather(*tasks) return ket_qua

bat_dau = time.perf_counter() asyncio.run(main()) print(f"Async: {time.perf_counter() - bat_dau:.2f}s")

Output: Async: 0.50s (gần như đồng thời!)

]

Output: Async: 0.50s (gần như đồng thời!)


### Kết quả

| Phương pháp | Thời gian | Tỷ lệ |
|-------------|-----------|-------|
| Sync (requests) | 25.00s | 1x |
| Async (aiohttp) | 0.50s | **50x nhanh hơn!** |

---

## Các Pattern Asyncio Quan trọng

### 1. `asyncio.gather()` - Chạy nhiều tasks

```python
async def main():
    # Chạy đồng thời, đợi tất cả xong
    ket_qua = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    # ket_qua = [user1, user2, user3]

2. asyncio.create_task() - Tạo task chạy nền

python
async def main():
    # Tạo task nhưng không đợi
    task = asyncio.create_task(background_job())
    
    # Làm việc khác
    await do_something_else()
    
    # Đợi task khi cần
    result = await task

3. asyncio.wait_for() - Timeout (Legacy)

python
async def main():
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=5.0  # Timeout sau 5 giây
        )
    except asyncio.TimeoutError:
        print("Quá thời gian!")

4. asyncio.timeout() - Modern Timeout (Python 3.11+)

python
import asyncio

async def main():
    # Context manager - cleaner syntax
    try:
        async with asyncio.timeout(5.0):
            result = await slow_operation()
            print(f"Kết quả: {result}")
    except TimeoutError:  # Không phải asyncio.TimeoutError!
        print("Quá thời gian!")

# Có thể reschedule timeout
async def flexible_timeout():
    async with asyncio.timeout(10.0) as cm:
        await step_1()
        
        # Cần thêm thời gian? Reschedule!
        cm.reschedule(asyncio.get_running_loop().time() + 20.0)
        
        await step_2()  # Có thêm 20s

# timeout_at - deadline cụ thể
async def deadline_example():
    loop = asyncio.get_running_loop()
    deadline = loop.time() + 30.0  # 30s từ bây giờ
    
    async with asyncio.timeout_at(deadline):
        await long_running_task()

💡 asyncio.timeout() vs asyncio.wait_for()

  • timeout(): Context manager, có thể reschedule, raise TimeoutError
  • wait_for(): Function wrapper, raise asyncio.TimeoutError
  • Khuyến nghị: Dùng timeout() cho Python 3.11+

4. asyncio.Semaphore - Giới hạn concurrent tasks

python
async def fetch_with_limit(sem, url):
    async with sem:  # Chỉ N tasks chạy cùng lúc
        return await fetch(url)

async def main():
    sem = asyncio.Semaphore(10)  # Max 10 concurrent
    
    tasks = [fetch_with_limit(sem, url) for url in urls_1000]
    results = await asyncio.gather(*tasks)

5. asyncio.TaskGroup - Structured Concurrency (Python 3.11+)

python
import asyncio

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)  # Simulate API call
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    # TaskGroup - tự động cancel tất cả tasks nếu một task fail
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_user(1))
        task2 = tg.create_task(fetch_user(2))
        task3 = tg.create_task(fetch_user(3))
    
    # Tất cả tasks đã hoàn thành khi ra khỏi context
    print(task1.result())  # {"id": 1, "name": "User 1"}
    print(task2.result())
    print(task3.result())

asyncio.run(main())

TaskGroup vs gather() - Khi nào dùng cái nào?

python
# ❌ gather() - Một task fail, các tasks khác vẫn chạy
async def gather_example():
    try:
        results = await asyncio.gather(
            task_that_fails(),
            task_that_succeeds(),
            return_exceptions=True  # Không raise, trả về exception
        )
    except Exception:
        # Các tasks khác vẫn đang chạy!
        pass

# ✅ TaskGroup - Một task fail, TẤT CẢ tasks bị cancel
async def taskgroup_example():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task_that_fails())
            tg.create_task(task_that_succeeds())
    except* ValueError:  # except* cho ExceptionGroup (Python 3.11+)
        # Tất cả tasks đã bị cancel
        print("Có lỗi xảy ra, đã cleanup")
Featuregather()TaskGroup
Error handlingMột task fail, các task khác tiếp tụcMột task fail, tất cả bị cancel
CleanupManualAutomatic
SyntaxFunction callContext manager
Python version3.4+3.11+
Use caseIndependent tasksRelated tasks cần cleanup cùng nhau

6. async for - Async iteration

python
async def stream_data():
    async for chunk in async_reader:
        yield process(chunk)

async def main():
    async for data in stream_data():
        print(data)

7. async with - Async context manager

python
async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()

Exception Handling Patterns

Pattern 1: Basic try/except trong async

python
async def safe_fetch(url: str) -> dict | None:
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()
    except aiohttp.ClientError as e:
        print(f"Network error: {e}")
        return None
    except asyncio.TimeoutError:
        print(f"Timeout fetching {url}")
        return None

Pattern 2: Exception handling với gather()

python
async def main():
    # return_exceptions=True: Không raise, trả về exceptions trong list
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),  # Giả sử task này fail
        fetch_user(3),
        return_exceptions=True
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

Pattern 3: ExceptionGroup với TaskGroup (Python 3.11+)

python
async def risky_task(task_id: int):
    if task_id == 2:
        raise ValueError(f"Task {task_id} failed!")
    await asyncio.sleep(0.1)
    return f"Task {task_id} done"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(risky_task(1))
            tg.create_task(risky_task(2))  # Sẽ fail
            tg.create_task(risky_task(3))
    except* ValueError as eg:
        # except* bắt ExceptionGroup
        for exc in eg.exceptions:
            print(f"Caught: {exc}")
    except* TypeError as eg:
        # Có thể bắt nhiều loại exception
        for exc in eg.exceptions:
            print(f"Type error: {exc}")

asyncio.run(main())

Pattern 4: Graceful shutdown với signal handling

python
import asyncio
import signal

async def long_running_task():
    while True:
        print("Working...")
        await asyncio.sleep(1)

async def main():
    # Tạo task
    task = asyncio.create_task(long_running_task())
    
    # Setup signal handler
    loop = asyncio.get_running_loop()
    
    def shutdown():
        print("Shutting down...")
        task.cancel()
    
    loop.add_signal_handler(signal.SIGINT, shutdown)
    loop.add_signal_handler(signal.SIGTERM, shutdown)
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

# Chạy và thử Ctrl+C
asyncio.run(main())

Pattern 5: Retry với exponential backoff

python
import asyncio
import random

async def fetch_with_retry(
    url: str,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> dict:
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession() as session:
                async with asyncio.timeout(10.0):
                    async with session.get(url) as response:
                        response.raise_for_status()
                        return await response.json()
        except (aiohttp.ClientError, TimeoutError) as e:
            if attempt == max_retries - 1:
                raise  # Hết retry, raise exception
            
            # Exponential backoff với jitter
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s...")
            await asyncio.sleep(delay)

Production Pitfalls

⚠️ NHỮNG LỖI THƯỜNG GẶP

1. Quên await - Silent Bug

python
# ❌ SAI - Không có await, coroutine không chạy!
async def main():
    fetch_data()  # Trả về coroutine object, không execute!
    print("Done")  # In ra ngay lập tức

# ✅ ĐÚNG
async def main():
    await fetch_data()
    print("Done")

2. Blocking code trong async function

python
# ❌ SAI - time.sleep() block event loop!
async def bad_sleep():
    time.sleep(5)  # BLOCK toàn bộ event loop!
    return "done"

# ✅ ĐÚNG
async def good_sleep():
    await asyncio.sleep(5)  # Non-blocking
    return "done"

3. Tạo quá nhiều tasks cùng lúc

python
# ❌ SAI - 10000 connections cùng lúc = crash server
async def bad_scraper():
    tasks = [fetch(url) for url in urls_10000]
    return await asyncio.gather(*tasks)

# ✅ ĐÚNG - Giới hạn concurrent connections
async def good_scraper():
    sem = asyncio.Semaphore(50)  # Max 50 concurrent
    
    async def limited_fetch(url):
        async with sem:
            return await fetch(url)
    
    tasks = [limited_fetch(url) for url in urls_10000]
    return await asyncio.gather(*tasks)

4. Không handle CancelledError đúng cách

python
# ❌ SAI - Swallow CancelledError
async def bad_task():
    try:
        await long_operation()
    except Exception:  # Bắt cả CancelledError!
        pass

# ✅ ĐÚNG - Re-raise CancelledError
async def good_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        # Cleanup nếu cần
        raise  # PHẢI re-raise!
    except Exception as e:
        print(f"Error: {e}")

5. Dùng asyncio.run() trong async context

python
# ❌ SAI - Nested event loops
async def outer():
    asyncio.run(inner())  # RuntimeError!

# ✅ ĐÚNG
async def outer():
    await inner()

Real-world: Async Web Scraper

import asyncio import aiohttp from bs4 import BeautifulSoup

class AsyncScraper: def init(self, max_concurrent: int = 20): self.semaphore = asyncio.Semaphore(max_concurrent)

async def fetch_page(self, session, url: str) -> str:
    async with self.semaphore:
        try:
            async with session.get(url, timeout=10) as response:
                return await response.text()
        except Exception as e:
            print(f"Lỗi {url}: {e}")
            return ""

async def parse_page(self, html: str) -> dict:
    soup = BeautifulSoup(html, 'html.parser')
    return {
        "title": soup.title.string if soup.title else "",
        "links": len(soup.find_all('a'))
    }

async def scrape(self, urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [self.fetch_page(session, url) for url in urls]
        pages = await asyncio.gather(*tasks)
    
    results = []
    for html in pages:
        if html:
            results.append(await self.parse_page(html))
    return results

Sử dụng

async def main(): scraper = AsyncScraper(max_concurrent=50) urls = [f"https://example.com/page/{i}" for i in range(1000)]

results = await scraper.scrape(urls)
print(f"Đã scrape {len(results)} trang")

asyncio.run(main()) ]

asyncio.run(main())


---

## Khi nào KHÔNG dùng Asyncio?

::: warning ⚠️ ASYNCIO KHÔNG PHẢI CHO MỌI THỨ

❌ **Không dùng cho CPU-bound tasks**:
```python
# ❌ SAI - Chặn Event Loop
async def tinh_toan():
    result = heavy_cpu_work()  # Không có await!
    return result

Dùng run_in_executor cho CPU-bound:

python
import asyncio
from concurrent.futures import ProcessPoolExecutor

async def tinh_toan():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, heavy_cpu_work)
    return result

:::


Async Frameworks

from fastapi import FastAPI import httpx

app = FastAPI()

@app.get("/users/{user_id}") async def get_user(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json() ) async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json()


### Starlette (Lightweight)

```python
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

async def homepage(request):
    return JSONResponse({"message": "Hello"})

app = Starlette(routes=[Route("/", homepage)])

Deep Dive vào Python Concurrency

💡 MASTER ASYNCIO

Asyncio là kỹ năng quan trọng cho Python Engineer hiện đại. Chúng tôi đã tổng hợp tất cả patterns, pitfalls, và best practices trong ebook chuyên sâu.

👑PREMIUM

HPN Interview Kit

Bộ câu hỏi phỏng vấn Big Tech

  • ✓ 200+ câu hỏi thực tế từ FAANG
  • ✓ Giải thích chi tiết bằng Tiếng Việt
  • ✓ Cập nhật liên tục 2025
Sở hữu ngay99k

Bảng Tóm tắt

python
# === CƠ BẢN ===
import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    return "done"

# Chạy async code
asyncio.run(my_coroutine())

# === NHIỀU TASKS ===
# gather - chạy đồng thời, đợi tất cả
results = await asyncio.gather(task1(), task2(), task3())

# create_task - tạo task chạy nền
task = asyncio.create_task(background())

# wait_for - với timeout (legacy)
result = await asyncio.wait_for(slow(), timeout=5.0)

# === PYTHON 3.11+ FEATURES ===
# timeout context manager (modern)
async with asyncio.timeout(5.0):
    result = await slow()

# TaskGroup - structured concurrency
async with asyncio.TaskGroup() as tg:
    task1 = tg.create_task(fetch_user(1))
    task2 = tg.create_task(fetch_user(2))
# Tất cả tasks hoàn thành khi ra khỏi context

# ExceptionGroup handling
try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(risky_task())
except* ValueError as eg:
    for exc in eg.exceptions:
        print(f"Caught: {exc}")

# === CONTROL CONCURRENCY ===
sem = asyncio.Semaphore(10)

async def limited_task():
    async with sem:
        await do_work()

# === ASYNC ITERATION ===
async for item in async_generator():
    process(item)

# === ASYNC CONTEXT MANAGER ===
async with resource() as r:
    await r.do_something()

# === HTTP CLIENT ===
import aiohttp

async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        data = await response.json()

# === MIX CPU + I/O ===
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, cpu_function)

# === RETRY PATTERN ===
async def fetch_with_retry(url, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await fetch(url)
        except Exception:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff