Vector Databases and RAG Architecture: Building Production-Ready AI Search in 2026



Vector Databases and RAG Architecture: Building Production-Ready AI Search in 2026

Retrieval-Augmented Generation (RAG) has moved from experimental to essential. If your LLM application doesn’t have a solid retrieval backbone, you’re leaving accuracy—and cost efficiency—on the table. This post covers production patterns for vector databases and RAG pipelines based on real-world deployments.

Vector Database Architecture Photo by Lars Kienle on Unsplash

Why RAG Still Wins Over Long Context

With Gemini 1.5 Pro offering 1M token windows and Claude supporting 200K, you might wonder if RAG is obsolete. It isn’t—and here’s why:

FactorLong ContextRAG
CostHigh (linear scaling)Low (selective retrieval)
LatencyHigh (full context eval)Low (top-k chunks)
FreshnessStatic at inferenceDynamic
AccuracyGood for short docsBetter for large corpora

For corpora over a few thousand documents, RAG remains the right architecture.

Vector Database Landscape in 2026

pgvector — The Boring But Powerful Choice

If you already run PostgreSQL, pgvector is often the right answer:

-- Enable extension
CREATE EXTENSION IF NOT EXISTS vector;

-- Create table with embedding column
CREATE TABLE documents (
  id BIGSERIAL PRIMARY KEY,
  content TEXT NOT NULL,
  metadata JSONB,
  embedding VECTOR(1536),  -- OpenAI text-embedding-3-small
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- HNSW index for fast ANN search
CREATE INDEX ON documents
  USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);

-- Similarity search
SELECT id, content, metadata,
       1 - (embedding <=> $1::vector) AS similarity
FROM documents
WHERE metadata->>'tenant_id' = $2
ORDER BY embedding <=> $1::vector
LIMIT 10;

pgvector 0.8+ improvements (2026):

  • Parallel HNSW build
  • halfvec type (768 dims in half storage)
  • Bit quantization for dense vectors
  • Streaming index builds without table locks

Qdrant — When You Need Purpose-Built Performance

Qdrant shines for high-cardinality multi-tenant workloads:

from qdrant_client import QdrantClient
from qdrant_client.models import (
    VectorParams, Distance, PointStruct,
    Filter, FieldCondition, MatchValue,
    PayloadSchemaType
)

client = QdrantClient(url="http://localhost:6333")

# Create collection with quantization
client.create_collection(
    collection_name="docs",
    vectors_config=VectorParams(
        size=1536,
        distance=Distance.COSINE,
        quantization_config={
            "scalar": {
                "type": "int8",
                "quantile": 0.99,
                "always_ram": True
            }
        }
    ),
    on_disk_payload=True  # Keep payload on disk, vectors in RAM
)

# Create tenant-aware payload index
client.create_payload_index(
    collection_name="docs",
    field_name="tenant_id",
    field_schema=PayloadSchemaType.KEYWORD
)

# Filtered search (tenant isolation)
results = client.search(
    collection_name="docs",
    query_vector=query_embedding,
    query_filter=Filter(
        must=[FieldCondition(
            key="tenant_id",
            match=MatchValue(value="acme-corp")
        )]
    ),
    limit=10,
    with_payload=True
)

Qdrant 1.11 features:

  • Multi-vector support: store multiple embeddings per document (e.g., title + body)
  • Sparse vectors: hybrid search with BM25 + dense in one collection
  • Web UI for collection inspection
  • Fastembed integration for on-node embedding

Weaviate excels at hybrid search and has native LLM module integration:

{
  Get {
    Article(
      hybrid: {
        query: "kubernetes production best practices"
        alpha: 0.75  # 0=BM25, 1=vector, 0.75=mostly vector
      }
      limit: 5
      where: {
        path: ["publishedYear"]
        operator: GreaterThan
        valueInt: 2024
      }
    ) {
      title
      content
      _additional {
        score
        explainScore
        generate(
          singleResult: {
            prompt: "Summarize this in one sentence: {content}"
          }
        ) {
          singleResult
        }
      }
    }
  }
}

Production RAG Pipeline Architecture

Ingestion Pipeline

import asyncio
from typing import AsyncIterator
from dataclasses import dataclass
import httpx
from openai import AsyncOpenAI

@dataclass
class Chunk:
    content: str
    metadata: dict
    embedding: list[float] | None = None

