Skip to content

Heap & Priority Queue — Bậc thầy thứ tự ưu tiên

Hệ thống cấp cứu bệnh viện tiếp nhận liên tục bệnh nhân với mức nghiêm trọng khác nhau. Mảng không sắp xếp tìm bệnh nhân nặng nhất trong O(n) — quá chậm. Mảng sắp xếp chèn bệnh nhân mới trong O(n) — cũng không chấp nhận được. Cần một cấu trúc cân bằng cả hai: insert O(log n) và extract-max O(log n).

Heap là câu trả lời. Cấu trúc này đơn giản đến bất ngờ — chỉ là một mảng tuân theo quy tắc "cha luôn ưu tiên hơn con" — nhưng sức mạnh thì không nhỏ. Dijkstra dùng heap. OS scheduler dùng heap. Merge k sorted lists dùng heap. Top-K problems dùng heap. Đây là cấu trúc dữ liệu xuất hiện nhiều nhất trong các bài phỏng vấn hệ thống.

Bức tranh tư duy

Hình dung giải đấu bóng đá theo thể thức loại trực tiếp. Đội vô địch (root) là đội mạnh nhất. Mỗi trận đấu (node) chọn đội mạnh hơn đi tiếp lên vòng trên. Khi đội vô địch bị loại (extract-max), ta tổ chức lại giải đấu — nhưng chỉ cần đấu lại trên nhánh bị ảnh hưởng, không phải toàn bộ.

Max Heap biểu diễn cây:

            [90]              ← root = max
           /    \
        [70]     [80]
        /  \    /  \
     [30] [50] [60] [40]

Mảng tương ứng (0-indexed):
[90, 70, 80, 30, 50, 60, 40]
 0   1   2   3   4   5   6

Node i:
  - Cha: (i-1) / 2
  - Con trái: 2*i + 1
  - Con phải: 2*i + 2

Quy tắc duy nhất: heap property — mỗi node có giá trị ≥ hai con (max-heap) hoặc ≤ hai con (min-heap). Không yêu cầu sắp xếp hoàn toàn giữa các node cùng cấp. Đây là "sắp xếp nửa vời" vừa đủ để extract min/max trong O(log n).

Cốt lõi kỹ thuật

Swim (Sift Up) — Khôi phục heap property từ dưới lên

Khi insert phần tử mới vào cuối mảng, phần tử đó có thể lớn hơn cha → vi phạm heap property. Swap với cha và lặp lại cho đến khi hợp lệ.

cpp
void swim(std::vector<int>& heap, int i) {
    while (i > 0) {
        int parent = (i - 1) / 2;
        if (heap[i] <= heap[parent]) break;
        std::swap(heap[i], heap[parent]);
        i = parent;
    }
}

void insert(std::vector<int>& heap, int val) {
    heap.push_back(val);
    swim(heap, heap.size() - 1);
}
python
def swim(heap: list[int], i: int) -> None:
    while i > 0:
        parent = (i - 1) // 2
        if heap[i] <= heap[parent]:
            break
        heap[i], heap[parent] = heap[parent], heap[i]
        i = parent

def insert(heap: list[int], val: int) -> None:
    heap.append(val)
    swim(heap, len(heap) - 1)
java
void swim(int[] heap, int i) {
    while (i > 0) {
        int parent = (i - 1) / 2;
        if (heap[i] <= heap[parent]) break;
        int tmp = heap[i]; heap[i] = heap[parent]; heap[parent] = tmp;
        i = parent;
    }
}
go
func swim(heap []int, i int) {
    for i > 0 {
        parent := (i - 1) / 2
        if heap[i] <= heap[parent] {
            break
        }
        heap[i], heap[parent] = heap[parent], heap[i]
        i = parent
    }
}

Sink (Sift Down) — Khôi phục heap property từ trên xuống

