Skip to content

Structured Concurrency Python 3.11+

Structured Concurrency = Async code không leak tasks

Learning Outcomes

Sau bài này, bạn sẽ:

  • Hiểu nguyên tắc Structured Concurrency và tại sao nó quan trọng
  • Sử dụng asyncio.TaskGroup để quản lý concurrent tasks
  • Handle exceptions với ExceptionGroupexcept*
  • Implement proper cancellation handling
  • Tránh task leaks và resource leaks

Structured Concurrency là gì?

Structured Concurrency là paradigm đảm bảo rằng:

  1. Tất cả tasks con phải hoàn thành trước khi task cha kết thúc
  2. Exceptions được propagate đúng cách
  3. Cancellation được handle tự động
  4. Không có task leaks - mọi task đều được track

Vấn đề với Unstructured Concurrency

python
import asyncio

# ❌ UNSTRUCTURED - Task có thể leak!
async def unstructured_example():
    # Tạo task nhưng không track
    task1 = asyncio.create_task(fetch_data(1))
    task2 = asyncio.create_task(fetch_data(2))
    
    # Nếu exception xảy ra ở đây...
    result = await some_operation()  # 💥 Exception!
    
    # ...task1 và task2 vẫn đang chạy!
    # Chúng trở thành "orphan tasks"
    return await asyncio.gather(task1, task2)

Giải pháp: Structured Concurrency

python
import asyncio

# ✅ STRUCTURED - Tất cả tasks được quản lý
async def structured_example():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data(1))
        task2 = tg.create_task(fetch_data(2))
        
        # Nếu exception xảy ra...
        # TaskGroup tự động cancel tất cả tasks!
    
    # Chỉ đến đây khi TẤT CẢ tasks hoàn thành
    return task1.result(), task2.result()

TaskGroup - Core API

Basic Usage

python
import asyncio

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)  # Simulate API call
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    async with asyncio.TaskGroup() as tg:
        # Tạo tasks trong context
        task1 = tg.create_task(fetch_user(1))
        task2 = tg.create_task(fetch_user(2))
        task3 = tg.create_task(fetch_user(3))
    
    # Khi ra khỏi context, TẤT CẢ tasks đã hoàn thành
    print(task1.result())  # {"id": 1, "name": "User 1"}
    print(task2.result())  # {"id": 2, "name": "User 2"}
    print(task3.result())  # {"id": 3, "name": "User 3"}

asyncio.run(main())

Dynamic Task Creation

python
async def process_items(items: list[str]) -> list[dict]:
    results = []
    
    async with asyncio.TaskGroup() as tg:
        tasks = []
        for item in items:
            task = tg.create_task(process_item(item))
            tasks.append(task)
    
    # Tất cả tasks hoàn thành
    return [task.result() for task in tasks]

Nested TaskGroups

python
async def complex_operation():
    async with asyncio.TaskGroup() as outer_tg:
        # Task 1: Fetch users
        async def fetch_all_users():
            async with asyncio.TaskGroup() as inner_tg:
                tasks = [inner_tg.create_task(fetch_user(i)) for i in range(10)]
            return [t.result() for t in tasks]
        
        # Task 2: Fetch products
        async def fetch_all_products():
            async with asyncio.TaskGroup() as inner_tg:
                tasks = [inner_tg.create_task(fetch_product(i)) for i in range(10)]
            return [t.result() for t in tasks]
        
        users_task = outer_tg.create_task(fetch_all_users())
        products_task = outer_tg.create_task(fetch_all_products())
    
    return users_task.result(), products_task.result()

Exception Propagation

ExceptionGroup - Nhiều exceptions cùng lúc

Khi nhiều tasks fail, Python 3.11+ gom chúng vào ExceptionGroup:

python
import asyncio

async def failing_task(task_id: int):
    await asyncio.sleep(0.1)
    if task_id % 2 == 0:
        raise ValueError(f"Task {task_id} failed!")
    return f"Task {task_id} succeeded"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(failing_task(1))  # Success
            tg.create_task(failing_task(2))  # Fail
            tg.create_task(failing_task(3))  # Success
            tg.create_task(failing_task(4))  # Fail
    except ExceptionGroup as eg:
        print(f"Caught {len(eg.exceptions)} exceptions:")
        for exc in eg.exceptions:
            print(f"  - {type(exc).__name__}: {exc}")

asyncio.run(main())
# Output:
# Caught 2 exceptions:
#   - ValueError: Task 2 failed!
#   - ValueError: Task 4 failed!

except* - Selective Exception Handling

python
async def mixed_failures():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task_raises_value_error())
        tg.create_task(task_raises_type_error())
        tg.create_task(task_raises_runtime_error())

async def main():
    try:
        await mixed_failures()
    except* ValueError as eg:
        print(f"ValueError(s): {[str(e) for e in eg.exceptions]}")
    except* TypeError as eg:
        print(f"TypeError(s): {[str(e) for e in eg.exceptions]}")
    except* RuntimeError as eg:
        print(f"RuntimeError(s): {[str(e) for e in eg.exceptions]}")

