Skip to content

Compare-And-Swap — Lập trình không khóa

Năm 2014, đội ngũ kỹ sư của Discord gặp vấn đề: hệ thống chat với hàng triệu user đồng thời cần counter thống kê message — nhưng mutex lock trở thành bottleneck khi 10,000 thread tranh nhau ghi vào cùng một biến. Giải pháp: thay mutex bằng atomic counter dựa trên CAS — throughput tăng 10 lần, latency giảm 90%.

Compare-And-Swap (CAS) là phép toán nguyên tử (atomic) cấp phần cứng: "Nếu giá trị hiện tại đúng như tôi nghĩ, thì cập nhật. Nếu sai, nghĩa là ai đó đã thay đổi — tôi đọc lại và thử lại." Không cần khóa, không thread nào bị block. Đây là nền tảng của mọi lock-free data structure: AtomicInteger trong Java, std::atomic trong C++, sync/atomic trong Go.

Nhưng CAS không phải viên đạn bạc. High contention (nhiều thread tranh nhau) khiến CAS retry liên tục — tệ hơn cả mutex. Hiểu khi nào dùng CAS, khi nào dùng lock, và tránh bẫy ABA problem là kỹ năng bắt buộc cho senior engineer làm việc với hệ thống concurrent.

Bức tranh tư duy

Hình dung bạn đang ở quầy bán vé xe khách Miền Đông — mô hình cũ (Pessimistic Lock): mỗi người phải lấy số thứ tự, đợi gọi tên, chỉ 1 người mua tại 1 thời điểm. Hàng dài, ai cũng phải đợi dù quầy trống.

Mô hình mới (CAS — Optimistic Lock): bảng điện tử hiển thị "Còn 50 vé". Bạn đọc bảng (50 vé), quyết định mua 1. Bạn đến quầy nói: "Tôi thấy còn 50, tôi mua 1, cập nhật thành 49." Nếu giữa lúc bạn đọc và đến quầy, ai đó mua trước (bảng hiện 49) → quầy từ chối → bạn đọc lại bảng (49) và thử lại: "Tôi thấy 49, cập nhật 48."

text
Pessimistic (Mutex):
Thread A:  LOCK ────── [cập nhật] ────── UNLOCK
Thread B:       CHỜ ──────────────────────── LOCK ── [cập nhật] ── UNLOCK
                 ↑ blocked, không làm gì

Optimistic (CAS):
Thread A:  đọc(10) ─── CAS(10→11) ✅
Thread B:  đọc(10) ─── CAS(10→11) ❌ conflict!
Thread B:  đọc(11) ─── CAS(11→12) ✅  ← retry thành công
                 ↑ không bị block, vẫn đang chạy

Ưu điểm: không thread nào bị "ngồi chơi". Nhược điểm: nếu 100 người cùng đến quầy, 99 người phải đọc lại bảng và thử lại — overhead retry.

📌 Khi nào analogy không còn chính xác

Quầy vé có 1 nhân viên xử lý tuần tự. CAS thực sự là phép toán phần cứng (CPU instruction: CMPXCHG trên x86) chạy trong 1 CPU cycle — không cần "nhân viên quầy". Tốc độ CAS nhanh hơn mutex vì không cần system call (context switch vào kernel).

Cốt lõi kỹ thuật

CAS — Nguyên lý hoạt động

CAS nhận 3 tham số: địa chỉ bộ nhớ, giá trị kỳ vọng (expected), giá trị mới (new). Phép toán chạy nguyên tử (atomic — không bị ngắt giữa chừng):

text
CAS(address, expected, new):
    if *address == expected:
        *address = new
        return true   // Thành công
    else:
        return false  // Conflict — giá trị đã bị thay đổi

CAS Loop — Pattern chuẩn

Mọi ứng dụng CAS đều theo pattern: đọc → tính toán → CAS → retry nếu thất bại.

cpp
#include <atomic>

std::atomic<int> counter{0};