Khi extract root (phần tử lớn nhất), ta đưa phần tử cuối lên root rồi sink xuống: swap với con lớn hơn, lặp lại cho đến khi hợp lệ.

cpp
void sink(std::vector<int>& heap, int i, int n) {
    while (2 * i + 1 < n) {
        int child = 2 * i + 1;
        if (child + 1 < n && heap[child + 1] > heap[child])
            child++;
        if (heap[i] >= heap[child]) break;
        std::swap(heap[i], heap[child]);
        i = child;
    }
}

int extractMax(std::vector<int>& heap) {
    int maxVal = heap[0];
    heap[0] = heap.back();
    heap.pop_back();
    if (!heap.empty()) sink(heap, 0, heap.size());
    return maxVal;
}
python
def sink(heap: list[int], i: int, n: int) -> None:
    while 2 * i + 1 < n:
        child = 2 * i + 1
        if child + 1 < n and heap[child + 1] > heap[child]:
            child += 1
        if heap[i] >= heap[child]:
            break
        heap[i], heap[child] = heap[child], heap[i]
        i = child

def extract_max(heap: list[int]) -> int:
    max_val = heap[0]
    heap[0] = heap[-1]
    heap.pop()
    if heap:
        sink(heap, 0, len(heap))
    return max_val
java
void sink(int[] heap, int i, int n) {
    while (2 * i + 1 < n) {
        int child = 2 * i + 1;
        if (child + 1 < n && heap[child + 1] > heap[child]) child++;
        if (heap[i] >= heap[child]) break;
        int tmp = heap[i]; heap[i] = heap[child]; heap[child] = tmp;
        i = child;
    }
}
go
func sink(heap []int, i, n int) {
    for 2*i+1 < n {
        child := 2*i + 1
        if child+1 < n && heap[child+1] > heap[child] {
            child++
        }
        if heap[i] >= heap[child] {
            break
        }
        heap[i], heap[child] = heap[child], heap[i]
        i = child
    }
}

Build Heap (Heapify) — O(n)

Xây heap từ mảng bất kỳ. Thay vì insert từng phần tử O(n log n), ta sink từ giữa mảng về đầu. Phân tích amortized cho thấy tổng chi phí chỉ là O(n).

