FastAPI Production Best Practices: Complete 2026 Guide
in Development on Python, Fastapi, Backend, Api
FastAPI Production Best Practices: Complete 2026 Guide
FastAPI has become the go-to Python framework for building modern APIs. This comprehensive guide covers production-ready patterns, performance optimization, and best practices for 2026.
Why FastAPI?
- High performance - On par with Node.js and Go
- Type hints - Full Python type annotation support
- Auto documentation - OpenAPI and JSON Schema generation
- Async support - Native async/await support
- Validation - Pydantic v2 integration
- Developer experience - Excellent IDE support and debugging
Project Structure
my_api/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── database.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── router.py
│ │ │ ├── endpoints/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── users.py
│ │ │ │ ├── items.py
│ │ │ │ └── auth.py
│ │ │ └── dependencies.py
│ │ └── v2/
│ ├── core/
│ │ ├── __init__.py
│ │ ├── security.py
│ │ ├── exceptions.py
│ │ └── middleware.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── user_service.py
│ │ └── item_service.py
│ └── repositories/
│ ├── __init__.py
│ ├── base.py
│ ├── user_repository.py
│ └── item_repository.py
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_users.py
│ └── test_items.py
├── alembic/
│ └── versions/
├── pyproject.toml
├── Dockerfile
└── docker-compose.yaml
Core Configuration
Settings with Pydantic
# app/config.py
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False
)
# Application
app_name: str = "My API"
app_version: str = "1.0.0"
debug: bool = False
environment: str = "production"
# API
api_v1_prefix: str = "/api/v1"
# Database
database_url: str
database_pool_size: int = 20
database_max_overflow: int = 10
# Redis
redis_url: str = "redis://localhost:6379"
# Security
secret_key: str
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
algorithm: str = "HS256"
# CORS
allowed_origins: list[str] = ["*"]
# External Services
aws_region: str = "us-east-1"
s3_bucket: str = ""
@lru_cache
def get_settings() -> Settings:
return Settings()
settings = get_settings()
Main Application
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from app.config import settings
from app.api.v1.router import api_router
from app.core.middleware import RequestLoggingMiddleware
from app.core.exceptions import setup_exception_handlers
from app.database import init_db, close_db
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await init_db()
yield
# Shutdown
await close_db()
def create_application() -> FastAPI:
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
openapi_url=f"{settings.api_v1_prefix}/openapi.json",
docs_url=f"{settings.api_v1_prefix}/docs",
redoc_url=f"{settings.api_v1_prefix}/redoc",
lifespan=lifespan,
)
# Middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(RequestLoggingMiddleware)
# Exception handlers
setup_exception_handlers(app)
# Routes
app.include_router(api_router, prefix=settings.api_v1_prefix)
return app
app = create_application()
@app.get("/health")
async def health_check():
return {"status": "healthy"}
Database Layer
Async SQLAlchemy
# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
engine = create_async_engine(
settings.database_url,
pool_size=settings.database_pool_size,
max_overflow=settings.database_max_overflow,
pool_pre_ping=True,
echo=settings.debug,
)
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
class Base(DeclarativeBase):
pass
async def init_db():
async with engine.begin() as conn:
# Create tables (use Alembic in production)
await conn.run_sync(Base.metadata.create_all)
async def close_db():
await engine.dispose()
async def get_db() -> AsyncSession:
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
Models
# app/models/user.py
from datetime import datetime
from sqlalchemy import String, Boolean, DateTime
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
full_name: Mapped[str | None] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
is_superuser: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
updated_at: Mapped[datetime | None] = mapped_column(DateTime, onupdate=datetime.utcnow)
items: Mapped[list["Item"]] = relationship(back_populates="owner")
Repository Pattern
# app/repositories/base.py
from typing import TypeVar, Generic
from sqlalchemy import select, delete, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import Base
ModelType = TypeVar("ModelType", bound=Base)
class BaseRepository(Generic[ModelType]):
def __init__(self, model: type[ModelType], session: AsyncSession):
self.model = model
self.session = session
async def get(self, id: int) -> ModelType | None:
result = await self.session.execute(
select(self.model).where(self.model.id == id)
)
return result.scalar_one_or_none()
async def get_multi(
self, *, skip: int = 0, limit: int = 100
) -> list[ModelType]:
result = await self.session.execute(
select(self.model).offset(skip).limit(limit)
)
return list(result.scalars().all())
async def create(self, obj_in: dict) -> ModelType:
db_obj = self.model(**obj_in)
self.session.add(db_obj)
await self.session.flush()
await self.session.refresh(db_obj)
return db_obj
async def update(self, id: int, obj_in: dict) -> ModelType | None:
await self.session.execute(
update(self.model).where(self.model.id == id).values(**obj_in)
)
return await self.get(id)
async def delete(self, id: int) -> bool:
result = await self.session.execute(
delete(self.model).where(self.model.id == id)
)
return result.rowcount > 0
# app/repositories/user_repository.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.repositories.base import BaseRepository
class UserRepository(BaseRepository[User]):
def __init__(self, session: AsyncSession):
super().__init__(User, session)
async def get_by_email(self, email: str) -> User | None:
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def get_active_users(self) -> list[User]:
result = await self.session.execute(
select(User).where(User.is_active == True)
)
return list(result.scalars().all())
Schemas with Pydantic v2
# app/schemas/user.py
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field, ConfigDict
class UserBase(BaseModel):
email: EmailStr
full_name: str | None = None
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
class UserUpdate(BaseModel):
email: EmailStr | None = None
full_name: str | None = None
password: str | None = Field(None, min_length=8)
class UserResponse(UserBase):
model_config = ConfigDict(from_attributes=True)
id: int
is_active: bool
created_at: datetime
class UserInDB(UserResponse):
hashed_password: str
API Endpoints
# app/api/v1/endpoints/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.user import UserCreate, UserResponse, UserUpdate
from app.services.user_service import UserService
from app.api.v1.dependencies import get_current_user, get_current_superuser
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/", response_model=list[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_superuser),
):
"""List all users (admin only)"""
service = UserService(db)
return await service.get_users(skip=skip, limit=limit)
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
current_user = Depends(get_current_user),
):
"""Get current user info"""
return current_user
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_in: UserCreate,
db: AsyncSession = Depends(get_db),
):
"""Create new user"""
service = UserService(db)
existing_user = await service.get_by_email(user_in.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
return await service.create_user(user_in)
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user),
):
"""Get user by ID"""
service = UserService(db)
user = await service.get_user(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.patch("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_in: UserUpdate,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user),
):
"""Update user"""
if current_user.id != user_id and not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized"
)
service = UserService(db)
user = await service.update_user(user_id, user_in)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
Authentication
# app/core/security.py
from datetime import datetime, timedelta
from typing import Any
from jose import jwt, JWTError
from passlib.context import CryptContext
from app.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(subject: str | Any, expires_delta: timedelta | None = None) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode = {"exp": expire, "sub": str(subject), "type": "access"}
return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
def create_refresh_token(subject: str | Any) -> str:
expire = datetime.utcnow() + timedelta(days=settings.refresh_token_expire_days)
to_encode = {"exp": expire, "sub": str(subject), "type": "refresh"}
return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
def decode_token(token: str) -> dict | None:
try:
return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
except JWTError:
return None
# app/api/v1/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.core.security import decode_token
from app.services.user_service import UserService
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_token(token)
if payload is None:
raise credentials_exception
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
service = UserService(db)
user = await service.get_user(int(user_id))
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return user
async def get_current_superuser(current_user = Depends(get_current_user)):
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Superuser required"
)
return current_user
Middleware
# app/core/middleware.py
import time
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import structlog
logger = structlog.get_logger()
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = str(uuid.uuid4())
start_time = time.perf_counter()
# Add request ID to state
request.state.request_id = request_id
# Log request
logger.info(
"request_started",
request_id=request_id,
method=request.method,
path=request.url.path,
)
response = await call_next(request)
# Log response
process_time = time.perf_counter() - start_time
logger.info(
"request_completed",
request_id=request_id,
status_code=response.status_code,
process_time_ms=round(process_time * 1000, 2),
)
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(process_time)
return response
Testing
# tests/conftest.py
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.main import app
from app.database import Base, get_db
from app.config import settings
# Test database
TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db"
engine = create_async_engine(TEST_DATABASE_URL, echo=True)
TestingSessionLocal = async_sessionmaker(engine, class_=AsyncSession)
@pytest.fixture(scope="session")
def anyio_backend():
return "asyncio"
@pytest.fixture(scope="function")
async def db_session():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with TestingSessionLocal() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture(scope="function")
async def client(db_session):
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
# tests/test_users.py
import pytest
from httpx import AsyncClient
@pytest.mark.anyio
async def test_create_user(client: AsyncClient):
response = await client.post(
"/api/v1/users/",
json={
"email": "test@example.com",
"password": "testpassword123",
"full_name": "Test User"
}
)
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "id" in data
assert "hashed_password" not in data
@pytest.mark.anyio
async def test_create_duplicate_user(client: AsyncClient):
# Create first user
await client.post(
"/api/v1/users/",
json={"email": "test@example.com", "password": "testpassword123"}
)
# Try to create duplicate
response = await client.post(
"/api/v1/users/",
json={"email": "test@example.com", "password": "testpassword123"}
)
assert response.status_code == 400
Docker Deployment
# Dockerfile
FROM python:3.12-slim as builder
WORKDIR /app
RUN pip install poetry
COPY pyproject.toml poetry.lock ./
RUN poetry export -f requirements.txt --output requirements.txt --without-hashes
FROM python:3.12-slim
WORKDIR /app
RUN useradd --create-home appuser && \
apt-get update && \
apt-get install -y --no-install-recommends curl && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY --chown=appuser:appuser . .
USER appuser
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yaml
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/myapp
- REDIS_URL=redis://redis:6379
- SECRET_KEY=${SECRET_KEY}
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: myapp
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d myapp"]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
Conclusion
FastAPI provides an excellent foundation for building production-grade APIs. By following these patterns, you can create maintainable, scalable, and secure applications.
Key takeaways:
- Use Pydantic v2 for validation and settings
- Implement repository pattern for data access
- Use async/await throughout
- Write comprehensive tests
- Containerize for deployment
Resources
이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)
