After building Python microservices across multiple companies—from data pipelines with RabbitMQ to AI-enabled backend systems—I’ve converged on a set of patterns I reach for in every new service. They’re not clever. They’re not sexy. But they prevent the bugs that wake you up at 3 AM.
These are the five patterns I copy into every Python microservice before I write a single line of business logic. Each one has earned its place by preventing a real production incident.
1. Structured Configuration with Validation
The first thing that goes wrong in a microservice is configuration. A missing environment variable, a typo in a database URL, a port that’s already in use. I want to know about these problems at startup, not when the first request hits.
I use Pydantic’s BaseSettings to create a single, validated configuration object:
from pydantic_settings import BaseSettings
from pydantic import Field, field_validator
from functools import lru_cache
class Settings(BaseSettings):
# Service identity
service_name: str = "my-service"
environment: str = Field(default="development")
debug: bool = False
# Server
host: str = "0.0.0.0"
port: int = 8000
# Database
database_url: str
db_pool_min: int = 2
db_pool_max: int = 10
# External services
rabbitmq_url: str = "amqp://guest:guest@localhost:5672/"
redis_url: str = "redis://localhost:6379/0"
# Timeouts (seconds)
request_timeout: int = 30
shutdown_timeout: int = 15
@field_validator("environment")
@classmethod
def validate_environment(cls, v: str) -> str:
allowed = {"development", "staging", "production"}
if v not in allowed:
raise ValueError(f"environment must be one of {allowed}")
return v
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
"case_sensitive": False,
}
@lru_cache
def get_settings() -> Settings:
return Settings()
Why this matters:
- Fail fast. If
database_urlis missing, the service refuses to start. You find out immediately, not after your first database query. - Type safety.
portis always anint,debugis always abool. No moreif os.getenv("DEBUG") == "true"scattered across your code. - Single source of truth. Every module imports
get_settings(). No moreos.getenv()calls hidden in random files. - Testable. Override settings in tests without touching environment variables.
I’ve seen services in production that crashed hours after deployment because a single env var was misspelled. This pattern catches that before the process even binds to a port.
2. Graceful Shutdown
This is the pattern that separates “it works on my machine” services from production-ready ones. When your service receives a SIGTERM (which happens on every deploy, every scale-down, every container restart), it needs to:
- Stop accepting new work
- Finish in-progress requests
- Close database connections and message queues
- Exit cleanly
Here’s my standard implementation:
import asyncio
import signal
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
from typing import Any
from fastapi import FastAPI
class GracefulShutdown:
def __init__(self, timeout: int = 15):
self.timeout = timeout
self.is_shutting_down = False
self._tasks: list[asyncio.Task[Any]] = []
self._cleanup_handlers: list[Any] = []
def register_cleanup(self, handler: Any) -> None:
"""Register an async cleanup function."""
self._cleanup_handlers.append(handler)
def track_task(self, task: asyncio.Task[Any]) -> None:
"""Track a background task for graceful completion."""
self._tasks.append(task)
task.add_done_callback(self._tasks.remove)
async def shutdown(self) -> None:
self.is_shutting_down = True
# Wait for in-flight tasks
if self._tasks:
await asyncio.wait(
self._tasks,
timeout=self.timeout,
)
# Run cleanup handlers
for handler in reversed(self._cleanup_handlers):
try:
await handler()
except Exception as e:
print(f"Cleanup error: {e}")
shutdown_manager = GracefulShutdown()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Startup
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(shutdown_manager.shutdown()),
)
yield
# Shutdown
await shutdown_manager.shutdown()
app = FastAPI(lifespan=lifespan)
The critical detail most tutorials miss: you need to handle both the signal and the ASGI lifespan shutdown. Kubernetes sends SIGTERM, but your ASGI server (uvicorn) also has its own shutdown sequence. This pattern handles both.
In a previous role, I worked on data pipeline services using RabbitMQ where graceful shutdown was critical. If a service died mid-message, that message could be lost or processed twice. The register_cleanup pattern let us ensure every message was either fully processed or properly re-queued before the service exited.
3. Health Check Endpoints (That Actually Check Health)
I’ve seen too many health checks that just return {"status": "ok"}. That tells your orchestrator the process is running, but not whether it can actually do useful work.
A proper health check verifies every dependency:
import time
from dataclasses import dataclass, field
from fastapi import FastAPI, Response
@dataclass
class HealthCheck:
name: str
check: Any # async callable
critical: bool = True
@dataclass
class HealthRegistry:
checks: list[HealthCheck] = field(default_factory=list)
def register(
self, name: str, check: Any, critical: bool = True
) -> None:
self.checks.append(
HealthCheck(name=name, check=check, critical=critical)
)
async def run_all(self) -> dict:
results: dict = {}
all_healthy = True
for health_check in self.checks:
start = time.monotonic()
try:
await health_check.check()
results[health_check.name] = {
"status": "healthy",
"latency_ms": round(
(time.monotonic() - start) * 1000, 2
),
}
except Exception as e:
results[health_check.name] = {
"status": "unhealthy",
"error": str(e),
"latency_ms": round(
(time.monotonic() - start) * 1000, 2
),
}
if health_check.critical:
all_healthy = False
return {
"status": "healthy" if all_healthy else "unhealthy",
"checks": results,
}
health = HealthRegistry()
app = FastAPI()
# Register checks at startup
async def check_database():
"""Verify database connection with a simple query."""
async with db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
async def check_redis():
"""Verify Redis is reachable."""
await redis_client.ping()
async def check_rabbitmq():
"""Verify RabbitMQ connection is open."""
if not rabbitmq_connection or rabbitmq_connection.is_closed:
raise ConnectionError("RabbitMQ connection is closed")
health.register("database", check_database, critical=True)
health.register("redis", check_redis, critical=True)
health.register("rabbitmq", check_rabbitmq, critical=False)
@app.get("/health")
async def health_endpoint(response: Response):
result = await health.run_all()
if result["status"] == "unhealthy":
response.status_code = 503
return result
@app.get("/health/live")
async def liveness():
"""Kubernetes liveness probe - is the process alive?"""
return {"status": "alive"}
@app.get("/health/ready")
async def readiness(response: Response):
"""Kubernetes readiness probe - can we serve traffic?"""
result = await health.run_all()
if result["status"] == "unhealthy":
response.status_code = 503
return result
Key distinctions:
/health/live— Is the process running? (Liveness probe. If this fails, Kubernetes restarts the pod.)/health/ready— Can this instance serve traffic? (Readiness probe. If this fails, the pod is removed from the load balancer but not killed.)/health— Full diagnostic for dashboards and debugging.
The critical flag matters. If Redis is down but you use it only for caching, the service can still serve requests (degraded). But if the database is down, you’re completely broken. This distinction prevents unnecessary restarts while still catching real problems.
4. Idempotency for Message Processing
If you’re consuming messages from a queue (RabbitMQ, Kafka, SQS), you will eventually process the same message twice. Network blips, consumer restarts, rebalances—it’s not a matter of if, it’s when.
The solution is making your message handlers idempotent:
import hashlib
import json
from datetime import datetime, timedelta
class IdempotencyStore:
"""Track processed messages to prevent duplicate handling."""
def __init__(self, redis_client, ttl_hours: int = 24):
self.redis = redis_client
self.ttl = timedelta(hours=ttl_hours)
def _make_key(self, message_id: str) -> str:
return f"idempotency:{message_id}"
async def is_processed(self, message_id: str) -> bool:
"""Check if a message has already been processed."""
return await self.redis.exists(self._make_key(message_id))
async def mark_processed(
self,
message_id: str,
result: dict | None = None,
) -> None:
"""Mark a message as successfully processed."""
value = json.dumps({
"processed_at": datetime.utcnow().isoformat(),
"result": result,
})
await self.redis.setex(
self._make_key(message_id),
int(self.ttl.total_seconds()),
value,
)
async def get_result(self, message_id: str) -> dict | None:
"""Retrieve the result of a previously processed message."""
data = await self.redis.get(self._make_key(message_id))
if data:
return json.loads(data)
return None
def generate_message_id(payload: dict) -> str:
"""Generate a deterministic ID from message content."""
content = json.dumps(payload, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
# Usage in a message consumer
async def handle_message(message: dict) -> None:
store = IdempotencyStore(redis_client)
message_id = message.get("id") or generate_message_id(message)
if await store.is_processed(message_id):
logger.info(
"Skipping duplicate message",
message_id=message_id,
)
return
try:
result = await process_business_logic(message)
await store.mark_processed(message_id, result)
except Exception:
# Don't mark as processed - allow retry
raise
The subtlety here is in the error path. If processing fails, we deliberately don’t mark the message as processed. This lets the message broker redeliver it for retry. Only successful processing gets recorded.
I implemented this pattern while working on data pipeline services that used RabbitMQ for message delivery. When a consumer restarted mid-batch, messages would be redelivered. Without idempotency, files could be processed twice, leading to duplicate data. With this pattern, the second delivery was detected and skipped within milliseconds.
The generate_message_id function is a fallback for messages that don’t have a unique ID. By hashing the content deterministically, you get natural deduplication even without explicit message IDs.
5. Structured Logging with Context
print() debugging doesn’t scale. When you have dozens of service instances processing thousands of requests, you need logs that are searchable, filterable, and carry context automatically.
I use structlog for this:
import uuid
import structlog
from contextvars import ContextVar
from starlette.middleware.base import (
BaseHTTPMiddleware,
RequestResponseEndpoint,
)
from starlette.requests import Request
from starlette.responses import Response
# Context variable for request-scoped data
request_id_var: ContextVar[str | None] = ContextVar(
"request_id", default=None
)
def setup_logging(environment: str = "development") -> None:
"""Configure structured logging for the service."""
processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
]
if environment == "production":
# JSON output for log aggregators
processors.append(structlog.processors.JSONRenderer())
else:
# Human-readable for development
processors.append(structlog.dev.ConsoleRenderer())
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(0),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
class RequestContextMiddleware(BaseHTTPMiddleware):
"""Inject request context into every log line."""
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
req_id = request.headers.get(
"x-request-id", str(uuid.uuid4())
)
request_id_var.set(req_id)
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=req_id,
method=request.method,
path=request.url.path,
)
logger = structlog.get_logger()
await logger.ainfo("request_started")
try:
response = await call_next(request)
await logger.ainfo(
"request_completed",
status_code=response.status_code,
)
response.headers["x-request-id"] = req_id
return response
except Exception:
await logger.aexception("request_failed")
raise
# Usage throughout your codebase
logger = structlog.get_logger()
async def process_order(order_id: str, user_id: str) -> None:
# Bind context for this operation
log = logger.bind(order_id=order_id, user_id=user_id)
await log.ainfo("processing_order")
# Every subsequent log in this call chain includes
# request_id, method, path, order_id, and user_id
items = await fetch_order_items(order_id)
await log.ainfo("items_fetched", count=len(items))
await validate_inventory(items)
await log.ainfo("inventory_validated")
In development, this gives you readable output:
2026-02-10 10:15:32 [info] request_started request_id=abc-123 method=POST path=/orders
2026-02-10 10:15:32 [info] processing_order request_id=abc-123 order_id=ord-456 user_id=usr-789
2026-02-10 10:15:33 [info] items_fetched request_id=abc-123 order_id=ord-456 count=3
2026-02-10 10:15:33 [info] request_completed request_id=abc-123 status_code=201
In production, every line is JSON that feeds directly into your log aggregator:
{"event": "processing_order", "request_id": "abc-123", "order_id": "ord-456", "user_id": "usr-789", "level": "info", "timestamp": "2026-02-10T10:15:32Z"}
The magic is contextvars. You bind context once at the middleware level, and it automatically appears in every log line for that request, across every function call, without passing a logger through every function signature. When something breaks at 3 AM, you search by request_id and see the entire story of what happened.
Putting It All Together
These five patterns form the skeleton of every microservice I build:
- Structured Configuration — Fail fast, fail clearly
- Graceful Shutdown — Deploy without dropping requests
- Health Checks — Tell your orchestrator what’s actually happening
- Idempotency — Process messages exactly once (even when they arrive twice)
- Structured Logging — Debug production without guessing
None of these are revolutionary. You won’t find them in conference talks about the latest framework. But after 6+ years of building backend services—from data pipeline systems with RabbitMQ to AI-enabled platforms with REST and GraphQL APIs—these are the patterns that consistently prevent the most painful production incidents.
The common thread? They’re all about handling the unhappy path. Any developer can build a service that works when everything goes right. Senior engineering is about building services that behave predictably when things go wrong.
Start with these five patterns, then write your business logic. Your future self (and your on-call rotation) will thank you.