cpp
void buildHeap(std::vector<int>& arr) {
    int n = arr.size();
    for (int i = n / 2 - 1; i >= 0; i--)
        sink(arr, i, n);
}
python
def build_heap(arr: list[int]) -> None:
    n = len(arr)
    for i in range(n // 2 - 1, -1, -1):
        sink(arr, i, n)
java
void buildHeap(int[] arr) {
    int n = arr.length;
    for (int i = n / 2 - 1; i >= 0; i--)
        sink(arr, i, n);
}
go
func buildHeap(arr []int) {
    n := len(arr)
    for i := n/2 - 1; i >= 0; i-- {
        sink(arr, i, n)
    }
}

Priority Queue Interface

Priority Queue là abstraction trên Heap, cung cấp interface: push(item, priority), pop() (trả về phần tử ưu tiên nhất), peek() (xem mà không xóa).

Thao tácTimeGhi chú
pushO(log n)Insert + swim
popO(log n)Extract root + sink
peekO(1)Đọc root
buildO(n)Heapify toàn bộ

D-ary Heap

Binary heap có 2 con. D-ary heap mở rộng lên d con. Chiều cao giảm xuống log_d(n) nhưng mỗi sink phải so sánh d con.

HeapChiều caoSink costInsert costTốt cho
Binary (d=2)log₂ nO(log n)O(log n)Đa dụng
4-ary (d=4)log₄ nO(4 log₄ n)O(log₄ n)Nhiều insert hơn extract

Trong Dijkstra, insert nhiều hơn extract-min → d-ary heap với d = 4 có thể nhanh hơn binary heap trên thực tế.

Thực chiến

Tình huống: Top-K phần tử lớn nhất trong stream

Bối cảnh: Hệ thống analytics nhận liên tục data points. Cần duy trì top-K giá trị lớn nhất tại mọi thời điểm. N có thể lên tới 10⁸, K = 100.

Mục tiêu: Mỗi data point xử lý O(log K), bộ nhớ O(K).

cpp
#include <queue>
#include <vector>

class TopKTracker {
    int k_;
    // Min-heap: phần tử nhỏ nhất ở top → dễ loại bỏ
    std::priority_queue<int, std::vector<int>, std::greater<int>> minHeap_;

public:
    explicit TopKTracker(int k) : k_(k) {}

    void add(int val) {
        if ((int)minHeap_.size() < k_) {
            minHeap_.push(val);
        } else if (val > minHeap_.top()) {
            minHeap_.pop();
            minHeap_.push(val);
        }
    }

    std::vector<int> getTopK() {
        std::vector<int> result;
        auto copy = minHeap_;
        while (!copy.empty()) {
            result.push_back(copy.top());
            copy.pop();
        }
        return result;
    }
};
python
import heapq

class TopKTracker:
    # Duy trì top-K phần tử lớn nhất bằng min-heap kích thước K

    def __init__(self, k: int) -> None:
        self.k = k
        self.min_heap: list[int] = []

    def add(self, val: int) -> None:
        if len(self.min_heap) < self.k:
            heapq.heappush(self.min_heap, val)
        elif val > self.min_heap[0]:
            heapq.heapreplace(self.min_heap, val)

    def get_top_k(self) -> list[int]:
        return sorted(self.min_heap, reverse=True)
java
import java.util.PriorityQueue;
import java.util.ArrayList;
import java.util.Collections;

public class TopKTracker {
    private final int k;
    private final PriorityQueue<Integer> minHeap;

    public TopKTracker(int k) {
        this.k = k;
        this.minHeap = new PriorityQueue<>();
    }

    public void add(int val) {
        if (minHeap.size() < k) {
            minHeap.offer(val);
        } else if (val > minHeap.peek()) {
            minHeap.poll();
            minHeap.offer(val);
        }
    }

    public ArrayList<Integer> getTopK() {
        ArrayList<Integer> result = new ArrayList<>(minHeap);
        Collections.sort(result, Collections.reverseOrder());
        return result;
    }
}
go
import "container/heap"

type MinHeap []int

func (h MinHeap) Len() int            { return len(h) }
func (h MinHeap) Less(i, j int) bool   { return h[i] < h[j] }
func (h MinHeap) Swap(i, j int)        { h[i], h[j] = h[j], h[i] }
func (h *MinHeap) Push(x interface{})  { *h = append(*h, x.(int)) }
func (h *MinHeap) Pop() interface{} {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[:n-1]
    return x
}

type TopKTracker struct {
    k    int
    heap *MinHeap
}

func NewTopKTracker(k int) *TopKTracker {
    h := &MinHeap{}
    heap.Init(h)
    return &TopKTracker{k: k, heap: h}
}

func (t *TopKTracker) Add(val int) {
    if t.heap.Len() < t.k {
        heap.Push(t.heap, val)
    } else if val > (*t.heap)[0] {
        (*t.heap)[0] = val
        heap.Fix(t.heap, 0)
    }
}

Phân tích:

  • Trick cốt lõi: dùng min-heap kích thước K để giữ top-K lớn nhất. Phần tử nhỏ nhất trong heap (root) là ngưỡng. Nếu giá trị mới > ngưỡng → thay thế, ngược lại bỏ qua.
  • Mỗi data point: O(log K). Với K = 100, mỗi thao tác chỉ ~7 so sánh.
  • Bộ nhớ O(K) — xử lý stream 10⁸ phần tử mà chỉ giữ 100 trong RAM.

Sai lầm điển hình

Sai lầm 1: Dùng max-heap cho top-K thay vì min-heap

Vấn đề: Lưu tất cả vào max-heap rồi pop K lần.

Tại sao sai: Bộ nhớ O(n) thay vì O(K). Với stream 10⁸ phần tử, giữ hết trong RAM không khả thi. Min-heap kích thước K chỉ cần O(K) bộ nhớ và loại bỏ phần tử nhỏ nhất mỗi khi heap đầy.

Sai lầm 2: Quên Python heapq là min-heap

Vấn đề: Python heapq chỉ hỗ trợ min-heap. Nhiều người gọi heappop kỳ vọng nhận max.

python
# SAI: heapq.heappop trả về min, không phải max
import heapq
heap = [3, 1, 4, 1, 5]
heapq.heapify(heap)
print(heapq.heappop(heap))  # 1, không phải 5!

Tại sao sai: Python không có max-heap built-in. Workaround: negate giá trị khi push, negate lại khi pop.

python
# ĐÚNG: negate để mô phỏng max-heap
heapq.heappush(heap, -val)
max_val = -heapq.heappop(heap)

Sai lầm 3: Build heap bằng n lần insert thay vì heapify

Vấn đề: Insert từng phần tử O(log n) × n = O(n log n). Heapify chỉ O(n).

Tại sao sai: Lãng phí thời gian. Khi đã có toàn bộ dữ liệu trước, luôn dùng build_heap / heapify — nhanh hơn O(log n) lần.

Sai lầm 4: Dùng heap khi cần search hoặc delete arbitrary element

Vấn đề: Heap không hỗ trợ tìm kiếm phần tử O(log n). Delete một phần tử bất kỳ cần O(n) để tìm vị trí.

Tại sao sai: Heap chỉ đảm bảo min/max ở root. Nếu cần search + delete, hãy cân nhắc balanced BST (TreeSet/TreeMap trong Java, std::set trong C++) hoặc kết hợp heap + hash map (lazy deletion).

Under the Hood

Hiệu năng

Thao tácBinary HeapSorted ArrayBST (balanced)
InsertO(log n)O(n)O(log n)
Extract min/maxO(log n)O(1)O(log n)
Peek min/maxO(1)O(1)O(log n)
BuildO(n)O(n log n)O(n log n)
SearchO(n)O(log n)O(log n)

Nội bộ triển khai

Biểu diễn mảng: Binary heap lưu trong mảng liên tục, không cần pointer. Node i (0-indexed): cha = (i-1)/2, con trái = 2i+1, con phải = 2i+2. Cache-friendly vì dữ liệu nằm liền kề trong bộ nhớ.

Tại sao build-heap là O(n)? Trực giác: nửa dưới cây là lá — không cần sink. Tầng ngay trên lá chỉ sink 1 bước, tầng tiếp sink 2 bước... Tổng: ∑(h=0 to log n) n/2^(h+1) × h = O(n). Chứng minh chặt chẽ dùng tổng chuỗi hình học.

Python heapq: Dùng min-heap trên list. heapify() là O(n). heappush / heappop là O(log n). heapreplace(heap, val) = pop + push trong một thao tác duy nhất — nhanh hơn gọi riêng.

D-ary heap trade-off: D lớn → cây thấp hơn → swim nhanh hơn (ít bước). Nhưng sink chậm hơn (so sánh d con thay vì 2). Tối ưu thực tế: d = 4 cho Dijkstra vì insert (swim) nhiều hơn extract-min (sink).

Trade-offs

Tiêu chíHeapSorted ArrayBST
Min/Max extraction✅ O(log n)✅ O(1)✅ O(log n)
Insert✅ O(log n)❌ O(n)✅ O(log n)
Arbitrary search❌ O(n)✅ O(log n)✅ O(log n)
Bộ nhớ overheadThấp (mảng)Thấp (mảng)Cao (pointer)
ImplementationĐơn giảnRất đơn giảnPhức tạp

Heap phù hợp khi chỉ cần min/max + insert. Nếu cần search/delete arbitrary → dùng BST hoặc hash map kết hợp.

Checklist ghi nhớ

✅ Checklist triển khai

Cơ bản

  • [ ] Heap property: cha luôn ≥ con (max-heap) hoặc ≤ con (min-heap)
  • [ ] Node i: cha = (i-1)/2, con trái = 2i+1, con phải = 2i+2
  • [ ] Insert = append + swim, Extract = swap root với cuối + sink

Thiết kế

  • [ ] Top-K lớn nhất → dùng min-heap kích thước K
  • [ ] Top-K nhỏ nhất → dùng max-heap kích thước K
  • [ ] Python heapq là min-heap — negate giá trị cho max-heap
  • [ ] Build heap từ mảng sẵn: dùng heapify O(n), không phải n × insert

Production

  • [ ] Heap không hỗ trợ search O(log n) — cần search thì dùng BST
  • [ ] Dijkstra: cân nhắc d-ary heap (d=4) thay vì binary heap
  • [ ] Custom comparator: đảm bảo tính nhất quán (a < b và b < c → a < c)

Bài tập luyện tập

Bài 1: Merge K Sorted Lists — Intermediate

Đề bài: Cho K danh sách đã sắp xếp tăng dần, merge thành một danh sách sắp xếp duy nhất.

Ràng buộc: K ≤ 10⁴, tổng số phần tử ≤ 10⁶.

🧠 Quiz

Câu hỏi: Merge K sorted lists sử dụng min-heap có độ phức tạp thời gian là?

  • [ ] A. O(N × K)
  • [ ] B. O(N log N)
  • [x] C. O(N log K)
  • [ ] D. O(K log N) Giải thích: Heap kích thước K. Mỗi phần tử trong tổng N phần tử: push/pop O(log K). Tổng: O(N log K). Đáp án A là brute force, B là sort toàn bộ.
💡 Gợi ý
  • Dùng min-heap chứa K phần tử — mỗi phần tử từ một danh sách.
  • Pop phần tử nhỏ nhất, push phần tử tiếp theo từ cùng danh sách.
✅ Lời giải
python
import heapq

def merge_k_sorted(lists: list[list[int]]) -> list[int]:
    heap = []
    for i, lst in enumerate(lists):
        if lst:
            heapq.heappush(heap, (lst[0], i, 0))

    result = []
    while heap:
        val, list_idx, elem_idx = heapq.heappop(heap)
        result.append(val)
        if elem_idx + 1 < len(lists[list_idx]):
            next_val = lists[list_idx][elem_idx + 1]
            heapq.heappush(heap, (next_val, list_idx, elem_idx + 1))
    return result

Phân tích: Time O(N log K), Space O(K) cho heap. N = tổng phần tử, K = số danh sách.

Bài 2: Kth Largest Element in Stream — Intermediate

Đề bài: Thiết kế class nhận stream số nguyên, tại mọi thời điểm trả về phần tử lớn thứ K.

💡 Gợi ý
  • Dùng min-heap kích thước K.
  • Root của heap = phần tử lớn thứ K (nhỏ nhất trong top-K).
✅ Lời giải
python
import heapq

class KthLargest:
    def __init__(self, k: int, nums: list[int]) -> None:
        self.k = k
        self.heap = []
        for val in nums:
            self.add(val)

    def add(self, val: int) -> int:
        if len(self.heap) < self.k:
            heapq.heappush(self.heap, val)
        elif val > self.heap[0]:
            heapq.heapreplace(self.heap, val)
        return self.heap[0]

Phân tích: Time O(log K) mỗi lần add, Space O(K).

Liên kết học tiếp

Từ khóa glossary: Heap, Priority Queue, Binary Heap, Min-Heap, Max-Heap, Heapify, Swim, Sink, D-ary Heap

Tìm kiếm liên quan: hàng đợi ưu tiên, đống nhị phân, sắp xếp heap, top-K elements