Skip to content

Structured Concurrency — TaskGroup và ExceptionGroup

Một API server chạy 200 requests/giây. Mỗi request tạo 3 background tasks bằng asyncio.create_task(). Khi request timeout, task bị bỏ rơi — không ai cancel, không ai await. Sau 24 giờ: 50,000 orphan tasks đang chạy, memory leak 2GB, server OOM. Đây là hậu quả của unstructured concurrency.

Structured concurrency là paradigm đảm bảo mọi task con phải hoàn thành (hoặc bị cancel) trước khi scope cha kết thúc — giống structured programming đảm bảo mọi goto phải nằm trong block. Python 3.11 giới thiệu asyncio.TaskGroupExceptionGroup / except* để implement paradigm này. Trio và anyio đã pioneer concept này từ trước, nhưng giờ nó là phần chính thức của Python.

Bài này sẽ phân tích tại sao unstructured concurrency nguy hiểm, cách TaskGroup giải quyết vấn đề, xử lý nhiều exceptions đồng thời với ExceptionGroup, và patterns cho graceful shutdown trong production.

Bức tranh tư duy

Hãy tưởng tượng một quản lý dự án xây dựng ở TP.HCM. Unstructured concurrency giống như quản lý giao việc cho thợ rồi... bỏ đi. Thợ làm xong không ai nhận. Thợ gặp sự cố không ai biết. Máy móc để chạy không ai tắt. Cuối ngày: công trường bừa bộn, thiết bị hao mòn, budget cháy.

Structured concurrency giống như quản lý dùng bảng theo dõi: giao việc cho nhóm thợ, theo dõi tiến độ, khi một thợ gặp sự cố thì dừng cả nhóm, thu dọn dụng cụ, báo cáo đầy đủ. Quản lý không rời công trường cho đến khi mọi thợ đã hoàn thành hoặc dừng an toàn.

Trong code: TaskGroup là quản lý đó. async with TaskGroup() as tg mở scope, tg.create_task() giao việc, và khi ra khỏi async with — tất cả tasks phải xong. Nếu một task fail → tất cả tasks khác bị cancel → exceptions được gom vào ExceptionGroup → propagate lên caller.

Cốt lõi kỹ thuật

Vấn đề: Unstructured Concurrency

python
import asyncio

async def fetch_data(source: str) -> dict:
    await asyncio.sleep(1)
    return {"source": source}

# ❌ Unstructured: Task leak khi có exception
async def unstructured():
    task1 = asyncio.create_task(fetch_data("api-1"))
    task2 = asyncio.create_task(fetch_data("api-2"))

    # Exception trước khi await tasks
    raise ValueError("Something went wrong")

    # task1 và task2 vẫn đang chạy → orphan tasks!
    # Không ai cancel, không ai await, memory leak
    results = await asyncio.gather(task1, task2)

TaskGroup — Giải pháp (Python 3.11+)

python
import asyncio

async def fetch_data(source: str) -> dict:
    await asyncio.sleep(1)
    return {"source": source}

# ✅ Structured: Mọi tasks được quản lý
async def structured():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data("api-1"))
        task2 = tg.create_task(fetch_data("api-2"))
        task3 = tg.create_task(fetch_data("api-3"))

    # Khi ra khỏi `async with`:
    # - TẤT CẢ tasks đã hoàn thành
    # - Nếu một task fail → tất cả bị cancel → ExceptionGroup raised
    print(task1.result())  # {"source": "api-1"}
    print(task2.result())  # {"source": "api-2"}
    print(task3.result())  # {"source": "api-3"}

asyncio.run(structured())

Automatic Cancellation

Khi một task trong TaskGroup fail, tất cả tasks khác bị cancel tự động:

python
import asyncio

async def slow_task(name: str, duration: float) -> str:
    try:
        print(f"{name}: bắt đầu")
        await asyncio.sleep(duration)
        print(f"{name}: hoàn thành")
        return f"{name} done"
    except asyncio.CancelledError:
        print(f"{name}: BỊ CANCEL")
        raise  # Phải re-raise!

