Loading_

Building a RAG Pipeline That Actually Works

A production-focused guide to building Retrieval-Augmented Generation pipelines with Python and FastAPI. Covers chunking strategies, embedding models, vector search, prompt construction, and the stuff that breaks when you move past the tutorial stage.

Building a RAG Pipeline That Actually Works

I got my first RAG pipeline working in a weekend. Documents in, vectors out, plug into GPT, ask a question, get an answer. The demo was impressive. Then I pointed it at our actual internal docs and it started confidently telling users that our API supported features we’d deprecated two years ago.

Turns out “load documents, chunk, embed, retrieve, generate” is the easy part. The hard part is everything that breaks silently: chunks that lose meaning because they got split mid-thought, embeddings that think “ARR” is pirate speak instead of Annual Recurring Revenue, search results that look relevant on paper but don’t actually answer anything.

I spent months iterating on RAG systems for internal document search and domain-specific Q&A. Most of that time went into problems that tutorials gloss over completely. This is what actually worked.


Why Most RAG Tutorials Fail You

Every RAG tutorial follows the same recipe: load documents, split into chunks, embed with OpenAI, store in a vector DB, retrieve top-K, stuff into prompt. Works on the demo dataset. Falls apart on your actual documents.

The problems:

  1. Chunking destroys context. A fixed 500-token chunk splits a paragraph mid-sentence. The retrieved chunk makes no sense without its surrounding context.
  2. Embeddings miss domain semantics. Generic embedding models don’t understand that “ARR” means Annual Recurring Revenue in a SaaS context, not a pirate sound.
  3. Top-K retrieval is naive. Returning the 5 “most similar” chunks often gives you 5 slightly different versions of the same information while missing the actual answer.
  4. The prompt is an afterthought. Stuffing retrieved chunks into a system message without structure leads to the model ignoring half the context.

All of these are fixable. I’ll go through my pipeline step by step.


The Architecture

The high-level flow:

Document Ingestion → Chunking → Embedding → Vector Store

User Query → Query Processing → Vector Search → Re-ranking → Prompt Construction → LLM → Response

I use FastAPI for the API layer, PostgreSQL with pgvector for storage (one less service to manage versus a dedicated vector DB), and the OpenAI API for embeddings and generation. Swap any component you want. The patterns carry over.

Project Structure

rag_service/
├── main.py              # FastAPI app
├── config.py            # Settings and model configuration
├── ingestion/
│   ├── chunker.py       # Document chunking strategies
│   ├── loader.py        # Document loading (PDF, Markdown, HTML)
│   └── embedder.py      # Embedding generation
├── retrieval/
│   ├── search.py        # Vector search + filtering
│   └── reranker.py      # Result re-ranking
├── generation/
│   ├── prompt.py        # Prompt construction
│   └── chain.py         # Query → retrieval → generation pipeline
├── db/
│   ├── models.py        # SQLAlchemy + pgvector models
│   └── connection.py    # Database connection
└── monitoring/
    └── metrics.py       # Quality and performance tracking

Step 1: Chunking That Preserves Context

Chunking is where most pipelines go wrong first. Fixed-size splitting (every N tokens) is easy to implement and terrible in practice.

The Problem With Fixed-Size Chunks

# DON'T do this in production
def naive_chunk(text: str, chunk_size: int = 500) -> list[str]:
    words = text.split()
    return [
        " ".join(words[i:i + chunk_size])
        for i in range(0, len(words), chunk_size)
    ]

This splits mid-paragraph, mid-sentence, sometimes mid-word. A chunk that starts with “…increased by 40% compared to the previous quarter” is useless without knowing what increased.

Semantic Chunking

I use a two-pass approach: first split on structural boundaries (headings, paragraphs, sections), then merge small chunks and split large ones while respecting sentence boundaries.

import re
from dataclasses import dataclass


@dataclass
class Chunk:
    content: str
    metadata: dict
    token_count: int


def semantic_chunk(
    text: str,
    max_tokens: int = 512,
    min_tokens: int = 100,
    overlap_sentences: int = 2,
) -> list[Chunk]:
    """
    Split text into chunks that respect document structure.
    
    Strategy:
    1. Split on headings and double newlines (structural boundaries)
    2. Within each section, split on sentence boundaries if too large
    3. Merge small adjacent chunks to avoid fragments
    4. Add overlap between chunks for context continuity
    """
    # Split on structural boundaries
    sections = re.split(r'\n#{1,3}\s|\n\n', text)
    sections = [s.strip() for s in sections if s.strip()]

    chunks: list[Chunk] = []
    buffer = ""

    for section in sections:
        estimated_tokens = len(section.split()) * 1.3  # rough token estimate

        if estimated_tokens > max_tokens:
            # Section too large — split on sentence boundaries
            sentences = re.split(r'(?<=[.!?])\s+', section)
            current = ""
            for sentence in sentences:
                if len((current + " " + sentence).split()) * 1.3 > max_tokens:
                    if current:
                        chunks.append(_make_chunk(current))
                    current = sentence
                else:
                    current = (current + " " + sentence).strip()
            if current:
                buffer = current
        elif len((buffer + " " + section).split()) * 1.3 > max_tokens:
            # Adding this section would exceed limit — flush buffer
            if buffer:
                chunks.append(_make_chunk(buffer))
            buffer = section
        else:
            # Merge with buffer
            buffer = (buffer + "\n\n" + section).strip()

    if buffer:
        chunks.append(_make_chunk(buffer))

    # Add sentence overlap between consecutive chunks
    if overlap_sentences > 0:
        chunks = _add_overlap(chunks, overlap_sentences)

    # Filter out chunks that are too small
    return [c for c in chunks if c.token_count >= min_tokens]


def _make_chunk(text: str) -> Chunk:
    token_count = int(len(text.split()) * 1.3)
    return Chunk(content=text, metadata={}, token_count=token_count)


def _add_overlap(chunks: list[Chunk], n_sentences: int) -> list[Chunk]:
    """Prepend the last N sentences from the previous chunk."""
    result = [chunks[0]]
    for i in range(1, len(chunks)):
        prev_sentences = re.split(r'(?<=[.!?])\s+', chunks[i - 1].content)
        overlap = " ".join(prev_sentences[-n_sentences:])
        new_content = overlap + " " + chunks[i].content
        result.append(_make_chunk(new_content))
    return result

A few notes on the numbers:

  • 512 tokens max per chunk, which fits within embedding model context windows and leaves room for multiple chunks in the generation prompt.
  • 100 tokens minimum so you don’t end up with fragments that mean nothing on their own.
  • 2-sentence overlap between chunks for continuity. That “40% increase” chunk now includes what actually increased.

Metadata on Every Chunk

You want metadata on every chunk. Filtering and citation depend on it:

@dataclass
class Chunk:
    content: str
    metadata: dict  # source_file, section_title, page_number, etc.
    token_count: int


def enrich_chunks(
    chunks: list[Chunk],
    source: str,
    title: str,
) -> list[Chunk]:
    """Attach source metadata to each chunk."""
    for i, chunk in enumerate(chunks):
        chunk.metadata.update({
            "source": source,
            "title": title,
            "chunk_index": i,
            "total_chunks": len(chunks),
        })
    return chunks

When the model cites a source in its answer, you can trace it back to the exact chunk and document. Without metadata, you have an answer with no provenance.


Step 2: Embeddings That Understand Your Domain

Choosing an Embedding Model

For most production use cases, I default to text-embedding-3-small from OpenAI. It’s cheap ($0.02/1M tokens), fast, and good enough for English text. If you need multilingual support or want to avoid API dependency, sentence-transformers/all-MiniLM-L6-v2 runs locally and handles most languages.

Honestly, the embedding model choice barely matters. Chunking and retrieval strategy have way more impact on quality.

Batch Embedding With Rate Limiting

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

