Skip to content

Optimistic Locking (CAS) - Lock-free Concurrency

"Không lock trước, cứ làm đi, conflict thì retry." - HPN

Định nghĩa

Compare-And-Swap (CAS) là atomic operation:

  1. Compare: Đọc giá trị hiện tại, so sánh với expected value
  2. Swap: Nếu match → cập nhật thành new value
  3. Retry: Nếu không match → ai đó đã thay đổi, thử lại

📘 Optimistic vs Pessimistic

Pessimistic Locking: "Tôi lock trước, không ai động vào được" Optimistic Locking: "Tôi cứ làm, nếu conflict thì retry"

Tại sao cần CAS?

Bài toán: 10,000 Concurrent Counter Updates

ApproachThroughputContention
Mutex Lock~50K ops/secHigh blocking
CAS (Lock-free)~500K ops/secRetry-based

🎯 KEY INSIGHT

Low contention (ít conflict) → CAS 10x faster vì không block threads. High contention (nhiều conflict) → Mutex có thể tốt hơn (avoid busy retry).

Mô hình hóa

Pessimistic Locking (Mutex)

Thread A:  LOCK ────────── [UPDATE] ────────── UNLOCK
Thread B:       WAIT ────────────────────────────── LOCK ── [UPDATE] ── UNLOCK

              Blocked waiting for lock

Optimistic Locking (CAS)

Quan sát:

  • Thread A và B đều đọc cùng lúc (no blocking)
  • Thread A update thành công
  • Thread B phát hiện conflict → retry với giá trị mới

Cơ chế Hoạt động

CAS Pseudocode

def atomic_increment(current_pointer):
    while True:
        old_value = read(current_pointer)
        new_value = old_value + 1
        
        if CAS(current_pointer, expected=old_value, new=new_value):
            return new_value  # Success!
        # Else: Someone changed it, retry

ABA Problem

Thread A: Read value = "A"
Thread B: Change A → B → A
Thread A: CAS(expected="A", new="C") → SUCCESS! ← Sai!
          (Không biết B đã thay đổi ở giữa)

Giải pháp: Thêm version number hoặc dùng AtomicStampedReference.

Production Implementation

python
# HPN Engineering Standard
# Implementation: Optimistic Locking Pattern

from typing import TypeVar, Generic, Optional, Callable
from threading import Lock
import time


T = TypeVar('T')


class AtomicReference(Generic[T]):
    """
    Simulates Compare-And-Swap behavior in Python.
    
    Note: Python doesn't have native CAS. This implementation
    uses a thin lock for simulation, but demonstrates the pattern.
    
    In production, use:
        - Java: AtomicInteger, AtomicReference
        - C++: std::atomic
        - Go: sync/atomic package
        - Redis: WATCH/MULTI/EXEC
    """
    
    def __init__(self, initial_value: T) -> None:
        self._value = initial_value
        self._version = 0
        self._lock = Lock()  # Thin lock for CAS simulation
    
    def get(self) -> tuple[T, int]:
        """Read current value and version."""
        return self._value, self._version
    
    def compare_and_set(
        self, 
        expected_value: T, 
        expected_version: int,
        new_value: T
    ) -> bool:
        """
        Atomically update if current matches expected.
        
        Returns:
            True if update succeeded, False if conflict detected
        """
        with self._lock:
            if self._value == expected_value and self._version == expected_version:
                self._value = new_value
                self._version += 1
                return True
            return False
    
    def update(self, update_func: Callable[[T], T]) -> T:
        """
        Update value using CAS loop pattern.
        
        Args:
            update_func: Function that takes old value, returns new value
            
        Returns:
            The new value after successful update
        """
        while True:
            old_value, version = self.get()
            new_value = update_func(old_value)
            
            if self.compare_and_set(old_value, version, new_value):
                return new_value
            # Conflict detected, retry with fresh value


class OptimisticCounter:
    """
    Thread-safe counter using Optimistic Locking.
    
    Use Case:
        - View counts
        - Like counts
        - Inventory updates
    """
    
    def __init__(self, initial: int = 0) -> None:
        self._ref = AtomicReference(initial)
        self._retry_count = 0
    
    def increment(self) -> int:
        """Atomically increment by 1."""
        return self._ref.update(lambda x: x + 1)
    
    def decrement(self) -> int:
        """Atomically decrement by 1."""
        return self._ref.update(lambda x: x - 1)
    
    def add(self, delta: int) -> int:
        """Atomically add delta."""
        return self._ref.update(lambda x: x + delta)
    
    def get(self) -> int:
        """Get current value."""
        value, _ = self._ref.get()
        return value