async def failing_task() -> str:
    await asyncio.sleep(0.5)
    raise ValueError("Task failed!")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(slow_task("A", 5.0))
            tg.create_task(slow_task("B", 5.0))
            tg.create_task(failing_task())  # Fail sau 0.5s
    except* ValueError as eg:
        print(f"Caught {len(eg.exceptions)} errors")

asyncio.run(main())
# Output:
# A: bắt đầu
# B: bắt đầu
# A: BỊ CANCEL
# B: BỊ CANCEL
# Caught 1 errors

ExceptionGroup và except*

Khi nhiều tasks fail đồng thời, exceptions được gom vào ExceptionGroup:

python
import asyncio

async def task_may_fail(task_id: int) -> str:
    await asyncio.sleep(0.1)
    if task_id % 2 == 0:
        raise ValueError(f"Task {task_id}: invalid value")
    if task_id % 3 == 0:
        raise TypeError(f"Task {task_id}: wrong type")
    return f"Task {task_id}: OK"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            for i in range(6):
                tg.create_task(task_may_fail(i))
    except* ValueError as eg:
        print(f"ValueErrors ({len(eg.exceptions)}):")
        for exc in eg.exceptions:
            print(f"  - {exc}")
    except* TypeError as eg:
        print(f"TypeErrors ({len(eg.exceptions)}):")
        for exc in eg.exceptions:
            print(f"  - {exc}")

asyncio.run(main())
# Cả hai except* blocks đều có thể chạy!
# ValueErrors (3): Task 0, Task 2, Task 4
# TypeErrors (1): Task 3

ExceptionGroup Utilities

python
import asyncio

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task_a())
            tg.create_task(task_b())
    except ExceptionGroup as eg:
        # split() — chia thành hai nhóm: match và rest
        value_errors, rest = eg.split(ValueError)
        if value_errors:
            print(f"Handled {len(value_errors.exceptions)} ValueErrors")
        if rest:
            raise rest  # Re-raise các exceptions không handle

        # subgroup() — lấy subgroup theo type
        type_errors = eg.subgroup(TypeError)
        if type_errors:
            for exc in type_errors.exceptions:
                print(f"TypeError: {exc}")

Cancellation Handling đúng cách

python
import asyncio

async def task_with_cleanup():
    """Task cần cleanup khi bị cancel."""
    db_conn = await connect_to_db()
    try:
        while True:
            data = await fetch_next_batch()
            await process_batch(data)
    except asyncio.CancelledError:
        # Cleanup TRƯỚC KHI re-raise
        await db_conn.close()
        print("DB connection closed")
        raise  # BẮT BUỘC re-raise!

async def task_with_shield():
    """Bảo vệ critical operation khỏi cancellation."""
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        # Operation này PHẢI hoàn thành dù bị cancel
        await asyncio.shield(save_checkpoint("critical_data"))
        raise

Timeout với TaskGroup

python
import asyncio

async def main():
    # Kết hợp timeout và TaskGroup
    try:
        async with asyncio.timeout(5.0):
            async with asyncio.TaskGroup() as tg:
                tg.create_task(slow_operation_a())  # 10s
                tg.create_task(slow_operation_b())  # 10s
    except TimeoutError:
        print("TaskGroup timeout — tất cả tasks đã bị cancel")

anyio — Cross-framework Structured Concurrency

anyio cung cấp API thống nhất hoạt động trên cả asyncio và trio:

python
import anyio

async def fetch(url: str) -> str:
    await anyio.sleep(0.1)
    return f"Data from {url}"

async def main():
    # TaskGroup tương tự asyncio nhưng portable
    async with anyio.create_task_group() as tg:
        results = []

        async def worker(url):
            result = await fetch(url)
            results.append(result)

        for i in range(10):
            tg.start_soon(worker, f"https://api.example.com/{i}")

    print(f"Fetched: {len(results)} results")

# Chạy trên asyncio
anyio.run(main, backend="asyncio")

# Hoặc trio
# anyio.run(main, backend="trio")

Thực chiến

Tình huống: Graceful Shutdown cho Async Service

