Skip to content

Topological Sort - The Dependency Resolver

"Mọi build system, package manager, và CI/CD pipeline đều dựa trên Topological Sort." - HPN

Problem Statement

Cho một Directed Acyclic Graph (DAG), sắp xếp các đỉnh sao cho với mọi cạnh u → v, đỉnh u luôn đứng trước đỉnh v trong thứ tự sắp xếp.

    A → B → D
    ↓   ↓
    C → E

Topological Order: A → B → C → D → E
                   hoặc A → C → B → D → E
                   hoặc A → C → B → E → D
                   (nhiều kết quả hợp lệ)

📘 Tại sao phải DAG?

Nếu có cycle (A → B → C → A), không thể sắp xếp! Task A cần B hoàn thành trước, B cần C, C cần A → Deadlock!

Real-World Applications

DomainUse CaseChi tiết
Build Systemsmake, gradle, webpackCompile dependencies trước
Package Managersnpm, pip, aptInstall dependencies đúng thứ tự
CI/CDGitHub Actions, JenkinsJob dependencies
Course PrerequisitesUniversity curriculumHọc môn cơ sở trước môn chuyên ngành
SpreadsheetExcel cell referencesTính cell phụ thuộc trước
Task SchedulingPERT/CPMProject management

DAG Visualization

Build Order: core.o → utils.o → network.o → main.cpp → executable

Two Approaches

1. Kahn's Algorithm (BFS-based)

