I keep reaching for the same five patterns every time I start a new Python microservice. They’re boring. Nobody writes conference talks about them. But every time I skip one, I regret it at 3 AM when something breaks in production.
After building microservices across different companies, from data pipelines with RabbitMQ to backend APIs to AI-powered services, the list hasn’t really changed. These patterns all came from real incidents that could’ve been avoided.
1. Structured config with validation
The first thing that breaks is configuration. Missing env var, typo in a database URL, port conflict. You want to catch these at startup, not when the first request hits.
from pydantic_settings import BaseSettings
from pydantic import Field, field_validator
from functools import lru_cache
class Settings(BaseSettings):
service_name: str = "my-service"
environment: str = Field(default="development")
database_url: str # no default = required
db_pool_min: int = 2
db_pool_max: int = 10
rabbitmq_url: str = "amqp://guest:guest@localhost:5672/"
shutdown_timeout: int = 15
@field_validator("environment")
@classmethod
def validate_environment(cls, v: str) -> str:
allowed = {"development", "staging", "production"}
if v not in allowed:
raise ValueError(f"environment must be one of {allowed}")
return v
model_config = {"env_file": ".env", "case_sensitive": False}
@lru_cache
def get_settings() -> Settings:
return Settings()
Missing database_url? Service won’t start. You find out now, not after your first query. Every module calls get_settings(). No rogue os.getenv() hiding in random files.
I’ve seen services crash hours after deployment because someone misspelled an env var. This catches it before the process even binds to a port. Small thing. Probably saves more time over a year than anything else on this list.
2. Graceful shutdown
When your service gets a SIGTERM — every deploy, every scale-down, every container restart — it needs to stop accepting new work, finish what’s in progress, close connections, and exit clean.
import asyncio
import signal
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
from fastapi import FastAPI
class GracefulShutdown:
def __init__(self, timeout: int = 15):
self.timeout = timeout
self.is_shutting_down = False
self._tasks: list[asyncio.Task] = []
self._cleanup_handlers: list = []
def register_cleanup(self, handler) -> None:
self._cleanup_handlers.append(handler)
def track_task(self, task: asyncio.Task) -> None:
self._tasks.append(task)
task.add_done_callback(self._tasks.remove)
async def shutdown(self) -> None:
self.is_shutting_down = True
if self._tasks:
await asyncio.wait(self._tasks, timeout=self.timeout)
for handler in reversed(self._cleanup_handlers):
try:
await handler()
except Exception as e:
print(f"Cleanup error: {e}") # FIXME: use proper logging
When I was working on data pipeline services with RabbitMQ, this was critical. If a service died mid-message, that message could be lost or processed twice. The cleanup approach made sure every message was either fully processed or re-queued before the service went down.
One thing people miss: the ordering matters. Cleanup handlers should run in reverse. You want to close the thing you opened last first. Database connections should close last because other handlers might still need them during their own cleanup.
3. Health checks that actually check health
I’ve seen too many health endpoints that just return {"status": "ok"}. That tells your orchestrator the process is alive, not whether it can do useful work.
A real health check hits every dependency:
import time
from dataclasses import dataclass, field
from fastapi import Response
@dataclass
class HealthRegistry:
checks: list = field(default_factory=list)
def register(self, name: str, check, critical: bool = True):
self.checks.append({"name": name, "check": check, "critical": critical})
async def run_all(self) -> dict:
results = {}
all_healthy = True
for hc in self.checks:
start = time.monotonic()
try:
await hc["check"]()
results[hc["name"]] = {
"status": "healthy",
"latency_ms": round((time.monotonic() - start) * 1000, 2),
}
except Exception as e:
results[hc["name"]] = {"status": "unhealthy", "error": str(e)}
if hc["critical"]:
all_healthy = False
return {"status": "healthy" if all_healthy else "unhealthy", "checks": results}
Three endpoints: /health/live for “is the process alive” (Kubernetes restarts if this fails), /health/ready for “can it handle traffic” (gets pulled from load balancer if not), and /health for full diagnostics on dashboards.
The critical flag matters. Redis down but only used for caching? Service still works, just slower. Database down? You’re done. Without this distinction, Kubernetes sees one failed check and starts restarting pods, which causes a cascade that takes down healthy instances too.
I saw this happen once with a non-critical metrics endpoint that went down for about 10 minutes. Without the flag, every pod restarted. All of them. On a Friday afternoon.
4. Idempotent message processing
If you consume messages from a queue — RabbitMQ, Kafka, SQS — you will process duplicates eventually. Network hiccups, consumer restarts, rebalances. It’s when, not if. And your database doesn’t care that it was an accident.
import hashlib
import json
class IdempotencyStore:
def __init__(self, redis_client, ttl_hours: int = 24):
self.redis = redis_client
self.ttl_seconds = ttl_hours * 3600
async def is_processed(self, message_id: str) -> bool:
return await self.redis.exists(f"idempotency:{message_id}")
async def mark_processed(self, message_id: str, result: dict | None = None):
await self.redis.setex(
f"idempotency:{message_id}",
self.ttl_seconds,
json.dumps({"result": result}),
)
async def handle_message(message: dict) -> None:
store = IdempotencyStore(redis_client)
message_id = message.get("id") or hashlib.sha256(
json.dumps(message, sort_keys=True).encode()
).hexdigest()
if await store.is_processed(message_id):
return # already handled
try:
result = await process_business_logic(message)
await store.mark_processed(message_id, result)
except Exception:
raise # don't mark as processed — allow retry
The error path is important. If processing fails, we deliberately don’t mark it as processed. The broker redelivers it for retry. Only success gets recorded.
I built this while working on file delivery pipelines. When a consumer restarted mid-batch, files got processed twice without idempotency. With it, the retry gets detected and skipped in milliseconds.
The TTL is something worth thinking about. 24 hours works for most cases. Too short and you miss slow retries. Too long and you’re paying for Redis memory you don’t need.
5. Structured logging with request context
print() stops being useful fast. Once you have more than a few service instances, debugging a production issue means grepping through logs from 20 pods trying to match timestamps. You find the failed request, but you can’t see what happened before the failure or which user triggered it. You need searchable logs that carry context automatically.
import uuid
import structlog
from starlette.middleware.base import BaseHTTPMiddleware
def setup_logging(environment: str = "development"):
processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
]
if environment == "production":
processors.append(structlog.processors.JSONRenderer())
else:
processors.append(structlog.dev.ConsoleRenderer())
structlog.configure(processors=processors, logger_factory=structlog.PrintLoggerFactory())
class RequestContextMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
req_id = request.headers.get("x-request-id", str(uuid.uuid4()))
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=req_id, method=request.method, path=request.url.path
)
response = await call_next(request)
response.headers["x-request-id"] = req_id
return response
contextvars is the key part. Bind context once in the middleware, and it shows up in every log line for that request across every function, without passing a logger through every method signature. When something breaks, search by request_id and see the full story.
In production, every line is JSON that feeds into your log aggregator. In development, you get readable console output. Same code, different renderers. You set this up once and forget about it until the day you need to debug something at 2 AM. Then you’re very glad it’s there.
None of this is revolutionary. But after building backend services for over six years, these are the patterns that keep preventing the worst incidents. They all share one thing: handling the unhappy path. Anyone can build a service that works when everything goes right. The hard part is when things go wrong.
I’ve tried fancier approaches over the years. Most aren’t worth the complexity for a typical service. These five are simple enough that there’s no excuse to skip them, and they cover the problems that actually cause real incidents.