Skip to content

Event Sourcing & CQRS — Lưu trữ sự kiện thay vì trạng thái

Hệ thống ngân hàng phát hiện giao dịch bất thường vào lúc 2 giờ sáng. Đội compliance cần audit trail — ai đã thay đổi , khi nào, tại sao. Với database truyền thống chỉ lưu trạng thái cuối cùng (balance = 1,200,000), mọi thông tin về hành trình đi đến con số đó đã bị ghi đè và mất vĩnh viễn.

Event Sourcing giải quyết triệt để vấn đề này: thay vì lưu trạng thái cuối, ta lưu toàn bộ chuỗi sự kiện đã xảy ra. Trạng thái hiện tại được tái tạo bằng cách replay tất cả event từ đầu. Kết hợp với CQRS (Command Query Responsibility Segregation) — tách biệt hoàn toàn luồng ghi và luồng đọc — ta có kiến trúc mạnh mẽ cho hệ thống audit-critical, event-driven, và cần temporal query.

🎯 Mục tiêu

Sau bài này, bạn sẽ:

  • Hiểu core concept của Event Sourcing và khi nào nên (không nên) áp dụng
  • Thiết kế Event Store, Projections, và Snapshot strategy
  • Phân biệt CQRS với CRUD truyền thống và biết cách kết hợp với Event Sourcing
  • Tránh những sai lầm phổ biến về schema versioning và eventual consistency

Bức tranh tư duy

Sổ cái kế toán — Không bao giờ xóa, chỉ ghi thêm

Hãy nghĩ về sổ cái kế toán (ledger) truyền thống. Kế toán viên không bao giờ xóa hay sửa một dòng đã ghi — nếu có sai sót, họ ghi thêm bút toán điều chỉnh (adjusting entry). Số dư tài khoản tại bất kỳ thời điểm nào = tổng hợp tất cả bút toán từ đầu đến thời điểm đó.

Event Sourcing hoạt động y hệt: database là append-only log chứa mọi sự kiện đã xảy ra. Trạng thái hiện tại là kết quả phái sinh (derived) từ chuỗi sự kiện, không phải nguồn sự thật.

Traditional CRUD vs Event Sourcing

Cốt lõi kỹ thuật

Event Store — Immutable append-only log

Event Store là trung tâm của kiến trúc Event Sourcing. Mỗi event được ghi một lần, không bao giờ bị sửa hay xóa.

Event Schema chuẩn:

json
{
  "eventId": "evt-a1b2c3d4",
  "aggregateId": "account-789",
  "aggregateType": "BankAccount",
  "eventType": "MoneyDeposited",
  "version": 4,
  "timestamp": "2024-01-15T10:30:00Z",
  "payload": {
    "amount": 200000,
    "currency": "VND",
    "source": "bank-transfer"
  },
  "metadata": {
    "userId": "user-456",
    "correlationId": "txn-xyz",
    "causationId": "cmd-deposit-001"
  }
}

Nguyên tắc thiết kế Event Store:

Nguyên tắcMô tả
ImmutableEvent đã ghi không bao giờ được UPDATE hay DELETE
OrderedMỗi stream có version number tăng đơn điệu
Append-onlyChỉ hỗ trợ INSERT — ghi thêm event mới
Optimistic ConcurrencyDùng expected version để tránh write conflict
Event VersioningSchema evolution qua upcasting, không sửa event cũ

Optimistic concurrency control — khi ghi event, client gửi kèm expectedVersion. Nếu version hiện tại trong store khác expected, write bị reject:

python
def append_event(stream_id, event, expected_version):
    current_version = get_stream_version(stream_id)
    if current_version != expected_version:
        raise ConcurrencyConflict(
            f"Expected version {expected_version}, "
            f"but stream is at {current_version}"
        )
    event.version = current_version + 1
    event_store.append(stream_id, event)

Projections (Read Models) — Materialize từ events

Projection là quá trình biến đổi chuỗi event thành read model phục vụ truy vấn. Mỗi projection là một view được tính toán (derived) từ events.

