Dependencies (DI)
TL;DR — Le système d'injection de dépendances de FastAPI repose sur une seule fonction,
Depends(), qui transforme n'importe quel callable en ressource résolue et mise en cache par requête. Tu déclares ce dont un endpoint a besoin (une session DB, l'utilisateur courant, un clientAsyncAnthropic) dans sa signature, et FastAPI résout le graphe de dépendances, gère le cycle de vie (yieldpour setup/teardown), et te donne une testabilité gratuite viaapp.dependency_overrides. Pour servir des agents LLM, la DI est l'endroit idiomatique où tu injectes un client Anthropic réutilisé, où tu appliques l'auth et le rate-limiting avant de brûler des tokens, et où tu propages un contexte de requête propre dans tout le stack — sans jamais instancier un client par requête.
🧠 Mental model
Si tu viens de NestJS, tu as déjà le réflexe DI : tu déclares un provider, tu l'annotes @Injectable(), et le conteneur le résout par son type dans le constructeur. FastAPI fait la même chose, mais sans conteneur global, sans décorateur, et sans singleton implicite. Il n'y a pas de @Module, pas de providers: [...], pas de scope REQUEST/SINGLETON à configurer. Le « conteneur », c'est l'arbre d'appels que FastAPI reconstruit à chaque requête en lisant les signatures de fonctions.
L'analogie la plus juste : une dépendance FastAPI est une recette, pas un objet enregistré. Quand un endpoint dit db: Session = Depends(get_db), FastAPI lit la recette get_db, exécute ses propres ingrédients (ses sous-dépendances), te sert le plat, et — si la recette utilise yield — revient nettoyer la cuisine quand la requête se termine. Deux endpoints qui demandent la même recette dans la même requête partagent le même plat (cache). Deux requêtes différentes obtiennent deux plats neufs.
Requête HTTP entrante
│
▼
┌─────────────────────────────┐
│ FastAPI lit la signature │
│ de l'endpoint │
└─────────────┬───────────────┘
│ rencontre Depends(...)
┌─────────────▼───────────────┐
│ Résolution du graphe │
│ │
│ get_current_user │
│ └── Depends(get_db) ◄──┐│ même requête →
│ get_agent_service ││ get_db résolu
│ └── Depends(get_db) ◄──┘│ UNE seule fois
└─────────────┬───────────────┘ (cache par requête)
│
▼
Exécution de l'endpoint
│
▼
Teardown des dépendances à `yield`
(ordre inverse de résolution)Le point qui surprend les gens venant de Nest/Spring : il n'y a pas de scope singleton dans la DI elle-même. Une dépendance est résolue une fois par requête, pas une fois par application. Les vrais singletons (pool de connexions DB, client HTTP, client AsyncAnthropic) vivent dans le lifespan de l'application (app.state), et la DI ne fait que les exposer proprement à chaque requête. On y revient en production.
Le cœur : Depends et l'Annotated moderne
Une dépendance est n'importe quel callable : fonction, fonction async, ou classe. FastAPI inspecte ses paramètres, les résout récursivement, et passe le résultat à l'endpoint.
La forme idiomatique (Python 3.12 + Annotated)
Depuis FastAPI 0.95+, la manière idiomatique d'écrire une dépendance utilise typing.Annotated. Ça sépare le type (Session) du mécanisme d'injection (Depends(...)), et ça rend la dépendance réutilisable comme un alias de type.
from typing import Annotated
from collections.abc import AsyncGenerator
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from pydantic import BaseModel
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
engine = create_async_engine("postgresql+asyncpg://localhost/app", pool_size=20)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dépendance à cycle de vie : ouvre une session, la ferme à la fin de la requête."""
async with SessionLocal() as session:
yield session # ← tout ce qui est avant `yield` = setup, après = teardown
# L'alias réutilisable : c'est CE qu'on partage dans tout le code.
DbSession = Annotated[AsyncSession, Depends(get_db)]
class User(BaseModel):
id: int
email: str
is_active: bool
async def get_current_user(
db: DbSession, # une dépendance dépend d'une autre
token: Annotated[str, Depends(oauth2_scheme)], # extrait le Bearer token de l'en-tête
) -> User:
user = await fetch_user_from_token(db, token)
if user is None or not user.is_active:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or inactive user")
return user
CurrentUser = Annotated[User, Depends(get_current_user)]
@app.get("/me")
async def read_me(user: CurrentUser) -> User:
return userCe qu'il faut voir ici :
get_dbutiliseyield, pasreturn. Le code avantyields'exécute à l'entrée ; le code après (ici le__aexit__duasync with) s'exécute au teardown, dans l'ordre inverse de résolution. C'est l'équivalent FastAPI d'untry/finallyscopé à la requête — et c'est l'unique bon endroit pour fermer une session DB, relâcher un lock, ou logger la durée.- Les dépendances composent.
get_current_userdéclaredb: DbSession; FastAPI résoutget_dbd'abord. Tu construis un graphe, pas une liste plate. Annotated[T, Depends(...)]est un alias.CurrentUserse réutilise dans 50 endpoints sans répéter leDepends.
La manière à éviter (le « common wrong way »)
# ❌ ANTI-PATTERN : instancier des ressources lourdes DANS la dépendance, par requête
from anthropic import AsyncAnthropic
async def get_agent_reply(prompt: str):
client = AsyncAnthropic() # ← nouveau client + nouveau pool HTTP À CHAQUE REQUÊTE
msg = await client.messages.create(
model="claude-opus-4-8",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
await client.close() # et on jette tout le pool de connexions immédiatement
return msgPourquoi c'est faux :
- Coût de connexion.
AsyncAnthropic()crée un pool de connexions HTTP (httpx) sous-jacent. Le recréer par requête tue le keep-alive TLS, ajoute de la latence de handshake, et n'amortit jamais le pool. - Pas testable. Tu ne peux pas substituer ce client en test sans monkeypatcher l'import.
- Pas de cache de prompt possible. Le cache de prompt Anthropic est par modèle et profite du keep-alive ; un client jetable casse tout amortissement.
La version correcte injecte un client partagé, créé une fois au démarrage. Voir la section production.
Le cache de dépendances : la subtilité qui mord
Dans une même requête, FastAPI résout chaque dépendance une seule fois et réutilise le résultat. Si get_current_user et get_agent_service dépendent tous deux de get_db, get_db n'est exécuté qu'une fois — tu obtiens la même AsyncSession. C'est ce qui rend les transactions par requête cohérentes.
Si tu veux désactiver ce cache (rare, mais réel — par ex. générer deux tokens d'idempotence distincts) :
from fastapi import Depends
def fresh_uuid() -> str:
import uuid
return str(uuid.uuid4())
@app.post("/double")
async def double(
a: Annotated[str, Depends(fresh_uuid, use_cache=False)],
b: Annotated[str, Depends(fresh_uuid, use_cache=False)],
):
return {"a": a, "b": b} # a != bSans use_cache=False, a == b parce que fresh_uuid est résolu une fois et mis en cache. Piège classique : c'est la cause n°1 de « pourquoi mes deux dépendances retournent le même objet alors que je veux deux instances ». La réponse est presque toujours : déplace l'état hors de la DI, ou désactive le cache.
Dépendances sans valeur de retour : auth, rate-limit, garde
Toutes les dépendances ne servent pas à fournir un objet. Beaucoup servent à valider et lever une HTTPException avant que l'endpoint ne tourne. On les attache au niveau du path operation ou du router via dependencies=[...], sans les binder à un paramètre.
from fastapi import APIRouter, Depends, Header, HTTPException, status
async def verify_api_key(x_api_key: Annotated[str | None, Header()] = None) -> None:
if x_api_key != EXPECTED_KEY:
raise HTTPException(status.HTTP_403_FORBIDDEN, "Bad API key")
# pas de return : on garde, on ne fournit pas
# Toutes les routes de ce router exigent la clé, sans la mentionner dans chaque signature
router = APIRouter(prefix="/agent", dependencies=[Depends(verify_api_key)])
@router.post("/chat")
async def chat(prompt: str) -> dict[str, str]:
return {"prompt": prompt}Ordre d'exécution : les dependencies=[...] du router s'exécutent avant celles de l'endpoint, qui s'exécutent avant le corps. C'est exactement là que tu mets l'auth et le rate-limiting pour une API qui sert un agent : tu refuses la requête avant d'avoir dépensé un seul token Anthropic.
⚙️ En production
Le vrai pattern : client AsyncAnthropic partagé via lifespan + DI
La règle d'or : les ressources coûteuses vivent dans le lifespan, la DI ne fait que les exposer. Tu crées le client une fois au démarrage, tu le ranges dans app.state, et une dépendance le ressort. Pour servir un agent LLM, c'est le squelette de référence.
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from typing import Annotated
import anthropic
from anthropic import AsyncAnthropic
from fastapi import Depends, FastAPI, HTTPException, Request
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# SETUP — une seule fois pour toute l'app
app.state.anthropic = AsyncAnthropic(max_retries=3) # retries + backoff intégrés au SDK
yield
# TEARDOWN — à l'arrêt
await app.state.anthropic.close()
app = FastAPI(lifespan=lifespan)
def get_anthropic(request: Request) -> AsyncAnthropic:
"""Expose le client partagé. PAS d'instanciation ici."""
return request.app.state.anthropic
AnthropicClient = Annotated[AsyncAnthropic, Depends(get_anthropic)]Note sur
claude-opus-4-8: le modèle phare actuel (1M de contexte, 5 $ / 25 $ par Mtok en entrée/sortie). Il n'accepte que le thinking adaptatif —thinking={"type": "adaptive"}— et pasbudget_tokens(qui renvoie un 400). La profondeur de réflexion se règle viaoutput_config={"effort": ...}(low/medium/high/xhigh/max), jamais via un budget de tokens.
Endpoint synchrone (réponse complète)
from pydantic import BaseModel
class ChatRequest(BaseModel):
prompt: str
effort: str = "high"
class ChatResponse(BaseModel):
text: str
input_tokens: int
output_tokens: int
@app.post("/chat")
async def chat(body: ChatRequest, client: AnthropicClient) -> ChatResponse:
try:
message = await client.messages.create(
model="claude-opus-4-8",
max_tokens=16000, # défaut sain en non-streaming
thinking={"type": "adaptive"}, # JAMAIS budget_tokens sur opus-4-8
output_config={"effort": body.effort}, # low | medium | high | xhigh | max
messages=[{"role": "user", "content": body.prompt}],
)
except anthropic.RateLimitError as exc:
raise HTTPException(429, "Upstream rate limited; retry later") from exc
except anthropic.APIStatusError as exc:
raise HTTPException(502, f"Anthropic error {exc.status_code}") from exc
text = "".join(block.text for block in message.content if block.type == "text")
return ChatResponse(
text=text,
input_tokens=message.usage.input_tokens,
output_tokens=message.usage.output_tokens,
)Endpoint en streaming (SSE) — la DI reste identique
Pour tout ce qui peut être long (sortie volumineuse, max_tokens élevé), il faut streamer, sinon tu risques un timeout HTTP. Le client injecté ne change pas ; seul le handler diffère. On utilise messages.stream() et on relaie les tokens en Server-Sent Events.
import json
from fastapi.responses import StreamingResponse
@app.post("/chat/stream")
async def chat_stream(body: ChatRequest, client: AnthropicClient) -> StreamingResponse:
async def event_source():
try:
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=64000, # défaut sain EN streaming
thinking={"type": "adaptive"},
output_config={"effort": body.effort},
messages=[{"role": "user", "content": body.prompt}],
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'delta': text})}\n\n"
final = await stream.get_final_message()
usage = {"input": final.usage.input_tokens, "output": final.usage.output_tokens}
yield f"data: {json.dumps({'done': True, 'usage': usage})}\n\n"
except anthropic.APIError as exc:
yield f"data: {json.dumps({'error': str(exc)})}\n\n"
return StreamingResponse(event_source(), media_type="text/event-stream")Point d'architecture senior : ne mets pas la logique de teardown du stream dans une dépendance à yield. Le générateur de streaming vit plus longtemps que la résolution de l'endpoint, et FastAPI ferme les dépendances à yield quand l'endpoint retourne l'objet StreamingResponse — avant que le streaming ne commence réellement. Si ta dépendance ferme une session DB à son yield, et que ton stream essaie de l'utiliser, tu touches une session fermée. Garde le client (un singleton) injecté ; ouvre tout état lié au stream à l'intérieur du générateur.
La boucle tool-use (agent) derrière un endpoint
Un agent réel boucle : le modèle demande un outil, ton code l'exécute, tu renvoies le résultat, jusqu'à stop_reason == "end_turn". La DI te sert ici à injecter à la fois le client et le service qui exécute les outils (lui-même injectant la DB). C'est la composition de dépendances qui paie.
import json
class AgentService:
def __init__(self, client: AsyncAnthropic, db: AsyncSession) -> None:
self._client = client
self._db = db
async def run(self, prompt: str, max_turns: int = 8) -> str:
tools = [{
"name": "lookup_order",
"description": "Récupère une commande par son id. À appeler dès que l'utilisateur mentionne un numéro de commande.",
"input_schema": {
"type": "object",
"properties": {"order_id": {"type": "string"}},
"required": ["order_id"],
},
}]
messages: list[dict] = [{"role": "user", "content": prompt}]
for _ in range(max_turns):
resp = await self._client.messages.create(
model="claude-opus-4-8",
max_tokens=16000,
thinking={"type": "adaptive"},
output_config={"effort": "high"},
tools=tools,
messages=messages,
)
if resp.stop_reason == "end_turn":
return "".join(b.text for b in resp.content if b.type == "text")
messages.append({"role": "assistant", "content": resp.content})
results = []
for block in resp.content:
if block.type == "tool_use" and block.name == "lookup_order":
order = await self._lookup(block.input["order_id"]) # utilise self._db
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(order),
})
messages.append({"role": "user", "content": results})
raise HTTPException(500, "Agent exceeded max turns")
async def _lookup(self, order_id: str) -> dict:
... # requête self._db
def get_agent_service(client: AnthropicClient, db: DbSession) -> AgentService:
return AgentService(client=client, db=db)
AgentDep = Annotated[AgentService, Depends(get_agent_service)]
@app.post("/agent")
async def run_agent(body: ChatRequest, agent: AgentDep) -> dict[str, str]:
return {"answer": await agent.run(body.prompt)}Remarque la descriptions d'outil prescriptive (« À appeler dès que… ») : sur les modèles Opus récents (4.7/4.8), qui déclenchent les outils plus parcimonieusement, dire quand appeler l'outil dans sa description donne un gain mesurable de taux de déclenchement.
Modes de défaillance, perf, sécurité, observabilité
- Défaillance : dépendances bloquantes dans un endpoint async. Une dépendance
def(synchrone) qui fait de l'I/O bloquant (requests.get,psycopg2) est exécutée par FastAPI dans un threadpool — fine en petit volume, mais sous charge elle épuise le threadpool et bloque toute la boucle asyncio. Règle : I/O dans une dépendance →async def+ client async (httpx.AsyncClient,asyncpg,AsyncAnthropic). - Défaillance : teardown silencieux. Le code après
yieldne s'exécute pas si une exception non-HTTPExceptiontraverse — sauf si tu l'entoures d'untry/finally. Pour libérer un lock ou décrémenter un compteur de rate-limit, mets-le dansfinally, pas juste aprèsyield. - Perf : un seul client, un seul pool. Le client
AsyncAnthropicpartagé amortit le pool httpx, le keep-alive TLS, et permet au cache de prompt Anthropic (préfixe stable → lecture à ~0,1× du prix d'entrée) de payer. Un système prompt figé +cache_control: {"type": "ephemeral"}sur le dernier bloc système, injecté via DI de manière déterministe, c'est de l'argent réel économisé sur un agent à fort trafic. - Sécurité : l'auth est une dépendance, pas un
ifdans le handler. Metsverify_api_key/get_current_userendependencies=[...]au niveau router. Ça centralise la garde, la rend testable, et garantit qu'aucun token LLM n'est dépensé avant la validation. Ne mets jamais de clé API dans le prompt ou les messages. - Sécurité : retries idempotents.
AsyncAnthropic(max_retries=3)retente automatiquement 429/5xx avec backoff. Ne réinvente pas une boucle de retry par-dessus — tu doublerais les tentatives. - Observabilité : une dépendance de contexte de requête. Une dépendance qui génère un
request_id, le logge, et mesure la latence enfinally(aprèsyield) te donne un point d'instrumentation propre, partagé par toute la requête grâce au cache de DI.
import time, uuid, logging
logger = logging.getLogger("api")
async def request_context() -> AsyncGenerator[str, None]:
rid = str(uuid.uuid4())
start = time.perf_counter()
try:
yield rid
finally: # s'exécute même en cas d'erreur
logger.info("request done", extra={"rid": rid, "ms": (time.perf_counter() - start) * 1000})- Tradeoff senior : classe-comme-dépendance vs fonction-factory. Une classe avec
__call__permet une dépendance paramétrée (RateLimiter(max=10)), ce qu'une fonction nue ne fait pas proprement. Mais une classe instanciée au module-level partage son état entre requêtes — utile pour un rate-limiter, dangereux pour un buffer mutable. Sache lequel tu construis.
🏋️ Exercices
Exercice 1 — De return à yield (implémenter)
Objectif. Écris une dépendance get_db qui ouvre une AsyncSession, commit en cas de succès, rollback en cas d'exception, et ferme toujours. Vérifie que le rollback se déclenche quand l'endpoint lève une HTTPException.
Indice/Solution. async with SessionLocal() as s: try: yield s; await s.commit() except: await s.rollback(); raise. Le commit doit venir après yield (donc après que l'endpoint a fini) ; le rollback dans except. Teste en levant une exception dans l'endpoint et en vérifiant qu'aucune ligne n'est persistée.
Exercice 2 — Casser le cache de dépendance (comprendre puis casser puis réparer)
Objectif. Crée un endpoint qui injecte deux fois une dépendance current_timestamp. Constate que les deux valeurs sont identiques. Puis fais-en sorte qu'elles diffèrent.
Indice/Solution. Identiques car cache par requête. Ajoute use_cache=False dans Depends(current_timestamp, use_cache=False). Bonus : explique pourquoi déplacer l'état hors de la DI est souvent meilleur que use_cache=False.
Exercice 3 — Auth en dépendance de router (production-grade)
Objectif. Implémente get_current_user qui (1) lit un Bearer token via OAuth2PasswordBearer, (2) le décode, (3) charge l'utilisateur depuis la DB injectée, (4) lève 401 si invalide. Attache-la au niveau router pour protéger tout un groupe d'endpoints d'agent.
Indice/Solution. token: Annotated[str, Depends(oauth2_scheme)] ; décode le JWT ; db: DbSession comme sous-dépendance. Au niveau router : APIRouter(dependencies=[Depends(get_current_user)]). Vérifie que l'auth s'exécute avant tout appel Anthropic en mettant un log dans la dépendance et un autre dans le handler.
Exercice 4 — Client Anthropic partagé + override en test (production-grade)
Objectif. Mets le client AsyncAnthropic dans le lifespan, expose-le via get_anthropic, et écris un test qui le remplace par un faux client via app.dependency_overrides[get_anthropic] = lambda: FakeClient(). Le test ne doit faire aucun appel réseau.
Indice/Solution. FakeClient n'a besoin que d'un messages.create async qui retourne un objet avec .content, .stop_reason, .usage. Utilise httpx.AsyncClient + ASGITransport pour tester l'app. Nettoie avec app.dependency_overrides.clear() en teardown de fixture.
Exercice 5 — Streaming sans fuite (casser puis réparer)
Objectif. Écris un endpoint streaming qui injecte get_db (dépendance à yield qui ferme la session). Fais-le échouer en utilisant db dans le générateur de stream, observe l'erreur « session is closed », puis répare.
Indice/Solution. La session est fermée au retour du StreamingResponse, avant l'itération. Réparation : ne dépends pas de get_db pour le travail fait pendant le stream ; ouvre la session à l'intérieur du générateur (async with SessionLocal()), ou pré-charge toutes les données nécessaires avant de retourner la StreamingResponse.
Exercice 6 — Rate-limiter paramétré comme classe-dépendance (HARD)
Objectif. Implémente RateLimiter(max_calls=10, window=60) utilisable en dependencies=[Depends(RateLimiter(10, 60))]. Il doit limiter par current_user.id, être thread/async-safe, et décrémenter proprement en finally.
Indice/Solution. Classe avec __init__(self, max_calls, window) et async def __call__(self, user: CurrentUser). Stocke les compteurs dans un dict + asyncio.Lock (ou Redis en vrai). Lève 429 au dépassement. Le piège : l'instance RateLimiter(10, 60) est partagée entre requêtes — c'est voulu pour l'état, mais ça veut dire qu'il faut un lock autour de la mutation du dict.
🎤 En entretien
Q : Quelle est la portée (scope) d'une dépendance FastAPI ? Comment fais-tu un singleton ? R : Scope par requête, avec cache par requête — pas de scope app/singleton dans la DI. Les vrais singletons (pool DB, client HTTP, client AsyncAnthropic) vivent dans le lifespan / app.state, et la DI ne fait que les exposer.
Q : def vs async def pour une dépendance — quand est-ce que ça compte ? R : Une dépendance def qui fait de l'I/O bloquant tourne dans le threadpool ; sous charge ça épuise le threadpool et bloque la boucle asyncio. Toute dépendance qui fait de l'I/O dans un app async doit être async def avec un client async.
Q : Pourquoi yield plutôt que return dans une dépendance, et quel est le piège du teardown ? R : yield permet le teardown scopé à la requête (fermer une session, libérer un lock) ; le piège est que le code après yield ne s'exécute pas forcément sur exception — il faut un try/finally pour les libérations critiques. Et en streaming, le teardown arrive avant l'itération, donc on n'ouvre pas l'état du stream dans une dépendance à yield.
Q : Comment testes-tu un endpoint qui appelle un agent LLM sans toucher le réseau ? R : J'injecte le client AsyncAnthropic via une dépendance (get_anthropic) et je le remplace en test avec app.dependency_overrides[get_anthropic] = lambda: fake. C'est précisément ce que la DI rend trivial — aucun monkeypatch d'import, substitution propre au niveau du graphe de dépendances.