Skip to content

Generators — Xử lý dữ liệu khổng lồ với bộ nhớ tối thiểu

Hệ thống của bạn cần xử lý file log 50GB mỗi ngày. RAM server chỉ có 4GB. Nếu bạn load toàn bộ file vào memory, process sẽ bị OOM-killed trước khi kịp xử lý dòng đầu tiên. Đây không phải bài toán lý thuyết — đây là thực tế hàng ngày của mọi data pipeline trong production. Generator là câu trả lời mà Python đưa ra cho bài toán này: xử lý dữ liệu streaming, từng phần tử một, với memory footprint gần như bằng không.

Generator không chỉ là công cụ tiết kiệm RAM. Cơ chế yield — khả năng suspend và resume execution — chính là nền tảng mà toàn bộ hệ sinh thái async của Python được xây dựng lên. Từ asyncio đến các web framework như FastAPI, tất cả đều dựa trên generator mechanics ở tầng thấp nhất. Hiểu generator là hiểu cách Python thực sự điều phối luồng thực thi.

Bài viết này sẽ đưa bạn từ cơ chế yield cơ bản đến việc xây dựng generator pipeline hoàn chỉnh cho ETL trong production — nơi mà mỗi stage chỉ giữ đúng một phần tử trong memory tại bất kỳ thời điểm nào.


Bức tranh tư duy

Hãy hình dung một quán sushi băng chuyền (kaiten-zushi). Nhà bếp không dọn tất cả 200 đĩa sushi lên bàn cùng lúc — làm vậy thì bàn sẽ sập và sushi nguội ngắt trước khi khách kịp ăn. Thay vào đó, đầu bếp đặt từng đĩa một lên băng chuyền. Khách ngồi quanh quầy, thấy đĩa nào muốn ăn thì lấy đĩa đó. Đĩa nào chưa được lấy thì tiếp tục chạy trên băng chuyền. Đầu bếp chỉ chuẩn bị đĩa tiếp theo khi có chỗ trống trên băng chuyền.

Đây chính xác là cách generator hoạt động. Hàm generator là nhà bếp — nó biết công thức làm ra toàn bộ dữ liệu, nhưng chỉ "nấu" từng phần tử khi được yêu cầu. Mỗi lệnh yield là hành động đặt một đĩa lên băng chuyền rồi tạm dừng, chờ consumer (khách) lấy đĩa đó đi. Consumer gọi next() — tương đương với việc lấy đĩa tiếp theo khỏi băng chuyền — thì nhà bếp mới tiếp tục chuẩn bị đĩa kế.

Mô hình này mang lại hai lợi thế quan trọng. Thứ nhất, bộ nhớ không đổi — dù thực đơn có 10 hay 10 triệu đĩa, tại mỗi thời điểm chỉ có một đĩa trên băng chuyền. Thứ hai, nhà bếp có thể dừng bất cứ lúc nào — nếu khách no rồi (consumer ngừng gọi next()), nhà bếp không cần nấu nốt 199 đĩa còn lại. Đó chính là lazy evaluation — chỉ tính toán khi thực sự cần.


Cốt lõi kỹ thuật

yield và cơ chế suspend/resume

Từ khóa yield biến một hàm thông thường thành generator function. Khi gọi hàm này, Python không thực thi body ngay lập tức — thay vào đó trả về một generator object. Code chỉ chạy khi consumer gọi next(), và sẽ tạm dừng ngay tại vị trí yield, giữ nguyên toàn bộ local state.

python
def dem_nguoc(n: int):
    print(f"Bắt đầu đếm ngược từ {n}")
    while n > 0:
        yield n           # Tạm dừng, trả n cho consumer
        n -= 1            # Chạy tiếp khi next() được gọi lần sau
        print(f"Tiếp tục, n = {n}")
    print("Hết!")

gen = dem_nguoc(3)        # Chưa in gì cả — chỉ tạo generator object
print(type(gen))          # <class 'generator'>