Bối cảnh: Background service xử lý messages từ queue, gọi external APIs, ghi kết quả vào database. Cần graceful shutdown khi nhận SIGTERM — hoàn thành tasks đang chạy, cancel tasks pending, đóng connections.

Mục tiêu: Zero data loss khi shutdown, timeout 30s cho graceful period.

python
import asyncio
import signal
from dataclasses import dataclass

@dataclass
class ServiceState:
    running: bool = True
    active_tasks: int = 0
    processed: int = 0

async def process_message(msg: dict, state: ServiceState) -> None:
    """Xử lý một message — có thể mất 1-5s."""
    state.active_tasks += 1
    try:
        # Giả lập: gọi API + ghi DB
        await asyncio.sleep(0.5)
        state.processed += 1
    finally:
        state.active_tasks -= 1

async def message_consumer(
    queue: asyncio.Queue,
    state: ServiceState,
    max_concurrent: int = 10,
):
    """Consumer loop với structured concurrency."""
    sem = asyncio.Semaphore(max_concurrent)

    async def bounded_process(msg):
        async with sem:
            await process_message(msg, state)

    while state.running:
        try:
            msg = await asyncio.wait_for(queue.get(), timeout=1.0)
        except asyncio.TimeoutError:
            continue

        if msg is None:
            break

        # Tạo task trong scope hiện tại
        asyncio.create_task(bounded_process(msg))

async def run_service():
    state = ServiceState()
    queue: asyncio.Queue = asyncio.Queue(maxsize=100)

    # Setup signal handlers
    loop = asyncio.get_running_loop()
    shutdown_event = asyncio.Event()

    def handle_shutdown():
        print("Shutdown signal received")
        state.running = False
        shutdown_event.set()

    # Signal handlers (Unix only — skip trên Windows)
    try:
        loop.add_signal_handler(signal.SIGTERM, handle_shutdown)
        loop.add_signal_handler(signal.SIGINT, handle_shutdown)
    except NotImplementedError:
        pass  # Windows không hỗ trợ add_signal_handler

    # Giả lập: feed messages
    async def message_producer():
        i = 0
        while state.running:
            await queue.put({"id": i, "data": f"msg-{i}"})
            i += 1
            await asyncio.sleep(0.1)
        await queue.put(None)

    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(message_producer())
            tg.create_task(message_consumer(queue, state))

            # Đợi shutdown signal hoặc timeout
            async def wait_shutdown():
                await shutdown_event.wait()
                state.running = False
                # Grace period: đợi active tasks hoàn thành
                for _ in range(300):  # 30s timeout
                    if state.active_tasks == 0:
                        break
                    await asyncio.sleep(0.1)
                if state.active_tasks > 0:
                    print(f"Force shutdown: {state.active_tasks} tasks still active")

            tg.create_task(wait_shutdown())

    except* Exception as eg:
        for exc in eg.exceptions:
            print(f"Service error: {exc}")

    print(f"Service stopped. Processed: {state.processed} messages")

if __name__ == "__main__":
    asyncio.run(run_service())

Phân tích:

  • TaskGroup đảm bảo tất cả tasks (producer, consumer, shutdown monitor) được track
  • Signal handler set running = False → producer/consumer loops thoát
  • Grace period 30s cho active tasks hoàn thành trước force shutdown
  • Semaphore giới hạn concurrent processing — tránh overload

Tình huống: Batch API Client với Error Recovery

python
import asyncio
from dataclasses import dataclass

@dataclass
class BatchResult:
    success: list[dict]
    failures: list[dict]

async def fetch_with_retry(item_id: int, max_retries: int = 3) -> dict:
    for attempt in range(max_retries):
        try:
            async with asyncio.timeout(5.0):
                await asyncio.sleep(0.05)
                if item_id % 50 == 0:
                    raise ConnectionError(f"Item {item_id}: connection refused")
                return {"id": item_id, "data": "ok"}
        except (TimeoutError, ConnectionError):
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(0.5 * (2 ** attempt))
    raise RuntimeError("Unreachable")