class DatabaseRowWithVersion:
    """
    Demonstrates Optimistic Locking pattern for database rows.
    
    Real-world usage:
        - JPA @Version annotation
        - Django's OptimisticLockException
        - MongoDB's { $set: ..., version: { $eq: old_version } }
    """
    
    def __init__(self, id: str, data: dict) -> None:
        self.id = id
        self.data = data
        self.version = 1
    
    def update(self, new_data: dict, expected_version: int) -> bool:
        """
        Update row only if version matches.
        
        Returns:
            True if update succeeded
            
        Raises:
            OptimisticLockException if version mismatch
        """
        if self.version != expected_version:
            raise OptimisticLockException(
                f"Version mismatch: expected {expected_version}, "
                f"current {self.version}"
            )
        
        self.data.update(new_data)
        self.version += 1
        return True


class OptimisticLockException(Exception):
    """Raised when optimistic lock fails due to concurrent modification."""
    pass


# ============================================
# USAGE EXAMPLE
# ============================================
if __name__ == "__main__":
    import threading
    
    # Demo 1: Atomic Counter
    counter = OptimisticCounter(0)
    
    def increment_1000_times():
        for _ in range(1000):
            counter.increment()
    
    threads = [threading.Thread(target=increment_1000_times) for _ in range(10)]
    
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    print(f"Final counter value: {counter.get()}")
    print(f"Expected: 10000")
    
    # Demo 2: Database Row Version
    print("\n=== Database Versioning ===")
    row = DatabaseRowWithVersion("user_123", {"name": "HPN", "balance": 100})
    
    # User A reads row
    user_a_version = row.version
    
    # User B updates (succeeds)
    row.update({"balance": 150}, expected_version=1)
    print(f"User B updated successfully. New version: {row.version}")
    
    # User A tries to update with stale version
    try:
        row.update({"balance": 80}, expected_version=user_a_version)
    except OptimisticLockException as e:
        print(f"User A failed: {e}")
        print("→ User A should refresh and retry")

Ứng dụng Thực tế

SystemImplementation
JavaAtomicInteger, AtomicReference, AtomicStampedReference
RedisWATCH + MULTI + EXEC (Optimistic transactions)
JPA/Hibernate@Version annotation on entity field
MongoDBConditional updates với version field
PostgreSQLUPDATE ... WHERE version = $1 pattern

Redis Example

redis
# Optimistic Locking in Redis
WATCH balance:user123
current = GET balance:user123

MULTI
SET balance:user123 (current + 100)
EXEC

# If another client modified between WATCH and EXEC,
# EXEC returns nil → retry

Anti-patterns

❌ TRÁNH

1. CAS trong high-contention scenario

python
# ❌ WRONG: 100 threads update cùng 1 counter
# → Majority sẽ retry liên tục → CPU waste

# ✅ RIGHT: Dùng Mutex hoặc sharded counters
# Chia counter thành N shards, mỗi thread update 1 shard

2. Không handle ABA problem

python
# ❌ WRONG: Chỉ compare value
if current_value == expected:
    update(new_value)

# ✅ RIGHT: Compare value + version
if current_value == expected AND version == expected_version:
    update(new_value, version + 1)

3. Infinite retry loop

python
# ❌ WRONG: Retry forever
while True:
    if cas(expected, new):
        break

# ✅ RIGHT: Limit retries
for attempt in range(MAX_RETRIES):
    if cas(expected, new):
        return SUCCESS
    time.sleep(backoff)
raise TooManyRetriesError()

So sánh Locking Strategies

StrategyThroughputContentionComplexity
MutexLowHandles highSimple
Read-Write LockMediumMediumMedium
CAS (Optimistic)HighLow onlyMedium
Lock-free DSHighestLow-MediumComplex

💡 HPN's Rule

"CAS cho reads nhiều, writes ít. Mutex cho writes nhiều hoặc critical sections lớn."