Skip to content

Middleware

TL;DR — Un middleware FastAPI est une fonction qui enveloppe chaque requête : il s'exécute avant que ta route ne soit appelée, peut modifier la requête, puis s'exécute après pour modifier la réponse. C'est la couche transversale (cross-cutting) idéale pour ce qui doit s'appliquer partout sans polluer les handlers : request_id, logging structuré, timing, CORS, compression, rate-limiting. Sous le capot c'est de l'ASGI pur : une pile d'oignons (Starlette empile les middlewares, le dernier ajouté est le plus externe). Pour un agent LLM, c'est exactement là que tu poses le request_id que tu propages jusqu'à l'appel AsyncAnthropic, le budget de tokens par tenant, et la capture des erreurs RateLimitError avant qu'elles ne fuient en 500 bruts. La règle senior : un middleware ne doit jamais consommer le body sans le restituer, jamais bloquer l'event loop, et toujours laisser passer les exceptions de streaming.

🧠 Mental model

Pense à une requête HTTP comme à un colis qui traverse une série de postes de douane avant d'atteindre sa destination (ta route), puis qui repasse par les mêmes postes en sens inverse pour la réponse.

Chaque middleware est un poste qui peut :

  • inspecter et tamponner le colis à l'aller (ajouter un request_id, vérifier un header) ;
  • décider de renvoyer le colis immédiatement sans le laisser passer (rejet d'auth, rate-limit) ;
  • attendre la réponse et la tamponner au retour (ajouter un header X-Process-Time, compresser).

C'est une structure en oignon : le premier middleware ajouté est le plus interne, le dernier ajouté est le plus externe. La requête traverse l'oignon de l'extérieur vers le centre, la réponse ressort du centre vers l'extérieur.

        requête entrante

   ┌──────────▼───────────┐   ← middleware ajouté EN DERNIER (le plus externe)
   │  ServerErrorMiddleware│      (catch-all exceptions, toujours là)
   │  ┌───────────────────┐│
   │  │  CORSMiddleware    ││
   │  │ ┌────────────────┐ ││
   │  │ │ RequestIDMw     │ ││   ← ton middleware métier
   │  │ │ ┌────────────┐  │ ││
   │  │ │ │  route()   │  │ ││   ← le centre : ton handler async
   │  │ │ └────────────┘  │ ││
   │  │ └────────────────┘ ││
   │  └───────────────────┘│
   └───────────────────────┘

        réponse sortante

Différence clé avec une dépendance (Depends) : une dépendance s'attache à une route (ou un router), reçoit déjà la requête parsée et peut renvoyer une valeur typée injectée dans le handler. Un middleware s'applique à tout (y compris les 404, les fichiers statiques, les erreurs), travaille au niveau ASGI brut et ne peut pas injecter de valeur dans la signature de ta route. Règle d'arbitrage senior : transversal et global → middleware ; logique métier réutilisable et typée → dépendance.

Le cœur : écrire un middleware

FastAPI hérite des deux mécanismes de Starlette. Voyons les deux, du plus simple au plus puissant.

1. @app.middleware("http") — le décorateur (90 % des cas)

C'est l'API la plus directe. Ta fonction reçoit la Request et un call_next qu'il faut await pour obtenir la Response.

python
import time
import uuid
from collections.abc import Awaitable, Callable

from fastapi import FastAPI, Request, Response

app = FastAPI()


@app.middleware("http")
async def add_request_context(
    request: Request,
    call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
    request_id = request.headers.get("x-request-id") or str(uuid.uuid4())
    # On stocke dans request.state pour que les routes et dépendances y accèdent.
    request.state.request_id = request_id

    start = time.perf_counter()
    response = await call_next(request)  # ← laisse passer vers le centre de l'oignon
    elapsed_ms = (time.perf_counter() - start) * 1000

    response.headers["x-request-id"] = request_id
    response.headers["x-process-time-ms"] = f"{elapsed_ms:.1f}"
    return response

Points importants, faciles à rater :

  • call_next doit être awaité ; oublier le await te renvoie une coroutine au lieu d'une réponse — bug silencieux classique.
  • Tout ce qui est avant await call_next(...) s'exécute à l'aller, tout ce qui est après s'exécute au retour.
  • request.state est le porte-bagages officiel pour passer des données du middleware aux handlers (typé Any, hélas).

2. La classe BaseHTTPMiddleware — quand tu veux des paramètres

Le décorateur ne prend pas de configuration. Dès que ton middleware a des options (une liste de chemins à exclure, un secret), passe à la classe.

python
from collections.abc import Awaitable, Callable

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.types import ASGIApp


class ApiKeyMiddleware(BaseHTTPMiddleware):
    def __init__(self, app: ASGIApp, *, api_key: str, exempt_paths: set[str]) -> None:
        super().__init__(app)
        self._api_key = api_key
        self._exempt_paths = exempt_paths

    async def dispatch(
        self,
        request: Request,
        call_next: Callable[[Request], Awaitable[Response]],
    ) -> Response:
        if request.url.path in self._exempt_paths:
            return await call_next(request)

        if request.headers.get("x-api-key") != self._api_key:
            # On court-circuite : on NE laisse PAS passer la requête.
            return JSONResponse(
                {"detail": "Invalid or missing API key"},
                status_code=401,
            )
        return await call_next(request)


app.add_middleware(
    ApiKeyMiddleware,
    api_key="s3cr3t",
    exempt_paths={"/health", "/docs", "/openapi.json"},
)

Remarque l'ordre : add_middleware empile. Le dernier add_middleware appelé est le middleware le plus externe. Donc si tu veux que le logging capture aussi le temps passé en auth, ajoute l'auth d'abord (interne) et le logging ensuite (externe).

3. La voie idiomatique senior : middleware ASGI pur (pas de BaseHTTPMiddleware)

BaseHTTPMiddleware est pratique mais a un défaut connu et important : il bufferise les réponses en streaming. Il enveloppe le corps dans une _StreamingResponse interne et, dans certains scénarios (background tasks, gros flux, SSE), il casse le comportement attendu ou introduit de la latence. Pour un service qui streame des tokens LLM en SSE, c'est rédhibitoire.

La version production écrit le middleware au niveau ASGI brut, sans BaseHTTPMiddleware :

python
import time
import uuid
from starlette.types import ASGIApp, Message, Receive, Scope, Send


class RequestContextMiddleware:
    """Middleware ASGI pur : n'interfère pas avec le streaming."""

    def __init__(self, app: ASGIApp) -> None:
        self.app = app

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] != "http":
            # On ne touche pas au lifespan ni au websocket : on passe la main.
            await self.app(scope, receive, send)
            return

        request_id = str(uuid.uuid4())
        scope.setdefault("state", {})["request_id"] = request_id
        start = time.perf_counter()

        async def send_wrapper(message: Message) -> None:
            # On intercepte UNIQUEMENT le début de la réponse pour injecter un header,
            # sans jamais toucher aux chunks du body (donc le streaming reste intact).
            if message["type"] == "http.response.start":
                elapsed_ms = (time.perf_counter() - start) * 1000
                headers = message.setdefault("headers", [])
                headers.append((b"x-request-id", request_id.encode()))
                headers.append((b"x-process-time-ms", f"{elapsed_ms:.1f}".encode()))
            await send(message)

        await self.app(scope, receive, send_wrapper)


app.add_middleware(RequestContextMiddleware)

Ce middleware n'avale jamais les http.response.body : il ne modifie que le message http.response.start. C'est la façon de poser des headers transverses sans casser le SSE.

La façon FAUSSE de faire (à reconnaître en review)

python
# ❌ ANTI-PATTERN — lit le body dans le middleware sans le restituer
@app.middleware("http")
async def log_body(request: Request, call_next):
    body = await request.body()   # consomme le flux ASGI...
    print(body)                   # ...
    response = await call_next(request)
    return response               # 💥 la route reçoit un body VIDE → hang ou 422

Une fois await request.body() (ou request.json()) appelé, le flux receive est épuisé. La route en aval recevra un corps vide. C'est un bug de production redoutable car il ne se manifeste que sur les routes qui lisent le body (POST/PUT). Pour logger un body, il faut le re-broadcaster via un receive reconstruit — ou, bien plus simple, le faire dans une dépendance sur la route concernée, pas dans un middleware global.

Autres erreurs fréquentes :

python
# ❌ Bloque l'event loop : appel synchrone lourd dans un middleware async
@app.middleware("http")
async def bad(request: Request, call_next):
    requests.get("https://slow-audit-log.internal")  # synchrone, bloquant !
    return await call_next(request)

# ✅ Soit await un client async (httpx.AsyncClient), soit fire-and-forget
#    via une BackgroundTask, jamais un appel réseau synchrone bloquant.

⚙️ En production

Modes de défaillance

  • Le middleware qui mange les exceptions. Si ton dispatch fait try/except autour de call_next et renvoie une 500 générique, tu masques le HTTPException que FastAPI aurait converti proprement, et tu casses les pages d'erreur. N'enveloppe call_next que si tu sais re-raiser ce que tu n'as pas géré.
  • L'ordre est de la logique, pas de la décoration. Un GZipMiddleware placé à l'intérieur d'un middleware qui ajoute des headers de cache produira un résultat différent d'un placement externe. Documente l'ordre comme du code métier.
  • Streaming cassé par BaseHTTPMiddleware. Vu plus haut : pour SSE/WebSocket/gros downloads, middleware ASGI pur obligatoire.
  • request.state non typé. C'est un SimpleNamespace/dict ; un typo (request.state.requset_id) ne lèvera qu'à l'exécution. Encapsule l'accès dans un helper typé.

Performance

  • Chaque middleware ajoute un appel async dans le chemin critique de toutes les requêtes, y compris /health et les fichiers statiques. Garde la pile courte. Si une logique ne concerne que 3 routes, c'est une dépendance, pas un middleware global.
  • Jamais de I/O synchrone bloquant dans un middleware async : un seul time.sleep ou un requests.get gèle tout l'event loop pour toutes les connexions concurrentes.
  • CORS, GZip, etc. sont déjà optimisés en C/Rust côté Starlette — n'écris pas les tiens.

Sécurité

  • L'auth en middleware s'applique avant le routage : pratique pour un mur global, mais tu perds la granularité par-route et la documentation OpenAPI des dépendances de sécurité. Pour une API publique fine, préfère Security/Depends. Le middleware reste excellent pour un filtre grossier (IP allowlist, mTLS, kill-switch global).
  • Ne logge jamais les headers authorization / x-api-key ni les bodies bruts dans un middleware de logging — redige-les.
  • Pose les security headers (X-Content-Type-Options, Strict-Transport-Security, Content-Security-Policy) dans un middleware ASGI au niveau http.response.start.

Observabilité

C'est le cas d'usage roi. Le middleware est l'endroit où tu génères le request_id, le poses dans un contextvars.ContextVar (pour que ton logger le retrouve sans le passer partout), mesures la latence, et incrémentes tes compteurs Prometheus.

python
import contextvars

request_id_ctx: contextvars.ContextVar[str] = contextvars.ContextVar("request_id", default="-")


class ObservabilityMiddleware:
    def __init__(self, app: ASGIApp) -> None:
        self.app = app

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        token = request_id_ctx.set(str(uuid.uuid4()))
        try:
            await self.app(scope, receive, send)
        finally:
            request_id_ctx.reset(token)  # impératif : éviter les fuites entre requêtes

Le ContextVar est correct avec asyncio (chaque tâche a sa copie), mais reset-le toujours dans un finally, sinon une requête peut hériter du request_id d'une autre dans un worker réutilisé.

Le tradeoff senior

Middleware = puissant mais global. Chaque ligne que tu y mets est un impôt sur toutes les requêtes et un point de défaillance unique. La maturité consiste à garder la pile minimale (contexte + sécurité grossière + observabilité), à pousser tout le reste dans des dépendances par-route, et à écrire en ASGI pur dès que le streaming entre en jeu.

🤖 Servir un agent LLM : le middleware comme colonne vertébrale

Tu construis des agents. Un service FastAPI qui appelle l'API Anthropic a exactement trois besoins transverses qui crient « middleware » : propagation du request_id jusqu'à l'appel modèle, budget de tokens / rate-limit par tenant, et traduction des erreurs SDK en réponses HTTP propres. Modèle par défaut : claude-opus-4-8 (5 $/25 $ par Mtok, fenêtre 1M).

Streaming de tokens en SSE (le middleware ne doit pas bufferiser)

L'endpoint streame les tokens du modèle via messages.stream. C'est précisément ici que BaseHTTPMiddleware te trahirait — d'où le middleware ASGI pur des sections précédentes.

python
import os
from collections.abc import AsyncIterator

from anthropic import AsyncAnthropic
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()
client = AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])