class RAGIngestionPipeline:
    def __init__(self, vector_db, embedding_model="text-embedding-3-small"):
        self.vector_db = vector_db
        self.openai = AsyncOpenAI()
        self.embedding_model = embedding_model

    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Batch embedding with retry logic."""
        response = await self.openai.embeddings.create(
            model=self.embedding_model,
            input=texts,
            encoding_format="float"
        )
        return [item.embedding for item in response.data]

    async def chunk_document(
        self, content: str, metadata: dict
    ) -> AsyncIterator[Chunk]:
        """
        Semantic chunking with overlap.
        For production, consider LlamaIndex's SemanticSplitterNodeParser.
        """
        # Simple fixed-size chunking (replace with semantic in production)
        chunk_size = 512
        overlap = 64
        tokens = content.split()  # simplification; use tiktoken

        for i in range(0, len(tokens), chunk_size - overlap):
            chunk_tokens = tokens[i:i + chunk_size]
            yield Chunk(
                content=" ".join(chunk_tokens),
                metadata={**metadata, "chunk_index": i // (chunk_size - overlap)}
            )

    async def ingest(self, documents: list[dict], batch_size: int = 32):
        """Ingest documents with batched embedding."""
        chunks = []
        async for doc in self._iter_documents(documents):
            async for chunk in self.chunk_document(doc["content"], doc["metadata"]):
                chunks.append(chunk)

                if len(chunks) >= batch_size:
                    await self._embed_and_store(chunks)
                    chunks = []

        if chunks:
            await self._embed_and_store(chunks)

    async def _embed_and_store(self, chunks: list[Chunk]):
        texts = [c.content for c in chunks]
        embeddings = await self.embed_batch(texts)

        for chunk, embedding in zip(chunks, embeddings):
            chunk.embedding = embedding

        await self.vector_db.upsert(chunks)

Query Pipeline with Hybrid Reranking

from typing import Optional
import cohere

class RAGQueryPipeline:
    def __init__(self, vector_db, llm_client, reranker=None):
        self.vector_db = vector_db
        self.llm = llm_client
        self.reranker = reranker or cohere.Client()

    async def retrieve(
        self,
        query: str,
        top_k: int = 20,
        rerank_top_n: int = 5,
        filters: Optional[dict] = None
    ) -> list[dict]:
        # 1. Embed query
        query_embedding = await self.embed_query(query)

        # 2. Vector search (over-fetch for reranking)
        candidates = await self.vector_db.search(
            embedding=query_embedding,
            limit=top_k,
            filters=filters
        )

        # 3. Rerank with cross-encoder
        if self.reranker and len(candidates) > rerank_top_n:
            reranked = self.reranker.rerank(
                model="rerank-v3.5",
                query=query,
                documents=[c["content"] for c in candidates],
                top_n=rerank_top_n
            )
            candidates = [candidates[r.index] for r in reranked.results]

        return candidates[:rerank_top_n]

    async def generate(
        self,
        query: str,
        context_docs: list[dict],
        system_prompt: str = None
    ) -> str:
        context = "\n\n---\n\n".join([
            f"[Source: {doc['metadata'].get('title', 'Unknown')}]\n{doc['content']}"
            for doc in context_docs
        ])

        messages = [
            {
                "role": "system",
                "content": system_prompt or (
                    "You are a helpful assistant. Answer based on the provided context. "
                    "If the answer isn't in the context, say so clearly."
                )
            },
            {
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {query}"
            }
        ]

        response = await self.llm.chat.completions.create(
            model="gpt-4.1",
            messages=messages,
            temperature=0.1
        )
        return response.choices[0].message.content

Critical Production Considerations

1. Chunking Strategy Matters Most

Bad chunking is the #1 cause of poor RAG quality:

# ❌ Fixed token chunking (loses context at boundaries)
chunks = textwrap.wrap(document, 500)

# ✅ Semantic chunking (respects paragraph/section boundaries)
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.embeddings.openai import OpenAIEmbedding

splitter = SemanticSplitterNodeParser(
    buffer_size=1,
    breakpoint_percentile_threshold=95,
    embed_model=OpenAIEmbedding()
)
nodes = splitter.get_nodes_from_documents(documents)

2. Metadata Filtering is Essential for Multi-Tenancy

Never rely on semantic search alone for access control:

# Always filter by tenant before semantic ranking
results = await vector_db.search(
    embedding=query_embedding,
    filters={
        "tenant_id": current_user.tenant_id,
        "document_status": "published",
        "access_level": {"$lte": current_user.access_level}
    }
)

3. Embedding Model Versioning

When you update your embedding model, ALL existing vectors become incompatible:

# Track embedding model version per document
metadata = {
    "embedding_model": "text-embedding-3-small",
    "embedding_version": "v1",
    "embedded_at": datetime.utcnow().isoformat()
}

# Re-index strategy: dual-write during migration
async def migrate_collection(old_version: str, new_version: str):
    # Stream old docs, re-embed, write to new collection
    # Use blue/green collection swap when complete
    pass

4. Observability for RAG

from opentelemetry import trace

tracer = trace.get_tracer("rag-pipeline")

async def traced_retrieve(query: str, **kwargs):
    with tracer.start_as_current_span("rag.retrieve") as span:
        span.set_attribute("rag.query", query)
        span.set_attribute("rag.top_k", kwargs.get("top_k", 10))

        results = await retrieve(query, **kwargs)

        span.set_attribute("rag.results_count", len(results))
        span.set_attribute("rag.avg_score",
            sum(r["score"] for r in results) / max(len(results), 1)
        )
        return results

Benchmarks: What to Expect in Production

SolutionQPS (1M vectors)P99 LatencyMemory (1M × 1536d)
pgvector (HNSW)~5008ms~12GB
Qdrant (int8)~30003ms~6GB
Qdrant (float32)~20004ms~24GB
Weaviate~15005ms~18GB

Benchmarks on 16-core ARM instance, cosine similarity, ef=64

Conclusion

RAG is infrastructure, not an afterthought. The teams winning with LLM applications in 2026 are the ones that treat vector search with the same rigor as their relational databases: proper indexing, monitoring, capacity planning, and schema evolution.

Start with pgvector if you’re already on PostgreSQL. Graduate to Qdrant when you hit multi-tenancy or throughput ceilings. Use Weaviate when GraphQL APIs and generative modules match your team’s stack.

The retrieval layer is where most RAG quality gains are found. Invest there before optimizing your prompts.

이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)