</> ( ) => && **
Loading

5 Python Patterns I Use in Every Microservice

Battle-tested Python patterns for building production microservices: structured configuration, graceful shutdown, health checks, idempotency, and structured logging. Code you can use today.

5 Python Patterns I Use in Every Microservice

After building Python microservices across multiple companies—from data pipelines with RabbitMQ to AI-enabled backend systems—I’ve converged on a set of patterns I reach for in every new service. They’re not clever. They’re not sexy. But they prevent the bugs that wake you up at 3 AM.

These are the five patterns I copy into every Python microservice before I write a single line of business logic. Each one has earned its place by preventing a real production incident.


1. Structured Configuration with Validation

The first thing that goes wrong in a microservice is configuration. A missing environment variable, a typo in a database URL, a port that’s already in use. I want to know about these problems at startup, not when the first request hits.

I use Pydantic’s BaseSettings to create a single, validated configuration object:

from pydantic_settings import BaseSettings
from pydantic import Field, field_validator
from functools import lru_cache


class Settings(BaseSettings):
    # Service identity
    service_name: str = "my-service"
    environment: str = Field(default="development")
    debug: bool = False

    # Server
    host: str = "0.0.0.0"
    port: int = 8000

    # Database
    database_url: str
    db_pool_min: int = 2
    db_pool_max: int = 10

    # External services
    rabbitmq_url: str = "amqp://guest:guest@localhost:5672/"
    redis_url: str = "redis://localhost:6379/0"

    # Timeouts (seconds)
    request_timeout: int = 30
    shutdown_timeout: int = 15

    @field_validator("environment")
    @classmethod
    def validate_environment(cls, v: str) -> str:
        allowed = {"development", "staging", "production"}
        if v not in allowed:
            raise ValueError(f"environment must be one of {allowed}")
        return v

    model_config = {
        "env_file": ".env",
        "env_file_encoding": "utf-8",
        "case_sensitive": False,
    }


@lru_cache
def get_settings() -> Settings:
    return Settings()

Why this matters:

  • Fail fast. If database_url is missing, the service refuses to start. You find out immediately, not after your first database query.
  • Type safety. port is always an int, debug is always a bool. No more if os.getenv("DEBUG") == "true" scattered across your code.
  • Single source of truth. Every module imports get_settings(). No more os.getenv() calls hidden in random files.
  • Testable. Override settings in tests without touching environment variables.

I’ve seen services in production that crashed hours after deployment because a single env var was misspelled. This pattern catches that before the process even binds to a port.


2. Graceful Shutdown

This is the pattern that separates “it works on my machine” services from production-ready ones. When your service receives a SIGTERM (which happens on every deploy, every scale-down, every container restart), it needs to:

  1. Stop accepting new work
  2. Finish in-progress requests
  3. Close database connections and message queues
  4. Exit cleanly

Here’s my standard implementation:

import asyncio
import signal
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
from typing import Any

from fastapi import FastAPI


class GracefulShutdown:
    def __init__(self, timeout: int = 15):
        self.timeout = timeout
        self.is_shutting_down = False
        self._tasks: list[asyncio.Task[Any]] = []
        self._cleanup_handlers: list[Any] = []

    def register_cleanup(self, handler: Any) -> None:
        """Register an async cleanup function."""
        self._cleanup_handlers.append(handler)

    def track_task(self, task: asyncio.Task[Any]) -> None:
        """Track a background task for graceful completion."""
        self._tasks.append(task)
        task.add_done_callback(self._tasks.remove)

    async def shutdown(self) -> None:
        self.is_shutting_down = True

        # Wait for in-flight tasks
        if self._tasks:
            await asyncio.wait(
                self._tasks,
                timeout=self.timeout,
            )

        # Run cleanup handlers
        for handler in reversed(self._cleanup_handlers):
            try:
                await handler()
            except Exception as e:
                print(f"Cleanup error: {e}")


shutdown_manager = GracefulShutdown()


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    # Startup
    loop = asyncio.get_event_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda: asyncio.create_task(shutdown_manager.shutdown()),
        )

    yield

    # Shutdown
    await shutdown_manager.shutdown()


app = FastAPI(lifespan=lifespan)

The critical detail most tutorials miss: you need to handle both the signal and the ASGI lifespan shutdown. Kubernetes sends SIGTERM, but your ASGI server (uvicorn) also has its own shutdown sequence. This pattern handles both.

In a previous role, I worked on data pipeline services using RabbitMQ where graceful shutdown was critical. If a service died mid-message, that message could be lost or processed twice. The register_cleanup pattern let us ensure every message was either fully processed or properly re-queued before the service exited.


3. Health Check Endpoints (That Actually Check Health)

I’ve seen too many health checks that just return {"status": "ok"}. That tells your orchestrator the process is running, but not whether it can actually do useful work.