void atomicIncrement() {
    int expected = counter.load();
    while (!counter.compare_exchange_weak(expected, expected + 1)) {
        // expected tự động được cập nhật thành giá trị hiện tại
        // retry với giá trị mới
    }
}
python
# Python không có native CAS — mô phỏng bằng Lock mỏng
from threading import Lock

class AtomicInteger:
    def __init__(self, value: int = 0) -> None:
        self._value = value
        self._lock = Lock()

    def compare_and_set(self, expected: int, new_value: int) -> bool:
        with self._lock:
            if self._value == expected:
                self._value = new_value
                return True
            return False

    def increment(self) -> int:
        while True:
            current = self._value
            if self.compare_and_set(current, current + 1):
                return current + 1
java
import java.util.concurrent.atomic.AtomicInteger;

AtomicInteger counter = new AtomicInteger(0);

public void atomicIncrement() {
    int expected;
    do {
        expected = counter.get();
    } while (!counter.compareAndSet(expected, expected + 1));
}

// Hoặc gọn hơn:
counter.incrementAndGet();  // Bên trong dùng CAS loop
go
import "sync/atomic"

var counter int64 = 0

func atomicIncrement() {
    for {
        current := atomic.LoadInt64(&counter)
        if atomic.CompareAndSwapInt64(&counter, current, current+1) {
            break
        }
    }
}

// Hoặc gọn hơn:
atomic.AddInt64(&counter, 1)

ABA Problem — Bẫy ẩn của CAS

CAS chỉ kiểm tra giá trị, không kiểm tra lịch sử thay đổi. Nếu giá trị thay đổi A → B → A, CAS vẫn thấy "A" và coi là không ai thay đổi.

text
Thread 1: Đọc giá trị = A
Thread 2: Đổi A → B
Thread 2: Đổi B → A
Thread 1: CAS(expected=A, new=C) → THÀNH CÔNG!
          → Thread 1 không biết giá trị đã bị thay đổi ở giữa

Khi nào ABA nguy hiểm? Với counter đơn thuần, ABA vô hại (kết quả vẫn đúng). Nhưng với lock-free linked list, ABA có thể khiến node bị xóa rồi được tái sử dụng, dẫn đến con trỏ trỏ vào vùng nhớ sai.

Giải pháp: Thêm version number vào mỗi CAS operation.

cpp
// std::atomic với tagged pointer (version + value)
struct Versioned {
    int value;
    int version;
};
std::atomic<Versioned> data{{0, 0}};

void safeUpdate(int newVal) {
    Versioned expected = data.load();
    Versioned desired;
    do {
        desired = {newVal, expected.version + 1};
    } while (!data.compare_exchange_weak(expected, desired));
}
python
class VersionedAtomicRef:
    """CAS với version number — tránh ABA problem."""

    def __init__(self, value, version: int = 0) -> None:
        self._value = value
        self._version = version
        self._lock = Lock()

    def get(self) -> tuple:
        return self._value, self._version

    def compare_and_set(self, expected_val, expected_ver, new_val) -> bool:
        with self._lock:
            if self._value == expected_val and self._version == expected_ver:
                self._value = new_val
                self._version += 1
                return True
            return False
java
import java.util.concurrent.atomic.AtomicStampedReference;

// Java có sẵn AtomicStampedReference để giải ABA
AtomicStampedReference<Integer> ref =
    new AtomicStampedReference<>(100, 0);

int[] stampHolder = new int[1];
int value = ref.get(stampHolder);  // value=100, stamp=0

ref.compareAndSet(100, 200, 0, 1);  // So sánh cả value VÀ stamp
go
import "sync/atomic"

// Go: dùng atomic.Value kết hợp struct có version
type Versioned struct {
    Value   int64
    Version int64
}

var state atomic.Value // stores Versioned

func safeUpdate(newVal int64) {
    for {
        current := state.Load().(Versioned)
        next := Versioned{newVal, current.Version + 1}
        // Go chưa có CAS trên struct — cần encoding hoặc unsafe
        // Production: dùng mutex cho complex state, CAS cho đơn giá trị
        state.Store(next)
        break
    }
}

Optimistic Locking trong Database