@app.post("/chat/stream")
async def chat_stream(request: Request, prompt: str) -> StreamingResponse:
    request_id: str = getattr(request.state, "request_id", "-")

    async def event_source() -> AsyncIterator[str]:
        # Adaptive thinking par défaut + effort pour le tradeoff qualité/coût.
        async with client.messages.stream(
            model="claude-opus-4-8",
            max_tokens=4096,
            thinking={"type": "adaptive"},
            output_config={"effort": "high"},
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            async for text in stream.text_stream:
                # Format SSE : une ligne data: par chunk.
                yield f"data: {text}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_source(),
        media_type="text/event-stream",
        headers={"x-request-id": request_id},
    )

Le RequestContextMiddleware ASGI vu plus haut laisse passer chaque chunk SSE intact, tout en ayant posé le request_id dans request.state que l'endpoint relit ici. Un BaseHTTPMiddleware qui bufferise tuerait le temps-au-premier-token.

Tool-use loop : où vit la logique, où vit le middleware

La boucle d'outils (Claude demande un tool → tu l'exécutes → tu renvoies le résultat → tu reboucles) est de la logique métier : elle vit dans le handler ou un service, pas dans un middleware. Le middleware, lui, porte le contexte qui traverse chaque tour de boucle (request_id, tenant, budget cumulé).