async def batch_fetch(item_ids: list[int], batch_size: int = 20) -> BatchResult:
    """Fetch items in batches, collect errors không abort toàn bộ."""
    result = BatchResult(success=[], failures=[])

    for i in range(0, len(item_ids), batch_size):
        batch = item_ids[i:i + batch_size]

        try:
            async with asyncio.TaskGroup() as tg:
                tasks = {
                    item_id: tg.create_task(fetch_with_retry(item_id))
                    for item_id in batch
                }
        except* (ConnectionError, TimeoutError) as eg:
            # Gom failures, không abort
            for exc in eg.exceptions:
                result.failures.append({"error": str(exc)})

        # Collect successes
        for item_id, task in tasks.items():
            if not task.cancelled() and task.exception() is None:
                result.success.append(task.result())

    return result

async def main():
    ids = list(range(200))
    result = await batch_fetch(ids, batch_size=20)
    print(f"Success: {len(result.success)}, Failures: {len(result.failures)}")

asyncio.run(main())

Sai lầm điển hình

Sai lầm 1: Tạo task ngoài TaskGroup scope

Vấn đề: Task tạo bằng create_task() ngoài TaskGroup — orphan task.

python
# SAI: Task ngoài scope — không được quản lý
async def bad_example():
    orphan = asyncio.create_task(background_work())  # Orphan!

    async with asyncio.TaskGroup() as tg:
        tg.create_task(main_work())

    # orphan vẫn chạy, không ai cancel, không ai await!

Tại sao sai: create_task() ngoài TaskGroup tạo orphan task. Khi function return, task vẫn chạy "vô chủ" — memory leak, resource leak.

python
# ĐÚNG: Tất cả tasks trong TaskGroup
async def good_example():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(background_work())
        tg.create_task(main_work())
    # Tất cả tasks hoàn thành khi ra khỏi scope

Sai lầm 2: Nuốt CancelledError

Vấn đề: Không re-raise CancelledError — phá vỡ cancellation flow.

python
# SAI: CancelledError bị nuốt → TaskGroup không thể cancel task
async def bad_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        print("Cancelled")
        # KHÔNG re-raise → task "sống lại"
python
# ĐÚNG: Cleanup rồi re-raise
async def good_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        await cleanup()
        raise  # BẮT BUỘC re-raise

Sai lầm 3: Bắt Exception thay vì dùng except*

Vấn đề: except Exception không bắt được ExceptionGroup đúng cách.

python
# SAI: except Exception không handle ExceptionGroup chi tiết
async def bad_handling():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(risky_task())
    except Exception as e:
        print(f"Error: {e}")  # Không truy cập được individual exceptions
python
# ĐÚNG: except* cho selective handling
async def good_handling():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(risky_task())
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"ValueError: {exc}")
    except* TypeError as eg:
        for exc in eg.exceptions:
            print(f"TypeError: {exc}")

Sai lầm 4: Blocking code trong TaskGroup

Vấn đề: Gọi sync blocking function — đóng băng tất cả tasks trong group.

python
# SAI: time.sleep block event loop → tất cả tasks trong group bị treo
import time

async def bad_usage():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(async_work())
        time.sleep(5)  # BLOCK toàn bộ!
python
# ĐÚNG: Dùng asyncio.sleep hoặc run_in_executor
async def good_usage():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(async_work())
        await asyncio.sleep(5)  # Non-blocking

Under the Hood

TaskGroup vs gather() — Khi nào dùng gì

AspectTaskGroupgather()
Error behaviorMột fail → cancel tất cảMột fail → others tiếp tục
CleanupTự độngManual
Exception typeExceptionGroupSingle exception
Python version3.11+3.4+
SyntaxContext managerFunction call
Task lifecycleGuaranteed completionCó thể leak

Dùng TaskGroup khi: Tasks liên quan, cần cancel cùng nhau, cần guaranteed cleanup, Python 3.11+.

Dùng gather() khi: Tasks độc lập, cần backward compatibility, muốn return_exceptions=True để collect tất cả kết quả kể cả failures.

Structured Concurrency Guarantees