EMBEDDING_MODEL = "text-embedding-3-small"
BATCH_SIZE = 100  # OpenAI limit is 2048, but smaller batches = better error recovery


async def embed_chunks(chunks: list[Chunk]) -> list[list[float]]:
    """
    Embed chunks in batches. Returns vectors in the same order as input.
    """
    embeddings: list[list[float]] = []

    for i in range(0, len(chunks), BATCH_SIZE):
        batch = chunks[i:i + BATCH_SIZE]
        texts = [c.content for c in batch]

        response = await client.embeddings.create(
            model=EMBEDDING_MODEL,
            input=texts,
        )

        batch_embeddings = [item.embedding for item in response.data]
        embeddings.extend(batch_embeddings)

        # Rate limiting: be kind to the API
        if i + BATCH_SIZE < len(chunks):
            await asyncio.sleep(0.1)

    return embeddings

The Embedding Cache Pattern

Re-embedding unchanged documents on every ingestion run is wasteful. I hash the chunk content and skip chunks that already exist in the database:

import hashlib


def chunk_hash(content: str) -> str:
    """Deterministic hash for deduplication."""
    return hashlib.sha256(content.encode()).hexdigest()


async def ingest_with_cache(
    chunks: list[Chunk],
    db,
) -> dict:
    """Only embed and store chunks that don't already exist."""
    new_chunks = []
    skipped = 0

    for chunk in chunks:
        h = chunk_hash(chunk.content)
        exists = await db.chunk_exists(h)
        if not exists:
            chunk.metadata["content_hash"] = h
            new_chunks.append(chunk)
        else:
            skipped += 1

    if new_chunks:
        vectors = await embed_chunks(new_chunks)
        await db.store_chunks(new_chunks, vectors)

    return {"ingested": len(new_chunks), "skipped": skipped}

Went from 20-minute re-indexing runs to under 2 minutes after the initial load.


Step 3: Vector Search With pgvector

I use PostgreSQL with the pgvector extension instead of a dedicated vector database. One reason: I already run PostgreSQL. Adding Pinecone or Qdrant means another service to deploy, monitor, and pay for. pgvector handles millions of vectors with good performance, and your metadata queries use regular SQL.