python
class AccountBalanceProjection:
    """Projection: tính số dư từ event stream."""

    def __init__(self):
        self.balances = {}  # aggregate_id -> balance

    def handle(self, event):
        handlers = {
            "AccountOpened": self._on_opened,
            "MoneyDeposited": self._on_deposited,
            "MoneyWithdrawn": self._on_withdrawn,
            "AccountClosed": self._on_closed,
        }
        handler = handlers.get(event.event_type)
        if handler:
            handler(event)

    def _on_opened(self, event):
        self.balances[event.aggregate_id] = event.payload["initial_balance"]

    def _on_deposited(self, event):
        self.balances[event.aggregate_id] += event.payload["amount"]

    def _on_withdrawn(self, event):
        self.balances[event.aggregate_id] -= event.payload["amount"]

    def _on_closed(self, event):
        del self.balances[event.aggregate_id]

Đặc điểm quan trọng:

  • Rebuild from scratch: Xóa read model, replay toàn bộ event → trạng thái chính xác 100%
  • Multiple projections: Cùng event stream có thể tạo nhiều read model khác nhau (balance view, transaction history, monthly report)
  • Asynchronous: Projection thường chạy async, nên read model có eventual consistency

CQRS — Tách biệt Command và Query

CQRS chia hệ thống thành hai phía hoàn toàn độc lập:

Tại sao CQRS + Event Sourcing là cặp đôi hoàn hảo?

Khía cạnhChỉ CRUDCQRS (không ES)Event Sourcing + CQRS
Audit trail❌ Không có❌ Không có✅ Hoàn chỉnh
Temporal query❌ Không thể❌ Không thể✅ Replay đến bất kỳ thời điểm
Read/Write scaleCùng DBTách DB → scale riêngTách DB + rebuild projection
ComplexityThấpTrung bìnhCao
Phù hợp choCRUD đơn giảnRead-heavy appsAudit-critical, event-driven

Snapshots — Tối ưu hiệu năng replay

Khi một aggregate có hàng nghìn event, replay từ đầu rất chậm. Snapshot lưu trạng thái tại một thời điểm, giúp chỉ cần replay các event sau snapshot.

python
class SnapshotStrategy:
    SNAPSHOT_INTERVAL = 100  # Tạo snapshot sau mỗi 100 events

    def load_aggregate(self, aggregate_id):
        snapshot = snapshot_store.get_latest(aggregate_id)

        if snapshot:
            aggregate = rebuild_from_snapshot(snapshot)
            start_version = snapshot.version + 1
        else:
            aggregate = create_empty_aggregate(aggregate_id)
            start_version = 0

        # Chỉ replay events SAU snapshot
        new_events = event_store.get_events(
            aggregate_id,
            from_version=start_version
        )
        for event in new_events:
            aggregate.apply(event)

        # Tạo snapshot mới nếu đủ điều kiện
        if aggregate.version - (snapshot.version if snapshot else 0) >= self.SNAPSHOT_INTERVAL:
            snapshot_store.save(aggregate_id, aggregate.version, aggregate.state)

        return aggregate

Khi nào cần snapshot:

  • Aggregate có > 100-200 events
  • Thời gian load aggregate vượt quá SLA (thường > 100ms)
  • Event stream dài nhưng hiếm khi cần replay toàn bộ

Thực chiến

Ví dụ 1: E-commerce Order System

Lifecycle của một đơn hàng thể hiện qua chuỗi event:

python
# --- Domain Events ---
@dataclass
class OrderCreated:
    order_id: str
    customer_id: str
    items: list
    timestamp: datetime

@dataclass
class ItemAdded:
    order_id: str
    item_id: str
    quantity: int
    unit_price: int

@dataclass
class PaymentProcessed:
    order_id: str
    payment_id: str
    amount: int
    method: str  # "credit_card", "bank_transfer"

@dataclass
class OrderShipped:
    order_id: str
    tracking_number: str
    carrier: str


