Giao diện
Compare-And-Swap — Lập trình không khóa
Năm 2014, đội ngũ kỹ sư của Discord gặp vấn đề: hệ thống chat với hàng triệu user đồng thời cần counter thống kê message — nhưng mutex lock trở thành bottleneck khi 10,000 thread tranh nhau ghi vào cùng một biến. Giải pháp: thay mutex bằng atomic counter dựa trên CAS — throughput tăng 10 lần, latency giảm 90%.
Compare-And-Swap (CAS) là phép toán nguyên tử (atomic) cấp phần cứng: "Nếu giá trị hiện tại đúng như tôi nghĩ, thì cập nhật. Nếu sai, nghĩa là ai đó đã thay đổi — tôi đọc lại và thử lại." Không cần khóa, không thread nào bị block. Đây là nền tảng của mọi lock-free data structure: AtomicInteger trong Java, std::atomic trong C++, sync/atomic trong Go.
Nhưng CAS không phải viên đạn bạc. High contention (nhiều thread tranh nhau) khiến CAS retry liên tục — tệ hơn cả mutex. Hiểu khi nào dùng CAS, khi nào dùng lock, và tránh bẫy ABA problem là kỹ năng bắt buộc cho senior engineer làm việc với hệ thống concurrent.
Bức tranh tư duy
Hình dung bạn đang ở quầy bán vé xe khách Miền Đông — mô hình cũ (Pessimistic Lock): mỗi người phải lấy số thứ tự, đợi gọi tên, chỉ 1 người mua tại 1 thời điểm. Hàng dài, ai cũng phải đợi dù quầy trống.
Mô hình mới (CAS — Optimistic Lock): bảng điện tử hiển thị "Còn 50 vé". Bạn đọc bảng (50 vé), quyết định mua 1. Bạn đến quầy nói: "Tôi thấy còn 50, tôi mua 1, cập nhật thành 49." Nếu giữa lúc bạn đọc và đến quầy, ai đó mua trước (bảng hiện 49) → quầy từ chối → bạn đọc lại bảng (49) và thử lại: "Tôi thấy 49, cập nhật 48."
text
Pessimistic (Mutex):
Thread A: LOCK ────── [cập nhật] ────── UNLOCK
Thread B: CHỜ ──────────────────────── LOCK ── [cập nhật] ── UNLOCK
↑ blocked, không làm gì
Optimistic (CAS):
Thread A: đọc(10) ─── CAS(10→11) ✅
Thread B: đọc(10) ─── CAS(10→11) ❌ conflict!
Thread B: đọc(11) ─── CAS(11→12) ✅ ← retry thành công
↑ không bị block, vẫn đang chạyƯu điểm: không thread nào bị "ngồi chơi". Nhược điểm: nếu 100 người cùng đến quầy, 99 người phải đọc lại bảng và thử lại — overhead retry.
📌 Khi nào analogy không còn chính xác
Quầy vé có 1 nhân viên xử lý tuần tự. CAS thực sự là phép toán phần cứng (CPU instruction: CMPXCHG trên x86) chạy trong 1 CPU cycle — không cần "nhân viên quầy". Tốc độ CAS nhanh hơn mutex vì không cần system call (context switch vào kernel).
Cốt lõi kỹ thuật
CAS — Nguyên lý hoạt động
CAS nhận 3 tham số: địa chỉ bộ nhớ, giá trị kỳ vọng (expected), giá trị mới (new). Phép toán chạy nguyên tử (atomic — không bị ngắt giữa chừng):
text
CAS(address, expected, new):
if *address == expected:
*address = new
return true // Thành công
else:
return false // Conflict — giá trị đã bị thay đổiCAS Loop — Pattern chuẩn
Mọi ứng dụng CAS đều theo pattern: đọc → tính toán → CAS → retry nếu thất bại.
cpp
#include <atomic>
std::atomic<int> counter{0};
void atomicIncrement() {
int expected = counter.load();
while (!counter.compare_exchange_weak(expected, expected + 1)) {
// expected tự động được cập nhật thành giá trị hiện tại
// retry với giá trị mới
}
}python
# Python không có native CAS — mô phỏng bằng Lock mỏng
from threading import Lock
class AtomicInteger:
def __init__(self, value: int = 0) -> None:
self._value = value
self._lock = Lock()
def compare_and_set(self, expected: int, new_value: int) -> bool:
with self._lock:
if self._value == expected:
self._value = new_value
return True
return False
def increment(self) -> int:
while True:
current = self._value
if self.compare_and_set(current, current + 1):
return current + 1java
import java.util.concurrent.atomic.AtomicInteger;
AtomicInteger counter = new AtomicInteger(0);
public void atomicIncrement() {
int expected;
do {
expected = counter.get();
} while (!counter.compareAndSet(expected, expected + 1));
}
// Hoặc gọn hơn:
counter.incrementAndGet(); // Bên trong dùng CAS loopgo
import "sync/atomic"
var counter int64 = 0
func atomicIncrement() {
for {
current := atomic.LoadInt64(&counter)
if atomic.CompareAndSwapInt64(&counter, current, current+1) {
break
}
}
}
// Hoặc gọn hơn:
atomic.AddInt64(&counter, 1)ABA Problem — Bẫy ẩn của CAS
CAS chỉ kiểm tra giá trị, không kiểm tra lịch sử thay đổi. Nếu giá trị thay đổi A → B → A, CAS vẫn thấy "A" và coi là không ai thay đổi.
text
Thread 1: Đọc giá trị = A
Thread 2: Đổi A → B
Thread 2: Đổi B → A
Thread 1: CAS(expected=A, new=C) → THÀNH CÔNG!
→ Thread 1 không biết giá trị đã bị thay đổi ở giữaKhi nào ABA nguy hiểm? Với counter đơn thuần, ABA vô hại (kết quả vẫn đúng). Nhưng với lock-free linked list, ABA có thể khiến node bị xóa rồi được tái sử dụng, dẫn đến con trỏ trỏ vào vùng nhớ sai.
Giải pháp: Thêm version number vào mỗi CAS operation.
cpp
// std::atomic với tagged pointer (version + value)
struct Versioned {
int value;
int version;
};
std::atomic<Versioned> data{{0, 0}};
void safeUpdate(int newVal) {
Versioned expected = data.load();
Versioned desired;
do {
desired = {newVal, expected.version + 1};
} while (!data.compare_exchange_weak(expected, desired));
}python
class VersionedAtomicRef:
"""CAS với version number — tránh ABA problem."""
def __init__(self, value, version: int = 0) -> None:
self._value = value
self._version = version
self._lock = Lock()
def get(self) -> tuple:
return self._value, self._version
def compare_and_set(self, expected_val, expected_ver, new_val) -> bool:
with self._lock:
if self._value == expected_val and self._version == expected_ver:
self._value = new_val
self._version += 1
return True
return Falsejava
import java.util.concurrent.atomic.AtomicStampedReference;
// Java có sẵn AtomicStampedReference để giải ABA
AtomicStampedReference<Integer> ref =
new AtomicStampedReference<>(100, 0);
int[] stampHolder = new int[1];
int value = ref.get(stampHolder); // value=100, stamp=0
ref.compareAndSet(100, 200, 0, 1); // So sánh cả value VÀ stampgo
import "sync/atomic"
// Go: dùng atomic.Value kết hợp struct có version
type Versioned struct {
Value int64
Version int64
}
var state atomic.Value // stores Versioned
func safeUpdate(newVal int64) {
for {
current := state.Load().(Versioned)
next := Versioned{newVal, current.Version + 1}
// Go chưa có CAS trên struct — cần encoding hoặc unsafe
// Production: dùng mutex cho complex state, CAS cho đơn giá trị
state.Store(next)
break
}
}Optimistic Locking trong Database
CAS pattern áp dụng trực tiếp cho database: thêm cột version vào table, UPDATE ... WHERE version = expected.
sql
-- Đọc row hiện tại
SELECT balance, version FROM accounts WHERE id = 'user_123';
-- → balance = 100, version = 5
-- Cập nhật với version check
UPDATE accounts
SET balance = 80, version = version + 1
WHERE id = 'user_123' AND version = 5;
-- Nếu affected_rows = 0 → conflict → retryThực chiến
Tình huống: Lock-free Counter cho Analytics Pipeline
Bối cảnh: Hệ thống analytics nhận 100,000 event/giây. Mỗi event cần increment counter theo category. 16 CPU cores xử lý song song. Mutex trên 1 counter → bottleneck.
Mục tiêu: Counter chịu được 100K ops/sec mà không dùng mutex.
java
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
public class AnalyticsCounter {
private final ConcurrentHashMap<String, AtomicLong> counters =
new ConcurrentHashMap<>();
public void increment(String category) {
counters.computeIfAbsent(category, k -> new AtomicLong(0))
.incrementAndGet(); // CAS loop bên trong
}
public long getCount(String category) {
AtomicLong counter = counters.get(category);
return counter != null ? counter.get() : 0;
}
// Batch read — snapshot tại thời điểm gọi
public Map<String, Long> snapshot() {
Map<String, Long> result = new HashMap<>();
counters.forEach((key, value) -> result.put(key, value.get()));
return result;
}
}Phân tích:
AtomicLong.incrementAndGet()dùng CAS loop nội bộ — ~500K ops/sec trên modern CPUConcurrentHashMapcho phép nhiều thread đọc/ghi khác category song song- Cùng category: CAS retry, nhưng xác suất conflict thấp nếu phân bố đều
- Nếu 1 category chiếm 90% traffic → sharded counter (chia thành N sub-counter) tốt hơn
Tình huống: Optimistic Locking cho Inventory Update
Bối cảnh: E-commerce — 1000 user đặt hàng cùng lúc cho sản phẩm có 50 units tồn kho. Cần giảm inventory chính xác, không oversell.
python
import time
from dataclasses import dataclass
@dataclass
class InventoryRow:
product_id: str
quantity: int
version: int
class InventoryService:
"""Optimistic locking cho inventory management."""
MAX_RETRIES = 5
BACKOFF_BASE = 0.01 # 10ms
def __init__(self, db) -> None:
self.db = db
def reserve(self, product_id: str, amount: int) -> bool:
for attempt in range(self.MAX_RETRIES):
# Đọc state hiện tại
row = self.db.query(
"SELECT quantity, version FROM inventory WHERE id = %s",
product_id
)
if row.quantity < amount:
return False # Hết hàng
# CAS: cập nhật nếu version khớp
affected = self.db.execute(
"""UPDATE inventory
SET quantity = quantity - %s, version = version + 1
WHERE id = %s AND version = %s""",
amount, product_id, row.version
)
if affected > 0:
return True # Thành công
# Conflict → backoff rồi retry
time.sleep(self.BACKOFF_BASE * (2 ** attempt))
raise TooManyRetriesError(
f"Không thể reserve {product_id} sau {self.MAX_RETRIES} lần thử"
)
class TooManyRetriesError(Exception):
passPhân tích:
- Exponential backoff giảm contention khi nhiều request cùng product
MAX_RETRIEStránh infinite loop — sau 5 lần thất bại, raise error để caller xử lýversioncolumn thay vì chỉ checkquantity— tránh ABA khi quantity tăng rồi giảm về cùng giá trị- Production: thêm
FOR UPDATEhint hoặcSELECT ... FOR NO KEY UPDATEnếu contention quá cao
Sai lầm điển hình
❌ Sai lầm 1: Dùng CAS trong high-contention scenario
Vấn đề: 100 thread cùng CAS vào 1 biến → 99 thread retry → CPU waste.
java
// SAI: Counter "nóng" — majority thread retry liên tục
AtomicLong globalCounter = new AtomicLong(0);
// 100 threads increment cùng 1 counter
// → CAS success rate < 10%, CPU burn retryTại sao sai: CAS hiệu quả khi conflict rate thấp (<10%). Với 100 thread tranh 1 biến, mỗi CAS attempt có ~1% thành công. 99 thread đọc lại, tính lại, CAS lại — busy loop tiêu tốn CPU nhưng không làm gì hữu ích.
java
// ĐÚNG: Sharded counter — giảm contention
LongAdder shardedCounter = new LongAdder();
// Bên trong: chia thành N cell, mỗi thread CAS vào 1 cell
// Read: tổng hợp tất cả cell
shardedCounter.increment(); // Thread-local CAS, gần như không conflict
long total = shardedCounter.sum(); // O(N) sum, chấp nhận được cho đọc❌ Sai lầm 2: Không xử lý ABA problem
Vấn đề: CAS chỉ so sánh giá trị, không biết giá trị đã thay đổi rồi quay lại.
python
# SAI: Chỉ compare value — bỏ qua lịch sử thay đổi
def withdraw(account, amount):
while True:
balance = account.get()
if balance < amount:
return False
if account.cas(balance, balance - amount): # Chỉ check value
return TrueTại sao sai: Thread A đọc balance=100. Thread B rút 50 (100→50), Thread C nạp 50 (50→100). Thread A thấy "100 == 100" → CAS thành công. Kết quả: rút 2 lần nhưng nạp 1 lần → sai số dư. Với counter đơn thuần thì không sao, nhưng với business logic phụ thuộc vào "không ai thay đổi" thì ABA gây bug.
python
# ĐÚNG: Compare value + version
def withdraw(account, amount):
while True:
balance, version = account.get_versioned()
if balance < amount:
return False
if account.cas_versioned(balance, version, balance - amount):
return True❌ Sai lầm 3: CAS retry vô hạn, không backoff
Vấn đề: Retry loop không có giới hạn và không có backoff.
python
# SAI: Retry forever — có thể chạy mãi khi contention cao
while True:
old = ref.get()
if ref.cas(old, compute(old)):
break
# Không sleep, không giới hạn → livelockTại sao sai: Khi nhiều thread retry đồng thời, chúng "va chạm" liên tục (livelock). Không thread nào tiến triển. CPU chạy 100% nhưng throughput = 0.
python
# ĐÚNG: Giới hạn retry + exponential backoff
MAX_RETRIES = 10
for attempt in range(MAX_RETRIES):
old = ref.get()
if ref.cas(old, compute(old)):
return SUCCESS
# Backoff tăng dần: 1ms, 2ms, 4ms, 8ms...
time.sleep(0.001 * (2 ** attempt))
raise TooManyRetriesError("CAS failed after max retries")❌ Sai lầm 4: Dùng CAS cho critical section lớn
Vấn đề: CAS cho block code dài — nếu conflict, toàn bộ computation bị lãng phí.
python
# SAI: Tính toán nặng giữa read và CAS
while True:
data = ref.get()
result = heavy_computation(data) # 100ms tính toán
if ref.cas(data, result): # Rất dễ conflict!
break
# → 100ms computation bị vứt đi mỗi lần retryTại sao sai: Khoảng thời gian giữa read và CAS càng dài → xác suất conflict càng cao. 100ms là vĩnh cửu trong multi-threaded context.
python
# ĐÚNG: Mutex cho critical section lớn
with lock:
data = ref.get()
result = heavy_computation(data)
ref.set(result)
# Hoặc: tách computation ra khỏi CAS pathUnder the Hood
Hiệu năng
| Strategy | Throughput (low contention) | Throughput (high contention) | Latency |
|---|---|---|---|
| Mutex | ~50K ops/sec | ~30K ops/sec | Ổn định, có peak khi context switch |
| CAS (atomic) | ~500K ops/sec | ~20K ops/sec (retry overhead) | Thấp trung bình, spike khi retry |
| Sharded (LongAdder) | ~500K ops/sec | ~400K ops/sec | Thấp, đều |
Nội bộ triển khai
CPU level: CAS là instruction phần cứng (CMPXCHG trên x86, LL/SC trên ARM). Chạy trong 1-2 CPU cycle, không cần system call. Mutex cần system call futex() khi có contention → hàng trăm cycle.
Memory ordering: CAS trên std::atomic mặc định dùng memory_order_seq_cst (sequential consistency) — đảm bảo mọi thread thấy cùng thứ tự operations. Có thể relax xuống memory_order_acquire/release cho performance, nhưng reasoning khó hơn nhiều.
Cache line bouncing: Khi nhiều core CAS cùng 1 biến, cache line chứa biến đó bị "bắn" qua lại giữa L1 cache của các core (MESI protocol). Đây là nguyên nhân chính khiến CAS chậm khi high contention — không phải retry logic, mà là cache coherence traffic.
Java LongAdder: Giải quyết cache bouncing bằng striping — mỗi thread CAS vào cell riêng (khác cache line). sum() tổng hợp tất cả cell. Write O(1), read O(N cells). Trade-off: read chậm hơn nhưng write nhanh hơn nhiều.
Trade-offs
| Khi NÊN dùng CAS | Khi KHÔNG NÊN dùng CAS |
|---|---|
| Counter, flag, simple state update | Critical section lớn (>microsecond) |
| Low contention (<10% conflict rate) | High contention (>50% conflict rate) |
| Read-heavy, write-light workload | Write-heavy vào cùng 1 resource |
| Latency-sensitive (không chấp nhận block) | Correctness phụ thuộc vào "không ai thay đổi" (→ version) |
| Lock-free queue, stack đơn giản | Complex multi-step transaction (→ lock) |
Checklist ghi nhớ
✅ Checklist triển khai
Chọn strategy
- [ ] Contention thấp (<10% conflict) → CAS / atomic operations
- [ ] Contention cao hoặc critical section lớn → Mutex / Read-Write Lock
- [ ] Counter "nóng" nhiều thread → Sharded counter (LongAdder / striping)
- [ ] Database concurrent update → Optimistic locking (version column)
Triển khai CAS
- [ ] Pattern: read → compute → CAS → retry nếu thất bại
- [ ] Giới hạn retry (MAX_RETRIES) + exponential backoff
- [ ] Đánh giá ABA risk: nếu business logic phụ thuộc lịch sử → dùng version
- [ ] Giữ khoảng cách read-CAS ngắn nhất có thể
Production
- [ ] Monitoring: đếm số retry trung bình — nếu >3 → contention quá cao
- [ ] Java:
AtomicIntegercho counter,AtomicStampedReferencecho tránh ABA - [ ] C++:
std::atomic+compare_exchange_weak(cho CAS loop) - [ ] Database:
UPDATE ... WHERE version = $1, checkaffected_rows > 0
Bài tập luyện tập
Bài 1: Atomic Counter đa luồng — Foundation
Đề bài: Viết AtomicCounter dùng CAS pattern. Chạy 10 thread, mỗi thread increment 1000 lần. Kết quả cuối phải đúng 10,000.
🧠 Quiz
Câu hỏi kiểm tra: Tại sao không dùng counter += 1 mà cần CAS?
- [ ] A.
counter += 1chạy chậm hơn CAS - [x] B.
counter += 1không phải atomic — gồm 3 bước (read, increment, write) có thể bị xen kẽ bởi thread khác - [ ] C. Python tự động thread-safe nên không cần CAS
- [ ] D. CAS chỉ cần thiết khi có hơn 100 thread Giải thích:
counter += 1=temp = counter→temp = temp + 1→counter = temp. Nếu 2 thread đọc cùng giá trị 10, cả hai ghi 11 → mất 1 increment. CAS đảm bảo: nếu giá trị đã thay đổi giữa read và write → retry.
💡 Gợi ý
- Dùng Lock mỏng để mô phỏng CAS trong Python
- CAS loop:
while not cas(expected, new): expected = read() - Test: 10 threads × 1000 increments = 10,000
✅ Lời giải
python
import threading
class AtomicCounter:
def __init__(self) -> None:
self._value = 0
self._lock = threading.Lock()
def _cas(self, expected: int, new_value: int) -> bool:
with self._lock:
if self._value == expected:
self._value = new_value
return True
return False
def increment(self) -> None:
while True:
current = self._value
if self._cas(current, current + 1):
return
def get(self) -> int:
return self._value
# Test
counter = AtomicCounter()
threads = []
for _ in range(10):
t = threading.Thread(target=lambda: [counter.increment() for _ in range(1000)])
threads.append(t)
t.start()
for t in threads:
t.join()
assert counter.get() == 10000, f"Expected 10000, got {counter.get()}"Phân tích: Time O(1) amortized mỗi increment (retry hiếm khi contention thấp). Space O(1). Python Lock ở đây mô phỏng CAS hardware instruction.
Bài 2: Versioned Database Row — Intermediate
Đề bài: Viết class VersionedRow với update(new_data, expected_version). Nếu version không khớp → raise OptimisticLockError. Mô phỏng: User A và User B đọc cùng row, B update trước → A phải thất bại.
🧠 Quiz
Câu hỏi kiểm tra: Trong database thực tế, câu SQL nào triển khai CAS pattern?
- [ ] A.
UPDATE t SET data = 'new' WHERE id = 1 - [ ] B.
UPDATE t SET data = 'new' WHERE id = 1 FOR UPDATE - [x] C.
UPDATE t SET data = 'new', version = version + 1 WHERE id = 1 AND version = 5 - [ ] D.
INSERT INTO t (id, data) VALUES (1, 'new') ON CONFLICT DO UPDATEGiải thích:WHERE version = 5là phần "Compare" — chỉ update nếu version đúng.version = version + 1đảm bảo lần update tiếp theo phải dùng version mới.affected_rows = 0nghĩa là CAS thất bại → retry.
💡 Gợi ý
update()kiểm traself.version == expected_version- Nếu khớp: cập nhật data + tăng version
- Nếu không: raise exception
✅ Lời giải
python
class OptimisticLockError(Exception):
pass
class VersionedRow:
def __init__(self, row_id: str, data: dict) -> None:
self.id = row_id
self.data = data
self.version = 1
def read(self) -> tuple[dict, int]:
return dict(self.data), self.version
def update(self, new_data: dict, expected_version: int) -> None:
if self.version != expected_version:
raise OptimisticLockError(
f"Version conflict: expected {expected_version}, "
f"current {self.version}"
)
self.data.update(new_data)
self.version += 1
# Test scenario
row = VersionedRow("product_1", {"price": 100, "stock": 50})
# User A đọc
data_a, ver_a = row.read()
# User B đọc cùng thời điểm
data_b, ver_b = row.read()
# User B update trước — thành công
row.update({"price": 120}, expected_version=ver_b)
# User A update sau — thất bại (version đã tăng)
try:
row.update({"stock": 45}, expected_version=ver_a)
assert False, "Phải raise error"
except OptimisticLockError as e:
print(f"Đúng: {e}")
# User A cần read lại và retry
data_a, ver_a = row.read()
row.update({"stock": 45}, expected_version=ver_a)Phân tích: Đây chính là pattern @Version trong JPA/Hibernate, updateOne({version: expected}) trong MongoDB. Production cần wrap trong retry loop với backoff.
Liên kết học tiếp
Từ khóa glossary: compare-and-swap, CAS, lock-free, atomic operation, optimistic locking, ABA problem, CMPXCHG, memory ordering, cache line bouncing, sharded counter, LongAdder
Tìm kiếm liên quan: lập trình không khóa, atomic counter Java, optimistic locking database, CAS vs mutex, ABA problem giải pháp, concurrent data structure