Skip to content

Dependencies (DI)

TL;DR — Le système d'injection de dépendances de FastAPI repose sur une seule fonction, Depends(), qui transforme n'importe quel callable en ressource résolue et mise en cache par requête. Tu déclares ce dont un endpoint a besoin (une session DB, l'utilisateur courant, un client AsyncAnthropic) dans sa signature, et FastAPI résout le graphe de dépendances, gère le cycle de vie (yield pour setup/teardown), et te donne une testabilité gratuite via app.dependency_overrides. Pour servir des agents LLM, la DI est l'endroit idiomatique où tu injectes un client Anthropic réutilisé, où tu appliques l'auth et le rate-limiting avant de brûler des tokens, et où tu propages un contexte de requête propre dans tout le stack — sans jamais instancier un client par requête.

🧠 Mental model

Si tu viens de NestJS, tu as déjà le réflexe DI : tu déclares un provider, tu l'annotes @Injectable(), et le conteneur le résout par son type dans le constructeur. FastAPI fait la même chose, mais sans conteneur global, sans décorateur, et sans singleton implicite. Il n'y a pas de @Module, pas de providers: [...], pas de scope REQUEST/SINGLETON à configurer. Le « conteneur », c'est l'arbre d'appels que FastAPI reconstruit à chaque requête en lisant les signatures de fonctions.

L'analogie la plus juste : une dépendance FastAPI est une recette, pas un objet enregistré. Quand un endpoint dit db: Session = Depends(get_db), FastAPI lit la recette get_db, exécute ses propres ingrédients (ses sous-dépendances), te sert le plat, et — si la recette utilise yield — revient nettoyer la cuisine quand la requête se termine. Deux endpoints qui demandent la même recette dans la même requête partagent le même plat (cache). Deux requêtes différentes obtiennent deux plats neufs.

                    Requête HTTP entrante


              ┌─────────────────────────────┐
              │   FastAPI lit la signature   │
              │   de l'endpoint              │
              └─────────────┬───────────────┘
                            │  rencontre Depends(...)
              ┌─────────────▼───────────────┐
              │   Résolution du graphe       │
              │                              │
              │   get_current_user           │
              │      └── Depends(get_db) ◄──┐│  même requête →
              │   get_agent_service          ││  get_db résolu
              │      └── Depends(get_db) ◄──┘│  UNE seule fois
              └─────────────┬───────────────┘   (cache par requête)


                    Exécution de l'endpoint


              Teardown des dépendances à `yield`
              (ordre inverse de résolution)

Le point qui surprend les gens venant de Nest/Spring : il n'y a pas de scope singleton dans la DI elle-même. Une dépendance est résolue une fois par requête, pas une fois par application. Les vrais singletons (pool de connexions DB, client HTTP, client AsyncAnthropic) vivent dans le lifespan de l'application (app.state), et la DI ne fait que les exposer proprement à chaque requête. On y revient en production.


Le cœur : Depends et l'Annotated moderne

Une dépendance est n'importe quel callable : fonction, fonction async, ou classe. FastAPI inspecte ses paramètres, les résout récursivement, et passe le résultat à l'endpoint.

La forme idiomatique (Python 3.12 + Annotated)

Depuis FastAPI 0.95+, la manière idiomatique d'écrire une dépendance utilise typing.Annotated. Ça sépare le type (Session) du mécanisme d'injection (Depends(...)), et ça rend la dépendance réutilisable comme un alias de type.

python
from typing import Annotated
from collections.abc import AsyncGenerator

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from pydantic import BaseModel

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

engine = create_async_engine("postgresql+asyncpg://localhost/app", pool_size=20)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """Dépendance à cycle de vie : ouvre une session, la ferme à la fin de la requête."""
    async with SessionLocal() as session:
        yield session  # ← tout ce qui est avant `yield` = setup, après = teardown


# L'alias réutilisable : c'est CE qu'on partage dans tout le code.
DbSession = Annotated[AsyncSession, Depends(get_db)]


class User(BaseModel):
    id: int
    email: str
    is_active: bool


async def get_current_user(
    db: DbSession,                            # une dépendance dépend d'une autre
    token: Annotated[str, Depends(oauth2_scheme)],  # extrait le Bearer token de l'en-tête
) -> User:
    user = await fetch_user_from_token(db, token)
    if user is None or not user.is_active:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or inactive user")
    return user


CurrentUser = Annotated[User, Depends(get_current_user)]


@app.get("/me")
async def read_me(user: CurrentUser) -> User:
    return user

Ce qu'il faut voir ici :

  • get_db utilise yield, pas return. Le code avant yield s'exécute à l'entrée ; le code après (ici le __aexit__ du async with) s'exécute au teardown, dans l'ordre inverse de résolution. C'est l'équivalent FastAPI d'un try/finally scopé à la requête — et c'est l'unique bon endroit pour fermer une session DB, relâcher un lock, ou logger la durée.
  • Les dépendances composent. get_current_user déclare db: DbSession ; FastAPI résout get_db d'abord. Tu construis un graphe, pas une liste plate.
  • Annotated[T, Depends(...)] est un alias. CurrentUser se réutilise dans 50 endpoints sans répéter le Depends.

La manière à éviter (le « common wrong way »)

python
# ❌ ANTI-PATTERN : instancier des ressources lourdes DANS la dépendance, par requête
from anthropic import AsyncAnthropic

async def get_agent_reply(prompt: str):
    client = AsyncAnthropic()  # ← nouveau client + nouveau pool HTTP À CHAQUE REQUÊTE
    msg = await client.messages.create(
        model="claude-opus-4-8",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    )
    await client.close()  # et on jette tout le pool de connexions immédiatement
    return msg

Pourquoi c'est faux :

  1. Coût de connexion. AsyncAnthropic() crée un pool de connexions HTTP (httpx) sous-jacent. Le recréer par requête tue le keep-alive TLS, ajoute de la latence de handshake, et n'amortit jamais le pool.
  2. Pas testable. Tu ne peux pas substituer ce client en test sans monkeypatcher l'import.
  3. Pas de cache de prompt possible. Le cache de prompt Anthropic est par modèle et profite du keep-alive ; un client jetable casse tout amortissement.

La version correcte injecte un client partagé, créé une fois au démarrage. Voir la section production.


Le cache de dépendances : la subtilité qui mord

Dans une même requête, FastAPI résout chaque dépendance une seule fois et réutilise le résultat. Si get_current_user et get_agent_service dépendent tous deux de get_db, get_db n'est exécuté qu'une fois — tu obtiens la même AsyncSession. C'est ce qui rend les transactions par requête cohérentes.

Si tu veux désactiver ce cache (rare, mais réel — par ex. générer deux tokens d'idempotence distincts) :

python
from fastapi import Depends

def fresh_uuid() -> str:
    import uuid
    return str(uuid.uuid4())

@app.post("/double")
async def double(
    a: Annotated[str, Depends(fresh_uuid, use_cache=False)],
    b: Annotated[str, Depends(fresh_uuid, use_cache=False)],
):
    return {"a": a, "b": b}  # a != b

Sans use_cache=False, a == b parce que fresh_uuid est résolu une fois et mis en cache. Piège classique : c'est la cause n°1 de « pourquoi mes deux dépendances retournent le même objet alors que je veux deux instances ». La réponse est presque toujours : déplace l'état hors de la DI, ou désactive le cache.


Dépendances sans valeur de retour : auth, rate-limit, garde

Toutes les dépendances ne servent pas à fournir un objet. Beaucoup servent à valider et lever une HTTPException avant que l'endpoint ne tourne. On les attache au niveau du path operation ou du router via dependencies=[...], sans les binder à un paramètre.

python
from fastapi import APIRouter, Depends, Header, HTTPException, status

async def verify_api_key(x_api_key: Annotated[str | None, Header()] = None) -> None:
    if x_api_key != EXPECTED_KEY:
        raise HTTPException(status.HTTP_403_FORBIDDEN, "Bad API key")
    # pas de return : on garde, on ne fournit pas


# Toutes les routes de ce router exigent la clé, sans la mentionner dans chaque signature
router = APIRouter(prefix="/agent", dependencies=[Depends(verify_api_key)])


@router.post("/chat")
async def chat(prompt: str) -> dict[str, str]:
    return {"prompt": prompt}

Ordre d'exécution : les dependencies=[...] du router s'exécutent avant celles de l'endpoint, qui s'exécutent avant le corps. C'est exactement là que tu mets l'auth et le rate-limiting pour une API qui sert un agent : tu refuses la requête avant d'avoir dépensé un seul token Anthropic.


⚙️ En production

Le vrai pattern : client AsyncAnthropic partagé via lifespan + DI

La règle d'or : les ressources coûteuses vivent dans le lifespan, la DI ne fait que les exposer. Tu crées le client une fois au démarrage, tu le ranges dans app.state, et une dépendance le ressort. Pour servir un agent LLM, c'est le squelette de référence.

python
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from typing import Annotated

import anthropic
from anthropic import AsyncAnthropic
from fastapi import Depends, FastAPI, HTTPException, Request


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # SETUP — une seule fois pour toute l'app
    app.state.anthropic = AsyncAnthropic(max_retries=3)  # retries + backoff intégrés au SDK
    yield
    # TEARDOWN — à l'arrêt
    await app.state.anthropic.close()


app = FastAPI(lifespan=lifespan)


def get_anthropic(request: Request) -> AsyncAnthropic:
    """Expose le client partagé. PAS d'instanciation ici."""
    return request.app.state.anthropic


AnthropicClient = Annotated[AsyncAnthropic, Depends(get_anthropic)]

Note sur claude-opus-4-8 : le modèle phare actuel (1M de contexte, 5 $ / 25 $ par Mtok en entrée/sortie). Il n'accepte que le thinking adaptatifthinking={"type": "adaptive"} — et pas budget_tokens (qui renvoie un 400). La profondeur de réflexion se règle via output_config={"effort": ...} (low / medium / high / xhigh / max), jamais via un budget de tokens.

Endpoint synchrone (réponse complète)

python
from pydantic import BaseModel


class ChatRequest(BaseModel):
    prompt: str
    effort: str = "high"


class ChatResponse(BaseModel):
    text: str
    input_tokens: int
    output_tokens: int


@app.post("/chat")
async def chat(body: ChatRequest, client: AnthropicClient) -> ChatResponse:
    try:
        message = await client.messages.create(
            model="claude-opus-4-8",
            max_tokens=16000,                       # défaut sain en non-streaming
            thinking={"type": "adaptive"},          # JAMAIS budget_tokens sur opus-4-8
            output_config={"effort": body.effort},  # low | medium | high | xhigh | max
            messages=[{"role": "user", "content": body.prompt}],
        )
    except anthropic.RateLimitError as exc:
        raise HTTPException(429, "Upstream rate limited; retry later") from exc
    except anthropic.APIStatusError as exc:
        raise HTTPException(502, f"Anthropic error {exc.status_code}") from exc

    text = "".join(block.text for block in message.content if block.type == "text")
    return ChatResponse(
        text=text,
        input_tokens=message.usage.input_tokens,
        output_tokens=message.usage.output_tokens,
    )

Endpoint en streaming (SSE) — la DI reste identique

Pour tout ce qui peut être long (sortie volumineuse, max_tokens élevé), il faut streamer, sinon tu risques un timeout HTTP. Le client injecté ne change pas ; seul le handler diffère. On utilise messages.stream() et on relaie les tokens en Server-Sent Events.

python
import json
from fastapi.responses import StreamingResponse


@app.post("/chat/stream")
async def chat_stream(body: ChatRequest, client: AnthropicClient) -> StreamingResponse:
    async def event_source():
        try:
            async with client.messages.stream(
                model="claude-opus-4-8",
                max_tokens=64000,               # défaut sain EN streaming
                thinking={"type": "adaptive"},
                output_config={"effort": body.effort},
                messages=[{"role": "user", "content": body.prompt}],
            ) as stream:
                async for text in stream.text_stream:
                    yield f"data: {json.dumps({'delta': text})}\n\n"
                final = await stream.get_final_message()
                usage = {"input": final.usage.input_tokens, "output": final.usage.output_tokens}
                yield f"data: {json.dumps({'done': True, 'usage': usage})}\n\n"
        except anthropic.APIError as exc:
            yield f"data: {json.dumps({'error': str(exc)})}\n\n"

    return StreamingResponse(event_source(), media_type="text/event-stream")

Point d'architecture senior : ne mets pas la logique de teardown du stream dans une dépendance à yield. Le générateur de streaming vit plus longtemps que la résolution de l'endpoint, et FastAPI ferme les dépendances à yield quand l'endpoint retourne l'objet StreamingResponse — avant que le streaming ne commence réellement. Si ta dépendance ferme une session DB à son yield, et que ton stream essaie de l'utiliser, tu touches une session fermée. Garde le client (un singleton) injecté ; ouvre tout état lié au stream à l'intérieur du générateur.

La boucle tool-use (agent) derrière un endpoint

Un agent réel boucle : le modèle demande un outil, ton code l'exécute, tu renvoies le résultat, jusqu'à stop_reason == "end_turn". La DI te sert ici à injecter à la fois le client et le service qui exécute les outils (lui-même injectant la DB). C'est la composition de dépendances qui paie.

python
import json


class AgentService:
    def __init__(self, client: AsyncAnthropic, db: AsyncSession) -> None:
        self._client = client
        self._db = db

    async def run(self, prompt: str, max_turns: int = 8) -> str:
        tools = [{
            "name": "lookup_order",
            "description": "Récupère une commande par son id. À appeler dès que l'utilisateur mentionne un numéro de commande.",
            "input_schema": {
                "type": "object",
                "properties": {"order_id": {"type": "string"}},
                "required": ["order_id"],
            },
        }]
        messages: list[dict] = [{"role": "user", "content": prompt}]

        for _ in range(max_turns):
            resp = await self._client.messages.create(
                model="claude-opus-4-8",
                max_tokens=16000,
                thinking={"type": "adaptive"},
                output_config={"effort": "high"},
                tools=tools,
                messages=messages,
            )
            if resp.stop_reason == "end_turn":
                return "".join(b.text for b in resp.content if b.type == "text")

            messages.append({"role": "assistant", "content": resp.content})
            results = []
            for block in resp.content:
                if block.type == "tool_use" and block.name == "lookup_order":
                    order = await self._lookup(block.input["order_id"])  # utilise self._db
                    results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": json.dumps(order),
                    })
            messages.append({"role": "user", "content": results})

        raise HTTPException(500, "Agent exceeded max turns")

    async def _lookup(self, order_id: str) -> dict:
        ...  # requête self._db


