Giao diện
Topological Sort - The Dependency Resolver
"Mọi build system, package manager, và CI/CD pipeline đều dựa trên Topological Sort." - HPN
Problem Statement
Cho một Directed Acyclic Graph (DAG), sắp xếp các đỉnh sao cho với mọi cạnh u → v, đỉnh u luôn đứng trước đỉnh v trong thứ tự sắp xếp.
A → B → D
↓ ↓
C → E
Topological Order: A → B → C → D → E
hoặc A → C → B → D → E
hoặc A → C → B → E → D
(nhiều kết quả hợp lệ)📘 Tại sao phải DAG?
Nếu có cycle (A → B → C → A), không thể sắp xếp! Task A cần B hoàn thành trước, B cần C, C cần A → Deadlock!
Real-World Applications
| Domain | Use Case | Chi tiết |
|---|---|---|
| Build Systems | make, gradle, webpack | Compile dependencies trước |
| Package Managers | npm, pip, apt | Install dependencies đúng thứ tự |
| CI/CD | GitHub Actions, Jenkins | Job dependencies |
| Course Prerequisites | University curriculum | Học môn cơ sở trước môn chuyên ngành |
| Spreadsheet | Excel cell references | Tính cell phụ thuộc trước |
| Task Scheduling | PERT/CPM | Project management |
DAG Visualization
Build Order: core.o → utils.o → network.o → main.cpp → executable
Two Approaches
1. Kahn's Algorithm (BFS-based)
2. DFS-based (Tarjan's variant)
- DFS từ mỗi đỉnh chưa visit
- Khi hoàn thành DFS của một đỉnh → Push vào stack
- Kết quả = Reverse của stack
Kahn's Algorithm Implementation
python
from typing import List, Tuple, Optional
from collections import deque, defaultdict
def topological_sort_kahn(
V: int,
edges: List[Tuple[int, int]]
) -> Optional[List[int]]:
"""
Kahn's Algorithm - BFS-based Topological Sort.
Args:
V: Number of vertices (0 to V-1)
edges: List of (u, v) meaning u → v (u must come before v)
Returns:
Topological order, or None if cycle exists
Time: O(V + E)
Space: O(V + E)
"""
# Build adjacency list and in-degree array
adj = defaultdict(list)
in_degree = [0] * V
for u, v in edges:
adj[u].append(v)
in_degree[v] += 1
# Initialize queue with nodes having in-degree 0
queue = deque()
for i in range(V):
if in_degree[i] == 0:
queue.append(i)
result = []
while queue:
u = queue.popleft()
result.append(u)
for v in adj[u]:
in_degree[v] -= 1
if in_degree[v] == 0:
queue.append(v)
# Check for cycle
if len(result) != V:
return None # Cycle detected!
return resultDFS-based Implementation
python
from typing import List, Tuple, Optional
from collections import defaultdict
from enum import Enum
class NodeState(Enum):
UNVISITED = 0
VISITING = 1 # In current DFS path
VISITED = 2 # Completed
def topological_sort_dfs(
V: int,
edges: List[Tuple[int, int]]
) -> Optional[List[int]]:
"""
DFS-based Topological Sort with cycle detection.
Returns:
Topological order, or None if cycle exists
Time: O(V + E)
Space: O(V + E)
"""
adj = defaultdict(list)
for u, v in edges:
adj[u].append(v)
state = [NodeState.UNVISITED] * V
result = []
def dfs(u: int) -> bool:
"""Returns False if cycle detected."""
if state[u] == NodeState.VISITING:
return False # Back edge → Cycle!
if state[u] == NodeState.VISITED:
return True
state[u] = NodeState.VISITING
for v in adj[u]:
if not dfs(v):
return False
state[u] = NodeState.VISITED
result.append(u)
return True
for i in range(V):
if state[i] == NodeState.UNVISITED:
if not dfs(i):
return None # Cycle detected!
result.reverse()
return resultProduction Code
python
# HPN Engineering Standard
# Implementation: Topological Sort - Full Featured
from typing import List, Tuple, Optional, Dict, Set
from collections import deque, defaultdict
from dataclasses import dataclass
from enum import Enum
@dataclass
class Task:
"""Represents a task with dependencies."""
name: str
dependencies: List[str]
class TopologicalSort:
"""
Production-ready Topological Sort implementation.
Features:
- Kahn's algorithm (BFS)
- DFS with cycle detection
- All valid orderings
- Critical path analysis
- String-labeled nodes support
"""
def __init__(self):
self.adj: Dict[str, List[str]] = defaultdict(list)
self.in_degree: Dict[str, int] = defaultdict(int)
self.nodes: Set[str] = set()
def add_edge(self, u: str, v: str):
"""Add dependency: u must complete before v."""
self.adj[u].append(v)
self.in_degree[v] += 1
if u not in self.in_degree:
self.in_degree[u] = 0
self.nodes.add(u)
self.nodes.add(v)
def add_task(self, task: Task):
"""Add task with its dependencies."""
self.nodes.add(task.name)
if task.name not in self.in_degree:
self.in_degree[task.name] = 0
for dep in task.dependencies:
self.add_edge(dep, task.name)
def sort_kahn(self) -> Optional[List[str]]:
"""
Kahn's Algorithm - Returns one valid topological order.
Returns None if cycle exists.
"""
in_deg = dict(self.in_degree)
queue = deque()
for node in self.nodes:
if in_deg.get(node, 0) == 0:
queue.append(node)
result = []
while queue:
u = queue.popleft()
result.append(u)
for v in self.adj[u]:
in_deg[v] -= 1
if in_deg[v] == 0:
queue.append(v)
if len(result) != len(self.nodes):
return None
return result
def detect_cycle(self) -> Optional[List[str]]:
"""
Detect and return cycle if exists.
Returns: Cycle path, or None if no cycle.
"""
WHITE, GRAY, BLACK = 0, 1, 2
color = {node: WHITE for node in self.nodes}
parent = {node: None for node in self.nodes}
def dfs(u: str) -> Optional[List[str]]:
color[u] = GRAY
for v in self.adj[u]:
if color[v] == GRAY:
# Found cycle! Reconstruct it.
cycle = [v, u]
curr = u
while parent[curr] and parent[curr] != v:
cycle.append(parent[curr])
curr = parent[curr]
cycle.append(v)
return list(reversed(cycle))
if color[v] == WHITE:
parent[v] = u
result = dfs(v)
if result:
return result
color[u] = BLACK
return None
for node in self.nodes:
if color[node] == WHITE:
cycle = dfs(node)
if cycle:
return cycle
return None
def all_topological_orders(self) -> List[List[str]]:
"""
Generate ALL valid topological orderings.
⚠️ Warning: Can be exponential! Use only for small graphs.
"""
in_deg = dict(self.in_degree)
result = []
current = []
visited = set()
def backtrack():
if len(current) == len(self.nodes):
result.append(list(current))
return
for node in self.nodes:
if node not in visited and in_deg.get(node, 0) == 0:
# Choose
visited.add(node)
current.append(node)
for v in self.adj[node]:
in_deg[v] -= 1
backtrack()
# Unchoose
visited.remove(node)
current.pop()
for v in self.adj[node]:
in_deg[v] += 1
backtrack()
return result
def get_levels(self) -> Dict[str, int]:
"""
Get level/depth of each node (useful for parallel execution).
Level 0: nodes with no dependencies (can run first)
Level 1: depends only on level 0
...
"""
order = self.sort_kahn()
if not order:
return {}
levels = {}
for node in order:
max_dep_level = -1
for other in self.nodes:
if node in self.adj[other] and other in levels:
max_dep_level = max(max_dep_level, levels[other])
levels[node] = max_dep_level + 1
return levels
# ============================================
# REAL-WORLD: BUILD SYSTEM
# ============================================
def simulate_build_system(tasks: List[Task]) -> Optional[List[str]]:
"""
Simulate a build system that respects dependencies.
Args:
tasks: List of build tasks with dependencies
Returns:
Build order, or None if circular dependency
"""
ts = TopologicalSort()
for task in tasks:
ts.add_task(task)
order = ts.sort_kahn()
if order is None:
cycle = ts.detect_cycle()
print(f"❌ Circular dependency detected: {' → '.join(cycle)}")
return None
return order
def parallel_build_schedule(tasks: List[Task]) -> Dict[int, List[str]]:
"""
Group tasks by level for parallel execution.
Returns:
{level: [tasks that can run in parallel]}
"""
ts = TopologicalSort()
for task in tasks:
ts.add_task(task)
levels = ts.get_levels()
if not levels:
return {}
schedule = defaultdict(list)
for task, level in levels.items():
schedule[level].append(task)
return dict(schedule)
# ============================================
# REAL-WORLD: COURSE PREREQUISITES
# ============================================
def plan_course_schedule(
courses: List[str],
prerequisites: List[Tuple[str, str]]
) -> Optional[List[str]]:
"""
Plan course schedule respecting prerequisites.
Args:
courses: All available courses
prerequisites: [(prereq, course)] - prereq must be taken before course
Returns:
Valid course order, or None if impossible
"""
ts = TopologicalSort()
# Add all courses (even those without prereqs)
for course in courses:
ts.nodes.add(course)
if course not in ts.in_degree:
ts.in_degree[course] = 0
for prereq, course in prerequisites:
ts.add_edge(prereq, course)
return ts.sort_kahn()
# ============================================
# USAGE EXAMPLE
# ============================================
if __name__ == "__main__":
# Example 1: Build System
print("=== Build System ===")
tasks = [
Task("core.o", []),
Task("utils.o", ["core.o"]),
Task("network.o", ["core.o"]),
Task("main.o", ["utils.o", "network.o"]),
Task("app.exe", ["main.o"]),
]
order = simulate_build_system(tasks)
if order:
print(f"Build order: {' → '.join(order)}")
# Parallel schedule
schedule = parallel_build_schedule(tasks)
print("\nParallel build schedule:")
for level, items in sorted(schedule.items()):
print(f" Stage {level}: {items} (can run in parallel)")
# Example 2: Course Prerequisites
print("\n=== Course Schedule ===")
courses = ["Math101", "CS101", "CS201", "CS301", "Math201", "Physics101"]
prereqs = [
("Math101", "CS101"),
("CS101", "CS201"),
("CS201", "CS301"),
("Math101", "Math201"),
("Math101", "Physics101"),
]
plan = plan_course_schedule(courses, prereqs)
if plan:
print(f"Course order: {' → '.join(plan)}")
# Example 3: Circular dependency
print("\n=== Circular Dependency Detection ===")
bad_tasks = [
Task("A", ["C"]),
Task("B", ["A"]),
Task("C", ["B"]), # A → B → C → A
]
simulate_build_system(bad_tasks)Kahn vs DFS Comparison
| Aspect | Kahn's (BFS) | DFS-based |
|---|---|---|
| Intuition | "What's ready now?" | "Go deep first" |
| Cycle detection | Check result.size != V | Back edge detection |
| Parallel scheduling | Natural (levels) | Needs extra work |
| Implementation | Slightly simpler | More elegant recursion |
| Memory | Queue + in-degree | Call stack + state |
Complexity
| Aspect | Complexity |
|---|---|
| Time | |
| Space |
💡 HPN's Rule
"Thấy 'dependencies', 'prerequisites', 'ordering' → Topological Sort. Có cycle = Không có lời giải."