RAG with LangChain: Production 2026 Guide
in Development on Ai, Llm, Langchain, Rag, Python
RAG with LangChain: Production 2026 Guide
Retrieval-Augmented Generation (RAG) has become the standard pattern for building LLM applications with custom knowledge. This guide covers production-grade RAG implementation with LangChain.
Why RAG?
- Up-to-date information - Access current data beyond training cutoff
- Domain expertise - Inject specialized knowledge
- Reduced hallucination - Ground responses in factual data
- Cost efficiency - Avoid fine-tuning costs
- Privacy - Keep sensitive data in your control
Architecture Overview
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Documents │────▶│ Chunking │────▶│ Embeddings │
└──────────────┘ └──────────────┘ └──────────────┘
│
▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Response │◀────│ LLM │◀────│ Vector Store │
└──────────────┘ └──────────────┘ └──────────────┘
▲
│
┌─────┴─────┐
│ Query │
└───────────┘
Project Setup
pip install langchain langchain-openai langchain-community
pip install chromadb pgvector pinecone-client
pip install unstructured pdf2image pdfminer.six
pip install tiktoken
Project Structure
rag_app/
├── app/
│ ├── __init__.py
│ ├── config.py
│ ├── embeddings.py
│ ├── vectorstore.py
│ ├── retriever.py
│ ├── chain.py
│ └── api.py
├── ingestion/
│ ├── __init__.py
│ ├── loader.py
│ ├── chunker.py
│ └── pipeline.py
├── tests/
└── pyproject.toml
Document Ingestion
Document Loading
# ingestion/loader.py
from langchain_community.document_loaders import (
PyPDFLoader,
UnstructuredHTMLLoader,
TextLoader,
CSVLoader,
DirectoryLoader,
)
from langchain_core.documents import Document
from pathlib import Path
from typing import List
class DocumentLoader:
"""Load documents from various sources"""
LOADER_MAP = {
'.pdf': PyPDFLoader,
'.html': UnstructuredHTMLLoader,
'.txt': TextLoader,
'.csv': CSVLoader,
'.md': TextLoader,
}
def load_file(self, file_path: str) -> List[Document]:
path = Path(file_path)
loader_class = self.LOADER_MAP.get(path.suffix.lower())
if not loader_class:
raise ValueError(f"Unsupported file type: {path.suffix}")
loader = loader_class(str(path))
documents = loader.load()
# Add metadata
for doc in documents:
doc.metadata['source'] = str(path)
doc.metadata['file_type'] = path.suffix
return documents
def load_directory(
self,
directory: str,
glob: str = "**/*.*"
) -> List[Document]:
documents = []
path = Path(directory)
for file_path in path.glob(glob):
if file_path.suffix.lower() in self.LOADER_MAP:
try:
docs = self.load_file(str(file_path))
documents.extend(docs)
except Exception as e:
print(f"Error loading {file_path}: {e}")
return documents
Chunking Strategies
# ingestion/chunker.py
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
TokenTextSplitter,
MarkdownHeaderTextSplitter,
)
from langchain_core.documents import Document
from typing import List
class DocumentChunker:
"""Split documents into chunks with different strategies"""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
strategy: str = "recursive"
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.strategy = strategy
self._init_splitter()
def _init_splitter(self):
if self.strategy == "recursive":
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""],
length_function=len,
)
elif self.strategy == "token":
self.splitter = TokenTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
)
elif self.strategy == "markdown":
self.splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
)
def chunk_documents(self, documents: List[Document]) -> List[Document]:
chunks = self.splitter.split_documents(documents)
# Add chunk metadata
for i, chunk in enumerate(chunks):
chunk.metadata['chunk_index'] = i
chunk.metadata['chunk_size'] = len(chunk.page_content)
return chunks
class SemanticChunker:
"""Semantic-based chunking using embeddings"""
def __init__(self, embeddings, threshold: float = 0.5):
from langchain_experimental.text_splitter import SemanticChunker
self.splitter = SemanticChunker(
embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=threshold,
)
def chunk_documents(self, documents: List[Document]) -> List[Document]:
return self.splitter.split_documents(documents)
Ingestion Pipeline
# ingestion/pipeline.py
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma, PGVector
import hashlib
from typing import List, Optional
class IngestionPipeline:
"""Complete document ingestion pipeline"""
def __init__(
self,
vectorstore,
loader: DocumentLoader,
chunker: DocumentChunker,
):
self.vectorstore = vectorstore
self.loader = loader
self.chunker = chunker
def _generate_doc_id(self, doc: Document) -> str:
content = doc.page_content + str(doc.metadata)
return hashlib.md5(content.encode()).hexdigest()
def process_file(self, file_path: str) -> int:
# Load
documents = self.loader.load_file(file_path)
# Chunk
chunks = self.chunker.chunk_documents(documents)
# Generate IDs for deduplication
ids = [self._generate_doc_id(chunk) for chunk in chunks]
# Add to vectorstore
self.vectorstore.add_documents(chunks, ids=ids)
return len(chunks)
def process_directory(
self,
directory: str,
batch_size: int = 100
) -> int:
documents = self.loader.load_directory(directory)
chunks = self.chunker.chunk_documents(documents)
total = 0
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
ids = [self._generate_doc_id(chunk) for chunk in batch]
self.vectorstore.add_documents(batch, ids=ids)
total += len(batch)
print(f"Processed {total}/{len(chunks)} chunks")
return total
Vector Store
Chroma (Local Development)
# app/vectorstore.py
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from chromadb.config import Settings
def create_chroma_vectorstore(
collection_name: str = "documents",
persist_directory: str = "./chroma_db"
):
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma(
collection_name=collection_name,
embedding_function=embeddings,
persist_directory=persist_directory,
collection_metadata={"hnsw:space": "cosine"},
)
return vectorstore
PGVector (Production)
# app/vectorstore.py
from langchain_community.vectorstores import PGVector
from langchain_openai import OpenAIEmbeddings
def create_pgvector_store(
collection_name: str = "documents",
connection_string: str = None,
):
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = PGVector(
collection_name=collection_name,
connection_string=connection_string,
embedding_function=embeddings,
use_jsonb=True,
)
return vectorstore
# With connection pooling
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
def create_pgvector_with_pool(connection_string: str):
engine = create_engine(
connection_string,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
)
return engine
Retrieval
Advanced Retriever
# app/retriever.py
from langchain.retrievers import (
ContextualCompressionRetriever,
MultiQueryRetriever,
EnsembleRetriever,
)
from langchain.retrievers.document_compressors import (
LLMChainExtractor,
EmbeddingsFilter,
)
from langchain_community.retrievers import BM25Retriever
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
class HybridRetriever:
"""Combine vector search with BM25 for better retrieval"""
def __init__(
self,
vectorstore,
documents,
k: int = 4,
weights: tuple = (0.5, 0.5)
):
# Vector retriever
self.vector_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": k,
"fetch_k": k * 3,
"lambda_mult": 0.7,
}
)
# BM25 retriever
self.bm25_retriever = BM25Retriever.from_documents(documents)
self.bm25_retriever.k = k
# Ensemble
self.retriever = EnsembleRetriever(
retrievers=[self.bm25_retriever, self.vector_retriever],
weights=list(weights),
)
def get_retriever(self):
return self.retriever
class MultiQueryRAG:
"""Generate multiple queries for better retrieval"""
def __init__(self, vectorstore, llm=None):
self.llm = llm or ChatOpenAI(model="gpt-4o-mini", temperature=0)
self.base_retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
self.retriever = MultiQueryRetriever.from_llm(
retriever=self.base_retriever,
llm=self.llm,
)
def get_retriever(self):
return self.retriever
class RerankedRetriever:
"""Compress and rerank retrieved documents"""
def __init__(self, vectorstore, llm=None):
self.llm = llm or ChatOpenAI(model="gpt-4o-mini", temperature=0)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
# Embeddings filter
embeddings_filter = EmbeddingsFilter(
embeddings=self.embeddings,
similarity_threshold=0.76
)
# LLM compressor
compressor = LLMChainExtractor.from_llm(self.llm)
self.retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever,
)
def get_retriever(self):
return self.retriever
RAG Chain
Basic Chain
# app/chain.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain.chains import create_history_aware_retriever
from operator import itemgetter
RAG_PROMPT = """You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
Context:
{context}
Question: {question}
Answer:"""
class RAGChain:
"""Production RAG chain with streaming support"""
def __init__(
self,
retriever,
llm=None,
prompt: str = RAG_PROMPT
):
self.llm = llm or ChatOpenAI(model="gpt-4o", temperature=0)
self.retriever = retriever
self.prompt = ChatPromptTemplate.from_template(prompt)
self._build_chain()
def _format_docs(self, docs):
return "\n\n---\n\n".join(
f"Source: {doc.metadata.get('source', 'Unknown')}\n{doc.page_content}"
for doc in docs
)
def _build_chain(self):
self.chain = (
RunnableParallel(
context=self.retriever | self._format_docs,
question=RunnablePassthrough()
)
| self.prompt
| self.llm
| StrOutputParser()
)
def invoke(self, question: str) -> str:
return self.chain.invoke(question)
async def ainvoke(self, question: str) -> str:
return await self.chain.ainvoke(question)
def stream(self, question: str):
for chunk in self.chain.stream(question):
yield chunk
async def astream(self, question: str):
async for chunk in self.chain.astream(question):
yield chunk
Conversational RAG
# app/chain.py
from langchain_core.messages import HumanMessage, AIMessage
CONDENSE_PROMPT = """Given a chat history and the latest user question
which might reference context in the chat history, formulate a standalone question
which can be understood without the chat history.
Chat History:
{chat_history}
Follow Up Input: {question}
Standalone question:"""
class ConversationalRAG:
"""RAG with conversation history"""
def __init__(self, retriever, llm=None):
self.llm = llm or ChatOpenAI(model="gpt-4o", temperature=0)
self.retriever = retriever
self._build_chain()
def _build_chain(self):
# Condense question prompt
condense_prompt = ChatPromptTemplate.from_template(CONDENSE_PROMPT)
# History-aware retriever
self.history_retriever = create_history_aware_retriever(
self.llm,
self.retriever,
condense_prompt
)
# Answer prompt
answer_prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant. Answer based on the context.
Context: {context}"""),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
])
# Full chain
self.chain = (
RunnablePassthrough.assign(
context=self.history_retriever | self._format_docs
)
| answer_prompt
| self.llm
| StrOutputParser()
)
def _format_docs(self, docs):
return "\n\n".join(doc.page_content for doc in docs)
def invoke(
self,
question: str,
chat_history: list = None
) -> str:
chat_history = chat_history or []
return self.chain.invoke({
"question": question,
"chat_history": chat_history
})
RAG with Sources
from langchain_core.runnables import RunnableLambda
from typing import TypedDict, List
class RAGResponse(TypedDict):
answer: str
sources: List[dict]
class RAGWithSources:
"""RAG that returns sources with answers"""
def __init__(self, retriever, llm=None):
self.llm = llm or ChatOpenAI(model="gpt-4o", temperature=0)
self.retriever = retriever
self._build_chain()
def _build_chain(self):
prompt = ChatPromptTemplate.from_template(RAG_PROMPT)
def get_sources(docs):
return [
{
"source": doc.metadata.get("source", "Unknown"),
"page": doc.metadata.get("page", None),
"content": doc.page_content[:200] + "..."
}
for doc in docs
]
retrieve_and_format = RunnableParallel(
context=self.retriever | self._format_docs,
sources=self.retriever | get_sources,
question=RunnablePassthrough()
)
def combine_output(input_dict):
return RAGResponse(
answer=input_dict["answer"],
sources=input_dict["sources"]
)
self.chain = (
retrieve_and_format
| RunnablePassthrough.assign(
answer=prompt | self.llm | StrOutputParser()
)
| RunnableLambda(combine_output)
)
def _format_docs(self, docs):
return "\n\n".join(doc.page_content for doc in docs)
def invoke(self, question: str) -> RAGResponse:
return self.chain.invoke(question)
API Server
# app/api.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List, Optional
import asyncio
app = FastAPI(title="RAG API")
class Query(BaseModel):
question: str
chat_history: Optional[List[dict]] = None
class Response(BaseModel):
answer: str
sources: Optional[List[dict]] = None
# Initialize on startup
@app.on_event("startup")
async def startup():
global rag_chain
vectorstore = create_chroma_vectorstore()
retriever = vectorstore.as_retriever()
rag_chain = RAGWithSources(retriever)
@app.post("/query", response_model=Response)
async def query(request: Query):
try:
result = await asyncio.to_thread(
rag_chain.invoke,
request.question
)
return Response(
answer=result["answer"],
sources=result["sources"]
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/query/stream")
async def query_stream(request: Query):
async def generate():
async for chunk in rag_chain.astream(request.question):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
Evaluation
# evaluation.py
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
)
from datasets import Dataset
def evaluate_rag(qa_pairs: list, rag_chain):
"""Evaluate RAG pipeline with RAGAS"""
questions = [pair["question"] for pair in qa_pairs]
ground_truths = [pair["ground_truth"] for pair in qa_pairs]
# Generate answers
answers = []
contexts = []
for q in questions:
result = rag_chain.invoke(q)
answers.append(result["answer"])
contexts.append([s["content"] for s in result["sources"]])
# Create dataset
dataset = Dataset.from_dict({
"question": questions,
"answer": answers,
"contexts": contexts,
"ground_truth": ground_truths,
})
# Evaluate
results = evaluate(
dataset,
metrics=[
faithfulness,
answer_relevancy,
context_precision,
context_recall,
]
)
return results
Conclusion
Building production RAG systems requires careful attention to document processing, retrieval quality, and evaluation. LangChain provides the building blocks, but success depends on proper chunking, hybrid retrieval, and continuous evaluation.
Key takeaways:
- Use semantic chunking for better context
- Implement hybrid retrieval (vector + BM25)
- Add reranking for precision
- Stream responses for better UX
- Evaluate with RAGAS metrics
Resources
이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)