def get_agent_service(client: AnthropicClient, db: DbSession) -> AgentService:
    return AgentService(client=client, db=db)


AgentDep = Annotated[AgentService, Depends(get_agent_service)]


@app.post("/agent")
async def run_agent(body: ChatRequest, agent: AgentDep) -> dict[str, str]:
    return {"answer": await agent.run(body.prompt)}

Remarque la descriptions d'outil prescriptive (« À appeler dès que… ») : sur les modèles Opus récents (4.7/4.8), qui déclenchent les outils plus parcimonieusement, dire quand appeler l'outil dans sa description donne un gain mesurable de taux de déclenchement.

Modes de défaillance, perf, sécurité, observabilité

  • Défaillance : dépendances bloquantes dans un endpoint async. Une dépendance def (synchrone) qui fait de l'I/O bloquant (requests.get, psycopg2) est exécutée par FastAPI dans un threadpool — fine en petit volume, mais sous charge elle épuise le threadpool et bloque toute la boucle asyncio. Règle : I/O dans une dépendance → async def + client async (httpx.AsyncClient, asyncpg, AsyncAnthropic).
  • Défaillance : teardown silencieux. Le code après yield ne s'exécute pas si une exception non-HTTPException traverse — sauf si tu l'entoures d'un try/finally. Pour libérer un lock ou décrémenter un compteur de rate-limit, mets-le dans finally, pas juste après yield.
  • Perf : un seul client, un seul pool. Le client AsyncAnthropic partagé amortit le pool httpx, le keep-alive TLS, et permet au cache de prompt Anthropic (préfixe stable → lecture à ~0,1× du prix d'entrée) de payer. Un système prompt figé + cache_control: {"type": "ephemeral"} sur le dernier bloc système, injecté via DI de manière déterministe, c'est de l'argent réel économisé sur un agent à fort trafic.
  • Sécurité : l'auth est une dépendance, pas un if dans le handler. Mets verify_api_key / get_current_user en dependencies=[...] au niveau router. Ça centralise la garde, la rend testable, et garantit qu'aucun token LLM n'est dépensé avant la validation. Ne mets jamais de clé API dans le prompt ou les messages.
  • Sécurité : retries idempotents. AsyncAnthropic(max_retries=3) retente automatiquement 429/5xx avec backoff. Ne réinvente pas une boucle de retry par-dessus — tu doublerais les tentatives.
  • Observabilité : une dépendance de contexte de requête. Une dépendance qui génère un request_id, le logge, et mesure la latence en finally (après yield) te donne un point d'instrumentation propre, partagé par toute la requête grâce au cache de DI.
python
import time, uuid, logging

logger = logging.getLogger("api")

async def request_context() -> AsyncGenerator[str, None]:
    rid = str(uuid.uuid4())
    start = time.perf_counter()
    try:
        yield rid
    finally:  # s'exécute même en cas d'erreur
        logger.info("request done", extra={"rid": rid, "ms": (time.perf_counter() - start) * 1000})
  • Tradeoff senior : classe-comme-dépendance vs fonction-factory. Une classe avec __call__ permet une dépendance paramétrée (RateLimiter(max=10)), ce qu'une fonction nue ne fait pas proprement. Mais une classe instanciée au module-level partage son état entre requêtes — utile pour un rate-limiter, dangereux pour un buffer mutable. Sache lequel tu construis.

🏋️ Exercices

Exercice 1 — De return à yield (implémenter)

Objectif. Écris une dépendance get_db qui ouvre une AsyncSession, commit en cas de succès, rollback en cas d'exception, et ferme toujours. Vérifie que le rollback se déclenche quand l'endpoint lève une HTTPException.

Indice/Solution. async with SessionLocal() as s: try: yield s; await s.commit() except: await s.rollback(); raise. Le commit doit venir après yield (donc après que l'endpoint a fini) ; le rollback dans except. Teste en levant une exception dans l'endpoint et en vérifiant qu'aucune ligne n'est persistée.

Exercice 2 — Casser le cache de dépendance (comprendre puis casser puis réparer)

Objectif. Crée un endpoint qui injecte deux fois une dépendance current_timestamp. Constate que les deux valeurs sont identiques. Puis fais-en sorte qu'elles diffèrent.

Indice/Solution. Identiques car cache par requête. Ajoute use_cache=False dans Depends(current_timestamp, use_cache=False). Bonus : explique pourquoi déplacer l'état hors de la DI est souvent meilleur que use_cache=False.

Exercice 3 — Auth en dépendance de router (production-grade)

Objectif. Implémente get_current_user qui (1) lit un Bearer token via OAuth2PasswordBearer, (2) le décode, (3) charge l'utilisateur depuis la DB injectée, (4) lève 401 si invalide. Attache-la au niveau router pour protéger tout un groupe d'endpoints d'agent.

Indice/Solution. token: Annotated[str, Depends(oauth2_scheme)] ; décode le JWT ; db: DbSession comme sous-dépendance. Au niveau router : APIRouter(dependencies=[Depends(get_current_user)]). Vérifie que l'auth s'exécute avant tout appel Anthropic en mettant un log dans la dépendance et un autre dans le handler.

Exercice 4 — Client Anthropic partagé + override en test (production-grade)

Objectif. Mets le client AsyncAnthropic dans le lifespan, expose-le via get_anthropic, et écris un test qui le remplace par un faux client via app.dependency_overrides[get_anthropic] = lambda: FakeClient(). Le test ne doit faire aucun appel réseau.

Indice/Solution. FakeClient n'a besoin que d'un messages.create async qui retourne un objet avec .content, .stop_reason, .usage. Utilise httpx.AsyncClient + ASGITransport pour tester l'app. Nettoie avec app.dependency_overrides.clear() en teardown de fixture.

Exercice 5 — Streaming sans fuite (casser puis réparer)

Objectif. Écris un endpoint streaming qui injecte get_db (dépendance à yield qui ferme la session). Fais-le échouer en utilisant db dans le générateur de stream, observe l'erreur « session is closed », puis répare.

Indice/Solution. La session est fermée au retour du StreamingResponse, avant l'itération. Réparation : ne dépends pas de get_db pour le travail fait pendant le stream ; ouvre la session à l'intérieur du générateur (async with SessionLocal()), ou pré-charge toutes les données nécessaires avant de retourner la StreamingResponse.

Exercice 6 — Rate-limiter paramétré comme classe-dépendance (HARD)

Objectif. Implémente RateLimiter(max_calls=10, window=60) utilisable en dependencies=[Depends(RateLimiter(10, 60))]. Il doit limiter par current_user.id, être thread/async-safe, et décrémenter proprement en finally.

Indice/Solution. Classe avec __init__(self, max_calls, window) et async def __call__(self, user: CurrentUser). Stocke les compteurs dans un dict + asyncio.Lock (ou Redis en vrai). Lève 429 au dépassement. Le piège : l'instance RateLimiter(10, 60) est partagée entre requêtes — c'est voulu pour l'état, mais ça veut dire qu'il faut un lock autour de la mutation du dict.


🎤 En entretien

Q : Quelle est la portée (scope) d'une dépendance FastAPI ? Comment fais-tu un singleton ? R : Scope par requête, avec cache par requête — pas de scope app/singleton dans la DI. Les vrais singletons (pool DB, client HTTP, client AsyncAnthropic) vivent dans le lifespan / app.state, et la DI ne fait que les exposer.

Q : def vs async def pour une dépendance — quand est-ce que ça compte ? R : Une dépendance def qui fait de l'I/O bloquant tourne dans le threadpool ; sous charge ça épuise le threadpool et bloque la boucle asyncio. Toute dépendance qui fait de l'I/O dans un app async doit être async def avec un client async.

Q : Pourquoi yield plutôt que return dans une dépendance, et quel est le piège du teardown ? R : yield permet le teardown scopé à la requête (fermer une session, libérer un lock) ; le piège est que le code après yield ne s'exécute pas forcément sur exception — il faut un try/finally pour les libérations critiques. Et en streaming, le teardown arrive avant l'itération, donc on n'ouvre pas l'état du stream dans une dépendance à yield.

Q : Comment testes-tu un endpoint qui appelle un agent LLM sans toucher le réseau ? R : J'injecte le client AsyncAnthropic via une dépendance (get_anthropic) et je le remplace en test avec app.dependency_overrides[get_anthropic] = lambda: fake. C'est précisément ce que la DI rend trivial — aucun monkeypatch d'import, substitution propre au niveau du graphe de dépendances.

Bibliothèque tech perso — Achref