print(next(gen))          # "Bắt đầu đếm ngược từ 3" → 3
print(next(gen))          # "Tiếp tục, n = 2" → 2
print(next(gen))          # "Tiếp tục, n = 1" → 1
# next(gen)               # "Tiếp tục, n = 0" → "Hết!" → StopIteration

Điểm then chốt: giữa hai lần gọi next(), biến n vẫn tồn tại trong execution frame của generator. Generator không phải chạy lại từ đầu — nó resume đúng tại dòng sau yield.

So sánh memory footprint giữa list và generator:

python
import sys

danh_sach = [i ** 2 for i in range(1_000_000)]
generator = (i ** 2 for i in range(1_000_000))

print(f"List:      {sys.getsizeof(danh_sach):>12,} bytes")  # ~8,448,728 bytes
print(f"Generator: {sys.getsizeof(generator):>12,} bytes")  # ~        200 bytes

Generator expressions — cú pháp gọn

Generator expression có cú pháp giống list comprehension, nhưng dùng () thay vì []. Kết quả là một generator object thay vì một list đầy đủ trong memory.

python
# List comprehension — tạo toàn bộ list ngay lập tức
binh_phuong_list = [x ** 2 for x in range(10_000_000)]

# Generator expression — không tính gì cho đến khi iterate
binh_phuong_gen = (x ** 2 for x in range(10_000_000))

# Khi truyền vào hàm, có thể bỏ cặp ngoặc ngoài
tong = sum(x ** 2 for x in range(10_000_000))

# Có thể chain điều kiện giống list comprehension
ket_qua = sum(
    len(line)
    for line in open("data.txt", encoding="utf-8")
    if not line.startswith("#")
)

Generator expression phù hợp cho các phép tính một lần duyệt (single-pass) như sum(), max(), min(), any(), all(). Nếu cần duyệt nhiều lần, hãy dùng list comprehension hoặc wrap generator trong hàm factory.

send(), throw(), close() — giao tiếp hai chiều

Generator không chỉ phát ra dữ liệu — nó còn có thể nhận vào dữ liệu từ consumer thông qua send(). Đây là cơ chế biến generator thành coroutine — nền tảng của async programming.

python
def running_average():
    """Generator tính trung bình cộng liên tục."""
    total = 0.0
    count = 0
    average = None
    while True:
        value = yield average   # Nhận value từ send(), trả average
        if value is None:
            break
        total += value
        count += 1
        average = total / count

# Khởi tạo — phải gọi next() trước (prime the generator)
avg = running_average()
next(avg)                 # Chạy đến yield đầu tiên, trả None

print(avg.send(10))       # 10.0
print(avg.send(20))       # 15.0
print(avg.send(30))       # 20.0
print(avg.send(40))       # 25.0

throw() cho phép inject exception vào generator tại điểm yield hiện tại:

python
def resilient_reader():
    """Generator có thể xử lý lỗi từ bên ngoài."""
    while True:
        try:
            line = yield
            print(f"Xử lý: {line}")
        except ValueError as e:
            print(f"Bỏ qua dòng lỗi: {e}")

reader = resilient_reader()
next(reader)

reader.send("dòng hợp lệ")       # Xử lý: dòng hợp lệ
reader.throw(ValueError, "dữ liệu sai định dạng")  # Bỏ qua dòng lỗi: ...
reader.send("dòng tiếp theo")     # Xử lý: dòng tiếp theo
reader.close()                    # Kết thúc generator, raise GeneratorExit bên trong

close() gửi GeneratorExit vào generator, cho phép nó chạy các khối finally để dọn dẹp tài nguyên. Luôn gọi close() khi kết thúc sớm một generator đang giữ tài nguyên.

yield from — delegation

yield from delegate toàn bộ quá trình iteration cho một sub-iterable, bao gồm cả việc forward send(), throw(), và capture return value.