CAS pattern áp dụng trực tiếp cho database: thêm cột version vào table, UPDATE ... WHERE version = expected.

sql
-- Đọc row hiện tại
SELECT balance, version FROM accounts WHERE id = 'user_123';
-- → balance = 100, version = 5

-- Cập nhật với version check
UPDATE accounts
SET balance = 80, version = version + 1
WHERE id = 'user_123' AND version = 5;

-- Nếu affected_rows = 0 → conflict → retry

Thực chiến

Tình huống: Lock-free Counter cho Analytics Pipeline

Bối cảnh: Hệ thống analytics nhận 100,000 event/giây. Mỗi event cần increment counter theo category. 16 CPU cores xử lý song song. Mutex trên 1 counter → bottleneck.

Mục tiêu: Counter chịu được 100K ops/sec mà không dùng mutex.

java
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;

public class AnalyticsCounter {
    private final ConcurrentHashMap<String, AtomicLong> counters =
        new ConcurrentHashMap<>();

    public void increment(String category) {
        counters.computeIfAbsent(category, k -> new AtomicLong(0))
                .incrementAndGet();  // CAS loop bên trong
    }

    public long getCount(String category) {
        AtomicLong counter = counters.get(category);
        return counter != null ? counter.get() : 0;
    }

    // Batch read — snapshot tại thời điểm gọi
    public Map<String, Long> snapshot() {
        Map<String, Long> result = new HashMap<>();
        counters.forEach((key, value) -> result.put(key, value.get()));
        return result;
    }
}

Phân tích:

  • AtomicLong.incrementAndGet() dùng CAS loop nội bộ — ~500K ops/sec trên modern CPU
  • ConcurrentHashMap cho phép nhiều thread đọc/ghi khác category song song
  • Cùng category: CAS retry, nhưng xác suất conflict thấp nếu phân bố đều
  • Nếu 1 category chiếm 90% traffic → sharded counter (chia thành N sub-counter) tốt hơn

Tình huống: Optimistic Locking cho Inventory Update

Bối cảnh: E-commerce — 1000 user đặt hàng cùng lúc cho sản phẩm có 50 units tồn kho. Cần giảm inventory chính xác, không oversell.

python
import time
from dataclasses import dataclass


@dataclass
class InventoryRow:
    product_id: str
    quantity: int
    version: int


class InventoryService:
    """Optimistic locking cho inventory management."""

    MAX_RETRIES = 5
    BACKOFF_BASE = 0.01  # 10ms

    def __init__(self, db) -> None:
        self.db = db

    def reserve(self, product_id: str, amount: int) -> bool:
        for attempt in range(self.MAX_RETRIES):
            # Đọc state hiện tại
            row = self.db.query(
                "SELECT quantity, version FROM inventory WHERE id = %s",
                product_id
            )
            if row.quantity < amount:
                return False  # Hết hàng

            # CAS: cập nhật nếu version khớp
            affected = self.db.execute(
                """UPDATE inventory
                   SET quantity = quantity - %s, version = version + 1
                   WHERE id = %s AND version = %s""",
                amount, product_id, row.version
            )

            if affected > 0:
                return True  # Thành công

            # Conflict → backoff rồi retry
            time.sleep(self.BACKOFF_BASE * (2 ** attempt))

        raise TooManyRetriesError(
            f"Không thể reserve {product_id} sau {self.MAX_RETRIES} lần thử"
        )


class TooManyRetriesError(Exception):
    pass

Phân tích:

  • Exponential backoff giảm contention khi nhiều request cùng product
  • MAX_RETRIES tránh infinite loop — sau 5 lần thất bại, raise error để caller xử lý
  • version column thay vì chỉ check quantity — tránh ABA khi quantity tăng rồi giảm về cùng giá trị
  • Production: thêm FOR UPDATE hint hoặc SELECT ... FOR NO KEY UPDATE nếu contention quá cao

Sai lầm điển hình

Sai lầm 1: Dùng CAS trong high-contention scenario

Vấn đề: 100 thread cùng CAS vào 1 biến → 99 thread retry → CPU waste.

java
// SAI: Counter "nóng" — majority thread retry liên tục
AtomicLong globalCounter = new AtomicLong(0);

// 100 threads increment cùng 1 counter
// → CAS success rate < 10%, CPU burn retry

Tại sao sai: CAS hiệu quả khi conflict rate thấp (<10%). Với 100 thread tranh 1 biến, mỗi CAS attempt có ~1% thành công. 99 thread đọc lại, tính lại, CAS lại — busy loop tiêu tốn CPU nhưng không làm gì hữu ích.

java
// ĐÚNG: Sharded counter — giảm contention
LongAdder shardedCounter = new LongAdder();
// Bên trong: chia thành N cell, mỗi thread CAS vào 1 cell
// Read: tổng hợp tất cả cell

shardedCounter.increment();        // Thread-local CAS, gần như không conflict
long total = shardedCounter.sum(); // O(N) sum, chấp nhận được cho đọc

Sai lầm 2: Không xử lý ABA problem

Vấn đề: CAS chỉ so sánh giá trị, không biết giá trị đã thay đổi rồi quay lại.

python
# SAI: Chỉ compare value — bỏ qua lịch sử thay đổi
def withdraw(account, amount):
    while True:
        balance = account.get()
        if balance < amount:
            return False
        if account.cas(balance, balance - amount):  # Chỉ check value
            return True

Tại sao sai: Thread A đọc balance=100. Thread B rút 50 (100→50), Thread C nạp 50 (50→100). Thread A thấy "100 == 100" → CAS thành công. Kết quả: rút 2 lần nhưng nạp 1 lần → sai số dư. Với counter đơn thuần thì không sao, nhưng với business logic phụ thuộc vào "không ai thay đổi" thì ABA gây bug.

python
# ĐÚNG: Compare value + version
def withdraw(account, amount):
    while True:
        balance, version = account.get_versioned()
        if balance < amount:
            return False
        if account.cas_versioned(balance, version, balance - amount):
            return True

Sai lầm 3: CAS retry vô hạn, không backoff

Vấn đề: Retry loop không có giới hạn và không có backoff.

python
# SAI: Retry forever — có thể chạy mãi khi contention cao
while True:
    old = ref.get()
    if ref.cas(old, compute(old)):
        break
    # Không sleep, không giới hạn → livelock

Tại sao sai: Khi nhiều thread retry đồng thời, chúng "va chạm" liên tục (livelock). Không thread nào tiến triển. CPU chạy 100% nhưng throughput = 0.

python
# ĐÚNG: Giới hạn retry + exponential backoff
MAX_RETRIES = 10

for attempt in range(MAX_RETRIES):
    old = ref.get()
    if ref.cas(old, compute(old)):
        return SUCCESS

    # Backoff tăng dần: 1ms, 2ms, 4ms, 8ms...
    time.sleep(0.001 * (2 ** attempt))

raise TooManyRetriesError("CAS failed after max retries")

Sai lầm 4: Dùng CAS cho critical section lớn

Vấn đề: CAS cho block code dài — nếu conflict, toàn bộ computation bị lãng phí.

python
# SAI: Tính toán nặng giữa read và CAS
while True:
    data = ref.get()
    result = heavy_computation(data)  # 100ms tính toán
    if ref.cas(data, result):         # Rất dễ conflict!
        break
    # → 100ms computation bị vứt đi mỗi lần retry

Tại sao sai: Khoảng thời gian giữa read và CAS càng dài → xác suất conflict càng cao. 100ms là vĩnh cửu trong multi-threaded context.

python
# ĐÚNG: Mutex cho critical section lớn
with lock:
    data = ref.get()
    result = heavy_computation(data)
    ref.set(result)
# Hoặc: tách computation ra khỏi CAS path

Under the Hood

Hiệu năng

StrategyThroughput (low contention)Throughput (high contention)Latency
Mutex~50K ops/sec~30K ops/secỔn định, có peak khi context switch
CAS (atomic)~500K ops/sec~20K ops/sec (retry overhead)Thấp trung bình, spike khi retry
Sharded (LongAdder)~500K ops/sec~400K ops/secThấp, đều