python
import anthropic

WEATHER_TOOL = {
    "name": "get_weather",
    "description": "Get current weather for a city. Call this when the user asks about weather.",
    "input_schema": {
        "type": "object",
        "properties": {"city": {"type": "string"}},
        "required": ["city"],
    },
}


async def run_agent(prompt: str) -> str:
    messages: list[dict] = [{"role": "user", "content": prompt}]
    while True:
        response = await client.messages.create(
            model="claude-opus-4-8",
            max_tokens=4096,
            thinking={"type": "adaptive"},
            tools=[WEATHER_TOOL],
            messages=messages,
        )
        if response.stop_reason != "tool_use":
            return "".join(b.text for b in response.content if b.type == "text")

        messages.append({"role": "assistant", "content": response.content})
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                # block.input est déjà parsé par le SDK — jamais de string-matching brut.
                result = await execute_tool(block.name, block.input)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result,
                })
        messages.append({"role": "user", "content": tool_results})

Sorties structurées + traduction d'erreurs en middleware

Le middleware transforme les exceptions typées du SDK en HTTP propre, une fois pour toutes les routes :

python
from anthropic import APIStatusError
from fastapi.responses import JSONResponse


@app.middleware("http")
async def translate_anthropic_errors(request: Request, call_next):
    try:
        return await call_next(request)
    except anthropic.RateLimitError as exc:
        retry_after = exc.response.headers.get("retry-after", "5")
        return JSONResponse(
            {"detail": "Upstream LLM rate limited"},
            status_code=429,
            headers={"retry-after": retry_after},
        )
    except anthropic.APIStatusError as exc:
        # 529 overloaded, 5xx upstream → on n'expose pas les détails internes.
        return JSONResponse({"detail": "LLM service error"}, status_code=502)