python
def doc_nhieu_file(danh_sach_file: list[str]):
    """Đọc nhiều file nối tiếp nhau, mỗi file là một sub-generator."""
    for filepath in danh_sach_file:
        yield from doc_tung_dong(filepath)

def doc_tung_dong(filepath: str):
    """Đọc từng dòng của một file."""
    with open(filepath, encoding="utf-8") as f:
        for line in f:
            yield line.rstrip("\n")

# Consumer thấy một stream duy nhất, không cần biết có bao nhiêu file
for dong in doc_nhieu_file(["log_01.csv", "log_02.csv", "log_03.csv"]):
    xu_ly(dong)

yield from cũng capture giá trị return từ sub-generator:

python
def tinh_tong_phan():
    """Sub-generator tính tổng, return kết quả cuối."""
    total = 0
    while True:
        value = yield
        if value is None:
            return total    # Giá trị này được yield from capture
        total += value

def tong_hop():
    """Delegator thu thập kết quả từ nhiều sub-generator."""
    ket_qua_1 = yield from tinh_tong_phan()
    ket_qua_2 = yield from tinh_tong_phan()
    yield f"Phần 1: {ket_qua_1}, Phần 2: {ket_qua_2}"

gen = tong_hop()
next(gen)

# Gửi dữ liệu cho sub-generator thứ nhất
gen.send(10)
gen.send(20)
gen.send(None)        # Kết thúc sub-generator 1 → return 30

# Tự động chuyển sang sub-generator thứ hai
gen.send(5)
gen.send(15)
print(gen.send(None)) # "Phần 1: 30, Phần 2: 20"

Generator pipelines — chuỗi xử lý dữ liệu

Pipeline pattern kết nối nhiều generator thành một chuỗi xử lý — output của stage trước là input của stage sau. Mỗi stage chỉ giữ đúng một phần tử trong memory tại bất kỳ thời điểm nào.

python
from typing import Iterator

def doc_file(filepath: str) -> Iterator[str]:
    """Stage 1: Đọc từng dòng."""
    with open(filepath, encoding="utf-8") as f:
        for line in f:
            yield line.rstrip("\n")

def loc_dong_trong(lines: Iterator[str]) -> Iterator[str]:
    """Stage 2: Bỏ dòng trống."""
    for line in lines:
        if line.strip():
            yield line

def tach_truong(lines: Iterator[str], delimiter: str = ",") -> Iterator[list[str]]:
    """Stage 3: Tách thành fields."""
    for line in lines:
        yield line.split(delimiter)

def chuyen_kieu(rows: Iterator[list[str]]) -> Iterator[dict]:
    """Stage 4: Chuyển đổi kiểu dữ liệu."""
    for row in rows:
        if len(row) >= 3:
            yield {
                "timestamp": row[0].strip(),
                "level": row[1].strip(),
                "message": row[2].strip(),
            }

# Kết nối pipeline — không dòng nào nằm trong RAM quá thời gian xử lý
pipeline = chuyen_kieu(
    tach_truong(
        loc_dong_trong(
            doc_file("application.log")
        ),
        delimiter="|"
    )
)

for record in pipeline:
    print(record)

Mỗi lần vòng for gọi next() trên pipeline, tín hiệu truyền ngược qua toàn bộ chuỗi: chuyen_kieu yêu cầu tach_truong, tach_truong yêu cầu loc_dong_trong, loc_dong_trong yêu cầu doc_file. Chỉ khi doc_file yield một dòng, toàn bộ chuỗi mới xử lý dòng đó xuyên suốt.


Thực chiến

Bài toán: Streaming ETL Pipeline cho dữ liệu bán hàng

Hệ thống nhận file CSV hàng ngày chứa dữ liệu giao dịch bán hàng — file có thể lên đến hàng GB. Yêu cầu: đọc file, lọc bỏ record không hợp lệ, chuẩn hóa dữ liệu, tính toán metrics, và ghi ra file output. Toàn bộ pipeline phải chạy với memory footprint cố định bất kể kích thước file.