Structured concurrency đảm bảo 3 tính chất:

  1. Containment: Mọi child task phải hoàn thành trước khi scope cha kết thúc
  2. Propagation: Exceptions từ child tasks propagate lên parent
  3. Cancellation: Khi parent bị cancel, tất cả children bị cancel
python
# Visualization of task tree:
# main()
# └── TaskGroup (scope)
#     ├── task_a() → done
#     ├── task_b() → FAILED!
#     │   → cancel task_a, task_c
#     │   → ExceptionGroup propagated to main
#     └── task_c() → cancelled

anyio vs asyncio TaskGroup

Featureasyncio.TaskGroupanyio.create_task_group
Backendasyncio onlyasyncio + trio
APItg.create_task()tg.start_soon()
Cancel scopeQua asyncio.timeout()Built-in CancelScope
PortabilityCPython onlyCross-framework

Trade-offs

Structured concurrency mạnh khi: Cần guaranteed cleanup, complex task hierarchies, production services cần reliability. Debugging dễ hơn — stack traces rõ ràng, không có orphan tasks.

Structured concurrency yếu khi: Tasks thực sự cần sống lâu hơn scope (daemon tasks, background monitoring). Workaround: dedicated long-lived TaskGroup hoặc asyncio.create_task() cho fire-and-forget (chấp nhận risk).

Checklist ghi nhớ

✅ Checklist triển khai

TaskGroup

  • [ ] Tạo mọi tasks bên trong async with TaskGroup() — không dùng create_task() ngoài scope
  • [ ] Luôn handle except* cho các exception types cụ thể
  • [ ] Kết hợp asyncio.timeout() với TaskGroup cho deadline-based cancellation

CancelledError

  • [ ] Luôn re-raise CancelledError sau cleanup
  • [ ] Dùng asyncio.shield() cho critical operations không thể cancel
  • [ ] Không bắt CancelledError bằng bare except Exception

ExceptionGroup

  • [ ] Dùng except* (Python 3.11+) cho selective exception handling
  • [ ] Dùng .split() để tách exceptions theo type
  • [ ] Dùng .subgroup() để lọc exceptions

Graceful Shutdown

  • [ ] Handle SIGTERM/SIGINT → set shutdown flag
  • [ ] Grace period cho active tasks hoàn thành
  • [ ] Close connections, flush buffers trước khi exit
  • [ ] Log số tasks đã hoàn thành vs cancelled khi shutdown

Production

  • [ ] Dùng anyio nếu cần portability giữa asyncio/trio
  • [ ] Monitor orphan tasks — chúng là signal của unstructured code
  • [ ] Test cancellation paths — chúng thường untested và buggy

Bài tập luyện tập

Bài 1: TaskGroup Error Handling — Advanced

Đề bài: Viết hàm resilient_fetch(urls) dùng TaskGroup. Nếu một URL fail, gom vào error list nhưng KHÔNG cancel các URLs khác. Trả về (successes, failures).

🧠 Quiz

Câu hỏi: Khi dùng except* với TaskGroup, điều gì xảy ra nếu có cả ValueErrorTypeError trong ExceptionGroup?

  • [ ] A. Chỉ except* ValueError chạy, TypeError bị nuốt
  • [ ] B. Chỉ except* TypeError chạy, ValueError bị nuốt
  • [x] C. Cả hai except* ValueErrorexcept* TypeError đều chạy
  • [ ] D. RuntimeError vì ExceptionGroup chứa nhiều loại Giải thích: except* hoạt động khác except thông thường. Nó "split" ExceptionGroup theo type — mỗi except* clause xử lý phần match của nó. Nếu có cả ValueError và TypeError, cả hai clauses đều chạy, mỗi clause nhận ExceptionGroup con chỉ chứa type tương ứng.
💡 Gợi ý
  • TaskGroup cancel tất cả khi có failure → cần approach khác
  • Bọc mỗi task trong try-except riêng để ngăn propagation
  • Hoặc chia thành batches nhỏ, mỗi batch là một TaskGroup
✅ Lời giải
python
import asyncio

async def fetch_url(url: str) -> str:
    await asyncio.sleep(0.1)
    if "bad" in url:
        raise ConnectionError(f"Cannot connect to {url}")
    return f"Data from {url}"