# --- Aggregate ---
class OrderAggregate:
    def __init__(self, order_id):
        self.order_id = order_id
        self.status = None
        self.items = []
        self.total = 0
        self.version = 0
        self._pending_events = []

    def create(self, customer_id, items):
        if self.status is not None:
            raise InvalidOperation("Order already exists")
        self._apply(OrderCreated(
            order_id=self.order_id,
            customer_id=customer_id,
            items=items,
            timestamp=datetime.utcnow()
        ))

    def add_item(self, item_id, quantity, unit_price):
        if self.status != "CREATED":
            raise InvalidOperation("Cannot add items after payment")
        self._apply(ItemAdded(
            order_id=self.order_id,
            item_id=item_id,
            quantity=quantity,
            unit_price=unit_price
        ))

    def process_payment(self, payment_id, amount, method):
        if self.status != "CREATED":
            raise InvalidOperation("Invalid status for payment")
        if amount < self.total:
            raise InvalidOperation("Insufficient payment amount")
        self._apply(PaymentProcessed(
            order_id=self.order_id,
            payment_id=payment_id,
            amount=amount,
            method=method
        ))

    # --- Event Handlers (state mutation) ---
    def _on_order_created(self, event):
        self.status = "CREATED"
        self.items = event.items

    def _on_item_added(self, event):
        self.items.append({"item_id": event.item_id, "qty": event.quantity})
        self.total += event.quantity * event.unit_price

    def _on_payment_processed(self, event):
        self.status = "PAID"

    def _on_order_shipped(self, event):
        self.status = "SHIPPED"

    def _apply(self, event):
        handler_name = f"_on_{self._to_snake_case(type(event).__name__)}"
        getattr(self, handler_name)(event)
        self.version += 1
        self._pending_events.append(event)

Ví dụ 2: Bank Account — Audit & Temporal Query

python
class BankAccountAuditService:
    """Replay event stream để audit tại bất kỳ thời điểm."""

    def get_balance_at(self, account_id, target_time):
        """Temporal query: số dư tại thời điểm target_time."""
        events = event_store.get_events(account_id)
        balance = 0
        for event in events:
            if event.timestamp > target_time:
                break  # Dừng replay tại thời điểm cần
            if event.event_type == "MoneyDeposited":
                balance += event.payload["amount"]
            elif event.event_type == "MoneyWithdrawn":
                balance -= event.payload["amount"]
        return balance

    def get_audit_trail(self, account_id, start_time, end_time):
        """Audit trail: mọi thay đổi trong khoảng thời gian."""
        events = event_store.get_events(account_id)
        return [
            {
                "time": e.timestamp,
                "action": e.event_type,
                "amount": e.payload.get("amount"),
                "user": e.metadata.get("userId"),
                "correlation": e.metadata.get("correlationId"),
            }
            for e in events
            if start_time <= e.timestamp <= end_time
        ]

Kết quả audit trail:

Thời gian              | Hành động        | Số tiền     | User
2024-01-10 09:00:00    | AccountOpened     | +1,000,000  | admin-001
2024-01-12 14:30:00    | MoneyDeposited    | +500,000    | user-456
2024-01-13 08:15:00    | MoneyWithdrawn    | -300,000    | user-456
2024-01-15 10:30:00    | MoneyDeposited    | +200,000    | user-456

→ Với CRUD, bạn chỉ thấy balance = 1,400,000. Với Event Sourcing, bạn thấy toàn bộ hành trình.

Sai lầm điển hình

❌ Sai lầm 1: Áp dụng Event Sourcing cho mọi service

Sai: "Event Sourcing tuyệt vời, áp dụng cho toàn bộ hệ thống!"

Đúng: Chỉ dùng Event Sourcing cho domain phức tạp cần audit trail, temporal query, hoặc event replay. Một service CRUD đơn giản (quản lý user profile, config) không cần — overhead không đáng.

Nguyên tắc: Event Sourcing phù hợp cho core domain (tài chính, đơn hàng, inventory). Các supporting domain dùng CRUD bình thường.

❌ Sai lầm 2: Thay đổi event schema mà không có versioning strategy

Sai: Rename field amounttotalAmount trong event PaymentProcessed. Mọi projection cũ đều vỡ.