Handling và Re-raising

python
async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(risky_operation())
    except* ValueError as eg:
        # Handle ValueError, nhưng re-raise nếu có lỗi nghiêm trọng
        for exc in eg.exceptions:
            if "critical" in str(exc).lower():
                raise  # Re-raise entire ExceptionGroup
        print("Handled non-critical ValueErrors")
    except* Exception as eg:
        # Log và re-raise các exceptions khác
        for exc in eg.exceptions:
            print(f"Unexpected error: {exc}")
        raise

ExceptionGroup Utilities

python
import asyncio

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task_1())
            tg.create_task(task_2())
    except ExceptionGroup as eg:
        # Lọc exceptions theo type
        value_errors, rest = eg.split(ValueError)
        if value_errors:
            print(f"ValueErrors: {value_errors.exceptions}")
        if rest:
            raise rest  # Re-raise các exceptions còn lại
        
        # Hoặc dùng subgroup
        type_errors = eg.subgroup(TypeError)
        if type_errors:
            print(f"TypeErrors: {type_errors.exceptions}")

Cancellation Handling

Automatic Cancellation

Khi một task fail, TaskGroup tự động cancel các tasks khác:

python
import asyncio

async def long_running_task(task_id: int):
    try:
        print(f"Task {task_id}: Starting...")
        await asyncio.sleep(10)  # Long operation
        print(f"Task {task_id}: Completed!")
        return f"Result {task_id}"
    except asyncio.CancelledError:
        print(f"Task {task_id}: Cancelled!")
        raise  # PHẢI re-raise!

async def failing_task():
    await asyncio.sleep(0.5)
    raise ValueError("I failed!")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(long_running_task(1))
            tg.create_task(long_running_task(2))
            tg.create_task(failing_task())  # Fails after 0.5s
    except* ValueError:
        print("Caught ValueError")

asyncio.run(main())
# Output:
# Task 1: Starting...
# Task 2: Starting...
# Task 1: Cancelled!
# Task 2: Cancelled!
# Caught ValueError

Proper Cleanup on Cancellation

python
import asyncio

async def task_with_cleanup():
    resource = await acquire_resource()
    try:
        while True:
            await asyncio.sleep(1)
            await process_with_resource(resource)
    except asyncio.CancelledError:
        # Cleanup TRƯỚC KHI re-raise
        await release_resource(resource)
        print("Resource released")
        raise  # PHẢI re-raise!

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task_with_cleanup())
        tg.create_task(task_that_fails())

Shielding from Cancellation

Đôi khi bạn cần hoàn thành một operation dù bị cancel:

python
import asyncio

async def critical_save(data):
    """Operation này PHẢI hoàn thành."""
    await asyncio.sleep(1)  # Simulate save
    print(f"Saved: {data}")

async def task_with_critical_section():
    try:
        await asyncio.sleep(5)  # Normal work
    except asyncio.CancelledError:
        # Shield critical operation from cancellation
        await asyncio.shield(critical_save("important_data"))
        raise

async def main():
    task = asyncio.create_task(task_with_critical_section())
    await asyncio.sleep(0.5)
    task.cancel()  # Cancel task
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled, but data was saved!")

asyncio.run(main())

Timeout với TaskGroup

python
import asyncio

async def main():
    try:
        async with asyncio.timeout(5.0):
            async with asyncio.TaskGroup() as tg:
                tg.create_task(slow_task_1())  # Takes 10s
                tg.create_task(slow_task_2())  # Takes 10s
    except TimeoutError:
        print("TaskGroup timed out - all tasks cancelled")

TaskGroup Patterns

Pattern 1: Collect Results

python
async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    async with asyncio.TaskGroup() as tg:
        tasks = {
            user_id: tg.create_task(fetch_user(user_id))
            for user_id in user_ids
        }
    
    return {
        user_id: task.result()
        for user_id, task in tasks.items()
    }

Pattern 2: First Success (with cancellation)

python
async def first_success(urls: list[str]) -> str:
    """Return first successful response, cancel others."""
    result_queue = asyncio.Queue()
    
    async def fetch_and_report(url: str):
        try:
            result = await fetch(url)
            await result_queue.put(result)
        except Exception:
            pass  # Ignore failures
    
    async with asyncio.TaskGroup() as tg:
        for url in urls:
            tg.create_task(fetch_and_report(url))
        
        # Wait for first result
        result = await result_queue.get()
        
        # Cancel remaining tasks by exiting context
        # (TaskGroup will cancel all pending tasks)
        return result

Pattern 3: Retry with TaskGroup

