Skip to content

Implement RAG Pipeline (Ingestion)


1. Purpose

Transform raw unstructured data into a queryable knowledge base for RAG applications. A RAG system is only as good as its retrieval quality, which depends entirely on the ingestion pipeline. This workflow ensures that data is cleaned, chunked intelligently (preserving context), embedded using efficient models, and indexed with metadata to enable accurate citations.


2. When to Use / When Not to Use

Use This Workflow When

  • Introducing a generic Knowledge Base (Notion, Wiki, Jira) to an LLM.
  • Building a "Chat with your Data" feature.
  • Updating an existing index with new documents (incremental sync).

Do NOT Use This Workflow When

  • The data is already structured (Text-to-SQL is better for tabular data).
  • Real-time updates (<1s latency) are required (needs specialized streaming architecture).
  • Fine-tuning a model (this is for Retrieval, not Training).

3. Inputs

Required Inputs

  • [[SOURCE_DOCUMENTS]]: Path or URI to raw files (PDFs, Markdown, HTML).
  • [[VECTOR_DB_CONFIG]]: Connection details for Pinecone/Weaviate/Chroma (API Key, Index Name).
  • [[EMBEDDING_MODEL]]: Selected provider (e.g., text-embedding-3-small, cohere-embed-v3).

4. Outputs

  • Ingestion Script: Python ETL script handling loading, splitting, and upserting.
  • Vector Index: Populated database index ready for similarity search.
  • Ingestion Report: Stats on documents processed, chunks created, and errors encountered.

5. Preconditions

  • Python environment with langchain, unstructured, and vector DB SDK installed.
  • Valid API keys for Embedding Model and Vector DB.
  • Source documents are accessible to the runtime.

6. Procedure

Phase 1: Ingestion & Cleaning

  1. Action: Load documents from source.

    • Expected Output: List of Document objects containing raw text and metadata.
    • Notes: Remove noise (headers, footers, HTML tags) before chunking. Garbage in, garbage out.
  2. Action: Enrich Metadata.

    • Expected Output: Each document object has source_url, author, and published_date fields.
    • Notes: This metadata is crucial for the LLM to provide citations later.

Phase 2: Chunking (Splitting)

  1. Action: Configure the Text Splitter.

    • Expected Output: RecursiveCharacterTextSplitter configured with chunk_size (~512-1024 tokens) and chunk_overlap (10-20%).
    • Notes: Overlap is non-negotiable; it prevents context from being severed mid-sentence.
  2. Action: Generate deterministic IDs.

    • Expected Output: Unique ID for each chunk (e.g., md5(content)) to ensure idempotency.
    • Notes: This prevents duplicate vectors if the script runs twice on the same data.

Phase 3: Embedding & Indexing

  1. Action: Generate Embeddings and Upsert.

    • Expected Output: Vectors stored in the DB.
    • Notes: Use batch processing (e.g., 100 docs per batch) to avoid hitting API rate limits. Implement exponential backoff for 429 Too Many Requests.
  2. Action: Verify Recall.

    • Expected Output: A test query returns the expected document chunk as the top result.

7. Quality Gates

  • [ ] Data Hygiene: Pre-cleaning step prevents "Nav" or "Footer" text from being indexed.
  • [ ] Idempotency: Re-running the script does not increase the vector count (De-duplication works).
  • [ ] Rate Limits: Retry logic handles API throttling gracefully.
  • [ ] Security: No PII (emails/phones) embedded without redaction.

8. Failure Handling

Stale Data

  • Symptoms: RAG retrieves outdated policy documents.
  • Recovery: Implement a "Delete vs Update" sync logic. Track last_modified and delete vectors for docs that no longer exist in source.

Garbage Chunks

  • Symptoms: LLM retrieves "Copyright 2024" or generic nav text.
  • Recovery: Aggressively filter out short chunks (<50 chars) or use regex to strip boilerplate during Phase 1.

API Throttling

  • Symptoms: Job crashes with 429 errors halfway through.
  • Recovery: Use tenacity library to wrap API calls with wait_exponential strategy.

9. Paste Prompt

TIP

One-Click Agent Invocation Copy the prompt below, replace placeholders, and paste into your agent.

text
Role: Act as a Data Engineer specializing in AI.
Task: Execute the Implement RAG Pipeline workflow.

## Objective
Ingest documents from [[SOURCE_DOCUMENTS]] into [[VECTOR_DB_CONFIG]] for RAG retrieval.

## Inputs
- **Source**: [[SOURCE_DOCUMENTS]]
- **Target**: [[VECTOR_DB_CONFIG]]
- **Model**: [[EMBEDDING_MODEL]]

## Procedure
Execute the following phases:

1. **Ingest**:
   - Load raw files.
   - CLEAN textual noise (headers/footers).
   - Extract metadata (URL, Date).

2. **Split**:
   - Chunk using Recursive Splitter with overlap.
   - Generate deterministic IDs (MD5).

3. **Index**:
   - Embed chunks in batches.
   - Upsert to Vector DB.
   - Handle Rate Limits.

## Quality Gates
- [ ] No duplicates (Idempotency).
- [ ] Metadata preserved for citations.
- [ ] PII Redacted.

## Failure Handling
- If 429 Error: Retry with backoff.
- If Stale Data: Implement deletion logic.

## Constraints
- Security: Redact sensitive info before embedding.
- No Code: Output script and log.

## Command
Start by loading and inspecting a sample document for noise.

Cập nhật lần cuối: