Every team building AI features eventually hits the same wall: the LLM doesn’t know about your data. It hallucinates confidently, invents facts about your product, and gives generic answers when users ask domain-specific questions. The standard fix is RAG — Retrieval-Augmented Generation. Feed the model relevant context before it answers.
The concept is simple. Making it work in production is not.
I’ve built RAG pipelines for internal document search and domain-specific Q&A systems. The first version took a weekend. Getting it to production quality — where it actually returns useful answers consistently — took significantly longer. Most of that time was spent on problems that no tutorial covers: bad chunking destroying context, embeddings that don’t capture domain meaning, and retrieval that returns technically relevant but practically useless documents.
Here’s what I learned building RAG systems that people actually use.
Why Most RAG Tutorials Fail You
The typical RAG tutorial looks like this: load documents, split into chunks, embed with OpenAI, store in a vector database, retrieve top-K results, stuff them into a prompt. It works on the demo dataset. Then you try it on real documents and everything falls apart.
The problems are predictable:
- Chunking destroys context. A fixed 500-token chunk splits a paragraph mid-sentence. The retrieved chunk makes no sense without its surrounding context.
- Embeddings miss domain semantics. Generic embedding models don’t understand that “ARR” means Annual Recurring Revenue in a SaaS context, not a pirate sound.
- Top-K retrieval is naive. Returning the 5 “most similar” chunks often gives you 5 slightly different versions of the same information while missing the actual answer.
- The prompt is an afterthought. Stuffing retrieved chunks into a system message without structure leads to the model ignoring half the context.
Every one of these has a fix. Let me walk through the pipeline I use.
The Architecture
Here’s the high-level flow:
Document Ingestion → Chunking → Embedding → Vector Store
↓
User Query → Query Processing → Vector Search → Re-ranking → Prompt Construction → LLM → Response
I use FastAPI for the API layer, PostgreSQL with pgvector for storage (one less service to manage compared to a dedicated vector DB), and the OpenAI API for embeddings and generation. You can swap any component — the patterns are what matter.
Project Structure
rag_service/
├── main.py # FastAPI app
├── config.py # Settings and model configuration
├── ingestion/
│ ├── chunker.py # Document chunking strategies
│ ├── loader.py # Document loading (PDF, Markdown, HTML)
│ └── embedder.py # Embedding generation
├── retrieval/
│ ├── search.py # Vector search + filtering
│ └── reranker.py # Result re-ranking
├── generation/
│ ├── prompt.py # Prompt construction
│ └── chain.py # Query → retrieval → generation pipeline
├── db/
│ ├── models.py # SQLAlchemy + pgvector models
│ └── connection.py # Database connection
└── monitoring/
└── metrics.py # Quality and performance tracking
Step 1: Chunking That Preserves Context
This is where most pipelines go wrong first. Fixed-size chunking (split every N tokens) is easy to implement and terrible in practice.
The Problem With Fixed-Size Chunks
# DON'T do this in production
def naive_chunk(text: str, chunk_size: int = 500) -> list[str]:
words = text.split()
return [
" ".join(words[i:i + chunk_size])
for i in range(0, len(words), chunk_size)
]
This splits mid-paragraph, mid-sentence, sometimes mid-word. A chunk that starts with “…increased by 40% compared to the previous quarter” is useless without knowing what increased.
Semantic Chunking
I use a two-pass approach: first split on structural boundaries (headings, paragraphs, sections), then merge small chunks and split large ones while respecting sentence boundaries.
import re
from dataclasses import dataclass
@dataclass
class Chunk:
content: str
metadata: dict
token_count: int
def semantic_chunk(
text: str,
max_tokens: int = 512,
min_tokens: int = 100,
overlap_sentences: int = 2,
) -> list[Chunk]:
"""
Split text into chunks that respect document structure.
Strategy:
1. Split on headings and double newlines (structural boundaries)
2. Within each section, split on sentence boundaries if too large
3. Merge small adjacent chunks to avoid fragments
4. Add overlap between chunks for context continuity
"""
# Split on structural boundaries
sections = re.split(r'\n#{1,3}\s|\n\n', text)
sections = [s.strip() for s in sections if s.strip()]
chunks: list[Chunk] = []
buffer = ""
for section in sections:
estimated_tokens = len(section.split()) * 1.3 # rough token estimate
if estimated_tokens > max_tokens:
# Section too large — split on sentence boundaries
sentences = re.split(r'(?<=[.!?])\s+', section)
current = ""
for sentence in sentences:
if len((current + " " + sentence).split()) * 1.3 > max_tokens:
if current:
chunks.append(_make_chunk(current))
current = sentence
else:
current = (current + " " + sentence).strip()
if current:
buffer = current
elif len((buffer + " " + section).split()) * 1.3 > max_tokens:
# Adding this section would exceed limit — flush buffer
if buffer:
chunks.append(_make_chunk(buffer))
buffer = section
else:
# Merge with buffer
buffer = (buffer + "\n\n" + section).strip()
if buffer:
chunks.append(_make_chunk(buffer))
# Add sentence overlap between consecutive chunks
if overlap_sentences > 0:
chunks = _add_overlap(chunks, overlap_sentences)
# Filter out chunks that are too small
return [c for c in chunks if c.token_count >= min_tokens]
def _make_chunk(text: str) -> Chunk:
token_count = int(len(text.split()) * 1.3)
return Chunk(content=text, metadata={}, token_count=token_count)
def _add_overlap(chunks: list[Chunk], n_sentences: int) -> list[Chunk]:
"""Prepend the last N sentences from the previous chunk."""
result = [chunks[0]]
for i in range(1, len(chunks)):
prev_sentences = re.split(r'(?<=[.!?])\s+', chunks[i - 1].content)
overlap = " ".join(prev_sentences[-n_sentences:])
new_content = overlap + " " + chunks[i].content
result.append(_make_chunk(new_content))
return result
Key decisions:
- 512 tokens max per chunk. This fits comfortably within embedding model context windows and leaves room for multiple chunks in the generation prompt.
- 100 tokens minimum to avoid fragments that lack meaning.
- 2-sentence overlap gives the model continuity across chunk boundaries. The retrieval docs about “40% increase” now include what actually increased.
Don’t Forget Metadata
Every chunk should carry metadata from the source document. This is critical for filtering and citation:
@dataclass
class Chunk:
content: str
metadata: dict # source_file, section_title, page_number, etc.
token_count: int
def enrich_chunks(
chunks: list[Chunk],
source: str,
title: str,
) -> list[Chunk]:
"""Attach source metadata to each chunk."""
for i, chunk in enumerate(chunks):
chunk.metadata.update({
"source": source,
"title": title,
"chunk_index": i,
"total_chunks": len(chunks),
})
return chunks
When the model cites a source in its answer, you can trace it back to the exact chunk and document. Without metadata, you have an answer with no provenance.
Step 2: Embeddings That Understand Your Domain
Choosing an Embedding Model
For most production use cases, I default to text-embedding-3-small from OpenAI. It’s cheap ($0.02/1M tokens), fast, and good enough for English text. If you need multilingual support or want to avoid API dependency, sentence-transformers/all-MiniLM-L6-v2 runs locally and handles most languages.
The model matters less than you think. The chunking and retrieval strategy dominate quality.
Batch Embedding With Rate Limiting
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
EMBEDDING_MODEL = "text-embedding-3-small"
BATCH_SIZE = 100 # OpenAI limit is 2048, but smaller batches = better error recovery
async def embed_chunks(chunks: list[Chunk]) -> list[list[float]]:
"""
Embed chunks in batches. Returns vectors in the same order as input.
"""
embeddings: list[list[float]] = []
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i:i + BATCH_SIZE]
texts = [c.content for c in batch]
response = await client.embeddings.create(
model=EMBEDDING_MODEL,
input=texts,
)
batch_embeddings = [item.embedding for item in response.data]
embeddings.extend(batch_embeddings)
# Rate limiting: be kind to the API
if i + BATCH_SIZE < len(chunks):
await asyncio.sleep(0.1)
return embeddings
The Embedding Cache Pattern
Re-embedding unchanged documents on every ingestion run is wasteful. I hash the chunk content and skip chunks that already exist in the database:
import hashlib
def chunk_hash(content: str) -> str:
"""Deterministic hash for deduplication."""
return hashlib.sha256(content.encode()).hexdigest()
async def ingest_with_cache(
chunks: list[Chunk],
db,
) -> dict:
"""Only embed and store chunks that don't already exist."""
new_chunks = []
skipped = 0
for chunk in chunks:
h = chunk_hash(chunk.content)
exists = await db.chunk_exists(h)
if not exists:
chunk.metadata["content_hash"] = h
new_chunks.append(chunk)
else:
skipped += 1
if new_chunks:
vectors = await embed_chunks(new_chunks)
await db.store_chunks(new_chunks, vectors)
return {"ingested": len(new_chunks), "skipped": skipped}
This cut our re-indexing time from 20 minutes to under 2 minutes after the initial load.
Step 3: Vector Search With pgvector
I use PostgreSQL with the pgvector extension instead of a dedicated vector database. One reason: I already run PostgreSQL. Adding Pinecone or Qdrant means another service to deploy, monitor, and pay for. pgvector handles millions of vectors with good performance, and your metadata queries use regular SQL.
Schema
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
source TEXT NOT NULL,
title TEXT NOT NULL,
ingested_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS chunks (
id SERIAL PRIMARY KEY,
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
content TEXT NOT NULL,
content_hash TEXT UNIQUE NOT NULL,
embedding vector(1536), -- text-embedding-3-small dimension
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- IVFFlat index for approximate nearest neighbor search
-- lists = sqrt(num_rows) is a reasonable starting point
CREATE INDEX IF NOT EXISTS chunks_embedding_idx
ON chunks USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
Search With Metadata Filtering
from dataclasses import dataclass
@dataclass
class SearchResult:
chunk_id: int
content: str
metadata: dict
score: float
async def vector_search(
query_embedding: list[float],
db,
top_k: int = 10,
source_filter: str | None = None,
score_threshold: float = 0.3,
) -> list[SearchResult]:
"""
Search for similar chunks using cosine similarity.
Returns more than we need — re-ranking narrows it down.
"""
filter_clause = ""
params: dict = {
"embedding": query_embedding,
"limit": top_k,
"threshold": score_threshold,
}
if source_filter:
filter_clause = "AND d.source = :source"
params["source"] = source_filter
query = f"""
SELECT
c.id,
c.content,
c.metadata,
1 - (c.embedding <=> :embedding::vector) AS score
FROM chunks c
JOIN documents d ON c.document_id = d.id
WHERE 1 - (c.embedding <=> :embedding::vector) > :threshold
{filter_clause}
ORDER BY c.embedding <=> :embedding::vector
LIMIT :limit
"""
rows = await db.fetch_all(query, params)
return [
SearchResult(
chunk_id=row["id"],
content=row["content"],
metadata=row["metadata"],
score=row["score"],
)
for row in rows
]
Why top_k=10 when we only need 3-5 chunks? Because we re-rank after retrieval. Vector similarity is a rough filter — re-ranking picks the actually useful results.
Step 4: Re-Ranking Changes Everything
This is the step most tutorials skip, and it’s arguably the most impactful improvement you can make to a RAG pipeline.
Vector search finds chunks that are semantically similar to the query. But “semantically similar” and “actually answers the question” are different things. A chunk describing the problem in similar words might score higher than the chunk containing the solution.
Cross-Encoder Re-Ranking
A cross-encoder takes the query and each candidate chunk as a pair and scores how well the chunk answers the query. It’s slower than vector search (can’t pre-compute) but dramatically more accurate.
from sentence_transformers import CrossEncoder
# Load once at startup — this model is ~60MB
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank(
query: str,
results: list[SearchResult],
top_n: int = 5,
) -> list[SearchResult]:
"""
Re-rank search results using a cross-encoder.
Much more accurate than vector similarity alone.
"""
if not results:
return []
pairs = [(query, r.content) for r in results]
scores = reranker.predict(pairs)
for result, score in zip(results, scores):
result.score = float(score)
ranked = sorted(results, key=lambda r: r.score, reverse=True)
return ranked[:top_n]
The impact: In my testing, re-ranking improved answer relevance by roughly 30-40% compared to raw vector search. The queries where it matters most are ambiguous ones — where the user’s phrasing doesn’t exactly match the document’s terminology.
For example, a user asking “how do I cancel my subscription?” might get chunks about “subscription management” and “billing FAQ” from vector search. The re-ranker correctly identifies that the billing FAQ chunk containing cancellation steps is the better answer, even if the subscription management chunk had higher embedding similarity.
Step 5: Prompt Construction
The retrieved chunks are only useful if the model actually reads them. The prompt structure matters.
What Doesn’t Work
# DON'T do this
prompt = f"""Answer the question based on the following context:
{all_chunks_concatenated}
Question: {query}"""
This falls apart with more than 2-3 chunks. The model loses track of which chunk says what, and longer context increases hallucination risk.
What Works: Structured Context With Citations
def build_prompt(
query: str,
results: list[SearchResult],
system_context: str = "",
) -> list[dict]:
"""
Build a prompt with structured, numbered context blocks.
The model can reference specific sources by number.
"""
context_blocks = []
for i, result in enumerate(results, 1):
source = result.metadata.get("source", "unknown")
title = result.metadata.get("title", "")
context_blocks.append(
f"[Source {i}] ({source} — {title})\n{result.content}"
)
context = "\n\n---\n\n".join(context_blocks)
system = f"""You are a helpful assistant that answers questions based on the provided context.
Rules:
- Answer ONLY based on the provided context
- If the context doesn't contain enough information to answer, say so explicitly
- Reference sources by number (e.g., "According to [Source 1]...")
- Be concise and specific
{system_context}"""
user_message = f"""Context:
{context}
---
Question: {query}"""
return [
{"role": "system", "content": system},
{"role": "user", "content": user_message},
]
Key details:
- Numbered sources let the model cite its references. Users can verify answers.
- Separator blocks (
---) help the model distinguish between different chunks. - Explicit instruction to say “I don’t know” reduces hallucination. Without it, the model will confidently fabricate answers from partial context.
- System context parameter lets you inject domain-specific instructions (e.g., “Use metric units” or “Refer to the product as ‘Platform’”).
Step 6: The FastAPI Endpoint
Bringing it all together in a query endpoint:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class QueryRequest(BaseModel):
query: str
source_filter: str | None = None
top_k: int = 5
class QueryResponse(BaseModel):
answer: str
sources: list[dict]
tokens_used: int
@app.post("/query", response_model=QueryResponse)
async def query_documents(request: QueryRequest):
if not request.query.strip():
raise HTTPException(status_code=400, detail="Query cannot be empty")
# 1. Embed the query
query_response = await client.embeddings.create(
model=EMBEDDING_MODEL,
input=[request.query],
)
query_embedding = query_response.data[0].embedding
# 2. Vector search (retrieve more than needed for re-ranking)
results = await vector_search(
query_embedding=query_embedding,
db=db,
top_k=request.top_k * 2,
source_filter=request.source_filter,
)
if not results:
return QueryResponse(
answer="I couldn't find any relevant information in the knowledge base.",
sources=[],
tokens_used=0,
)
# 3. Re-rank to get the best results
ranked = rerank(
query=request.query,
results=results,
top_n=request.top_k,
)
# 4. Build prompt with structured context
messages = build_prompt(query=request.query, results=ranked)
# 5. Generate answer
completion = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=1024,
temperature=0.1, # Low temperature for factual answers
)
answer = completion.choices[0].message.content or ""
tokens = completion.usage.total_tokens if completion.usage else 0
# 6. Return with source attribution
sources = [
{
"source": r.metadata.get("source", ""),
"title": r.metadata.get("title", ""),
"score": round(r.score, 3),
}
for r in ranked
]
return QueryResponse(
answer=answer,
sources=sources,
tokens_used=tokens,
)
Nothing clever here — that’s intentional. The complexity is in the chunking, embedding, and re-ranking layers. The endpoint is just orchestration.
The Failure Modes Nobody Warns You About
1. The “Correct Retrieval, Wrong Answer” Problem
The most frustrating failure: the right chunks are retrieved, but the model still gives a bad answer. This usually happens because:
- Contradictory chunks. Two documents say different things. The model picks one or merges them into nonsense.
- Outdated context. Old documentation is retrieved alongside new documentation. The model doesn’t know which is current.
Fix: Add timestamps to chunk metadata and instruct the model to prefer recent sources. For contradictions, add a rule: “If sources contradict each other, note the contradiction and present both perspectives.”
2. The Chunking Boundary Problem
A question spans information across two chunks. The answer is partially in chunk 3 and partially in chunk 7. Neither chunk alone answers the question.
Fix: The sentence overlap I showed earlier helps. But for complex documents, also consider parent-child chunking: store both a large parent chunk (full section) and smaller child chunks. Retrieve by child chunks, but include the parent in the prompt context.
3. Query-Document Vocabulary Mismatch
Users don’t use the same words as your documents. They say “cancel my account” when the docs say “account deactivation process.”
Fix: Query expansion. Before embedding, use a fast model to rephrase the query:
async def expand_query(original_query: str) -> list[str]:
"""Generate alternative phrasings to improve retrieval coverage."""
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"Generate 3 alternative phrasings of the user's question. "
"Each should use different terminology but ask the same thing. "
"Return one per line, no numbering."
),
},
{"role": "user", "content": original_query},
],
max_tokens=200,
temperature=0.7,
)
alternatives = (response.choices[0].message.content or "").strip().split("\n")
return [original_query] + [q.strip() for q in alternatives if q.strip()]
Then embed all query variants, search with each, and merge results before re-ranking. This adds one fast API call but catches significantly more relevant documents.
4. The Context Window Trap
You stuff too many chunks into the prompt, hit the context limit, and the model truncates or ignores later context. Worse: with long context, models tend to focus on the beginning and end (the “lost in the middle” phenomenon).
Fix: After re-ranking, enforce a token budget. I cap context at 3,000 tokens (leaving room for the system prompt and response):
def budget_context(
results: list[SearchResult],
max_context_tokens: int = 3000,
) -> list[SearchResult]:
"""Select results that fit within the token budget."""
selected = []
total_tokens = 0
for result in results:
chunk_tokens = int(len(result.content.split()) * 1.3)
if total_tokens + chunk_tokens > max_context_tokens:
break
selected.append(result)
total_tokens += chunk_tokens
return selected
Monitoring: Know When Your Pipeline Is Failing
A RAG pipeline without monitoring is a hallucination factory with no quality control. Here’s what I track:
import logging
import json
from datetime import datetime, timezone
logger = logging.getLogger("rag_metrics")
async def log_query_metrics(
query: str,
num_results: int,
top_score: float,
answer_tokens: int,
latency_ms: float,
source_filter: str | None,
):
"""Log every query for analysis. Review weekly."""
logger.info(
json.dumps({
"query": query[:200], # truncate for log size
"num_results": num_results,
"top_score": round(top_score, 3),
"answer_tokens": answer_tokens,
"latency_ms": round(latency_ms, 1),
"source_filter": source_filter,
"timestamp": datetime.now(timezone.utc).isoformat(),
})
)
Key metrics to alert on:
- Low retrieval scores. If top results consistently score below 0.5, your embeddings or chunking need work.
- Zero results. Queries that return nothing indicate gaps in your knowledge base or embedding coverage.
- High latency. If P95 exceeds 3 seconds, your vector index needs tuning or your re-ranker is a bottleneck.
- Answer length outliers. Extremely short answers (“I don’t have that information”) repeated often means knowledge gaps. Extremely long answers might indicate the model is rambling instead of being precise.
The Honest Performance Assessment
After running this pipeline for several months serving internal document Q&A:
What works well:
- Factual questions with clear answers in the docs: ~90% accuracy
- Multi-source synthesis (combining info from 2-3 docs): ~75% accuracy
- Source citation: users trust answers more when they can verify
What still struggles:
- Complex reasoning across many documents: ~50% accuracy
- Questions that require understanding document relationships (Doc A references Doc B): needs a knowledge graph, not just RAG
- “What changed between version X and Y?”: temporal reasoning is hard with chunk-based retrieval
Latency: P50 ~800ms, P95 ~2.5 seconds (including re-ranking). Re-ranking adds ~200ms but the quality gain is worth it.
Cost: About $0.02 per query (embedding + generation). At 1,000 queries/day, that’s ~$600/month. For lower-stakes queries, swap GPT-4o for GPT-4o-mini and cut costs by 10x.
The Playbook
If you’re building a RAG pipeline from scratch, here’s the order:
-
Start with good chunking. Spend 80% of your initial effort here. Bad chunks poison everything downstream. Test your chunker on real documents, not toy examples.
-
Use pgvector if you already run PostgreSQL. Only reach for a dedicated vector database if you’re at millions of documents or need features like filtered HNSW search.
-
Add re-ranking early. The cross-encoder model is tiny (~60MB) and the quality improvement is disproportionate to the effort.
-
Structure your prompts. Numbered sources, explicit instructions to cite and to say “I don’t know,” temperature at 0.1 for factual answers.
-
Monitor from day one. Log every query, score, and latency. Review the lowest-scoring queries weekly — they tell you exactly where your pipeline is failing.
-
Don’t over-engineer the first version. Skip query expansion, skip hybrid search, skip fine-tuned embeddings. Get the basic pipeline working, measure where it fails, then add complexity only where the data says you need it.
The difference between a RAG demo and a RAG product is unglamorous: better chunking, re-ranking, structured prompts, and relentless measurement. None of it is novel. All of it matters.