python
async def fetch_with_retry(url: str, max_retries: int = 3) -> dict:
    last_exception = None
    
    for attempt in range(max_retries):
        try:
            async with asyncio.timeout(5.0):
                return await fetch(url)
        except (TimeoutError, ConnectionError) as e:
            last_exception = e
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
    
    raise last_exception

async def fetch_all_with_retry(urls: list[str]) -> list[dict]:
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_with_retry(url)) for url in urls]
    
    return [task.result() for task in tasks]

Pattern 4: Progress Tracking

python
import asyncio
from dataclasses import dataclass

@dataclass
class Progress:
    total: int
    completed: int = 0
    failed: int = 0

async def process_with_progress(items: list, progress: Progress):
    async def process_item(item):
        try:
            result = await do_work(item)
            progress.completed += 1
            return result
        except Exception as e:
            progress.failed += 1
            raise
    
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(process_item(item)) for item in items]
    
    return [task.result() for task in tasks]

async def main():
    items = list(range(100))
    progress = Progress(total=len(items))
    
    # Progress reporter
    async def report_progress():
        while progress.completed + progress.failed < progress.total:
            print(f"Progress: {progress.completed}/{progress.total} "
                  f"(failed: {progress.failed})")
            await asyncio.sleep(1)
    
    async with asyncio.TaskGroup() as tg:
        tg.create_task(report_progress())
        tg.create_task(process_with_progress(items, progress))

Production Pitfalls

⚠️ NHỮNG LỖI THƯỜNG GẶP

1. Quên re-raise CancelledError

python
# ❌ SAI - Swallow CancelledError
async def bad_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        print("Cancelled")
        # Không re-raise! Task không thực sự cancelled

# ✅ ĐÚNG
async def good_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        await cleanup()
        raise  # PHẢI re-raise!

2. Tạo task ngoài TaskGroup

python
# ❌ SAI - Task leak
async def bad_example():
    task = asyncio.create_task(background_work())  # Orphan task!
    
    async with asyncio.TaskGroup() as tg:
        tg.create_task(main_work())
    
    # background_work vẫn đang chạy!

# ✅ ĐÚNG
async def good_example():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(background_work())
        tg.create_task(main_work())
    # Tất cả tasks hoàn thành

3. Blocking code trong TaskGroup

python
# ❌ SAI - Block event loop
async def bad_task():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(async_work())
        time.sleep(5)  # BLOCKS everything!

# ✅ ĐÚNG
async def good_task():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(async_work())
        await asyncio.sleep(5)  # Non-blocking

4. Không handle ExceptionGroup đúng cách

python
# ❌ SAI - Bắt Exception thay vì ExceptionGroup
async def bad_handling():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(risky_task())
    except Exception as e:  # Không bắt được ExceptionGroup!
        print(f"Error: {e}")

# ✅ ĐÚNG
async def good_handling():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(risky_task())
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"ValueError: {exc}")
    except* Exception as eg:
        for exc in eg.exceptions:
            print(f"Other error: {exc}")

TaskGroup vs gather() - Khi nào dùng cái nào?

AspectTaskGroupgather()
Error handlingMột fail → cancel tất cảMột fail → others continue
CleanupAutomaticManual
SyntaxContext managerFunction call
Python version3.11+3.4+
Exception typeExceptionGroupSingle exception
Use caseRelated tasksIndependent tasks

Khi nào dùng TaskGroup?

  • Tasks có liên quan và cần cleanup cùng nhau
  • Cần đảm bảo không có task leaks
  • Muốn automatic cancellation khi có lỗi
  • Python 3.11+

Khi nào dùng gather()?

  • Tasks độc lập, không cần cancel nhau
  • Cần backward compatibility (Python < 3.11)
  • Muốn collect tất cả results kể cả khi có failures (return_exceptions=True)

Bảng Tóm tắt

python
# === TASKGROUP BASIC ===
import asyncio

async with asyncio.TaskGroup() as tg:
    task1 = tg.create_task(async_operation_1())
    task2 = tg.create_task(async_operation_2())
# Tất cả tasks hoàn thành khi ra khỏi context

# === EXCEPTION HANDLING ===
try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(risky_task())
except* ValueError as eg:
    for exc in eg.exceptions:
        print(f"Caught: {exc}")
except* TypeError as eg:
    handle_type_errors(eg.exceptions)

# === CANCELLATION HANDLING ===
async def cancellable_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        await cleanup()
        raise  # PHẢI re-raise!

# === TIMEOUT ===
async with asyncio.timeout(5.0):
    async with asyncio.TaskGroup() as tg:
        tg.create_task(slow_task())

# === SHIELD CRITICAL OPERATIONS ===
try:
    await operation()
except asyncio.CancelledError:
    await asyncio.shield(critical_save())
    raise

# === EXCEPTIONGROUP UTILITIES ===
try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task())
except ExceptionGroup as eg:
    # Split by type
    value_errors, rest = eg.split(ValueError)
    
    # Get subgroup
    type_errors = eg.subgroup(TypeError)