Skip to content

Thực hành: Generator Pipelines

🎯 Mục tiêu

🎯 Sau bài thực hành này, bạn sẽ:

  • Xây dựng data pipeline hiệu quả với generator
  • Sử dụng yield from để delegate cho sub-generator
  • Hiểu cơ chế send() để giao tiếp hai chiều với generator

Mô tả bài tập

Generator là công cụ mạnh mẽ để xử lý dữ liệu lớn mà không cần load toàn bộ vào bộ nhớ. Bài tập này hướng dẫn bạn xây dựng pipeline xử lý dữ liệu theo phong cách Unix pipes: mỗi bước nhận input, xử lý, và yield output cho bước tiếp theo.

Yêu cầu

Bài 1: Generator Pipeline Cơ Bản

Xây dựng pipeline đọc log file, lọc và chuyển đổi dữ liệu.

python
def read_lines(file_path: str):
    """Generator đọc file từng dòng — tiết kiệm bộ nhớ."""
    # TODO: Mở file và yield từng dòng (strip whitespace)
    pass

def filter_errors(lines):
    """Lọc chỉ những dòng chứa 'ERROR'."""
    # TODO: yield dòng nếu chứa 'ERROR'
    pass

def extract_message(lines):
    """Trích xuất phần message sau timestamp."""
    # TODO: Parse "[2024-01-15 10:30:00] ERROR: message" -> "message"
    pass

# Pipeline: đọc → lọc → trích xuất
pipeline = extract_message(filter_errors(read_lines("app.log")))
for msg in pipeline:
    print(msg)

Bài 2: yield from — Flatten Nested Data

Sử dụng yield from để làm phẳng cấu trúc dữ liệu lồng nhau.

python
def flatten(data):
    """Flatten nested lists/tuples ở mọi cấp độ."""
    # TODO: Nếu item là list/tuple → yield from flatten(item)
    # Nếu không → yield item
    pass

# Test
nested = [1, [2, 3], [4, [5, 6]], [[7, 8], 9]]
print(list(flatten(nested)))
# Output: [1, 2, 3, 4, 5, 6, 7, 8, 9]

def chain_iterables(*iterables):
    """Nối nhiều iterable lại — tương tự itertools.chain."""
    # TODO: Dùng yield from cho mỗi iterable
    pass

print(list(chain_iterables([1, 2], [3, 4], [5])))
# Output: [1, 2, 3, 4, 5]

Bài 3: Coroutine với send()

Xây dựng running average calculator sử dụng send().

python
def running_average():
    """Coroutine tính trung bình cộng liên tục."""
    total = 0.0
    count = 0
    average = None
    while True:
        # TODO: value = yield average
        # Cập nhật total, count, average
        pass

# Sử dụng
avg = running_average()
next(avg)             # Prime the coroutine
print(avg.send(10))   # 10.0
print(avg.send(20))   # 15.0
print(avg.send(30))   # 20.0

Gợi ý

Gợi ý Bài 1
  • read_lines: dùng with open(path) as f: rồi yield line.strip() cho mỗi dòng
  • filter_errors: dùng if 'ERROR' in line: yield line
  • Pipeline kết nối bằng cách truyền generator làm argument cho generator tiếp theo
Gợi ý Bài 2
  • isinstance(item, (list, tuple)) để kiểm tra kiểu dữ liệu
  • yield from flatten(item) để đệ quy flatten
  • yield from delegate toàn bộ iteration cho sub-generator
Gợi ý Bài 3
  • Coroutine cần được "prime" bằng next(gen) trước khi send()
  • value = yield result — yield result ra ngoài, nhận value từ send()
  • total += value, count += 1, average = total / count

Lời giải tham khảo

Xem lời giải
python
def read_lines(file_path: str):
    with open(file_path, "r", encoding="utf-8") as f:
        yield from (line.strip() for line in f)
def filter_errors(lines):
    yield from (line for line in lines if "ERROR" in line)
def extract_message(lines):
    yield from (line.split("] ERROR: ", 1)[1] for line in lines if "] ERROR: " in line)
def flatten(data):
    for item in data:
        if isinstance(item, (list, tuple)): yield from flatten(item)
        else: yield item
def chain_iterables(*iterables):
    for it in iterables: yield from it
def running_average():
    total = count = 0; average = None
    while True:
        value = yield average
        total += value; count += 1; average = total / count