python
import csv
import json
from datetime import datetime
from typing import Iterator, TextIO
from dataclasses import dataclass, asdict


@dataclass
class SaleRecord:
    transaction_id: str
    timestamp: datetime
    product_code: str
    quantity: int
    unit_price: float
    total: float


# === STAGE 1: Extract — Đọc CSV streaming ===

def extract_csv(filepath: str) -> Iterator[dict[str, str]]:
    """Đọc CSV từng dòng, yield dict cho mỗi row."""
    with open(filepath, encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row_number, row in enumerate(reader, start=2):
            row["_row_number"] = str(row_number)
            yield row


# === STAGE 2: Validate — Lọc record không hợp lệ ===

def validate_records(
    rows: Iterator[dict[str, str]],
    error_log: TextIO,
) -> Iterator[dict[str, str]]:
    """Bỏ qua record thiếu trường hoặc dữ liệu sai kiểu."""
    required_fields = {"transaction_id", "timestamp", "product_code",
                       "quantity", "unit_price"}
    for row in rows:
        row_num = row.get("_row_number", "?")

        # Kiểm tra trường bắt buộc
        missing = required_fields - row.keys()
        if missing:
            error_log.write(f"Row {row_num}: thiếu trường {missing}\n")
            continue

        # Kiểm tra kiểu dữ liệu cơ bản
        try:
            int(row["quantity"])
            float(row["unit_price"])
        except (ValueError, KeyError):
            error_log.write(f"Row {row_num}: dữ liệu số không hợp lệ\n")
            continue

        yield row


# === STAGE 3: Transform — Chuẩn hóa và tính toán ===

def transform_records(
    rows: Iterator[dict[str, str]],
) -> Iterator[SaleRecord]:
    """Chuyển raw dict thành SaleRecord với business logic."""
    for row in rows:
        quantity = int(row["quantity"])
        unit_price = float(row["unit_price"])

        record = SaleRecord(
            transaction_id=row["transaction_id"].strip().upper(),
            timestamp=datetime.strptime(
                row["timestamp"].strip(), "%Y-%m-%d %H:%M:%S"
            ),
            product_code=row["product_code"].strip().upper(),
            quantity=quantity,
            unit_price=round(unit_price, 2),
            total=round(quantity * unit_price, 2),
        )
        yield record


# === STAGE 4: Enrich — Thêm metadata ===

def enrich_records(
    records: Iterator[SaleRecord],
) -> Iterator[dict]:
    """Thêm trường phân tích: giờ trong ngày, mức giá trị."""
    for record in records:
        enriched = asdict(record)
        enriched["timestamp"] = record.timestamp.isoformat()
        enriched["hour_of_day"] = record.timestamp.hour

        if record.total >= 1_000:
            enriched["tier"] = "high_value"
        elif record.total >= 100:
            enriched["tier"] = "medium_value"
        else:
            enriched["tier"] = "low_value"

        yield enriched


# === STAGE 5: Load — Ghi ra file JSON Lines ===

def load_jsonl(records: Iterator[dict], output_path: str) -> int:
    """Ghi từng record thành một dòng JSON. Trả về số record đã ghi."""
    count = 0
    with open(output_path, "w", encoding="utf-8") as f:
        for record in records:
            f.write(json.dumps(record, ensure_ascii=False) + "\n")
            count += 1
    return count


# === Kết nối toàn bộ pipeline ===

def run_etl(input_csv: str, output_jsonl: str, error_file: str) -> None:
    """Chạy ETL pipeline end-to-end."""
    with open(error_file, "w", encoding="utf-8") as err_log:
        pipeline = enrich_records(
            transform_records(
                validate_records(
                    extract_csv(input_csv),
                    error_log=err_log,
                )
            )
        )
        total = load_jsonl(pipeline, output_jsonl)
    print(f"ETL hoàn tất: {total} records được ghi vào {output_jsonl}")


# Sử dụng
# run_etl("sales_2024.csv", "sales_clean.jsonl", "etl_errors.log")

So sánh memory: cách tiếp cận list-based load toàn bộ file vào RAM trước khi xử lý — file 2GB cần ít nhất 2GB RAM. Pipeline generator trên chỉ giữ một record tại mỗi thời điểm, memory footprint cố định khoảng vài KB bất kể file lớn cỡ nào.

python
# === So sánh memory: list-based vs generator-based ===
import tracemalloc

def etl_list_based(filepath: str) -> list[dict]:
    """Cách truyền thống — load tất cả vào memory."""
    with open(filepath, encoding="utf-8", newline="") as f:
        all_rows = list(csv.DictReader(f))       # Toàn bộ file trong RAM

    valid = [r for r in all_rows if r.get("quantity", "").isdigit()]
    transformed = []
    for r in valid:
        transformed.append({
            "id": r["transaction_id"],
            "total": int(r["quantity"]) * float(r["unit_price"]),
        })
    return transformed

# Đo list-based
tracemalloc.start()
# result = etl_list_based("sales_2024.csv")
# peak_list = tracemalloc.get_traced_memory()[1]
tracemalloc.stop()

# Đo generator-based
tracemalloc.start()
# run_etl("sales_2024.csv", "output.jsonl", "errors.log")
# peak_gen = tracemalloc.get_traced_memory()[1]
tracemalloc.stop()

# Kết quả điển hình với file 1 triệu dòng:
# List-based:      ~850 MB peak
# Generator-based: ~  3 MB peak (cố định)

Sai lầm điển hình

1. Generator exhaustion — duyệt lần hai mất dữ liệu

python
# ❌ SAI: Duyệt generator hai lần
data = (x ** 2 for x in range(5))

tong = sum(data)          # 0 + 1 + 4 + 9 + 16 = 30
trung_binh = sum(data) / 5  # sum(data) = 0 → ZeroDivisionError hoặc kết quả sai!

Hậu quả production: Pipeline chạy đúng lần đầu, nhưng khi retry hoặc khi downstream consumer duyệt lại — kết quả rỗng. Bug này cực khó debug vì không có lỗi rõ ràng.

python
# ✅ ĐÚNG: Dùng hàm factory tạo generator mới mỗi lần
def binh_phuong():
    return (x ** 2 for x in range(5))

tong = sum(binh_phuong())
trung_binh = sum(binh_phuong()) / 5

# Hoặc: materialize thành list nếu dữ liệu đủ nhỏ
data_list = list(binh_phuong())
tong = sum(data_list)
trung_binh = sum(data_list) / len(data_list)

2. Nhầm lẫn returnyield trong generator

python
# ❌ SAI: Dùng return trong generator — chỉ trả danh sách cuối cùng
def lay_so_chan_sai(numbers: list[int]):
    result = []
    for n in numbers:
        if n % 2 == 0:
            result.append(n)
    return result    # Đây là return bình thường, KHÔNG phải generator

# Hàm trên trả về list ngay lập tức, load toàn bộ vào RAM

Hậu quả production: Developer nghĩ đã dùng lazy evaluation nhưng thực tế load toàn bộ vào RAM — chỉ phát hiện khi file đủ lớn để OOM.

python
# ✅ ĐÚNG: yield từng phần tử
def lay_so_chan(numbers):
    for n in numbers:
        if n % 2 == 0:
            yield n   # Lazy — chỉ giữ 1 phần tử trong memory

# Sử dụng
for so in lay_so_chan(range(10_000_000)):
    xu_ly(so)

3. Thay đổi nguồn dữ liệu trong khi đang iterate

python
# ❌ SAI: Thêm phần tử vào dict/set trong khi iterate
config = {"a": 1, "b": 2, "c": 3}

def cap_nhat_config(cfg: dict):
    for key in cfg:        # Iterator trên dict
        if cfg[key] < 2:
            cfg["new_" + key] = cfg[key] * 10  # RuntimeError!
        yield key, cfg[key]

Hậu quả production: RuntimeError: dictionary changed size during iteration — crash giữa chừng pipeline, dữ liệu bị xử lý dở.

python
# ✅ ĐÚNG: Tách đọc và ghi thành hai phase
def cap_nhat_config_an_toan(cfg: dict):
    updates = {}
    for key in cfg:
        if cfg[key] < 2:
            updates["new_" + key] = cfg[key] * 10
        yield key, cfg[key]
    cfg.update(updates)   # Cập nhật sau khi iterate xong

4. Không đóng generator giữ tài nguyên — resource leak

python
# ❌ SAI: Generator mở file nhưng consumer break sớm
def doc_log(filepath: str):
    f = open(filepath, encoding="utf-8")
    for line in f:
        yield line.strip()
    f.close()    # Nếu consumer break sớm, dòng này KHÔNG BAO GIỜ chạy!

gen = doc_log("server.log")
for line in gen:
    if "CRITICAL" in line:
        print(line)
        break    # File handle bị leak!

Hậu quả production: Hàng trăm file handle bị leak, hệ thống hết file descriptor, toàn bộ service ngưng phục vụ.

python
# ✅ ĐÚNG: Dùng try/finally hoặc contextmanager
def doc_log_an_toan(filepath: str):
    f = open(filepath, encoding="utf-8")
    try:
        for line in f:
            yield line.strip()
    finally:
        f.close()    # Luôn chạy, kể cả khi consumer break hoặc close()

# Hoặc tốt hơn: dùng with statement
def doc_log_tot_nhat(filepath: str):
    with open(filepath, encoding="utf-8") as f:
        for line in f:
            yield line.strip()
    # File tự động đóng khi generator kết thúc hoặc bị close()

Under the Hood

Generator frame object

Mỗi generator object trong CPython chứa một frame object (gi_frame) lưu toàn bộ execution state: local variables, instruction pointer, block stack. Khi yield thực thi, frame không bị hủy — nó được detach khỏi call stack và lưu trong generator object.

python
def vi_du_frame():
    x = 42
    y = "hello"
    yield x
    yield y

gen = vi_du_frame()
next(gen)

# Xem frame object
print(gen.gi_frame)           # <frame at 0x...>
print(gen.gi_frame.f_locals)  # {'x': 42, 'y': 'hello'}
print(gen.gi_code.co_name)    # 'vi_du_frame'

# Sau khi exhaust
list(gen)
print(gen.gi_frame)           # None — frame đã bị giải phóng

Cơ chế suspend/resume trong CPython

Khi CPython gặp yield, nó thực hiện opcode YIELD_VALUE:

  1. Lưu giá trị trả về lên stack
  2. Detach frame hiện tại khỏi execution stack (nhưng không hủy)
  3. Trả giá trị về cho caller (hàm đang gọi next() hoặc send())

Khi next() hoặc send() được gọi, CPython thực hiện RESUME:

  1. Re-attach frame vào execution stack
  2. Khôi phục instruction pointer từ vị trí sau yield
  3. Tiếp tục thực thi cho đến yield tiếp theo hoặc return
python
import dis

def generator_don_gian():
    yield 1
    yield 2

# Xem bytecode — chú ý YIELD_VALUE và RESUME
dis.dis(generator_don_gian)
# Output (đã lược bỏ):
#   RESUME          0
#   LOAD_CONST      1 (1)
#   YIELD_VALUE     1
#   RESUME          1
#   POP_TOP
#   LOAD_CONST      2 (2)
#   YIELD_VALUE     1
#   RESUME          1
#   POP_TOP
#   RETURN_CONST    0 (None)

Memory profile: list vs generator cho 10 triệu phần tử

python
import tracemalloc
from itertools import islice


def do_voi_list():
    """Tạo list 10 triệu phần tử."""
    data = [i * 2 for i in range(10_000_000)]
    return sum(data)


def do_voi_generator():
    """Generator 10 triệu phần tử."""
    data = (i * 2 for i in range(10_000_000))
    return sum(data)


# Đo list
tracemalloc.start()
do_voi_list()
_, peak_list = tracemalloc.get_traced_memory()
tracemalloc.stop()

# Đo generator
tracemalloc.start()
do_voi_generator()
_, peak_gen = tracemalloc.get_traced_memory()
tracemalloc.stop()

print(f"List:      peak = {peak_list / 1024 / 1024:.1f} MB")
# List:      peak = ~89.5 MB

print(f"Generator: peak = {peak_gen / 1024 / 1024:.1f} MB")
# Generator: peak = ~ 0.1 MB

Khi nào KHÔNG nên dùng generator: Nếu cần truy cập random (indexing), duyệt nhiều lần, hoặc dataset đủ nhỏ (dưới vài nghìn phần tử) — list thường nhanh hơn do ít overhead từ việc suspend/resume frame. Generator tỏa sáng khi dataset lớn hơn memory hoặc khi chỉ cần single-pass processing.


Checklist ghi nhớ

✅ Checklist triển khai

  • [ ] Dùng yield thay vì tích lũy vào list khi xử lý dữ liệu lớn hơn RAM
  • [ ] Generator expression (x for x in ...) cho các phép tính single-pass như sum(), max(), any()
  • [ ] Luôn nhớ generator chỉ duyệt được một lần — dùng factory function nếu cần tái sử dụng
  • [ ] Gọi next() lần đầu (prime) trước khi send() giá trị vào generator
  • [ ] Dùng yield from thay vì vòng for + yield khi delegate cho sub-generator
  • [ ] Đặt yield trong try/finally khi generator giữ tài nguyên (file handle, connection)
  • [ ] Gọi gen.close() khi kết thúc sớm generator có tài nguyên
  • [ ] Không thay đổi kích thước collection (dict, set) trong khi iterate bằng generator
  • [ ] Pipeline generator: mỗi stage là một hàm nhận Iterator, trả về Iterator
  • [ ] Dùng tracemalloc để đo và so sánh memory footprint giữa list và generator
  • [ ] Kiểm tra gen.gi_frame is None để biết generator đã exhaust chưa
  • [ ] Generator expression phù hợp cho logic đơn giản — dùng generator function cho logic phức tạp
  • [ ] Tránh list() trên generator vô hạn — sẽ hết memory

Bài tập luyện tập

Bài 1 — Fibonacci generator (Foundation)

Viết generator function fibonacci() yield vô hạn các số Fibonacci (0, 1, 1, 2, 3, 5, 8, ...). Sau đó dùng itertools.islice để lấy 20 số đầu tiên.

Yêu cầu:

  • Generator phải là vô hạn (infinite generator)
  • Memory footprint cố định bất kể lấy bao nhiêu số

🧠 Quiz — Bạn đã nắm vững yield cơ bản?

Câu hỏi: Khi gọi gen = fibonacci(), điều gì xảy ra ngay lập tức?

  • A) Hàm fibonacci() chạy hoàn tất và trả về tất cả số Fibonacci
  • B) Hàm fibonacci() chạy đến yield đầu tiên rồi dừng
  • C) Không có code nào được thực thi — chỉ tạo generator object
  • D) Python raise lỗi vì generator không có tham số