Đúng: Dùng upcasting — transform event cũ sang schema mới khi đọc, không sửa event trong store:

python
def upcast_payment_processed_v1_to_v2(event):
    """Transform v1 event sang v2 format khi đọc."""
    if event.schema_version == 1:
        event.payload["totalAmount"] = event.payload.pop("amount")
        event.schema_version = 2
    return event

❌ Sai lầm 3: Projection rebuild quá chậm vì thiếu snapshots

Sai: Aggregate có 50,000 events, mỗi lần load phải replay từ event #1. Response time > 5 giây.

Đúng: Tạo snapshot định kỳ (mỗi 100-500 events). Load snapshot gần nhất → chỉ replay vài chục event mới. Response time < 100ms.

❌ Sai lầm 4: Query read model ngay sau command và mong nhận kết quả mới

Sai:

POST /orders (create order)   → 200 OK
GET  /orders/123              → 404 Not Found  ???

Đúng: Read model cập nhật asynchronous (eventual consistency). Client cần:

  1. Dùng command ID để poll trạng thái
  2. Dùng WebSocket/SSE để nhận notification khi projection cập nhật xong
  3. Hoặc trả về write model response trực tiếp từ command handler (cho những trường hợp cần response ngay)

Under the Hood

Storage & Performance

Event store growth: Events tăng tuyến tính theo thời gian. Một hệ thống e-commerce trung bình:

  • 1,000 đơn hàng/ngày × 5 events/đơn = 5,000 events/ngày
  • 1 event ≈ 500 bytes → ~2.5 MB/ngày~900 MB/năm
  • Hoàn toàn khả thi cho storage hiện đại

Projection rebuild time (benchmark tham khảo):

Số eventsKhông snapshotCó snapshot (mỗi 100)
100~5ms~5ms
1,000~50ms~8ms
10,000~500ms~12ms
100,000~5s~15ms

Consistency Guarantees

┌─────────────────────────────────────────────────────────────┐
│  Write Side (Command)          │  Read Side (Query)         │
│  ─────────────────────         │  ────────────────          │
│  ✅ Strong Consistency          │  ⚡ Eventual Consistency   │
│  - Event Store là source of    │  - Projection cập nhật     │
│    truth                       │    async sau khi event     │
│  - Optimistic concurrency      │    được publish            │
│    đảm bảo write integrity     │  - Độ trễ thường < 100ms   │
│  - Mỗi aggregate stream       │    trong điều kiện bình    │
│    là strongly consistent      │    thường                  │
└─────────────────────────────────────────────────────────────┘

Trade-offs: Event Sourcing vs CRUD

Tiêu chíCRUD truyền thốngEvent Sourcing + CQRS
Complexity⭐ Thấp⭐⭐⭐⭐ Cao
Audit trail❌ Cần thêm audit table✅ Built-in
Temporal query❌ Không thể✅ Replay đến bất kỳ thời điểm
Debug production😰 Khó — chỉ thấy state cuối😊 Dễ — replay event để tái hiện bug
StorageÍt hơn (chỉ state cuối)Nhiều hơn (toàn bộ history)
Schema evolutionALTER TABLE (risky)Event versioning + upcasting
Read performanceTruy vấn trực tiếpCần projection (nhưng optimized cho read)
Undo/CompensateTự implementTự nhiên — emit compensating event
Team learning curveThấpCao — cần hiểu DDD, event modeling

Thực tế production

Nhiều hệ thống dùng hybrid approach: Event Sourcing cho core domain (orders, payments, inventory) và CRUD cho supporting domain (user settings, notifications, CMS). Đừng ép mọi thứ vào một pattern.

Checklist ghi nhớ

✅ Checklist triển khai

Event Store

  • [ ] Events là immutable — KHÔNG BAO GIỜ update hay delete event
  • [ ] Mỗi event có eventId, aggregateId, version, timestamp, payload, metadata
  • [ ] Dùng optimistic concurrency (expected version) để tránh write conflict
  • [ ] Event schema có version number, dùng upcasting cho evolution