Le SDK retry déjà automatiquement les 429/5xx avec backoff (max_retries=2 par défaut) ; ce middleware n'attrape donc que ce qui survit aux retries. Pour les sorties structurées, utilise client.messages.parse() avec un schéma — la validation est faite par le SDK, et le middleware reste agnostique du contenu.

Prompt caching : ce que le middleware ne doit PAS faire

Le caching de prompt est un préfixe exact. Si ton middleware injecte un datetime.now() ou le request_id dans le system prompt, tu invalides le cache à chaque requête. Règle : le middleware pose le contexte dans request.state / ContextVar, jamais dans le contenu envoyé au modèle. Le cache_control se gère dans le service LLM, en gardant le préfixe stable.

🏋️ Exercices

Exercice 1 — Timing + request_id (implémenter)

Objectif. Écris un @app.middleware("http") qui (a) génère un request_id UUID si absent du header x-request-id, le pose dans request.state, (b) mesure la durée, (c) ajoute x-request-id et x-process-time-ms à la réponse. Vérifie avec curl -i que les deux headers sortent.

Indice/Solution. Reprends add_request_context du cœur de la leçon. Le piège : lire le header entrant avec request.headers.get("x-request-id") avant de générer un UUID, pour respecter un ID propagé par un reverse-proxy.

Exercice 2 — API key configurable avec exemptions (production-grade)

Objectif. Convertis l'exercice 1 en BaseHTTPMiddleware paramétrable : api_key + exempt_paths. Renvoie 401 JSON si clé absente/fausse, sauf sur /health, /docs, /openapi.json. Ajoute un test qui prouve que /health répond 200 sans clé et /secret répond 401.

Indice/Solution. Vois ApiKeyMiddleware. Compare en temps constant (hmac.compare_digest) pour éviter une timing-attack sur la clé. Note dans un commentaire pourquoi Security/Depends serait préférable pour une vraie API publique.

Exercice 3 — Casser puis réparer le streaming (break-then-fix)