Đáp án: C — Gọi generator function chỉ tạo generator object. Code bên trong chỉ chạy khi next() được gọi lần đầu tiên.

Lời giải Bài 1
python
from itertools import islice
from typing import Iterator


def fibonacci() -> Iterator[int]:
    """Yield vô hạn các số Fibonacci."""
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b


# Lấy 20 số đầu tiên
twenty_fibs = list(islice(fibonacci(), 20))
print(twenty_fibs)
# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377,
#  610, 987, 1597, 2584, 4181]

# Lấy số Fibonacci thứ 100 (zero-indexed)
fib_100 = next(islice(fibonacci(), 100, 101))
print(f"Fibonacci thứ 100: {fib_100}")
# Fibonacci thứ 100: 354224848179261915075

Điểm then chốt: Vòng while True không gây treo chương trình vì yield tạm dừng execution sau mỗi số. Consumer kiểm soát việc lấy bao nhiêu phần tử thông qua islice hoặc break.

Bài 2 — Log analysis pipeline (Intermediate)

Xây dựng pipeline gồm 4 stage để phân tích file log:

  1. doc_log(filepath): Đọc file từng dòng
  2. loc_error(lines): Chỉ giữ dòng chứa "ERROR" hoặc "CRITICAL"
  3. trich_timestamp(lines): Trích xuất giờ (hour) từ mỗi dòng log có format "2024-01-15 14:32:01 ERROR ..."
  4. dem_theo_gio(hours): Đếm số lỗi theo từng giờ, yield kết quả cuối cùng dưới dạng dict[int, int]

