Giao diện
Thực hành: Generator Pipelines
🎯 Mục tiêu
🎯 Sau bài thực hành này, bạn sẽ:
- Xây dựng data pipeline hiệu quả với generator
- Sử dụng
yield fromđể delegate cho sub-generator - Hiểu cơ chế
send()để giao tiếp hai chiều với generator
Mô tả bài tập
Generator là công cụ mạnh mẽ để xử lý dữ liệu lớn mà không cần load toàn bộ vào bộ nhớ. Bài tập này hướng dẫn bạn xây dựng pipeline xử lý dữ liệu theo phong cách Unix pipes: mỗi bước nhận input, xử lý, và yield output cho bước tiếp theo.
Yêu cầu
Bài 1: Generator Pipeline Cơ Bản
Xây dựng pipeline đọc log file, lọc và chuyển đổi dữ liệu.
python
def read_lines(file_path: str):
"""Generator đọc file từng dòng — tiết kiệm bộ nhớ."""
# TODO: Mở file và yield từng dòng (strip whitespace)
pass
def filter_errors(lines):
"""Lọc chỉ những dòng chứa 'ERROR'."""
# TODO: yield dòng nếu chứa 'ERROR'
pass
def extract_message(lines):
"""Trích xuất phần message sau timestamp."""
# TODO: Parse "[2024-01-15 10:30:00] ERROR: message" -> "message"
pass
# Pipeline: đọc → lọc → trích xuất
pipeline = extract_message(filter_errors(read_lines("app.log")))
for msg in pipeline:
print(msg)Bài 2: yield from — Flatten Nested Data
Sử dụng yield from để làm phẳng cấu trúc dữ liệu lồng nhau.
python
def flatten(data):
"""Flatten nested lists/tuples ở mọi cấp độ."""
# TODO: Nếu item là list/tuple → yield from flatten(item)
# Nếu không → yield item
pass
# Test
nested = [1, [2, 3], [4, [5, 6]], [[7, 8], 9]]
print(list(flatten(nested)))
# Output: [1, 2, 3, 4, 5, 6, 7, 8, 9]
def chain_iterables(*iterables):
"""Nối nhiều iterable lại — tương tự itertools.chain."""
# TODO: Dùng yield from cho mỗi iterable
pass
print(list(chain_iterables([1, 2], [3, 4], [5])))
# Output: [1, 2, 3, 4, 5]Bài 3: Coroutine với send()
Xây dựng running average calculator sử dụng send().
python
def running_average():
"""Coroutine tính trung bình cộng liên tục."""
total = 0.0
count = 0
average = None
while True:
# TODO: value = yield average
# Cập nhật total, count, average
pass
# Sử dụng
avg = running_average()
next(avg) # Prime the coroutine
print(avg.send(10)) # 10.0
print(avg.send(20)) # 15.0
print(avg.send(30)) # 20.0Gợi ý
Gợi ý Bài 1
read_lines: dùngwith open(path) as f:rồiyield line.strip()cho mỗi dòngfilter_errors: dùngif 'ERROR' in line: yield line- Pipeline kết nối bằng cách truyền generator làm argument cho generator tiếp theo
Gợi ý Bài 2
isinstance(item, (list, tuple))để kiểm tra kiểu dữ liệuyield from flatten(item)để đệ quy flattenyield fromdelegate toàn bộ iteration cho sub-generator
Gợi ý Bài 3
- Coroutine cần được "prime" bằng
next(gen)trước khisend() value = yield result— yield result ra ngoài, nhận value từ send()total += value,count += 1,average = total / count
Lời giải tham khảo
Xem lời giải
python
def read_lines(file_path: str):
with open(file_path, "r", encoding="utf-8") as f:
yield from (line.strip() for line in f)
def filter_errors(lines):
yield from (line for line in lines if "ERROR" in line)
def extract_message(lines):
yield from (line.split("] ERROR: ", 1)[1] for line in lines if "] ERROR: " in line)
def flatten(data):
for item in data:
if isinstance(item, (list, tuple)): yield from flatten(item)
else: yield item
def chain_iterables(*iterables):
for it in iterables: yield from it
def running_average():
total = count = 0; average = None
while True:
value = yield average
total += value; count += 1; average = total / count