Nội bộ triển khai

CPU level: CAS là instruction phần cứng (CMPXCHG trên x86, LL/SC trên ARM). Chạy trong 1-2 CPU cycle, không cần system call. Mutex cần system call futex() khi có contention → hàng trăm cycle.

Memory ordering: CAS trên std::atomic mặc định dùng memory_order_seq_cst (sequential consistency) — đảm bảo mọi thread thấy cùng thứ tự operations. Có thể relax xuống memory_order_acquire/release cho performance, nhưng reasoning khó hơn nhiều.

Cache line bouncing: Khi nhiều core CAS cùng 1 biến, cache line chứa biến đó bị "bắn" qua lại giữa L1 cache của các core (MESI protocol). Đây là nguyên nhân chính khiến CAS chậm khi high contention — không phải retry logic, mà là cache coherence traffic.

Java LongAdder: Giải quyết cache bouncing bằng striping — mỗi thread CAS vào cell riêng (khác cache line). sum() tổng hợp tất cả cell. Write O(1), read O(N cells). Trade-off: read chậm hơn nhưng write nhanh hơn nhiều.

Trade-offs

Khi NÊN dùng CASKhi KHÔNG NÊN dùng CAS
Counter, flag, simple state updateCritical section lớn (>microsecond)
Low contention (<10% conflict rate)High contention (>50% conflict rate)
Read-heavy, write-light workloadWrite-heavy vào cùng 1 resource
Latency-sensitive (không chấp nhận block)Correctness phụ thuộc vào "không ai thay đổi" (→ version)
Lock-free queue, stack đơn giảnComplex multi-step transaction (→ lock)

Checklist ghi nhớ

✅ Checklist triển khai

Chọn strategy

  • [ ] Contention thấp (<10% conflict) → CAS / atomic operations
  • [ ] Contention cao hoặc critical section lớn → Mutex / Read-Write Lock
  • [ ] Counter "nóng" nhiều thread → Sharded counter (LongAdder / striping)
  • [ ] Database concurrent update → Optimistic locking (version column)

Triển khai CAS

  • [ ] Pattern: read → compute → CAS → retry nếu thất bại
  • [ ] Giới hạn retry (MAX_RETRIES) + exponential backoff
  • [ ] Đánh giá ABA risk: nếu business logic phụ thuộc lịch sử → dùng version
  • [ ] Giữ khoảng cách read-CAS ngắn nhất có thể

Production

  • [ ] Monitoring: đếm số retry trung bình — nếu >3 → contention quá cao
  • [ ] Java: AtomicInteger cho counter, AtomicStampedReference cho tránh ABA
  • [ ] C++: std::atomic + compare_exchange_weak (cho CAS loop)
  • [ ] Database: UPDATE ... WHERE version = $1, check affected_rows > 0

Bài tập luyện tập

Bài 1: Atomic Counter đa luồng — Foundation

Đề bài: Viết AtomicCounter dùng CAS pattern. Chạy 10 thread, mỗi thread increment 1000 lần. Kết quả cuối phải đúng 10,000.

🧠 Quiz

Câu hỏi kiểm tra: Tại sao không dùng counter += 1 mà cần CAS?

  • [ ] A. counter += 1 chạy chậm hơn CAS
  • [x] B. counter += 1 không phải atomic — gồm 3 bước (read, increment, write) có thể bị xen kẽ bởi thread khác
  • [ ] C. Python tự động thread-safe nên không cần CAS
  • [ ] D. CAS chỉ cần thiết khi có hơn 100 thread Giải thích: counter += 1 = temp = countertemp = temp + 1counter = temp. Nếu 2 thread đọc cùng giá trị 10, cả hai ghi 11 → mất 1 increment. CAS đảm bảo: nếu giá trị đã thay đổi giữa read và write → retry.