Yêu cầu:

  • Stage 1-3 phải là generator (lazy)
  • Stage 4 cần tích lũy nên dùng hàm thường trả về dict
  • Pipeline xử lý file bất kỳ kích thước với memory cố định

🧠 Quiz — Bạn hiểu pipeline pattern?

Câu hỏi: Trong pipeline dem_theo_gio(trich_timestamp(loc_error(doc_log("app.log")))), khi nào file app.log bắt đầu được đọc?

  • A) Ngay khi dòng code trên được thực thi
  • B) Khi dem_theo_gio() bắt đầu iterate qua input của nó
  • C) Khi doc_log() được gọi
  • D) Khi loc_error() gọi next() lần đầu

Đáp án: B — Toàn bộ pipeline là lazy. Khi dem_theo_gio() gọi for hour in hours (tức gọi next() trên input), tín hiệu pull truyền ngược qua các stage cho đến doc_log, lúc đó file mới thực sự được mở và đọc dòng đầu tiên.

Lời giải Bài 2
python
import re
from typing import Iterator
from collections import Counter


def doc_log(filepath: str) -> Iterator[str]:
    """Stage 1: Đọc file từng dòng."""
    with open(filepath, encoding="utf-8") as f:
        for line in f:
            yield line.rstrip("\n")