Projections & Read Models

  • [ ] Projection có thể rebuild từ scratch bất cứ lúc nào
  • [ ] Mỗi read model tối ưu cho use case cụ thể (không dùng chung)
  • [ ] Chấp nhận eventual consistency — read model có thể trễ vài milliseconds

CQRS Architecture

  • [ ] Command side validate business rules trước khi emit event
  • [ ] Query side KHÔNG chứa business logic — chỉ đọc từ read model
  • [ ] Command trả về acknowledgment, không trả về read model data

Performance & Operations

  • [ ] Snapshot strategy cho aggregate có > 100 events
  • [ ] Monitor projection lag (thời gian từ event → read model cập nhật)
  • [ ] Chỉ dùng Event Sourcing cho domain thực sự cần audit trail hoặc temporal query

Bài tập luyện tập

Bài 1: Khi nào dùng Event Sourcing? — Foundation

🧠 Quiz

Câu hỏi: Team bạn đang thiết kế hệ thống mới. Service nào phù hợp nhất để áp dụng Event Sourcing?

  • [ ] A. User Profile Service — lưu tên, email, avatar
  • [ ] B. Configuration Service — lưu feature flags và app settings
  • [x] C. Financial Transaction Service — quản lý chuyển tiền và audit compliance
  • [ ] D. Notification Service — gửi email và push notification

Giải thích: Financial Transaction Service cần complete audit trail (ai chuyển tiền cho ai, bao nhiêu, khi nào), temporal query (số dư tại thời điểm X), và khả năng replay events để kiểm tra giao dịch bất thường. Các service khác là CRUD đơn giản — Event Sourcing tạo ra overhead không cần thiết.

Bài 2: Lợi ích của CQRS — Foundation

🧠 Quiz

Câu hỏi: Hệ thống e-commerce có tỷ lệ read:write = 100:1 (đọc nhiều gấp 100 lần ghi). Lợi ích lớn nhất khi áp dụng CQRS là gì?

  • [ ] A. Giảm complexity của codebase
  • [x] B. Scale read và write side độc lập — tối ưu read model cho query patterns cụ thể
  • [ ] C. Đảm bảo strong consistency cho mọi query
  • [ ] D. Loại bỏ hoàn toàn eventual consistency

Giải thích: CQRS cho phép scale read side riêng (thêm read replicas, cache, denormalize) mà không ảnh hưởng write side. Read model được tối ưu cho query patterns cụ thể (denormalized, pre-computed). Lưu ý: CQRS tăng complexity và read model vẫn có eventual consistency.

Bài 3: Thiết kế architecture — Scenario

🧠 Quiz — Scenario — Chọn architecture phù hợp

Bối cảnh: Bạn đang thiết kế hệ thống quản lý đơn hàng cho sàn thương mại điện tử với yêu cầu:

  • 10,000 đơn hàng/ngày, mỗi đơn trải qua 5-8 trạng thái
  • Compliance yêu cầu audit trail đầy đủ, giữ data 7 năm
  • Cần khả năng replay để debug dispute giữa buyer và seller
  • Dashboard real-time hiển thị metrics cho operations team

Câu hỏi: Architecture nào phù hợp nhất?

  • [ ] A. Monolith + PostgreSQL — đơn giản, dùng triggers để log thay đổi
  • [ ] B. Microservices + CRUD — mỗi service có database riêng
  • [x] C. Event Sourcing + CQRS cho Order Service — CRUD cho các service phụ trợ (user, notification)
  • [ ] D. Event Sourcing cho toàn bộ hệ thống — consistency tốt hơn

Giải thích: Order Service là core domain với yêu cầu audit trail 7 năm, temporal query (replay dispute), và nhiều state transitions → Event Sourcing + CQRS là lựa chọn tự nhiên. Nhưng áp dụng cho toàn bộ hệ thống (đáp án D) là over-engineering — user service, notification service dùng CRUD là đủ. Đáp án A không scale và triggers-based audit thiếu reliability. Đáp án B thiếu audit trail native.

Liên kết học tiếp