A proper health check verifies every dependency:

import time
from dataclasses import dataclass, field

from fastapi import FastAPI, Response


@dataclass
class HealthCheck:
    name: str
    check: Any  # async callable
    critical: bool = True


@dataclass
class HealthRegistry:
    checks: list[HealthCheck] = field(default_factory=list)

    def register(
        self, name: str, check: Any, critical: bool = True
    ) -> None:
        self.checks.append(
            HealthCheck(name=name, check=check, critical=critical)
        )

    async def run_all(self) -> dict:
        results: dict = {}
        all_healthy = True

        for health_check in self.checks:
            start = time.monotonic()
            try:
                await health_check.check()
                results[health_check.name] = {
                    "status": "healthy",
                    "latency_ms": round(
                        (time.monotonic() - start) * 1000, 2
                    ),
                }
            except Exception as e:
                results[health_check.name] = {
                    "status": "unhealthy",
                    "error": str(e),
                    "latency_ms": round(
                        (time.monotonic() - start) * 1000, 2
                    ),
                }
                if health_check.critical:
                    all_healthy = False

        return {
            "status": "healthy" if all_healthy else "unhealthy",
            "checks": results,
        }


health = HealthRegistry()
app = FastAPI()


# Register checks at startup
async def check_database():
    """Verify database connection with a simple query."""
    async with db_pool.acquire() as conn:
        await conn.fetchval("SELECT 1")


async def check_redis():
    """Verify Redis is reachable."""
    await redis_client.ping()


async def check_rabbitmq():
    """Verify RabbitMQ connection is open."""
    if not rabbitmq_connection or rabbitmq_connection.is_closed:
        raise ConnectionError("RabbitMQ connection is closed")


health.register("database", check_database, critical=True)
health.register("redis", check_redis, critical=True)
health.register("rabbitmq", check_rabbitmq, critical=False)


@app.get("/health")
async def health_endpoint(response: Response):
    result = await health.run_all()
    if result["status"] == "unhealthy":
        response.status_code = 503
    return result


@app.get("/health/live")
async def liveness():
    """Kubernetes liveness probe - is the process alive?"""
    return {"status": "alive"}


@app.get("/health/ready")
async def readiness(response: Response):
    """Kubernetes readiness probe - can we serve traffic?"""
    result = await health.run_all()
    if result["status"] == "unhealthy":
        response.status_code = 503
    return result

Key distinctions:

  • /health/live — Is the process running? (Liveness probe. If this fails, Kubernetes restarts the pod.)
  • /health/ready — Can this instance serve traffic? (Readiness probe. If this fails, the pod is removed from the load balancer but not killed.)
  • /health — Full diagnostic for dashboards and debugging.

The critical flag matters. If Redis is down but you use it only for caching, the service can still serve requests (degraded). But if the database is down, you’re completely broken. This distinction prevents unnecessary restarts while still catching real problems.


4. Idempotency for Message Processing

If you’re consuming messages from a queue (RabbitMQ, Kafka, SQS), you will eventually process the same message twice. Network blips, consumer restarts, rebalances—it’s not a matter of if, it’s when.

The solution is making your message handlers idempotent:

import hashlib
import json
from datetime import datetime, timedelta


class IdempotencyStore:
    """Track processed messages to prevent duplicate handling."""

    def __init__(self, redis_client, ttl_hours: int = 24):
        self.redis = redis_client
        self.ttl = timedelta(hours=ttl_hours)

    def _make_key(self, message_id: str) -> str:
        return f"idempotency:{message_id}"

    async def is_processed(self, message_id: str) -> bool:
        """Check if a message has already been processed."""
        return await self.redis.exists(self._make_key(message_id))

    async def mark_processed(
        self,
        message_id: str,
        result: dict | None = None,
    ) -> None:
        """Mark a message as successfully processed."""
        value = json.dumps({
            "processed_at": datetime.utcnow().isoformat(),
            "result": result,
        })
        await self.redis.setex(
            self._make_key(message_id),
            int(self.ttl.total_seconds()),
            value,
        )

    async def get_result(self, message_id: str) -> dict | None:
        """Retrieve the result of a previously processed message."""
        data = await self.redis.get(self._make_key(message_id))
        if data:
            return json.loads(data)
        return None


def generate_message_id(payload: dict) -> str:
    """Generate a deterministic ID from message content."""
    content = json.dumps(payload, sort_keys=True)
    return hashlib.sha256(content.encode()).hexdigest()


# Usage in a message consumer
async def handle_message(message: dict) -> None:
    store = IdempotencyStore(redis_client)

    message_id = message.get("id") or generate_message_id(message)

    if await store.is_processed(message_id):
        logger.info(
            "Skipping duplicate message",
            message_id=message_id,
        )
        return

    try:
        result = await process_business_logic(message)
        await store.mark_processed(message_id, result)
    except Exception:
        # Don't mark as processed - allow retry
        raise

