Skip to content

Thực hành: Async Programming Patterns

🎯 Mục tiêu

🎯 Sau bài thực hành này, bạn sẽ:

  • Sử dụng asyncio.gather để chạy song song nhiều coroutine
  • Giới hạn concurrency bằng Semaphore
  • Xây dựng async generator cho data streaming

Mô tả bài tập

Lập trình bất đồng bộ là kỹ năng thiết yếu khi xây dựng ứng dụng I/O-bound: web scraping, API calls, database queries. Bài tập này giúp bạn nắm vững các pattern phổ biến nhất trong asyncio.

Yêu cầu

Bài 1: Parallel Tasks với asyncio.gather

Giả lập việc gọi nhiều API đồng thời và thu thập kết quả.

python
import asyncio
import random

async def fetch_user(user_id: int) -> dict:
    """Giả lập API call lấy thông tin user."""
    await asyncio.sleep(random.uniform(0.1, 0.5))
    return {"id": user_id, "name": f"User_{user_id}"}

async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    # TODO: Dùng asyncio.gather để gọi song song
    # Xử lý trường hợp một số request thất bại (return_exceptions=True)
    pass

# Test
async def main():
    users = await fetch_all_users([1, 2, 3, 4, 5])
    print(users)

asyncio.run(main())

Bài 2: Semaphore — Giới hạn Concurrency

Khi gọi API bên ngoài, bạn cần giới hạn số request đồng thời để tránh bị rate-limit.

python
async def fetch_with_limit(urls: list[str], max_concurrent: int = 3):
    """Fetch nhiều URL nhưng giới hạn max_concurrent request cùng lúc."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(url: str) -> str:
        # TODO: Dùng semaphore để giới hạn concurrency
        # async with semaphore: ...
        pass

    # TODO: Dùng gather để chạy tất cả
    pass

Bài 3: Async Generator

Xây dựng async generator để stream dữ liệu từng phần thay vì load tất cả vào bộ nhớ.

python
async def paginated_fetch(base_url: str, total_pages: int):
    """Async generator — yield từng page kết quả."""
    for page in range(1, total_pages + 1):
        await asyncio.sleep(0.1)  # Giả lập network delay
        data = [f"item_{page}_{i}" for i in range(10)]
        # TODO: yield data của từng page
        pass

async def main():
    async for page_data in paginated_fetch("/api/products", 5):
        print(f"Nhận {len(page_data)} items")

Bài 4: Error Handling trong Async

Xử lý lỗi đúng cách khi chạy nhiều task song song.

python
async def risky_task(task_id: int) -> str:
    if task_id % 3 == 0:
        raise ValueError(f"Task {task_id} thất bại!")
    await asyncio.sleep(0.1)
    return f"Task {task_id} thành công"

async def run_with_error_handling(task_ids: list[int]):
    # TODO: Chạy tất cả task, thu thập kết quả và lỗi riêng biệt
    # Trả về (successes, errors)
    pass

Gợi ý

Gợi ý Bài 1
  • asyncio.gather(*coroutines, return_exceptions=True) trả về list kết quả
  • Khi return_exceptions=True, exception được trả về thay vì raise
  • Kiểm tra isinstance(result, Exception) để phân loại kết quả
Gợi ý Bài 2
  • asyncio.Semaphore(n) cho phép tối đa n coroutine chạy đồng thời
  • Dùng async with semaphore: để tự động acquire/release
  • Kết hợp với asyncio.gather bên ngoài vòng semaphore
Gợi ý Bài 3
  • Async generator dùng yield trong async def
  • Duyệt bằng async for item in generator()
  • Có thể kết hợp yield với await trong cùng function

Lời giải tham khảo

Xem lời giải
python
import asyncio
import random

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(random.uniform(0.1, 0.5))
    if user_id == 3:
        raise ConnectionError(f"Không thể kết nối cho user {user_id}")
    return {"id": user_id, "name": f"User_{user_id}"}
async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    results = await asyncio.gather(*[fetch_user(uid) for uid in user_ids], return_exceptions=True)
    successes = [r for r in results if not isinstance(r, Exception)]
    errors = [r for r in results if isinstance(r, Exception)]
    if errors:
        print(f"⚠️ {len(errors)} request thất bại: {errors}")
    return successes
async def paginated_fetch(base_url: str, total_pages: int):
    for page in range(1, total_pages + 1):
        await asyncio.sleep(0.1)
        data = [f"item_{page}_{i}" for i in range(10)]
        yield data
async def run_with_error_handling(task_ids: list[int]):
    results = await asyncio.gather(*[risky_task(tid) for tid in task_ids], return_exceptions=True)
    successes = [r for r in results if not isinstance(r, Exception)]
    errors = [r for r in results if isinstance(r, Exception)]
    return successes, errors