💡 Gợi ý
  • Dùng Lock mỏng để mô phỏng CAS trong Python
  • CAS loop: while not cas(expected, new): expected = read()
  • Test: 10 threads × 1000 increments = 10,000
✅ Lời giải
python
import threading

class AtomicCounter:
    def __init__(self) -> None:
        self._value = 0
        self._lock = threading.Lock()

    def _cas(self, expected: int, new_value: int) -> bool:
        with self._lock:
            if self._value == expected:
                self._value = new_value
                return True
            return False

    def increment(self) -> None:
        while True:
            current = self._value
            if self._cas(current, current + 1):
                return

    def get(self) -> int:
        return self._value

# Test
counter = AtomicCounter()
threads = []
for _ in range(10):
    t = threading.Thread(target=lambda: [counter.increment() for _ in range(1000)])
    threads.append(t)
    t.start()
for t in threads:
    t.join()

assert counter.get() == 10000, f"Expected 10000, got {counter.get()}"

Phân tích: Time O(1) amortized mỗi increment (retry hiếm khi contention thấp). Space O(1). Python Lock ở đây mô phỏng CAS hardware instruction.

Bài 2: Versioned Database Row — Intermediate

Đề bài: Viết class VersionedRow với update(new_data, expected_version). Nếu version không khớp → raise OptimisticLockError. Mô phỏng: User A và User B đọc cùng row, B update trước → A phải thất bại.

🧠 Quiz

Câu hỏi kiểm tra: Trong database thực tế, câu SQL nào triển khai CAS pattern?

  • [ ] A. UPDATE t SET data = 'new' WHERE id = 1
  • [ ] B. UPDATE t SET data = 'new' WHERE id = 1 FOR UPDATE
  • [x] C. UPDATE t SET data = 'new', version = version + 1 WHERE id = 1 AND version = 5
  • [ ] D. INSERT INTO t (id, data) VALUES (1, 'new') ON CONFLICT DO UPDATEGiải thích: WHERE version = 5 là phần "Compare" — chỉ update nếu version đúng. version = version + 1 đảm bảo lần update tiếp theo phải dùng version mới. affected_rows = 0 nghĩa là CAS thất bại → retry.
💡 Gợi ý
  • update() kiểm tra self.version == expected_version
  • Nếu khớp: cập nhật data + tăng version
  • Nếu không: raise exception
✅ Lời giải
python
class OptimisticLockError(Exception):
    pass

class VersionedRow:
    def __init__(self, row_id: str, data: dict) -> None:
        self.id = row_id
        self.data = data
        self.version = 1

    def read(self) -> tuple[dict, int]:
        return dict(self.data), self.version

    def update(self, new_data: dict, expected_version: int) -> None:
        if self.version != expected_version:
            raise OptimisticLockError(
                f"Version conflict: expected {expected_version}, "
                f"current {self.version}"
            )
        self.data.update(new_data)
        self.version += 1

# Test scenario
row = VersionedRow("product_1", {"price": 100, "stock": 50})

# User A đọc
data_a, ver_a = row.read()

# User B đọc cùng thời điểm
data_b, ver_b = row.read()

# User B update trước — thành công
row.update({"price": 120}, expected_version=ver_b)

# User A update sau — thất bại (version đã tăng)
try:
    row.update({"stock": 45}, expected_version=ver_a)
    assert False, "Phải raise error"
except OptimisticLockError as e:
    print(f"Đúng: {e}")
    # User A cần read lại và retry
    data_a, ver_a = row.read()
    row.update({"stock": 45}, expected_version=ver_a)

Phân tích: Đây chính là pattern @Version trong JPA/Hibernate, updateOne({version: expected}) trong MongoDB. Production cần wrap trong retry loop với backoff.

Liên kết học tiếp

Từ khóa glossary: compare-and-swap, CAS, lock-free, atomic operation, optimistic locking, ABA problem, CMPXCHG, memory ordering, cache line bouncing, sharded counter, LongAdder

Tìm kiếm liên quan: lập trình không khóa, atomic counter Java, optimistic locking database, CAS vs mutex, ABA problem giải pháp, concurrent data structure