The subtlety here is in the error path. If processing fails, we deliberately don’t mark the message as processed. This lets the message broker redeliver it for retry. Only successful processing gets recorded.

I implemented this pattern while working on data pipeline services that used RabbitMQ for message delivery. When a consumer restarted mid-batch, messages would be redelivered. Without idempotency, files could be processed twice, leading to duplicate data. With this pattern, the second delivery was detected and skipped within milliseconds.

The generate_message_id function is a fallback for messages that don’t have a unique ID. By hashing the content deterministically, you get natural deduplication even without explicit message IDs.


5. Structured Logging with Context

print() debugging doesn’t scale. When you have dozens of service instances processing thousands of requests, you need logs that are searchable, filterable, and carry context automatically.

I use structlog for this:

import uuid
import structlog
from contextvars import ContextVar
from starlette.middleware.base import (
    BaseHTTPMiddleware,
    RequestResponseEndpoint,
)
from starlette.requests import Request
from starlette.responses import Response


# Context variable for request-scoped data
request_id_var: ContextVar[str | None] = ContextVar(
    "request_id", default=None
)


def setup_logging(environment: str = "development") -> None:
    """Configure structured logging for the service."""
    processors = [
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
    ]

    if environment == "production":
        # JSON output for log aggregators
        processors.append(structlog.processors.JSONRenderer())
    else:
        # Human-readable for development
        processors.append(structlog.dev.ConsoleRenderer())

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(0),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
    )


class RequestContextMiddleware(BaseHTTPMiddleware):
    """Inject request context into every log line."""

    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        req_id = request.headers.get(
            "x-request-id", str(uuid.uuid4())
        )
        request_id_var.set(req_id)

        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(
            request_id=req_id,
            method=request.method,
            path=request.url.path,
        )

        logger = structlog.get_logger()
        await logger.ainfo("request_started")

        try:
            response = await call_next(request)
            await logger.ainfo(
                "request_completed",
                status_code=response.status_code,
            )
            response.headers["x-request-id"] = req_id
            return response
        except Exception:
            await logger.aexception("request_failed")
            raise


# Usage throughout your codebase
logger = structlog.get_logger()


async def process_order(order_id: str, user_id: str) -> None:
    # Bind context for this operation
    log = logger.bind(order_id=order_id, user_id=user_id)

    await log.ainfo("processing_order")

    # Every subsequent log in this call chain includes
    # request_id, method, path, order_id, and user_id
    items = await fetch_order_items(order_id)
    await log.ainfo("items_fetched", count=len(items))

    await validate_inventory(items)
    await log.ainfo("inventory_validated")

In development, this gives you readable output:

2026-02-10 10:15:32 [info] request_started  request_id=abc-123 method=POST path=/orders
2026-02-10 10:15:32 [info] processing_order request_id=abc-123 order_id=ord-456 user_id=usr-789
2026-02-10 10:15:33 [info] items_fetched    request_id=abc-123 order_id=ord-456 count=3
2026-02-10 10:15:33 [info] request_completed request_id=abc-123 status_code=201

In production, every line is JSON that feeds directly into your log aggregator:

{"event": "processing_order", "request_id": "abc-123", "order_id": "ord-456", "user_id": "usr-789", "level": "info", "timestamp": "2026-02-10T10:15:32Z"}

The magic is contextvars. You bind context once at the middleware level, and it automatically appears in every log line for that request, across every function call, without passing a logger through every function signature. When something breaks at 3 AM, you search by request_id and see the entire story of what happened.


Putting It All Together

These five patterns form the skeleton of every microservice I build:

  1. Structured Configuration — Fail fast, fail clearly
  2. Graceful Shutdown — Deploy without dropping requests
  3. Health Checks — Tell your orchestrator what’s actually happening
  4. Idempotency — Process messages exactly once (even when they arrive twice)
  5. Structured Logging — Debug production without guessing

None of these are revolutionary. You won’t find them in conference talks about the latest framework. But after 6+ years of building backend services—from data pipeline systems with RabbitMQ to AI-enabled platforms with REST and GraphQL APIs—these are the patterns that consistently prevent the most painful production incidents.

The common thread? They’re all about handling the unhappy path. Any developer can build a service that works when everything goes right. Senior engineering is about building services that behave predictably when things go wrong.

Start with these five patterns, then write your business logic. Your future self (and your on-call rotation) will thank you.

Salih Yildirim

Salih "Adam" Yildirim

Full Stack Software Engineer with 6+ years of experience building scalable web and mobile applications. Passionate about clean code, modern architecture, and sharing knowledge.

{ ideas }
<thoughts/>
// discuss
</>{ }( )=>&&||
Gathering thoughts
Salih YILDIRIM

Let's Connect!

Choose your preferred way to reach out