Giao diện
Structured Concurrency — TaskGroup và ExceptionGroup
Một API server chạy 200 requests/giây. Mỗi request tạo 3 background tasks bằng asyncio.create_task(). Khi request timeout, task bị bỏ rơi — không ai cancel, không ai await. Sau 24 giờ: 50,000 orphan tasks đang chạy, memory leak 2GB, server OOM. Đây là hậu quả của unstructured concurrency.
Structured concurrency là paradigm đảm bảo mọi task con phải hoàn thành (hoặc bị cancel) trước khi scope cha kết thúc — giống structured programming đảm bảo mọi goto phải nằm trong block. Python 3.11 giới thiệu asyncio.TaskGroup và ExceptionGroup / except* để implement paradigm này. Trio và anyio đã pioneer concept này từ trước, nhưng giờ nó là phần chính thức của Python.
Bài này sẽ phân tích tại sao unstructured concurrency nguy hiểm, cách TaskGroup giải quyết vấn đề, xử lý nhiều exceptions đồng thời với ExceptionGroup, và patterns cho graceful shutdown trong production.
Bức tranh tư duy
Hãy tưởng tượng một quản lý dự án xây dựng ở TP.HCM. Unstructured concurrency giống như quản lý giao việc cho thợ rồi... bỏ đi. Thợ làm xong không ai nhận. Thợ gặp sự cố không ai biết. Máy móc để chạy không ai tắt. Cuối ngày: công trường bừa bộn, thiết bị hao mòn, budget cháy.
Structured concurrency giống như quản lý dùng bảng theo dõi: giao việc cho nhóm thợ, theo dõi tiến độ, khi một thợ gặp sự cố thì dừng cả nhóm, thu dọn dụng cụ, báo cáo đầy đủ. Quản lý không rời công trường cho đến khi mọi thợ đã hoàn thành hoặc dừng an toàn.
Trong code: TaskGroup là quản lý đó. async with TaskGroup() as tg mở scope, tg.create_task() giao việc, và khi ra khỏi async with — tất cả tasks phải xong. Nếu một task fail → tất cả tasks khác bị cancel → exceptions được gom vào ExceptionGroup → propagate lên caller.
Cốt lõi kỹ thuật
Vấn đề: Unstructured Concurrency
python
import asyncio
async def fetch_data(source: str) -> dict:
await asyncio.sleep(1)
return {"source": source}
# ❌ Unstructured: Task leak khi có exception
async def unstructured():
task1 = asyncio.create_task(fetch_data("api-1"))
task2 = asyncio.create_task(fetch_data("api-2"))
# Exception trước khi await tasks
raise ValueError("Something went wrong")
# task1 và task2 vẫn đang chạy → orphan tasks!
# Không ai cancel, không ai await, memory leak
results = await asyncio.gather(task1, task2)TaskGroup — Giải pháp (Python 3.11+)
python
import asyncio
async def fetch_data(source: str) -> dict:
await asyncio.sleep(1)
return {"source": source}
# ✅ Structured: Mọi tasks được quản lý
async def structured():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data("api-1"))
task2 = tg.create_task(fetch_data("api-2"))
task3 = tg.create_task(fetch_data("api-3"))
# Khi ra khỏi `async with`:
# - TẤT CẢ tasks đã hoàn thành
# - Nếu một task fail → tất cả bị cancel → ExceptionGroup raised
print(task1.result()) # {"source": "api-1"}
print(task2.result()) # {"source": "api-2"}
print(task3.result()) # {"source": "api-3"}
asyncio.run(structured())Automatic Cancellation
Khi một task trong TaskGroup fail, tất cả tasks khác bị cancel tự động:
python
import asyncio
async def slow_task(name: str, duration: float) -> str:
try:
print(f"{name}: bắt đầu")
await asyncio.sleep(duration)
print(f"{name}: hoàn thành")
return f"{name} done"
except asyncio.CancelledError:
print(f"{name}: BỊ CANCEL")
raise # Phải re-raise!
async def failing_task() -> str:
await asyncio.sleep(0.5)
raise ValueError("Task failed!")
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_task("A", 5.0))
tg.create_task(slow_task("B", 5.0))
tg.create_task(failing_task()) # Fail sau 0.5s
except* ValueError as eg:
print(f"Caught {len(eg.exceptions)} errors")
asyncio.run(main())
# Output:
# A: bắt đầu
# B: bắt đầu
# A: BỊ CANCEL
# B: BỊ CANCEL
# Caught 1 errorsExceptionGroup và except*
Khi nhiều tasks fail đồng thời, exceptions được gom vào ExceptionGroup:
python
import asyncio
async def task_may_fail(task_id: int) -> str:
await asyncio.sleep(0.1)
if task_id % 2 == 0:
raise ValueError(f"Task {task_id}: invalid value")
if task_id % 3 == 0:
raise TypeError(f"Task {task_id}: wrong type")
return f"Task {task_id}: OK"
async def main():
try:
async with asyncio.TaskGroup() as tg:
for i in range(6):
tg.create_task(task_may_fail(i))
except* ValueError as eg:
print(f"ValueErrors ({len(eg.exceptions)}):")
for exc in eg.exceptions:
print(f" - {exc}")
except* TypeError as eg:
print(f"TypeErrors ({len(eg.exceptions)}):")
for exc in eg.exceptions:
print(f" - {exc}")
asyncio.run(main())
# Cả hai except* blocks đều có thể chạy!
# ValueErrors (3): Task 0, Task 2, Task 4
# TypeErrors (1): Task 3ExceptionGroup Utilities
python
import asyncio
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task_a())
tg.create_task(task_b())
except ExceptionGroup as eg:
# split() — chia thành hai nhóm: match và rest
value_errors, rest = eg.split(ValueError)
if value_errors:
print(f"Handled {len(value_errors.exceptions)} ValueErrors")
if rest:
raise rest # Re-raise các exceptions không handle
# subgroup() — lấy subgroup theo type
type_errors = eg.subgroup(TypeError)
if type_errors:
for exc in type_errors.exceptions:
print(f"TypeError: {exc}")Cancellation Handling đúng cách
python
import asyncio
async def task_with_cleanup():
"""Task cần cleanup khi bị cancel."""
db_conn = await connect_to_db()
try:
while True:
data = await fetch_next_batch()
await process_batch(data)
except asyncio.CancelledError:
# Cleanup TRƯỚC KHI re-raise
await db_conn.close()
print("DB connection closed")
raise # BẮT BUỘC re-raise!
async def task_with_shield():
"""Bảo vệ critical operation khỏi cancellation."""
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
# Operation này PHẢI hoàn thành dù bị cancel
await asyncio.shield(save_checkpoint("critical_data"))
raiseTimeout với TaskGroup
python
import asyncio
async def main():
# Kết hợp timeout và TaskGroup
try:
async with asyncio.timeout(5.0):
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_operation_a()) # 10s
tg.create_task(slow_operation_b()) # 10s
except TimeoutError:
print("TaskGroup timeout — tất cả tasks đã bị cancel")anyio — Cross-framework Structured Concurrency
anyio cung cấp API thống nhất hoạt động trên cả asyncio và trio:
python
import anyio
async def fetch(url: str) -> str:
await anyio.sleep(0.1)
return f"Data from {url}"
async def main():
# TaskGroup tương tự asyncio nhưng portable
async with anyio.create_task_group() as tg:
results = []
async def worker(url):
result = await fetch(url)
results.append(result)
for i in range(10):
tg.start_soon(worker, f"https://api.example.com/{i}")
print(f"Fetched: {len(results)} results")
# Chạy trên asyncio
anyio.run(main, backend="asyncio")
# Hoặc trio
# anyio.run(main, backend="trio")Thực chiến
Tình huống: Graceful Shutdown cho Async Service
Bối cảnh: Background service xử lý messages từ queue, gọi external APIs, ghi kết quả vào database. Cần graceful shutdown khi nhận SIGTERM — hoàn thành tasks đang chạy, cancel tasks pending, đóng connections.
Mục tiêu: Zero data loss khi shutdown, timeout 30s cho graceful period.
python
import asyncio
import signal
from dataclasses import dataclass
@dataclass
class ServiceState:
running: bool = True
active_tasks: int = 0
processed: int = 0
async def process_message(msg: dict, state: ServiceState) -> None:
"""Xử lý một message — có thể mất 1-5s."""
state.active_tasks += 1
try:
# Giả lập: gọi API + ghi DB
await asyncio.sleep(0.5)
state.processed += 1
finally:
state.active_tasks -= 1
async def message_consumer(
queue: asyncio.Queue,
state: ServiceState,
max_concurrent: int = 10,
):
"""Consumer loop với structured concurrency."""
sem = asyncio.Semaphore(max_concurrent)
async def bounded_process(msg):
async with sem:
await process_message(msg, state)
while state.running:
try:
msg = await asyncio.wait_for(queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
if msg is None:
break
# Tạo task trong scope hiện tại
asyncio.create_task(bounded_process(msg))
async def run_service():
state = ServiceState()
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
# Setup signal handlers
loop = asyncio.get_running_loop()
shutdown_event = asyncio.Event()
def handle_shutdown():
print("Shutdown signal received")
state.running = False
shutdown_event.set()
# Signal handlers (Unix only — skip trên Windows)
try:
loop.add_signal_handler(signal.SIGTERM, handle_shutdown)
loop.add_signal_handler(signal.SIGINT, handle_shutdown)
except NotImplementedError:
pass # Windows không hỗ trợ add_signal_handler
# Giả lập: feed messages
async def message_producer():
i = 0
while state.running:
await queue.put({"id": i, "data": f"msg-{i}"})
i += 1
await asyncio.sleep(0.1)
await queue.put(None)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(message_producer())
tg.create_task(message_consumer(queue, state))
# Đợi shutdown signal hoặc timeout
async def wait_shutdown():
await shutdown_event.wait()
state.running = False
# Grace period: đợi active tasks hoàn thành
for _ in range(300): # 30s timeout
if state.active_tasks == 0:
break
await asyncio.sleep(0.1)
if state.active_tasks > 0:
print(f"Force shutdown: {state.active_tasks} tasks still active")
tg.create_task(wait_shutdown())
except* Exception as eg:
for exc in eg.exceptions:
print(f"Service error: {exc}")
print(f"Service stopped. Processed: {state.processed} messages")
if __name__ == "__main__":
asyncio.run(run_service())Phân tích:
TaskGroupđảm bảo tất cả tasks (producer, consumer, shutdown monitor) được track- Signal handler set
running = False→ producer/consumer loops thoát - Grace period 30s cho active tasks hoàn thành trước force shutdown
Semaphoregiới hạn concurrent processing — tránh overload
Tình huống: Batch API Client với Error Recovery
python
import asyncio
from dataclasses import dataclass
@dataclass
class BatchResult:
success: list[dict]
failures: list[dict]
async def fetch_with_retry(item_id: int, max_retries: int = 3) -> dict:
for attempt in range(max_retries):
try:
async with asyncio.timeout(5.0):
await asyncio.sleep(0.05)
if item_id % 50 == 0:
raise ConnectionError(f"Item {item_id}: connection refused")
return {"id": item_id, "data": "ok"}
except (TimeoutError, ConnectionError):
if attempt == max_retries - 1:
raise
await asyncio.sleep(0.5 * (2 ** attempt))
raise RuntimeError("Unreachable")
async def batch_fetch(item_ids: list[int], batch_size: int = 20) -> BatchResult:
"""Fetch items in batches, collect errors không abort toàn bộ."""
result = BatchResult(success=[], failures=[])
for i in range(0, len(item_ids), batch_size):
batch = item_ids[i:i + batch_size]
try:
async with asyncio.TaskGroup() as tg:
tasks = {
item_id: tg.create_task(fetch_with_retry(item_id))
for item_id in batch
}
except* (ConnectionError, TimeoutError) as eg:
# Gom failures, không abort
for exc in eg.exceptions:
result.failures.append({"error": str(exc)})
# Collect successes
for item_id, task in tasks.items():
if not task.cancelled() and task.exception() is None:
result.success.append(task.result())
return result
async def main():
ids = list(range(200))
result = await batch_fetch(ids, batch_size=20)
print(f"Success: {len(result.success)}, Failures: {len(result.failures)}")
asyncio.run(main())Sai lầm điển hình
❌ Sai lầm 1: Tạo task ngoài TaskGroup scope
Vấn đề: Task tạo bằng create_task() ngoài TaskGroup — orphan task.
python
# SAI: Task ngoài scope — không được quản lý
async def bad_example():
orphan = asyncio.create_task(background_work()) # Orphan!
async with asyncio.TaskGroup() as tg:
tg.create_task(main_work())
# orphan vẫn chạy, không ai cancel, không ai await!Tại sao sai: create_task() ngoài TaskGroup tạo orphan task. Khi function return, task vẫn chạy "vô chủ" — memory leak, resource leak.
python
# ĐÚNG: Tất cả tasks trong TaskGroup
async def good_example():
async with asyncio.TaskGroup() as tg:
tg.create_task(background_work())
tg.create_task(main_work())
# Tất cả tasks hoàn thành khi ra khỏi scope❌ Sai lầm 2: Nuốt CancelledError
Vấn đề: Không re-raise CancelledError — phá vỡ cancellation flow.
python
# SAI: CancelledError bị nuốt → TaskGroup không thể cancel task
async def bad_task():
try:
await long_operation()
except asyncio.CancelledError:
print("Cancelled")
# KHÔNG re-raise → task "sống lại"python
# ĐÚNG: Cleanup rồi re-raise
async def good_task():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup()
raise # BẮT BUỘC re-raise❌ Sai lầm 3: Bắt Exception thay vì dùng except*
Vấn đề: except Exception không bắt được ExceptionGroup đúng cách.
python
# SAI: except Exception không handle ExceptionGroup chi tiết
async def bad_handling():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task())
except Exception as e:
print(f"Error: {e}") # Không truy cập được individual exceptionspython
# ĐÚNG: except* cho selective handling
async def good_handling():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task())
except* ValueError as eg:
for exc in eg.exceptions:
print(f"ValueError: {exc}")
except* TypeError as eg:
for exc in eg.exceptions:
print(f"TypeError: {exc}")❌ Sai lầm 4: Blocking code trong TaskGroup
Vấn đề: Gọi sync blocking function — đóng băng tất cả tasks trong group.
python
# SAI: time.sleep block event loop → tất cả tasks trong group bị treo
import time
async def bad_usage():
async with asyncio.TaskGroup() as tg:
tg.create_task(async_work())
time.sleep(5) # BLOCK toàn bộ!python
# ĐÚNG: Dùng asyncio.sleep hoặc run_in_executor
async def good_usage():
async with asyncio.TaskGroup() as tg:
tg.create_task(async_work())
await asyncio.sleep(5) # Non-blockingUnder the Hood
TaskGroup vs gather() — Khi nào dùng gì
| Aspect | TaskGroup | gather() |
|---|---|---|
| Error behavior | Một fail → cancel tất cả | Một fail → others tiếp tục |
| Cleanup | Tự động | Manual |
| Exception type | ExceptionGroup | Single exception |
| Python version | 3.11+ | 3.4+ |
| Syntax | Context manager | Function call |
| Task lifecycle | Guaranteed completion | Có thể leak |
Dùng TaskGroup khi: Tasks liên quan, cần cancel cùng nhau, cần guaranteed cleanup, Python 3.11+.
Dùng gather() khi: Tasks độc lập, cần backward compatibility, muốn return_exceptions=True để collect tất cả kết quả kể cả failures.
Structured Concurrency Guarantees
Structured concurrency đảm bảo 3 tính chất:
- Containment: Mọi child task phải hoàn thành trước khi scope cha kết thúc
- Propagation: Exceptions từ child tasks propagate lên parent
- Cancellation: Khi parent bị cancel, tất cả children bị cancel
python
# Visualization of task tree:
# main()
# └── TaskGroup (scope)
# ├── task_a() → done
# ├── task_b() → FAILED!
# │ → cancel task_a, task_c
# │ → ExceptionGroup propagated to main
# └── task_c() → cancelledanyio vs asyncio TaskGroup
| Feature | asyncio.TaskGroup | anyio.create_task_group |
|---|---|---|
| Backend | asyncio only | asyncio + trio |
| API | tg.create_task() | tg.start_soon() |
| Cancel scope | Qua asyncio.timeout() | Built-in CancelScope |
| Portability | CPython only | Cross-framework |
Trade-offs
Structured concurrency mạnh khi: Cần guaranteed cleanup, complex task hierarchies, production services cần reliability. Debugging dễ hơn — stack traces rõ ràng, không có orphan tasks.
Structured concurrency yếu khi: Tasks thực sự cần sống lâu hơn scope (daemon tasks, background monitoring). Workaround: dedicated long-lived TaskGroup hoặc asyncio.create_task() cho fire-and-forget (chấp nhận risk).
Checklist ghi nhớ
✅ Checklist triển khai
TaskGroup
- [ ] Tạo mọi tasks bên trong
async with TaskGroup()— không dùngcreate_task()ngoài scope - [ ] Luôn handle
except*cho các exception types cụ thể - [ ] Kết hợp
asyncio.timeout()với TaskGroup cho deadline-based cancellation
CancelledError
- [ ] Luôn re-raise
CancelledErrorsau cleanup - [ ] Dùng
asyncio.shield()cho critical operations không thể cancel - [ ] Không bắt
CancelledErrorbằng bareexcept Exception
ExceptionGroup
- [ ] Dùng
except*(Python 3.11+) cho selective exception handling - [ ] Dùng
.split()để tách exceptions theo type - [ ] Dùng
.subgroup()để lọc exceptions
Graceful Shutdown
- [ ] Handle SIGTERM/SIGINT → set shutdown flag
- [ ] Grace period cho active tasks hoàn thành
- [ ] Close connections, flush buffers trước khi exit
- [ ] Log số tasks đã hoàn thành vs cancelled khi shutdown
Production
- [ ] Dùng anyio nếu cần portability giữa asyncio/trio
- [ ] Monitor orphan tasks — chúng là signal của unstructured code
- [ ] Test cancellation paths — chúng thường untested và buggy
Bài tập luyện tập
Bài 1: TaskGroup Error Handling — Advanced
Đề bài: Viết hàm resilient_fetch(urls) dùng TaskGroup. Nếu một URL fail, gom vào error list nhưng KHÔNG cancel các URLs khác. Trả về (successes, failures).
🧠 Quiz
Câu hỏi: Khi dùng except* với TaskGroup, điều gì xảy ra nếu có cả ValueError và TypeError trong ExceptionGroup?
- [ ] A. Chỉ
except* ValueErrorchạy, TypeError bị nuốt - [ ] B. Chỉ
except* TypeErrorchạy, ValueError bị nuốt - [x] C. Cả hai
except* ValueErrorvàexcept* TypeErrorđều chạy - [ ] D. RuntimeError vì ExceptionGroup chứa nhiều loại Giải thích:
except*hoạt động khácexceptthông thường. Nó "split" ExceptionGroup theo type — mỗiexcept*clause xử lý phần match của nó. Nếu có cả ValueError và TypeError, cả hai clauses đều chạy, mỗi clause nhận ExceptionGroup con chỉ chứa type tương ứng.
💡 Gợi ý
- TaskGroup cancel tất cả khi có failure → cần approach khác
- Bọc mỗi task trong try-except riêng để ngăn propagation
- Hoặc chia thành batches nhỏ, mỗi batch là một TaskGroup
✅ Lời giải
python
import asyncio
async def fetch_url(url: str) -> str:
await asyncio.sleep(0.1)
if "bad" in url:
raise ConnectionError(f"Cannot connect to {url}")
return f"Data from {url}"
async def resilient_fetch(
urls: list[str],
) -> tuple[list[str], list[dict]]:
successes: list[str] = []
failures: list[dict] = []
async def safe_fetch(url: str):
try:
result = await fetch_url(url)
successes.append(result)
except Exception as e:
failures.append({"url": url, "error": str(e)})
# Bọc trong try-except → TaskGroup không thấy exception
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(safe_fetch(url))
return successes, failures
async def main():
urls = [
"https://api.example.com/1",
"https://bad.example.com/2",
"https://api.example.com/3",
"https://bad.example.com/4",
"https://api.example.com/5",
]
ok, fail = await resilient_fetch(urls)
print(f"Success: {len(ok)}, Failed: {len(fail)}")
for f in fail:
print(f" Error: {f}")
asyncio.run(main())Phân tích: Key insight — bọc exception bên trong task để TaskGroup không thấy failure. Mỗi task tự handle exception → TaskGroup thấy tất cả tasks succeed → không cancel. Trade-off: mất automatic cancellation benefit — chấp nhận khi tasks thực sự independent.
Bài 2: Graceful Shutdown — Advanced
Đề bài: Implement async worker service nhận tasks từ queue, xử lý với 5 concurrent workers. Khi nhận signal shutdown (giả lập), dừng nhận tasks mới, đợi tasks đang chạy hoàn thành (tối đa 10s), rồi thoát. Report số tasks completed vs cancelled.
💡 Gợi ý
- Dùng
asyncio.Eventcho shutdown signal - Workers check
shutdown_eventmỗi iteration - Grace period:
asyncio.wait_for(pending_tasks, timeout=10)
✅ Lời giải
python
import asyncio
async def worker(
queue: asyncio.Queue,
shutdown: asyncio.Event,
stats: dict,
worker_id: int,
):
while not shutdown.is_set():
try:
task = await asyncio.wait_for(queue.get(), timeout=0.5)
except asyncio.TimeoutError:
continue
if task is None:
break
try:
await asyncio.sleep(0.2) # Giả lập xử lý
stats["completed"] += 1
except asyncio.CancelledError:
stats["cancelled"] += 1
raise
finally:
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
shutdown = asyncio.Event()
stats = {"completed": 0, "cancelled": 0}
# Feed tasks
for i in range(100):
await queue.put(f"task-{i}")
# Start workers
workers = [
asyncio.create_task(worker(queue, shutdown, stats, i))
for i in range(5)
]
# Giả lập: shutdown sau 2s
await asyncio.sleep(2)
print("Shutdown signal!")
shutdown.set()
# Grace period: đợi workers tối đa 10s
try:
async with asyncio.timeout(10.0):
# Gửi sentinel cho mỗi worker
for _ in range(5):
await queue.put(None)
await asyncio.gather(*workers)
except TimeoutError:
for w in workers:
w.cancel()
await asyncio.gather(*workers, return_exceptions=True)
print(f"Completed: {stats['completed']}")
print(f"Cancelled: {stats['cancelled']}")
print(f"Remaining in queue: {queue.qsize()}")
asyncio.run(main())Phân tích: Shutdown flow: (1) Set event → workers stop taking new tasks, (2) Send sentinels → workers exit loop, (3) Grace period 10s → force cancel nếu quá lâu. Production: thêm logging, metrics, health check endpoint.
Liên kết học tiếp
Từ khóa glossary: structured concurrency, TaskGroup, ExceptionGroup, except*, CancelledError, shield, anyio, trio, graceful shutdown
Tìm kiếm liên quan: taskgroup python, exception group python 3.11, structured concurrency, graceful shutdown asyncio