Objectif. Crée un endpoint SSE qui streame 100 chunks avec await asyncio.sleep(0.05) entre chaque. Ajoute un BaseHTTPMiddleware qui logge la réponse, et observe que le client reçoit tout d'un bloc à la fin (buffering). Puis réécris le middleware en ASGI pur (__call__ + send_wrapper) et prouve que les chunks arrivent en temps réel.

Indice/Solution. BaseHTTPMiddleware enveloppe le body dans une réponse interne qui le consomme entièrement. Le send_wrapper ASGI ne touche qu'à http.response.start et relaie chaque http.response.body tel quel. Mesure le délai entre le premier et le dernier byte côté client (curl -N).

Exercice 4 — request_id de bout en bout jusqu'à l'agent (AI, intégration)

Objectif. Propage le request_id du middleware jusque dans les logs de l'appel modèle. Utilise un contextvars.ContextVar, configure ton logger pour l'inclure, et logge avant/après l'appel await client.messages.create(...). Vérifie que deux requêtes concurrentes n'échangent jamais leurs IDs.

Indice/Solution. ContextVar.set() dans le middleware, reset(token) dans finally. Pour tester la concurrence : lance deux asyncio.create_task qui frappent l'endpoint et asserte que chaque log porte le bon ID. Un ContextVar mal reset fuit entre requêtes sur un worker réutilisé — c'est le bug à provoquer puis corriger.

Exercice 5 — Rate-limit / budget de tokens par tenant (hard, production-grade)

Objectif. Écris un middleware ASGI qui lit un header x-tenant-id, vérifie un budget de tokens restant (stocké en mémoire pour l'exo, Redis en vrai), et renvoie 429 + retry-after si dépassé. Après l'appel modèle, décrémente le budget de response.usage.input_tokens + output_tokens.

Indice/Solution. Le décrément post-appel impose de lire l'usage après call_next — donc tu poses le tenant dans request.state à l'aller, et l'endpoint y range response.usage à son tour. Alternative plus propre : un service TokenBudget injecté par dépendance, le middleware ne faisant que le check d'entrée. Discute pourquoi l'état en mémoire casse dès qu'il y a plusieurs workers Uvicorn (→ Redis avec INCRBY atomique).

Exercice 6 — Traduction d'erreurs Anthropic robuste (AI, hard)

Objectif. Écris le middleware translate_anthropic_errors, puis force chaque branche : provoque un RateLimitError (clé bridée ou mock), un APIStatusError 529, et une HTTPException 404 normale de FastAPI. Prouve que la 404 passe intacte (le middleware ne doit pas l'avaler).

Indice/Solution. Le piège est l'ordre des except : attrape RateLimitError avant APIStatusError (sous-classe), et ne mets pas de except Exception fourre-tout qui mangerait les HTTPException. Re-raise tout ce que tu ne sais pas traduire. Vérifie aussi que tu ne logges pas le header x-api-key.

🎤 En entretien

Q : Différence entre un middleware et une dépendance (Depends) en FastAPI, et quand choisir l'un ou l'autre ? R : Le middleware est global, opère au niveau ASGI brut (avant routage, sur toutes les requêtes y compris erreurs/statics) et ne peut pas injecter de valeur typée ; la dépendance est par-route, reçoit la requête parsée, renvoie une valeur injectée et apparaît dans OpenAPI — donc transversal/global → middleware, métier réutilisable et typé → dépendance.

Q : Pourquoi BaseHTTPMiddleware peut-il casser le streaming, et comment l'éviter ? R : Parce qu'il enveloppe la réponse dans une _StreamingResponse interne qui peut bufferiser le body avant de le réémettre, ce qui détruit le temps-au-premier-token du SSE ; on l'évite en écrivant un middleware ASGI pur (__call__(scope, receive, send)) qui ne modifie que le message http.response.start et relaie chaque http.response.body tel quel.

Q : Quel est le bug le plus vicieux qu'on puisse introduire dans un middleware de logging ? R : Appeler await request.body() (ou .json()) sans reconstruire le flux receive, ce qui épuise le stream ASGI et fait recevoir un body vide à la route — bug invisible sur les GET, fatal sur les POST/PUT ; le faire plutôt dans une dépendance sur les routes concernées.

Q : Comment l'ordre des middlewares est-il déterminé et pourquoi c'est important ? R : add_middleware empile, donc le dernier ajouté est le plus externe ; ça compte car un middleware externe enveloppe le timing/erreurs des internes — par exemple ajouter l'auth d'abord (interne) puis le logging ensuite (externe) fait que le logging mesure aussi le coût de l'auth, et inverser les deux change la sémantique observée.

Bibliothèque tech perso — Achref