def loc_error(lines: Iterator[str]) -> Iterator[str]:
    """Stage 2: Chỉ giữ dòng ERROR hoặc CRITICAL."""
    for line in lines:
        if "ERROR" in line or "CRITICAL" in line:
            yield line


def trich_timestamp(lines: Iterator[str]) -> Iterator[int]:
    """Stage 3: Trích xuất giờ (hour) từ timestamp."""
    pattern = re.compile(r"(\d{4}-\d{2}-\d{2})\s+(\d{2}):\d{2}:\d{2}")
    for line in lines:
        match = pattern.search(line)
        if match:
            hour = int(match.group(2))
            yield hour


def dem_theo_gio(hours: Iterator[int]) -> dict[int, int]:
    """Stage 4: Đếm lỗi theo giờ. Trả về dict."""
    counter = Counter(hours)
    return dict(sorted(counter.items()))


# Kết nối pipeline
def phan_tich_log(filepath: str) -> dict[int, int]:
    """Chạy toàn bộ pipeline."""
    pipeline = trich_timestamp(
        loc_error(
            doc_log(filepath)
        )
    )
    return dem_theo_gio(pipeline)


# Sử dụng
# ket_qua = phan_tich_log("application.log")
# for hour, count in ket_qua.items():
#     print(f"  {hour:02d}:00 → {count} lỗi")

# Ví dụ output:
#   02:00 → 3 lỗi
#   08:00 → 15 lỗi
#   14:00 → 7 lỗi
#   23:00 → 42 lỗi

Phân tích memory: Dù file log có hàng triệu dòng, stage 1-3 mỗi stage chỉ giữ một dòng/giá trị trong memory. Stage 4 tích lũy Counter nhưng chỉ có tối đa 24 key (24 giờ trong ngày) — memory cố định O(1) so với kích thước file.


Liên kết học tiếp