2. DFS-based (Tarjan's variant)

  • DFS từ mỗi đỉnh chưa visit
  • Khi hoàn thành DFS của một đỉnh → Push vào stack
  • Kết quả = Reverse của stack

Kahn's Algorithm Implementation

python
from typing import List, Tuple, Optional
from collections import deque, defaultdict


def topological_sort_kahn(
    V: int, 
    edges: List[Tuple[int, int]]
) -> Optional[List[int]]:
    """
    Kahn's Algorithm - BFS-based Topological Sort.
    
    Args:
        V: Number of vertices (0 to V-1)
        edges: List of (u, v) meaning u → v (u must come before v)
    
    Returns:
        Topological order, or None if cycle exists
    
    Time: O(V + E)
    Space: O(V + E)
    """
    # Build adjacency list and in-degree array
    adj = defaultdict(list)
    in_degree = [0] * V
    
    for u, v in edges:
        adj[u].append(v)
        in_degree[v] += 1
    
    # Initialize queue with nodes having in-degree 0
    queue = deque()
    for i in range(V):
        if in_degree[i] == 0:
            queue.append(i)
    
    result = []
    
    while queue:
        u = queue.popleft()
        result.append(u)
        
        for v in adj[u]:
            in_degree[v] -= 1
            if in_degree[v] == 0:
                queue.append(v)
    
    # Check for cycle
    if len(result) != V:
        return None  # Cycle detected!
    
    return result

DFS-based Implementation

python
from typing import List, Tuple, Optional
from collections import defaultdict
from enum import Enum


class NodeState(Enum):
    UNVISITED = 0
    VISITING = 1  # In current DFS path
    VISITED = 2   # Completed


def topological_sort_dfs(
    V: int, 
    edges: List[Tuple[int, int]]
) -> Optional[List[int]]:
    """
    DFS-based Topological Sort with cycle detection.
    
    Returns:
        Topological order, or None if cycle exists
    
    Time: O(V + E)
    Space: O(V + E)
    """
    adj = defaultdict(list)
    for u, v in edges:
        adj[u].append(v)
    
    state = [NodeState.UNVISITED] * V
    result = []
    
    def dfs(u: int) -> bool:
        """Returns False if cycle detected."""
        if state[u] == NodeState.VISITING:
            return False  # Back edge → Cycle!
        if state[u] == NodeState.VISITED:
            return True
        
        state[u] = NodeState.VISITING
        
        for v in adj[u]:
            if not dfs(v):
                return False
        
        state[u] = NodeState.VISITED
        result.append(u)
        return True
    
    for i in range(V):
        if state[i] == NodeState.UNVISITED:
            if not dfs(i):
                return None  # Cycle detected!
    
    result.reverse()
    return result

Production Code

python
# HPN Engineering Standard
# Implementation: Topological Sort - Full Featured

from typing import List, Tuple, Optional, Dict, Set
from collections import deque, defaultdict
from dataclasses import dataclass
from enum import Enum


@dataclass
class Task:
    """Represents a task with dependencies."""
    name: str
    dependencies: List[str]


class TopologicalSort:
    """
    Production-ready Topological Sort implementation.
    
    Features:
    - Kahn's algorithm (BFS)
    - DFS with cycle detection
    - All valid orderings
    - Critical path analysis
    - String-labeled nodes support
    """
    
    def __init__(self):
        self.adj: Dict[str, List[str]] = defaultdict(list)
        self.in_degree: Dict[str, int] = defaultdict(int)
        self.nodes: Set[str] = set()
    
    def add_edge(self, u: str, v: str):
        """Add dependency: u must complete before v."""
        self.adj[u].append(v)
        self.in_degree[v] += 1
        if u not in self.in_degree:
            self.in_degree[u] = 0
        self.nodes.add(u)
        self.nodes.add(v)
    
    def add_task(self, task: Task):
        """Add task with its dependencies."""
        self.nodes.add(task.name)
        if task.name not in self.in_degree:
            self.in_degree[task.name] = 0
        
        for dep in task.dependencies:
            self.add_edge(dep, task.name)
    
    def sort_kahn(self) -> Optional[List[str]]:
        """
        Kahn's Algorithm - Returns one valid topological order.
        
        Returns None if cycle exists.
        """
        in_deg = dict(self.in_degree)
        queue = deque()
        
        for node in self.nodes:
            if in_deg.get(node, 0) == 0:
                queue.append(node)
        
        result = []
        
        while queue:
            u = queue.popleft()
            result.append(u)
            
            for v in self.adj[u]:
                in_deg[v] -= 1
                if in_deg[v] == 0:
                    queue.append(v)
        
        if len(result) != len(self.nodes):
            return None
        
        return result
    
    def detect_cycle(self) -> Optional[List[str]]:
        """
        Detect and return cycle if exists.
        
        Returns: Cycle path, or None if no cycle.
        """
        WHITE, GRAY, BLACK = 0, 1, 2
        color = {node: WHITE for node in self.nodes}
        parent = {node: None for node in self.nodes}
        
        def dfs(u: str) -> Optional[List[str]]:
            color[u] = GRAY
            
            for v in self.adj[u]:
                if color[v] == GRAY:
                    # Found cycle! Reconstruct it.
                    cycle = [v, u]
                    curr = u
                    while parent[curr] and parent[curr] != v:
                        cycle.append(parent[curr])
                        curr = parent[curr]
                    cycle.append(v)
                    return list(reversed(cycle))
                
                if color[v] == WHITE:
                    parent[v] = u
                    result = dfs(v)
                    if result:
                        return result
            
            color[u] = BLACK
            return None
        
        for node in self.nodes:
            if color[node] == WHITE:
                cycle = dfs(node)
                if cycle:
                    return cycle
        
        return None
    
    def all_topological_orders(self) -> List[List[str]]:
        """
        Generate ALL valid topological orderings.
        
        ⚠️ Warning: Can be exponential! Use only for small graphs.
        """
        in_deg = dict(self.in_degree)
        result = []
        current = []
        visited = set()
        
        def backtrack():
            if len(current) == len(self.nodes):
                result.append(list(current))
                return
            
            for node in self.nodes:
                if node not in visited and in_deg.get(node, 0) == 0:
                    # Choose
                    visited.add(node)
                    current.append(node)
                    for v in self.adj[node]:
                        in_deg[v] -= 1
                    
                    backtrack()
                    
                    # Unchoose
                    visited.remove(node)
                    current.pop()
                    for v in self.adj[node]:
                        in_deg[v] += 1
        
        backtrack()
        return result
    
    def get_levels(self) -> Dict[str, int]:
        """
        Get level/depth of each node (useful for parallel execution).
        
        Level 0: nodes with no dependencies (can run first)
        Level 1: depends only on level 0
        ...
        """
        order = self.sort_kahn()
        if not order:
            return {}
        
        levels = {}
        for node in order:
            max_dep_level = -1
            for other in self.nodes:
                if node in self.adj[other] and other in levels:
                    max_dep_level = max(max_dep_level, levels[other])
            levels[node] = max_dep_level + 1
        
        return levels


# ============================================
# REAL-WORLD: BUILD SYSTEM
# ============================================

def simulate_build_system(tasks: List[Task]) -> Optional[List[str]]:
    """
    Simulate a build system that respects dependencies.
    
    Args:
        tasks: List of build tasks with dependencies
    
    Returns:
        Build order, or None if circular dependency
    """
    ts = TopologicalSort()
    for task in tasks:
        ts.add_task(task)
    
    order = ts.sort_kahn()
    if order is None:
        cycle = ts.detect_cycle()
        print(f"❌ Circular dependency detected: {''.join(cycle)}")
        return None
    
    return order


def parallel_build_schedule(tasks: List[Task]) -> Dict[int, List[str]]:
    """
    Group tasks by level for parallel execution.
    
    Returns:
        {level: [tasks that can run in parallel]}
    """
    ts = TopologicalSort()
    for task in tasks:
        ts.add_task(task)
    
    levels = ts.get_levels()
    if not levels:
        return {}
    
    schedule = defaultdict(list)
    for task, level in levels.items():
        schedule[level].append(task)
    
    return dict(schedule)


# ============================================
# REAL-WORLD: COURSE PREREQUISITES
# ============================================

def plan_course_schedule(
    courses: List[str],
    prerequisites: List[Tuple[str, str]]
) -> Optional[List[str]]:
    """
    Plan course schedule respecting prerequisites.
    
    Args:
        courses: All available courses
        prerequisites: [(prereq, course)] - prereq must be taken before course
    
    Returns:
        Valid course order, or None if impossible
    """
    ts = TopologicalSort()
    
    # Add all courses (even those without prereqs)
    for course in courses:
        ts.nodes.add(course)
        if course not in ts.in_degree:
            ts.in_degree[course] = 0
    
    for prereq, course in prerequisites:
        ts.add_edge(prereq, course)
    
    return ts.sort_kahn()


# ============================================
# USAGE EXAMPLE
# ============================================

if __name__ == "__main__":
    # Example 1: Build System
    print("=== Build System ===")
    tasks = [
        Task("core.o", []),
        Task("utils.o", ["core.o"]),
        Task("network.o", ["core.o"]),
        Task("main.o", ["utils.o", "network.o"]),
        Task("app.exe", ["main.o"]),
    ]
    
    order = simulate_build_system(tasks)
    if order:
        print(f"Build order: {''.join(order)}")
    
    # Parallel schedule
    schedule = parallel_build_schedule(tasks)
    print("\nParallel build schedule:")
    for level, items in sorted(schedule.items()):
        print(f"  Stage {level}: {items} (can run in parallel)")
    
    # Example 2: Course Prerequisites
    print("\n=== Course Schedule ===")
    courses = ["Math101", "CS101", "CS201", "CS301", "Math201", "Physics101"]
    prereqs = [
        ("Math101", "CS101"),
        ("CS101", "CS201"),
        ("CS201", "CS301"),
        ("Math101", "Math201"),
        ("Math101", "Physics101"),
    ]
    
    plan = plan_course_schedule(courses, prereqs)
    if plan:
        print(f"Course order: {''.join(plan)}")
    
    # Example 3: Circular dependency
    print("\n=== Circular Dependency Detection ===")
    bad_tasks = [
        Task("A", ["C"]),
        Task("B", ["A"]),
        Task("C", ["B"]),  # A → B → C → A
    ]
    simulate_build_system(bad_tasks)

Kahn vs DFS Comparison

AspectKahn's (BFS)DFS-based
Intuition"What's ready now?""Go deep first"
Cycle detectionCheck result.size != VBack edge detection
Parallel schedulingNatural (levels)Needs extra work
ImplementationSlightly simplerMore elegant recursion
MemoryQueue + in-degreeCall stack + state

Complexity

AspectComplexity
TimeO(V+E)
SpaceO(V+E)

💡 HPN's Rule

"Thấy 'dependencies', 'prerequisites', 'ordering' → Topological Sort. Có cycle = Không có lời giải."