async def resilient_fetch(
    urls: list[str],
) -> tuple[list[str], list[dict]]:
    successes: list[str] = []
    failures: list[dict] = []

    async def safe_fetch(url: str):
        try:
            result = await fetch_url(url)
            successes.append(result)
        except Exception as e:
            failures.append({"url": url, "error": str(e)})

    # Bọc trong try-except → TaskGroup không thấy exception
    async with asyncio.TaskGroup() as tg:
        for url in urls:
            tg.create_task(safe_fetch(url))

    return successes, failures

async def main():
    urls = [
        "https://api.example.com/1",
        "https://bad.example.com/2",
        "https://api.example.com/3",
        "https://bad.example.com/4",
        "https://api.example.com/5",
    ]
    ok, fail = await resilient_fetch(urls)
    print(f"Success: {len(ok)}, Failed: {len(fail)}")
    for f in fail:
        print(f"  Error: {f}")

asyncio.run(main())

Phân tích: Key insight — bọc exception bên trong task để TaskGroup không thấy failure. Mỗi task tự handle exception → TaskGroup thấy tất cả tasks succeed → không cancel. Trade-off: mất automatic cancellation benefit — chấp nhận khi tasks thực sự independent.

Bài 2: Graceful Shutdown — Advanced

Đề bài: Implement async worker service nhận tasks từ queue, xử lý với 5 concurrent workers. Khi nhận signal shutdown (giả lập), dừng nhận tasks mới, đợi tasks đang chạy hoàn thành (tối đa 10s), rồi thoát. Report số tasks completed vs cancelled.

💡 Gợi ý
  • Dùng asyncio.Event cho shutdown signal
  • Workers check shutdown_event mỗi iteration
  • Grace period: asyncio.wait_for(pending_tasks, timeout=10)
✅ Lời giải
python
import asyncio

async def worker(
    queue: asyncio.Queue,
    shutdown: asyncio.Event,
    stats: dict,
    worker_id: int,
):
    while not shutdown.is_set():
        try:
            task = await asyncio.wait_for(queue.get(), timeout=0.5)
        except asyncio.TimeoutError:
            continue

        if task is None:
            break

        try:
            await asyncio.sleep(0.2)  # Giả lập xử lý
            stats["completed"] += 1
        except asyncio.CancelledError:
            stats["cancelled"] += 1
            raise
        finally:
            queue.task_done()

async def main():
    queue: asyncio.Queue = asyncio.Queue(maxsize=50)
    shutdown = asyncio.Event()
    stats = {"completed": 0, "cancelled": 0}

    # Feed tasks
    for i in range(100):
        await queue.put(f"task-{i}")

    # Start workers
    workers = [
        asyncio.create_task(worker(queue, shutdown, stats, i))
        for i in range(5)
    ]

    # Giả lập: shutdown sau 2s
    await asyncio.sleep(2)
    print("Shutdown signal!")
    shutdown.set()

    # Grace period: đợi workers tối đa 10s
    try:
        async with asyncio.timeout(10.0):
            # Gửi sentinel cho mỗi worker
            for _ in range(5):
                await queue.put(None)
            await asyncio.gather(*workers)
    except TimeoutError:
        for w in workers:
            w.cancel()
        await asyncio.gather(*workers, return_exceptions=True)

    print(f"Completed: {stats['completed']}")
    print(f"Cancelled: {stats['cancelled']}")
    print(f"Remaining in queue: {queue.qsize()}")

asyncio.run(main())

Phân tích: Shutdown flow: (1) Set event → workers stop taking new tasks, (2) Send sentinels → workers exit loop, (3) Grace period 10s → force cancel nếu quá lâu. Production: thêm logging, metrics, health check endpoint.

Liên kết học tiếp

Từ khóa glossary: structured concurrency, TaskGroup, ExceptionGroup, except*, CancelledError, shield, anyio, trio, graceful shutdown

Tìm kiếm liên quan: taskgroup python, exception group python 3.11, structured concurrency, graceful shutdown asyncio