Giao diện
Event Sourcing & CQRS — Lưu trữ sự kiện thay vì trạng thái
Hệ thống ngân hàng phát hiện giao dịch bất thường vào lúc 2 giờ sáng. Đội compliance cần audit trail — ai đã thay đổi gì, khi nào, tại sao. Với database truyền thống chỉ lưu trạng thái cuối cùng (balance = 1,200,000), mọi thông tin về hành trình đi đến con số đó đã bị ghi đè và mất vĩnh viễn.
Event Sourcing giải quyết triệt để vấn đề này: thay vì lưu trạng thái cuối, ta lưu toàn bộ chuỗi sự kiện đã xảy ra. Trạng thái hiện tại được tái tạo bằng cách replay tất cả event từ đầu. Kết hợp với CQRS (Command Query Responsibility Segregation) — tách biệt hoàn toàn luồng ghi và luồng đọc — ta có kiến trúc mạnh mẽ cho hệ thống audit-critical, event-driven, và cần temporal query.
🎯 Mục tiêu
Sau bài này, bạn sẽ:
- Hiểu core concept của Event Sourcing và khi nào nên (không nên) áp dụng
- Thiết kế Event Store, Projections, và Snapshot strategy
- Phân biệt CQRS với CRUD truyền thống và biết cách kết hợp với Event Sourcing
- Tránh những sai lầm phổ biến về schema versioning và eventual consistency
Bức tranh tư duy
Sổ cái kế toán — Không bao giờ xóa, chỉ ghi thêm
Hãy nghĩ về sổ cái kế toán (ledger) truyền thống. Kế toán viên không bao giờ xóa hay sửa một dòng đã ghi — nếu có sai sót, họ ghi thêm bút toán điều chỉnh (adjusting entry). Số dư tài khoản tại bất kỳ thời điểm nào = tổng hợp tất cả bút toán từ đầu đến thời điểm đó.
Event Sourcing hoạt động y hệt: database là append-only log chứa mọi sự kiện đã xảy ra. Trạng thái hiện tại là kết quả phái sinh (derived) từ chuỗi sự kiện, không phải nguồn sự thật.
Traditional CRUD vs Event Sourcing
Cốt lõi kỹ thuật
Event Store — Immutable append-only log
Event Store là trung tâm của kiến trúc Event Sourcing. Mỗi event được ghi một lần, không bao giờ bị sửa hay xóa.
Event Schema chuẩn:
json
{
"eventId": "evt-a1b2c3d4",
"aggregateId": "account-789",
"aggregateType": "BankAccount",
"eventType": "MoneyDeposited",
"version": 4,
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"amount": 200000,
"currency": "VND",
"source": "bank-transfer"
},
"metadata": {
"userId": "user-456",
"correlationId": "txn-xyz",
"causationId": "cmd-deposit-001"
}
}Nguyên tắc thiết kế Event Store:
| Nguyên tắc | Mô tả |
|---|---|
| Immutable | Event đã ghi không bao giờ được UPDATE hay DELETE |
| Ordered | Mỗi stream có version number tăng đơn điệu |
| Append-only | Chỉ hỗ trợ INSERT — ghi thêm event mới |
| Optimistic Concurrency | Dùng expected version để tránh write conflict |
| Event Versioning | Schema evolution qua upcasting, không sửa event cũ |
Optimistic concurrency control — khi ghi event, client gửi kèm expectedVersion. Nếu version hiện tại trong store khác expected, write bị reject:
python
def append_event(stream_id, event, expected_version):
current_version = get_stream_version(stream_id)
if current_version != expected_version:
raise ConcurrencyConflict(
f"Expected version {expected_version}, "
f"but stream is at {current_version}"
)
event.version = current_version + 1
event_store.append(stream_id, event)Projections (Read Models) — Materialize từ events
Projection là quá trình biến đổi chuỗi event thành read model phục vụ truy vấn. Mỗi projection là một view được tính toán (derived) từ events.
python
class AccountBalanceProjection:
"""Projection: tính số dư từ event stream."""
def __init__(self):
self.balances = {} # aggregate_id -> balance
def handle(self, event):
handlers = {
"AccountOpened": self._on_opened,
"MoneyDeposited": self._on_deposited,
"MoneyWithdrawn": self._on_withdrawn,
"AccountClosed": self._on_closed,
}
handler = handlers.get(event.event_type)
if handler:
handler(event)
def _on_opened(self, event):
self.balances[event.aggregate_id] = event.payload["initial_balance"]
def _on_deposited(self, event):
self.balances[event.aggregate_id] += event.payload["amount"]
def _on_withdrawn(self, event):
self.balances[event.aggregate_id] -= event.payload["amount"]
def _on_closed(self, event):
del self.balances[event.aggregate_id]Đặc điểm quan trọng:
- Rebuild from scratch: Xóa read model, replay toàn bộ event → trạng thái chính xác 100%
- Multiple projections: Cùng event stream có thể tạo nhiều read model khác nhau (balance view, transaction history, monthly report)
- Asynchronous: Projection thường chạy async, nên read model có eventual consistency
CQRS — Tách biệt Command và Query
CQRS chia hệ thống thành hai phía hoàn toàn độc lập:
Tại sao CQRS + Event Sourcing là cặp đôi hoàn hảo?
| Khía cạnh | Chỉ CRUD | CQRS (không ES) | Event Sourcing + CQRS |
|---|---|---|---|
| Audit trail | ❌ Không có | ❌ Không có | ✅ Hoàn chỉnh |
| Temporal query | ❌ Không thể | ❌ Không thể | ✅ Replay đến bất kỳ thời điểm |
| Read/Write scale | Cùng DB | Tách DB → scale riêng | Tách DB + rebuild projection |
| Complexity | Thấp | Trung bình | Cao |
| Phù hợp cho | CRUD đơn giản | Read-heavy apps | Audit-critical, event-driven |
Snapshots — Tối ưu hiệu năng replay
Khi một aggregate có hàng nghìn event, replay từ đầu rất chậm. Snapshot lưu trạng thái tại một thời điểm, giúp chỉ cần replay các event sau snapshot.
python
class SnapshotStrategy:
SNAPSHOT_INTERVAL = 100 # Tạo snapshot sau mỗi 100 events
def load_aggregate(self, aggregate_id):
snapshot = snapshot_store.get_latest(aggregate_id)
if snapshot:
aggregate = rebuild_from_snapshot(snapshot)
start_version = snapshot.version + 1
else:
aggregate = create_empty_aggregate(aggregate_id)
start_version = 0
# Chỉ replay events SAU snapshot
new_events = event_store.get_events(
aggregate_id,
from_version=start_version
)
for event in new_events:
aggregate.apply(event)
# Tạo snapshot mới nếu đủ điều kiện
if aggregate.version - (snapshot.version if snapshot else 0) >= self.SNAPSHOT_INTERVAL:
snapshot_store.save(aggregate_id, aggregate.version, aggregate.state)
return aggregateKhi nào cần snapshot:
- Aggregate có > 100-200 events
- Thời gian load aggregate vượt quá SLA (thường > 100ms)
- Event stream dài nhưng hiếm khi cần replay toàn bộ
Thực chiến
Ví dụ 1: E-commerce Order System
Lifecycle của một đơn hàng thể hiện qua chuỗi event:
python
# --- Domain Events ---
@dataclass
class OrderCreated:
order_id: str
customer_id: str
items: list
timestamp: datetime
@dataclass
class ItemAdded:
order_id: str
item_id: str
quantity: int
unit_price: int
@dataclass
class PaymentProcessed:
order_id: str
payment_id: str
amount: int
method: str # "credit_card", "bank_transfer"
@dataclass
class OrderShipped:
order_id: str
tracking_number: str
carrier: str
# --- Aggregate ---
class OrderAggregate:
def __init__(self, order_id):
self.order_id = order_id
self.status = None
self.items = []
self.total = 0
self.version = 0
self._pending_events = []
def create(self, customer_id, items):
if self.status is not None:
raise InvalidOperation("Order already exists")
self._apply(OrderCreated(
order_id=self.order_id,
customer_id=customer_id,
items=items,
timestamp=datetime.utcnow()
))
def add_item(self, item_id, quantity, unit_price):
if self.status != "CREATED":
raise InvalidOperation("Cannot add items after payment")
self._apply(ItemAdded(
order_id=self.order_id,
item_id=item_id,
quantity=quantity,
unit_price=unit_price
))
def process_payment(self, payment_id, amount, method):
if self.status != "CREATED":
raise InvalidOperation("Invalid status for payment")
if amount < self.total:
raise InvalidOperation("Insufficient payment amount")
self._apply(PaymentProcessed(
order_id=self.order_id,
payment_id=payment_id,
amount=amount,
method=method
))
# --- Event Handlers (state mutation) ---
def _on_order_created(self, event):
self.status = "CREATED"
self.items = event.items
def _on_item_added(self, event):
self.items.append({"item_id": event.item_id, "qty": event.quantity})
self.total += event.quantity * event.unit_price
def _on_payment_processed(self, event):
self.status = "PAID"
def _on_order_shipped(self, event):
self.status = "SHIPPED"
def _apply(self, event):
handler_name = f"_on_{self._to_snake_case(type(event).__name__)}"
getattr(self, handler_name)(event)
self.version += 1
self._pending_events.append(event)Ví dụ 2: Bank Account — Audit & Temporal Query
python
class BankAccountAuditService:
"""Replay event stream để audit tại bất kỳ thời điểm."""
def get_balance_at(self, account_id, target_time):
"""Temporal query: số dư tại thời điểm target_time."""
events = event_store.get_events(account_id)
balance = 0
for event in events:
if event.timestamp > target_time:
break # Dừng replay tại thời điểm cần
if event.event_type == "MoneyDeposited":
balance += event.payload["amount"]
elif event.event_type == "MoneyWithdrawn":
balance -= event.payload["amount"]
return balance
def get_audit_trail(self, account_id, start_time, end_time):
"""Audit trail: mọi thay đổi trong khoảng thời gian."""
events = event_store.get_events(account_id)
return [
{
"time": e.timestamp,
"action": e.event_type,
"amount": e.payload.get("amount"),
"user": e.metadata.get("userId"),
"correlation": e.metadata.get("correlationId"),
}
for e in events
if start_time <= e.timestamp <= end_time
]Kết quả audit trail:
Thời gian | Hành động | Số tiền | User
2024-01-10 09:00:00 | AccountOpened | +1,000,000 | admin-001
2024-01-12 14:30:00 | MoneyDeposited | +500,000 | user-456
2024-01-13 08:15:00 | MoneyWithdrawn | -300,000 | user-456
2024-01-15 10:30:00 | MoneyDeposited | +200,000 | user-456→ Với CRUD, bạn chỉ thấy balance = 1,400,000. Với Event Sourcing, bạn thấy toàn bộ hành trình.
Sai lầm điển hình
❌ Sai lầm 1: Áp dụng Event Sourcing cho mọi service
Sai: "Event Sourcing tuyệt vời, áp dụng cho toàn bộ hệ thống!"
Đúng: Chỉ dùng Event Sourcing cho domain phức tạp cần audit trail, temporal query, hoặc event replay. Một service CRUD đơn giản (quản lý user profile, config) không cần — overhead không đáng.
Nguyên tắc: Event Sourcing phù hợp cho core domain (tài chính, đơn hàng, inventory). Các supporting domain dùng CRUD bình thường.
❌ Sai lầm 2: Thay đổi event schema mà không có versioning strategy
Sai: Rename field amount → totalAmount trong event PaymentProcessed. Mọi projection cũ đều vỡ.
Đúng: Dùng upcasting — transform event cũ sang schema mới khi đọc, không sửa event trong store:
python
def upcast_payment_processed_v1_to_v2(event):
"""Transform v1 event sang v2 format khi đọc."""
if event.schema_version == 1:
event.payload["totalAmount"] = event.payload.pop("amount")
event.schema_version = 2
return event❌ Sai lầm 3: Projection rebuild quá chậm vì thiếu snapshots
Sai: Aggregate có 50,000 events, mỗi lần load phải replay từ event #1. Response time > 5 giây.
Đúng: Tạo snapshot định kỳ (mỗi 100-500 events). Load snapshot gần nhất → chỉ replay vài chục event mới. Response time < 100ms.
❌ Sai lầm 4: Query read model ngay sau command và mong nhận kết quả mới
Sai:
POST /orders (create order) → 200 OK
GET /orders/123 → 404 Not Found ???Đúng: Read model cập nhật asynchronous (eventual consistency). Client cần:
- Dùng command ID để poll trạng thái
- Dùng WebSocket/SSE để nhận notification khi projection cập nhật xong
- Hoặc trả về write model response trực tiếp từ command handler (cho những trường hợp cần response ngay)
Under the Hood
Storage & Performance
Event store growth: Events tăng tuyến tính theo thời gian. Một hệ thống e-commerce trung bình:
- 1,000 đơn hàng/ngày × 5 events/đơn = 5,000 events/ngày
- 1 event ≈ 500 bytes → ~2.5 MB/ngày → ~900 MB/năm
- Hoàn toàn khả thi cho storage hiện đại
Projection rebuild time (benchmark tham khảo):
| Số events | Không snapshot | Có snapshot (mỗi 100) |
|---|---|---|
| 100 | ~5ms | ~5ms |
| 1,000 | ~50ms | ~8ms |
| 10,000 | ~500ms | ~12ms |
| 100,000 | ~5s | ~15ms |
Consistency Guarantees
┌─────────────────────────────────────────────────────────────┐
│ Write Side (Command) │ Read Side (Query) │
│ ───────────────────── │ ──────────────── │
│ ✅ Strong Consistency │ ⚡ Eventual Consistency │
│ - Event Store là source of │ - Projection cập nhật │
│ truth │ async sau khi event │
│ - Optimistic concurrency │ được publish │
│ đảm bảo write integrity │ - Độ trễ thường < 100ms │
│ - Mỗi aggregate stream │ trong điều kiện bình │
│ là strongly consistent │ thường │
└─────────────────────────────────────────────────────────────┘Trade-offs: Event Sourcing vs CRUD
| Tiêu chí | CRUD truyền thống | Event Sourcing + CQRS |
|---|---|---|
| Complexity | ⭐ Thấp | ⭐⭐⭐⭐ Cao |
| Audit trail | ❌ Cần thêm audit table | ✅ Built-in |
| Temporal query | ❌ Không thể | ✅ Replay đến bất kỳ thời điểm |
| Debug production | 😰 Khó — chỉ thấy state cuối | 😊 Dễ — replay event để tái hiện bug |
| Storage | Ít hơn (chỉ state cuối) | Nhiều hơn (toàn bộ history) |
| Schema evolution | ALTER TABLE (risky) | Event versioning + upcasting |
| Read performance | Truy vấn trực tiếp | Cần projection (nhưng optimized cho read) |
| Undo/Compensate | Tự implement | Tự nhiên — emit compensating event |
| Team learning curve | Thấp | Cao — cần hiểu DDD, event modeling |
Thực tế production
Nhiều hệ thống dùng hybrid approach: Event Sourcing cho core domain (orders, payments, inventory) và CRUD cho supporting domain (user settings, notifications, CMS). Đừng ép mọi thứ vào một pattern.
Checklist ghi nhớ
✅ Checklist triển khai
Event Store
- [ ] Events là immutable — KHÔNG BAO GIỜ update hay delete event
- [ ] Mỗi event có
eventId,aggregateId,version,timestamp,payload,metadata - [ ] Dùng optimistic concurrency (expected version) để tránh write conflict
- [ ] Event schema có version number, dùng upcasting cho evolution
Projections & Read Models
- [ ] Projection có thể rebuild từ scratch bất cứ lúc nào
- [ ] Mỗi read model tối ưu cho use case cụ thể (không dùng chung)
- [ ] Chấp nhận eventual consistency — read model có thể trễ vài milliseconds
CQRS Architecture
- [ ] Command side validate business rules trước khi emit event
- [ ] Query side KHÔNG chứa business logic — chỉ đọc từ read model
- [ ] Command trả về acknowledgment, không trả về read model data
Performance & Operations
- [ ] Snapshot strategy cho aggregate có > 100 events
- [ ] Monitor projection lag (thời gian từ event → read model cập nhật)
- [ ] Chỉ dùng Event Sourcing cho domain thực sự cần audit trail hoặc temporal query
Bài tập luyện tập
Bài 1: Khi nào dùng Event Sourcing? — Foundation
🧠 Quiz
Câu hỏi: Team bạn đang thiết kế hệ thống mới. Service nào phù hợp nhất để áp dụng Event Sourcing?
- [ ] A. User Profile Service — lưu tên, email, avatar
- [ ] B. Configuration Service — lưu feature flags và app settings
- [x] C. Financial Transaction Service — quản lý chuyển tiền và audit compliance
- [ ] D. Notification Service — gửi email và push notification
Giải thích: Financial Transaction Service cần complete audit trail (ai chuyển tiền cho ai, bao nhiêu, khi nào), temporal query (số dư tại thời điểm X), và khả năng replay events để kiểm tra giao dịch bất thường. Các service khác là CRUD đơn giản — Event Sourcing tạo ra overhead không cần thiết.
Bài 2: Lợi ích của CQRS — Foundation
🧠 Quiz
Câu hỏi: Hệ thống e-commerce có tỷ lệ read:write = 100:1 (đọc nhiều gấp 100 lần ghi). Lợi ích lớn nhất khi áp dụng CQRS là gì?
- [ ] A. Giảm complexity của codebase
- [x] B. Scale read và write side độc lập — tối ưu read model cho query patterns cụ thể
- [ ] C. Đảm bảo strong consistency cho mọi query
- [ ] D. Loại bỏ hoàn toàn eventual consistency
Giải thích: CQRS cho phép scale read side riêng (thêm read replicas, cache, denormalize) mà không ảnh hưởng write side. Read model được tối ưu cho query patterns cụ thể (denormalized, pre-computed). Lưu ý: CQRS tăng complexity và read model vẫn có eventual consistency.
Bài 3: Thiết kế architecture — Scenario
🧠 Quiz — Scenario — Chọn architecture phù hợp
Bối cảnh: Bạn đang thiết kế hệ thống quản lý đơn hàng cho sàn thương mại điện tử với yêu cầu:
- 10,000 đơn hàng/ngày, mỗi đơn trải qua 5-8 trạng thái
- Compliance yêu cầu audit trail đầy đủ, giữ data 7 năm
- Cần khả năng replay để debug dispute giữa buyer và seller
- Dashboard real-time hiển thị metrics cho operations team
Câu hỏi: Architecture nào phù hợp nhất?
- [ ] A. Monolith + PostgreSQL — đơn giản, dùng triggers để log thay đổi
- [ ] B. Microservices + CRUD — mỗi service có database riêng
- [x] C. Event Sourcing + CQRS cho Order Service — CRUD cho các service phụ trợ (user, notification)
- [ ] D. Event Sourcing cho toàn bộ hệ thống — consistency tốt hơn
Giải thích: Order Service là core domain với yêu cầu audit trail 7 năm, temporal query (replay dispute), và nhiều state transitions → Event Sourcing + CQRS là lựa chọn tự nhiên. Nhưng áp dụng cho toàn bộ hệ thống (đáp án D) là over-engineering — user service, notification service dùng CRUD là đủ. Đáp án A không scale và triggers-based audit thiếu reliability. Đáp án B thiếu audit trail native.