Schema

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE IF NOT EXISTS documents (
    id SERIAL PRIMARY KEY,
    source TEXT NOT NULL,
    title TEXT NOT NULL,
    ingested_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS chunks (
    id SERIAL PRIMARY KEY,
    document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
    content TEXT NOT NULL,
    content_hash TEXT UNIQUE NOT NULL,
    embedding vector(1536),  -- text-embedding-3-small dimension
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- IVFFlat index for approximate nearest neighbor search
-- lists = sqrt(num_rows) is a reasonable starting point
CREATE INDEX IF NOT EXISTS chunks_embedding_idx
    ON chunks USING ivfflat (embedding vector_cosine_ops)
    WITH (lists = 100);

Search With Metadata Filtering

from dataclasses import dataclass


@dataclass
class SearchResult:
    chunk_id: int
    content: str
    metadata: dict
    score: float


async def vector_search(
    query_embedding: list[float],
    db,
    top_k: int = 10,
    source_filter: str | None = None,
    score_threshold: float = 0.3,
) -> list[SearchResult]:
    """
    Search for similar chunks using cosine similarity.
    
    Returns more than we need — re-ranking narrows it down.
    """
    filter_clause = ""
    params: dict = {
        "embedding": query_embedding,
        "limit": top_k,
        "threshold": score_threshold,
    }

    if source_filter:
        filter_clause = "AND d.source = :source"
        params["source"] = source_filter

    query = f"""
        SELECT
            c.id,
            c.content,
            c.metadata,
            1 - (c.embedding <=> :embedding::vector) AS score
        FROM chunks c
        JOIN documents d ON c.document_id = d.id
        WHERE 1 - (c.embedding <=> :embedding::vector) > :threshold
        {filter_clause}
        ORDER BY c.embedding <=> :embedding::vector
        LIMIT :limit
    """

    rows = await db.fetch_all(query, params)
    return [
        SearchResult(
            chunk_id=row["id"],
            content=row["content"],
            metadata=row["metadata"],
            score=row["score"],
        )
        for row in rows
    ]

I pull 10 results even though we only use 3-5 in the end. Vector similarity is a rough first pass. The re-ranking step that follows is where the real filtering happens.


Step 4: Re-Ranking Changes Everything

Most implementations skip this step, and it’s probably the single biggest quality improvement you can add.

Vector search finds chunks that are semantically similar to the query, but “semantically similar” and “actually answers the question” are not the same thing. A chunk that describes the problem in similar words might rank higher than the chunk with the actual solution.

Cross-Encoder Re-Ranking

A cross-encoder takes the query and each candidate chunk as a pair and scores how well the chunk answers the query. It’s slower than vector search (can’t pre-compute) but dramatically more accurate.

from sentence_transformers import CrossEncoder

# Load once at startup — this model is ~60MB
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")


def rerank(
    query: str,
    results: list[SearchResult],
    top_n: int = 5,
) -> list[SearchResult]:
    """
    Re-rank search results using a cross-encoder.
    Much more accurate than vector similarity alone.
    """
    if not results:
        return []

    pairs = [(query, r.content) for r in results]
    scores = reranker.predict(pairs)

    for result, score in zip(results, scores):
        result.score = float(score)

    ranked = sorted(results, key=lambda r: r.score, reverse=True)
    return ranked[:top_n]

In my testing, re-ranking bumped answer relevance by roughly 30-40% over raw vector search alone. The biggest gains show up on ambiguous queries where the user’s wording doesn’t match the document’s terminology.

For example, a user asking “how do I cancel my subscription?” might get chunks about “subscription management” and “billing FAQ” from vector search. The re-ranker correctly identifies that the billing FAQ chunk containing cancellation steps is the better answer, even if the subscription management chunk had higher embedding similarity.


Step 5: Prompt Construction

Retrieved chunks are useless if the model ignores half of them. Prompt structure matters more than people think.

What Doesn’t Work

# DON'T do this
prompt = f"""Answer the question based on the following context:

{all_chunks_concatenated}

Question: {query}"""

This falls apart with more than 2-3 chunks. The model loses track of which chunk says what, and longer context increases hallucination risk.

What Works: Structured Context With Citations

def build_prompt(
    query: str,
    results: list[SearchResult],
    system_context: str = "",
) -> list[dict]:
    """
    Build a prompt with structured, numbered context blocks.
    The model can reference specific sources by number.
    """
    context_blocks = []
    for i, result in enumerate(results, 1):
        source = result.metadata.get("source", "unknown")
        title = result.metadata.get("title", "")
        context_blocks.append(
            f"[Source {i}] ({source}{title})\n{result.content}"
        )

    context = "\n\n---\n\n".join(context_blocks)

    system = f"""You are a helpful assistant that answers questions based on the provided context.

Rules:
- Answer ONLY based on the provided context
- If the context doesn't contain enough information to answer, say so explicitly
- Reference sources by number (e.g., "According to [Source 1]...")
- Be concise and specific
{system_context}"""

    user_message = f"""Context:

{context}

---

Question: {query}"""

    return [
        {"role": "system", "content": system},
        {"role": "user", "content": user_message},
    ]

Some things that matter here:

  • Numbered sources so the model can cite references and users can verify.
  • Separator blocks (---) help the model keep chunks distinct in its attention window.
  • Explicit “I don’t know” instruction cuts hallucination significantly. Without it, the model just makes stuff up from partial context.
  • System context parameter for injecting domain-specific rules (“Use metric units”, “Refer to the product as Platform”, etc.).

Step 6: The FastAPI Endpoint

Bringing it all together in a query endpoint:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()


class QueryRequest(BaseModel):
    query: str
    source_filter: str | None = None
    top_k: int = 5


class QueryResponse(BaseModel):
    answer: str
    sources: list[dict]
    tokens_used: int


@app.post("/query", response_model=QueryResponse)
async def query_documents(request: QueryRequest):
    if not request.query.strip():
        raise HTTPException(status_code=400, detail="Query cannot be empty")

    # 1. Embed the query
    query_response = await client.embeddings.create(
        model=EMBEDDING_MODEL,
        input=[request.query],
    )
    query_embedding = query_response.data[0].embedding

    # 2. Vector search (retrieve more than needed for re-ranking)
    results = await vector_search(
        query_embedding=query_embedding,
        db=db,
        top_k=request.top_k * 2,
        source_filter=request.source_filter,
    )

    if not results:
        return QueryResponse(
            answer="I couldn't find any relevant information in the knowledge base.",
            sources=[],
            tokens_used=0,
        )

    # 3. Re-rank to get the best results
    ranked = rerank(
        query=request.query,
        results=results,
        top_n=request.top_k,
    )

    # 4. Build prompt with structured context
    messages = build_prompt(query=request.query, results=ranked)

    # 5. Generate answer
    completion = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        max_tokens=1024,
        temperature=0.1,  # Low temperature for factual answers
    )

    answer = completion.choices[0].message.content or ""
    tokens = completion.usage.total_tokens if completion.usage else 0

    # 6. Return with source attribution
    sources = [
        {
            "source": r.metadata.get("source", ""),
            "title": r.metadata.get("title", ""),
            "score": round(r.score, 3),
        }
        for r in ranked
    ]

    return QueryResponse(
        answer=answer,
        sources=sources,
        tokens_used=tokens,
    )

The endpoint itself is boring on purpose. All the interesting work lives in the layers underneath: chunking, embedding, re-ranking. This is just glue code.


Where It Breaks in Practice

1. The “Correct Retrieval, Wrong Answer” Problem

The most frustrating failure: the right chunks are retrieved, but the model still gives a bad answer. This usually happens because:

  • Contradictory chunks. Two documents say different things. The model picks one or merges them into nonsense.
  • Outdated context. Old documentation is retrieved alongside new documentation. The model doesn’t know which is current.

I handle this by adding timestamps to chunk metadata and telling the model to prefer recent sources. For contradictions, the prompt includes a rule: “If sources contradict each other, note the contradiction and present both perspectives.”

2. The Chunking Boundary Problem

A question spans information across two chunks. The answer is partially in chunk 3 and partially in chunk 7. Neither chunk alone answers the question.

The sentence overlap from earlier helps. For more complex documents, try parent-child chunking: store both a large parent chunk (the full section) and smaller children. Search against the children, but pull the parent into the prompt context.

3. Query-Document Vocabulary Mismatch

Users don’t use the same words as your documents. They say “cancel my account” when the docs say “account deactivation process.”

I use query expansion for this. Before embedding, a fast model generates alternative phrasings:

async def expand_query(original_query: str) -> list[str]:
    """Generate alternative phrasings to improve retrieval coverage."""
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "Generate 3 alternative phrasings of the user's question. "
                    "Each should use different terminology but ask the same thing. "
                    "Return one per line, no numbering."
                ),
            },
            {"role": "user", "content": original_query},
        ],
        max_tokens=200,
        temperature=0.7,
    )
    alternatives = (response.choices[0].message.content or "").strip().split("\n")
    return [original_query] + [q.strip() for q in alternatives if q.strip()]

