Loading_

5 Python Patterns I Use in Every Microservice

Battle-tested Python patterns for production microservices: structured config, graceful shutdown, real health checks, idempotent message processing, and structured logging. All with code.

5 Python Patterns I Use in Every Microservice

I keep reaching for the same five patterns every time I start a new Python microservice. They’re boring. Nobody writes conference talks about them. But every time I skip one, I regret it at 3 AM when something breaks in production.

After building microservices across different companies, from data pipelines with RabbitMQ to backend APIs to AI-powered services, the list hasn’t really changed. These patterns all came from real incidents that could’ve been avoided.


1. Structured config with validation

The first thing that breaks is configuration. Missing env var, typo in a database URL, port conflict. You want to catch these at startup, not when the first request hits.

from pydantic_settings import BaseSettings
from pydantic import Field, field_validator
from functools import lru_cache


class Settings(BaseSettings):
    service_name: str = "my-service"
    environment: str = Field(default="development")
    database_url: str  # no default = required
    db_pool_min: int = 2
    db_pool_max: int = 10
    rabbitmq_url: str = "amqp://guest:guest@localhost:5672/"
    shutdown_timeout: int = 15

    @field_validator("environment")
    @classmethod
    def validate_environment(cls, v: str) -> str:
        allowed = {"development", "staging", "production"}
        if v not in allowed:
            raise ValueError(f"environment must be one of {allowed}")
        return v

    model_config = {"env_file": ".env", "case_sensitive": False}


@lru_cache
def get_settings() -> Settings:
    return Settings()

Missing database_url? Service won’t start. You find out now, not after your first query. Every module calls get_settings(). No rogue os.getenv() hiding in random files.

I’ve seen services crash hours after deployment because someone misspelled an env var. This catches it before the process even binds to a port. Small thing. Probably saves more time over a year than anything else on this list.


2. Graceful shutdown

When your service gets a SIGTERM — every deploy, every scale-down, every container restart — it needs to stop accepting new work, finish what’s in progress, close connections, and exit clean.

import asyncio
import signal
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator

from fastapi import FastAPI


class GracefulShutdown:
    def __init__(self, timeout: int = 15):
        self.timeout = timeout
        self.is_shutting_down = False
        self._tasks: list[asyncio.Task] = []
        self._cleanup_handlers: list = []

    def register_cleanup(self, handler) -> None:
        self._cleanup_handlers.append(handler)

    def track_task(self, task: asyncio.Task) -> None:
        self._tasks.append(task)
        task.add_done_callback(self._tasks.remove)

    async def shutdown(self) -> None:
        self.is_shutting_down = True
        if self._tasks:
            await asyncio.wait(self._tasks, timeout=self.timeout)
        for handler in reversed(self._cleanup_handlers):
            try:
                await handler()
            except Exception as e:
                print(f"Cleanup error: {e}")  # FIXME: use proper logging

When I was working on data pipeline services with RabbitMQ, this was critical. If a service died mid-message, that message could be lost or processed twice. The cleanup approach made sure every message was either fully processed or re-queued before the service went down.

One thing people miss: the ordering matters. Cleanup handlers should run in reverse. You want to close the thing you opened last first. Database connections should close last because other handlers might still need them during their own cleanup.


3. Health checks that actually check health

I’ve seen too many health endpoints that just return {"status": "ok"}. That tells your orchestrator the process is alive, not whether it can do useful work.

A real health check hits every dependency:

import time
from dataclasses import dataclass, field
from fastapi import Response


@dataclass
class HealthRegistry:
    checks: list = field(default_factory=list)

    def register(self, name: str, check, critical: bool = True):
        self.checks.append({"name": name, "check": check, "critical": critical})

    async def run_all(self) -> dict:
        results = {}
        all_healthy = True
        for hc in self.checks:
            start = time.monotonic()
            try:
                await hc["check"]()
                results[hc["name"]] = {
                    "status": "healthy",
                    "latency_ms": round((time.monotonic() - start) * 1000, 2),
                }
            except Exception as e:
                results[hc["name"]] = {"status": "unhealthy", "error": str(e)}
                if hc["critical"]:
                    all_healthy = False
        return {"status": "healthy" if all_healthy else "unhealthy", "checks": results}

