Giao diện
Structured Concurrency Python 3.11+
Structured Concurrency = Async code không leak tasks
Learning Outcomes
Sau bài này, bạn sẽ:
- Hiểu nguyên tắc Structured Concurrency và tại sao nó quan trọng
- Sử dụng
asyncio.TaskGroupđể quản lý concurrent tasks - Handle exceptions với
ExceptionGroupvàexcept* - Implement proper cancellation handling
- Tránh task leaks và resource leaks
Structured Concurrency là gì?
Structured Concurrency là paradigm đảm bảo rằng:
- Tất cả tasks con phải hoàn thành trước khi task cha kết thúc
- Exceptions được propagate đúng cách
- Cancellation được handle tự động
- Không có task leaks - mọi task đều được track
Vấn đề với Unstructured Concurrency
python
import asyncio
# ❌ UNSTRUCTURED - Task có thể leak!
async def unstructured_example():
# Tạo task nhưng không track
task1 = asyncio.create_task(fetch_data(1))
task2 = asyncio.create_task(fetch_data(2))
# Nếu exception xảy ra ở đây...
result = await some_operation() # 💥 Exception!
# ...task1 và task2 vẫn đang chạy!
# Chúng trở thành "orphan tasks"
return await asyncio.gather(task1, task2)Giải pháp: Structured Concurrency
python
import asyncio
# ✅ STRUCTURED - Tất cả tasks được quản lý
async def structured_example():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data(1))
task2 = tg.create_task(fetch_data(2))
# Nếu exception xảy ra...
# TaskGroup tự động cancel tất cả tasks!
# Chỉ đến đây khi TẤT CẢ tasks hoàn thành
return task1.result(), task2.result()TaskGroup - Core API
Basic Usage
python
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1) # Simulate API call
return {"id": user_id, "name": f"User {user_id}"}
async def main():
async with asyncio.TaskGroup() as tg:
# Tạo tasks trong context
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_user(2))
task3 = tg.create_task(fetch_user(3))
# Khi ra khỏi context, TẤT CẢ tasks đã hoàn thành
print(task1.result()) # {"id": 1, "name": "User 1"}
print(task2.result()) # {"id": 2, "name": "User 2"}
print(task3.result()) # {"id": 3, "name": "User 3"}
asyncio.run(main())Dynamic Task Creation
python
async def process_items(items: list[str]) -> list[dict]:
results = []
async with asyncio.TaskGroup() as tg:
tasks = []
for item in items:
task = tg.create_task(process_item(item))
tasks.append(task)
# Tất cả tasks hoàn thành
return [task.result() for task in tasks]Nested TaskGroups
python
async def complex_operation():
async with asyncio.TaskGroup() as outer_tg:
# Task 1: Fetch users
async def fetch_all_users():
async with asyncio.TaskGroup() as inner_tg:
tasks = [inner_tg.create_task(fetch_user(i)) for i in range(10)]
return [t.result() for t in tasks]
# Task 2: Fetch products
async def fetch_all_products():
async with asyncio.TaskGroup() as inner_tg:
tasks = [inner_tg.create_task(fetch_product(i)) for i in range(10)]
return [t.result() for t in tasks]
users_task = outer_tg.create_task(fetch_all_users())
products_task = outer_tg.create_task(fetch_all_products())
return users_task.result(), products_task.result()Exception Propagation
ExceptionGroup - Nhiều exceptions cùng lúc
Khi nhiều tasks fail, Python 3.11+ gom chúng vào ExceptionGroup:
python
import asyncio
async def failing_task(task_id: int):
await asyncio.sleep(0.1)
if task_id % 2 == 0:
raise ValueError(f"Task {task_id} failed!")
return f"Task {task_id} succeeded"
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task(1)) # Success
tg.create_task(failing_task(2)) # Fail
tg.create_task(failing_task(3)) # Success
tg.create_task(failing_task(4)) # Fail
except ExceptionGroup as eg:
print(f"Caught {len(eg.exceptions)} exceptions:")
for exc in eg.exceptions:
print(f" - {type(exc).__name__}: {exc}")
asyncio.run(main())
# Output:
# Caught 2 exceptions:
# - ValueError: Task 2 failed!
# - ValueError: Task 4 failed!except* - Selective Exception Handling
python
async def mixed_failures():
async with asyncio.TaskGroup() as tg:
tg.create_task(task_raises_value_error())
tg.create_task(task_raises_type_error())
tg.create_task(task_raises_runtime_error())
async def main():
try:
await mixed_failures()
except* ValueError as eg:
print(f"ValueError(s): {[str(e) for e in eg.exceptions]}")
except* TypeError as eg:
print(f"TypeError(s): {[str(e) for e in eg.exceptions]}")
except* RuntimeError as eg:
print(f"RuntimeError(s): {[str(e) for e in eg.exceptions]}")Handling và Re-raising
python
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_operation())
except* ValueError as eg:
# Handle ValueError, nhưng re-raise nếu có lỗi nghiêm trọng
for exc in eg.exceptions:
if "critical" in str(exc).lower():
raise # Re-raise entire ExceptionGroup
print("Handled non-critical ValueErrors")
except* Exception as eg:
# Log và re-raise các exceptions khác
for exc in eg.exceptions:
print(f"Unexpected error: {exc}")
raiseExceptionGroup Utilities
python
import asyncio
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task_1())
tg.create_task(task_2())
except ExceptionGroup as eg:
# Lọc exceptions theo type
value_errors, rest = eg.split(ValueError)
if value_errors:
print(f"ValueErrors: {value_errors.exceptions}")
if rest:
raise rest # Re-raise các exceptions còn lại
# Hoặc dùng subgroup
type_errors = eg.subgroup(TypeError)
if type_errors:
print(f"TypeErrors: {type_errors.exceptions}")Cancellation Handling
Automatic Cancellation
Khi một task fail, TaskGroup tự động cancel các tasks khác:
python
import asyncio
async def long_running_task(task_id: int):
try:
print(f"Task {task_id}: Starting...")
await asyncio.sleep(10) # Long operation
print(f"Task {task_id}: Completed!")
return f"Result {task_id}"
except asyncio.CancelledError:
print(f"Task {task_id}: Cancelled!")
raise # PHẢI re-raise!
async def failing_task():
await asyncio.sleep(0.5)
raise ValueError("I failed!")
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(long_running_task(1))
tg.create_task(long_running_task(2))
tg.create_task(failing_task()) # Fails after 0.5s
except* ValueError:
print("Caught ValueError")
asyncio.run(main())
# Output:
# Task 1: Starting...
# Task 2: Starting...
# Task 1: Cancelled!
# Task 2: Cancelled!
# Caught ValueErrorProper Cleanup on Cancellation
python
import asyncio
async def task_with_cleanup():
resource = await acquire_resource()
try:
while True:
await asyncio.sleep(1)
await process_with_resource(resource)
except asyncio.CancelledError:
# Cleanup TRƯỚC KHI re-raise
await release_resource(resource)
print("Resource released")
raise # PHẢI re-raise!
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(task_with_cleanup())
tg.create_task(task_that_fails())Shielding from Cancellation
Đôi khi bạn cần hoàn thành một operation dù bị cancel:
python
import asyncio
async def critical_save(data):
"""Operation này PHẢI hoàn thành."""
await asyncio.sleep(1) # Simulate save
print(f"Saved: {data}")
async def task_with_critical_section():
try:
await asyncio.sleep(5) # Normal work
except asyncio.CancelledError:
# Shield critical operation from cancellation
await asyncio.shield(critical_save("important_data"))
raise
async def main():
task = asyncio.create_task(task_with_critical_section())
await asyncio.sleep(0.5)
task.cancel() # Cancel task
try:
await task
except asyncio.CancelledError:
print("Task was cancelled, but data was saved!")
asyncio.run(main())Timeout với TaskGroup
python
import asyncio
async def main():
try:
async with asyncio.timeout(5.0):
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_task_1()) # Takes 10s
tg.create_task(slow_task_2()) # Takes 10s
except TimeoutError:
print("TaskGroup timed out - all tasks cancelled")TaskGroup Patterns
Pattern 1: Collect Results
python
async def fetch_all_users(user_ids: list[int]) -> list[dict]:
async with asyncio.TaskGroup() as tg:
tasks = {
user_id: tg.create_task(fetch_user(user_id))
for user_id in user_ids
}
return {
user_id: task.result()
for user_id, task in tasks.items()
}Pattern 2: First Success (with cancellation)
python
async def first_success(urls: list[str]) -> str:
"""Return first successful response, cancel others."""
result_queue = asyncio.Queue()
async def fetch_and_report(url: str):
try:
result = await fetch(url)
await result_queue.put(result)
except Exception:
pass # Ignore failures
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(fetch_and_report(url))
# Wait for first result
result = await result_queue.get()
# Cancel remaining tasks by exiting context
# (TaskGroup will cancel all pending tasks)
return resultPattern 3: Retry with TaskGroup
python
async def fetch_with_retry(url: str, max_retries: int = 3) -> dict:
last_exception = None
for attempt in range(max_retries):
try:
async with asyncio.timeout(5.0):
return await fetch(url)
except (TimeoutError, ConnectionError) as e:
last_exception = e
await asyncio.sleep(2 ** attempt) # Exponential backoff
raise last_exception
async def fetch_all_with_retry(urls: list[str]) -> list[dict]:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_with_retry(url)) for url in urls]
return [task.result() for task in tasks]Pattern 4: Progress Tracking
python
import asyncio
from dataclasses import dataclass
@dataclass
class Progress:
total: int
completed: int = 0
failed: int = 0
async def process_with_progress(items: list, progress: Progress):
async def process_item(item):
try:
result = await do_work(item)
progress.completed += 1
return result
except Exception as e:
progress.failed += 1
raise
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_item(item)) for item in items]
return [task.result() for task in tasks]
async def main():
items = list(range(100))
progress = Progress(total=len(items))
# Progress reporter
async def report_progress():
while progress.completed + progress.failed < progress.total:
print(f"Progress: {progress.completed}/{progress.total} "
f"(failed: {progress.failed})")
await asyncio.sleep(1)
async with asyncio.TaskGroup() as tg:
tg.create_task(report_progress())
tg.create_task(process_with_progress(items, progress))Production Pitfalls
⚠️ NHỮNG LỖI THƯỜNG GẶP
1. Quên re-raise CancelledError
python
# ❌ SAI - Swallow CancelledError
async def bad_task():
try:
await long_operation()
except asyncio.CancelledError:
print("Cancelled")
# Không re-raise! Task không thực sự cancelled
# ✅ ĐÚNG
async def good_task():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup()
raise # PHẢI re-raise!2. Tạo task ngoài TaskGroup
python
# ❌ SAI - Task leak
async def bad_example():
task = asyncio.create_task(background_work()) # Orphan task!
async with asyncio.TaskGroup() as tg:
tg.create_task(main_work())
# background_work vẫn đang chạy!
# ✅ ĐÚNG
async def good_example():
async with asyncio.TaskGroup() as tg:
tg.create_task(background_work())
tg.create_task(main_work())
# Tất cả tasks hoàn thành3. Blocking code trong TaskGroup
python
# ❌ SAI - Block event loop
async def bad_task():
async with asyncio.TaskGroup() as tg:
tg.create_task(async_work())
time.sleep(5) # BLOCKS everything!
# ✅ ĐÚNG
async def good_task():
async with asyncio.TaskGroup() as tg:
tg.create_task(async_work())
await asyncio.sleep(5) # Non-blocking4. Không handle ExceptionGroup đúng cách
python
# ❌ SAI - Bắt Exception thay vì ExceptionGroup
async def bad_handling():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task())
except Exception as e: # Không bắt được ExceptionGroup!
print(f"Error: {e}")
# ✅ ĐÚNG
async def good_handling():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task())
except* ValueError as eg:
for exc in eg.exceptions:
print(f"ValueError: {exc}")
except* Exception as eg:
for exc in eg.exceptions:
print(f"Other error: {exc}")TaskGroup vs gather() - Khi nào dùng cái nào?
| Aspect | TaskGroup | gather() |
|---|---|---|
| Error handling | Một fail → cancel tất cả | Một fail → others continue |
| Cleanup | Automatic | Manual |
| Syntax | Context manager | Function call |
| Python version | 3.11+ | 3.4+ |
| Exception type | ExceptionGroup | Single exception |
| Use case | Related tasks | Independent tasks |
Khi nào dùng TaskGroup?
- Tasks có liên quan và cần cleanup cùng nhau
- Cần đảm bảo không có task leaks
- Muốn automatic cancellation khi có lỗi
- Python 3.11+
Khi nào dùng gather()?
- Tasks độc lập, không cần cancel nhau
- Cần backward compatibility (Python < 3.11)
- Muốn collect tất cả results kể cả khi có failures (
return_exceptions=True)
Bảng Tóm tắt
python
# === TASKGROUP BASIC ===
import asyncio
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(async_operation_1())
task2 = tg.create_task(async_operation_2())
# Tất cả tasks hoàn thành khi ra khỏi context
# === EXCEPTION HANDLING ===
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task())
except* ValueError as eg:
for exc in eg.exceptions:
print(f"Caught: {exc}")
except* TypeError as eg:
handle_type_errors(eg.exceptions)
# === CANCELLATION HANDLING ===
async def cancellable_task():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup()
raise # PHẢI re-raise!
# === TIMEOUT ===
async with asyncio.timeout(5.0):
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_task())
# === SHIELD CRITICAL OPERATIONS ===
try:
await operation()
except asyncio.CancelledError:
await asyncio.shield(critical_save())
raise
# === EXCEPTIONGROUP UTILITIES ===
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task())
except ExceptionGroup as eg:
# Split by type
value_errors, rest = eg.split(ValueError)
# Get subgroup
type_errors = eg.subgroup(TypeError)