Then embed all query variants, search with each, and merge results before re-ranking. This adds one fast API call but catches significantly more relevant documents.

4. The Context Window Trap

You stuff too many chunks into the prompt, hit the context limit, and the model truncates or ignores later context. Worse: with long context, models tend to focus on the beginning and end (the “lost in the middle” phenomenon).

After re-ranking, I enforce a hard token budget. Context gets capped at 3,000 tokens, leaving room for the system prompt and response:

def budget_context(
    results: list[SearchResult],
    max_context_tokens: int = 3000,
) -> list[SearchResult]:
    """Select results that fit within the token budget."""
    selected = []
    total_tokens = 0

    for result in results:
        chunk_tokens = int(len(result.content.split()) * 1.3)
        if total_tokens + chunk_tokens > max_context_tokens:
            break
        selected.append(result)
        total_tokens += chunk_tokens

    return selected

Monitoring: Know When Your Pipeline Is Failing

Without monitoring you’re basically running a hallucination factory. What I track:

import logging
import json
from datetime import datetime, timezone

logger = logging.getLogger("rag_metrics")


async def log_query_metrics(
    query: str,
    num_results: int,
    top_score: float,
    answer_tokens: int,
    latency_ms: float,
    source_filter: str | None,
):
    """Log every query for analysis. Review weekly."""
    logger.info(
        json.dumps({
            "query": query[:200],  # truncate for log size
            "num_results": num_results,
            "top_score": round(top_score, 3),
            "answer_tokens": answer_tokens,
            "latency_ms": round(latency_ms, 1),
            "source_filter": source_filter,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        })
    )