Three endpoints: /health/live for “is the process alive” (Kubernetes restarts if this fails), /health/ready for “can it handle traffic” (gets pulled from load balancer if not), and /health for full diagnostics on dashboards.

The critical flag matters. Redis down but only used for caching? Service still works, just slower. Database down? You’re done. Without this distinction, Kubernetes sees one failed check and starts restarting pods, which causes a cascade that takes down healthy instances too.

I saw this happen once with a non-critical metrics endpoint that went down for about 10 minutes. Without the flag, every pod restarted. All of them. On a Friday afternoon.


4. Idempotent message processing

If you consume messages from a queue — RabbitMQ, Kafka, SQS — you will process duplicates eventually. Network hiccups, consumer restarts, rebalances. It’s when, not if. And your database doesn’t care that it was an accident.

import hashlib
import json


class IdempotencyStore:
    def __init__(self, redis_client, ttl_hours: int = 24):
        self.redis = redis_client
        self.ttl_seconds = ttl_hours * 3600

    async def is_processed(self, message_id: str) -> bool:
        return await self.redis.exists(f"idempotency:{message_id}")

    async def mark_processed(self, message_id: str, result: dict | None = None):
        await self.redis.setex(
            f"idempotency:{message_id}",
            self.ttl_seconds,
            json.dumps({"result": result}),
        )


async def handle_message(message: dict) -> None:
    store = IdempotencyStore(redis_client)
    message_id = message.get("id") or hashlib.sha256(
        json.dumps(message, sort_keys=True).encode()
    ).hexdigest()

    if await store.is_processed(message_id):
        return  # already handled

    try:
        result = await process_business_logic(message)
        await store.mark_processed(message_id, result)
    except Exception:
        raise  # don't mark as processed — allow retry

The error path is important. If processing fails, we deliberately don’t mark it as processed. The broker redelivers it for retry. Only success gets recorded.

I built this while working on file delivery pipelines. When a consumer restarted mid-batch, files got processed twice without idempotency. With it, the retry gets detected and skipped in milliseconds.

The TTL is something worth thinking about. 24 hours works for most cases. Too short and you miss slow retries. Too long and you’re paying for Redis memory you don’t need.


5. Structured logging with request context

print() stops being useful fast. Once you have more than a few service instances, debugging a production issue means grepping through logs from 20 pods trying to match timestamps. You find the failed request, but you can’t see what happened before the failure or which user triggered it. You need searchable logs that carry context automatically.

import uuid
import structlog
from starlette.middleware.base import BaseHTTPMiddleware


def setup_logging(environment: str = "development"):
    processors = [
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
    ]
    if environment == "production":
        processors.append(structlog.processors.JSONRenderer())
    else:
        processors.append(structlog.dev.ConsoleRenderer())

    structlog.configure(processors=processors, logger_factory=structlog.PrintLoggerFactory())


class RequestContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        req_id = request.headers.get("x-request-id", str(uuid.uuid4()))
        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(
            request_id=req_id, method=request.method, path=request.url.path
        )
        response = await call_next(request)
        response.headers["x-request-id"] = req_id
        return response

contextvars is the key part. Bind context once in the middleware, and it shows up in every log line for that request across every function, without passing a logger through every method signature. When something breaks, search by request_id and see the full story.

In production, every line is JSON that feeds into your log aggregator. In development, you get readable console output. Same code, different renderers. You set this up once and forget about it until the day you need to debug something at 2 AM. Then you’re very glad it’s there.


None of this is revolutionary. But after building backend services for over six years, these are the patterns that keep preventing the worst incidents. They all share one thing: handling the unhappy path. Anyone can build a service that works when everything goes right. The hard part is when things go wrong.

I’ve tried fancier approaches over the years. Most aren’t worth the complexity for a typical service. These five are simple enough that there’s no excuse to skip them, and they cover the problems that actually cause real incidents.

Salih Yildirim

Salih "Adam" Yildirim

Full Stack Software Engineer with 6+ years of experience building scalable web and mobile applications. Passionate about clean code, modern architecture, and sharing knowledge.

{ ideas }
<thoughts/>
// discuss
</>{ }( )=>&&||
Gathering thoughts
Salih YILDIRIM

Let's Connect!

Choose your preferred way to reach out