Metrics worth alerting on:

  • Low retrieval scores. If top results consistently score below 0.5, your embeddings or chunking need work.
  • Zero results. Queries that return nothing indicate gaps in your knowledge base or embedding coverage.
  • High latency. If P95 exceeds 3 seconds, your vector index needs tuning or your re-ranker is a bottleneck.
  • Answer length outliers. Extremely short answers (“I don’t have that information”) repeated often means knowledge gaps. Extremely long answers might indicate the model is rambling instead of being precise.

How It Actually Performs

After a few months running this for internal document Q&A:

Works well:

  • Factual questions with clear answers in the docs: ~90% accuracy
  • Multi-source synthesis (combining info from 2-3 docs): ~75% accuracy
  • Source citation: users trust answers more when they can verify

Still rough:

  • Complex reasoning across many documents: ~50% accuracy
  • Questions that require understanding document relationships (Doc A references Doc B): needs a knowledge graph, not just RAG
  • “What changed between version X and Y?”: temporal reasoning is hard with chunk-based retrieval

Latency: P50 ~800ms, P95 ~2.5 seconds (including re-ranking). Re-ranking adds ~200ms but the quality gain is worth it.

Cost: About $0.02 per query (embedding + generation). At 1,000 queries/day, that’s ~$600/month. For lower-stakes queries, swap GPT-4o for GPT-4o-mini and cut costs by 10x.


If You’re Starting From Scratch

Roughly this order:

  1. Start with good chunking. Spend 80% of your initial effort here. Bad chunks poison everything downstream. Test your chunker on real documents, not toy examples.

  2. Use pgvector if you already run PostgreSQL. Only reach for a dedicated vector database if you’re at millions of documents or need features like filtered HNSW search.

  3. Add re-ranking early. The cross-encoder model is tiny (~60MB) and the quality improvement is disproportionate to the effort.

  4. Structure your prompts. Numbered sources, explicit instructions to cite and to say “I don’t know,” temperature at 0.1 for factual answers.

  5. Monitor from day one. Log every query, score, and latency. Review the lowest-scoring queries weekly — they tell you exactly where your pipeline is failing.

  6. Don’t over-engineer the first version. Skip query expansion, skip hybrid search, skip fine-tuned embeddings. Get the basic pipeline working, measure where it fails, then add complexity only where the data says you need it.

That’s really the whole thing. Better chunking, re-ranking, structured prompts, and actually measuring whether your answers are any good. Not glamorous work, but it’s what separates a weekend demo from something people rely on every day.

Salih Yildirim

Salih "Adam" Yildirim

Full Stack Software Engineer with 6+ years of experience building scalable web and mobile applications. Passionate about clean code, modern architecture, and sharing knowledge.

{ ideas }
<thoughts/>
// discuss
</>{ }( )=>&&||
Gathering thoughts
Salih YILDIRIM

Let's Connect!

Choose your preferred way to reach out