Lifespan & startup
TL;DR — Le
lifespanest le seul endroit propre pour faire vivre des ressources coûteuses (client HTTP, pool de connexions DB, clientAsyncAnthropic) aussi longtemps que le process FastAPI : on les crée avant le premieryield, on les range dansapp.state, on les ferme après. C'est l'équivalent moderne et asynchrone des vieux décorateurs@app.on_event("startup")(dépréciés). La règle d'or pour un dev qui sert des agents LLM : un seulAsyncAnthropicpartagé pour tout le process (il gère son propre pool de connexions et ses retries) — instancier un client par requête, c'est tuer le keep-alive HTTP, exploser la latence et perdre le bénéfice du connection pooling. Tout ce qui estasync with, observable et fermable proprement passe par le lifespan.
🧠 Mental model
Pense au lifespan comme à l'ouverture et la fermeture d'un restaurant, pas à la prise de commande d'un client.
- Startup (avant
yield) = le chef arrive, allume les fourneaux, branche le frigo, ouvre la connexion au fournisseur. Ça arrive une fois, au boot du process. - Requêtes = chaque client qui passe commande. Ils partagent tous la même cuisine. On ne rallume pas les fourneaux à chaque assiette.
- Shutdown (après
yield) = on éteint les fourneaux, on ferme les connexions, on nettoie. Une fois, à l'arrêt.
Le piège classique de l'ex-PHP : en PHP-FPM, chaque requête est un process quasi-vierge, donc tu instancies tout à chaque requête et c'est "normal". En NestJS tu connais déjà le concept — c'est le onModuleInit / onApplicationShutdown et les providers singletons. FastAPI lifespan = le scope singleton de l'application, géré par un context manager async.
process boot
│
▼
┌─────────────────────────────────┐
│ lifespan: code AVANT yield │ ← startup (1×)
│ - créer AsyncAnthropic() │
│ - ouvrir pool DB / Redis │
│ - app.state.client = ... │
└─────────────────────────────────┘
│
│ yield ──────────────────────────────────┐
│ │
▼ │
┌─────────────────────────────────┐ │
│ l'app SERT les requêtes │ ← runtime │ durée de vie
│ req → handler → app.state.client │ (N×) │ du process
│ req → handler → app.state.client │ │
└─────────────────────────────────┘ │
│ │
│ (SIGTERM / Ctrl-C) │
▼ │
┌─────────────────────────────────┐ ◀────────────┘
│ lifespan: code APRÈS yield │ ← shutdown (1×)
│ - await client.close() │
│ - fermer pool DB │
└─────────────────────────────────┘Tout ce qui est avant yield bloque le démarrage : tant que ce n'est pas fini, FastAPI n'accepte aucune requête. Tout ce qui est après yield retarde l'arrêt : c'est là qu'on draine proprement.
La façon idiomatique (Python 3.12, FastAPI moderne)
Le lifespan est un async context manager passé à FastAPI(lifespan=...). Le pattern canonique utilise @asynccontextmanager et un TypedDict ou dataclass pour typer ce qu'on stocke.
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
import httpx
from anthropic import AsyncAnthropic
from fastapi import FastAPI, Request
@dataclass
class AppState:
"""Tout ce qui vit aussi longtemps que le process. Typé, pas un dict opaque."""
anthropic: AsyncAnthropic
http: httpx.AsyncClient
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# --- STARTUP : avant le yield, exécuté une seule fois ---
anthropic = AsyncAnthropic(
max_retries=4, # retries SDK avec backoff exponentiel (429 / 5xx / 529)
timeout=60.0, # timeout par requête, pas par token
)
http = httpx.AsyncClient(
timeout=httpx.Timeout(10.0, connect=5.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
app.state.deps = AppState(anthropic=anthropic, http=http)
yield # <-- l'app sert les requêtes pendant tout ce temps
# --- SHUTDOWN : après le yield, exécuté une seule fois ---
await anthropic.close()
await http.aclose()
app = FastAPI(lifespan=lifespan)
def get_state(request: Request) -> AppState:
"""Dependency : récupère l'état partagé, typé."""
return request.app.state.deps
@app.get("/health")
async def health(request: Request) -> dict[str, str]:
state = get_state(request)
return {"status": "ok", "clients": "ready"}Points clés :
@asynccontextmanagertransforme une fonction génératrice en context manager. Leyieldest le point de bascule startup → runtime → shutdown.app.stateest le sac officiel pour les singletons. On y range unedataclasstypée plutôt qu'un dict pour garder l'autocomplétion et le typage.- Le client
AsyncAnthropicest créé UNE fois. Il maintient un poolhttpxinterne, fait du keep-alive HTTP/2, et applique les retries. Le réinstancier par requête réinitialise tout ça. - On ferme dans l'ordre inverse (LIFO) de la création, comme une pile de context managers.
Injecter le client dans un endpoint via DI
from typing import Annotated
from fastapi import Depends
# Dependency dédiée et réutilisable
def get_anthropic(state: Annotated[AppState, Depends(get_state)]) -> AsyncAnthropic:
return state.anthropic
AnthropicDep = Annotated[AsyncAnthropic, Depends(get_anthropic)]
@app.post("/summarize")
async def summarize(text: str, client: AnthropicDep) -> dict[str, str]:
message = await client.messages.create(
model="claude-opus-4-8",
max_tokens=1024,
thinking={"type": "adaptive"}, # pas de budget_tokens : déprécié/refusé sur 4.8
messages=[{"role": "user", "content": f"Résume en une phrase :\n\n{text}"}],
)
# content est une liste de blocs : on filtre les blocs texte
parts = [b.text for b in message.content if b.type == "text"]
return {"summary": "".join(parts)}Le Annotated[..., Depends(...)] est la forme moderne (FastAPI ≥ 0.95) : réutilisable, typée, pas de = Depends(...) en valeur par défaut.
La façon naïve (à NE PAS faire)
# ❌ ANTI-PATTERN n°1 : un client par requête
@app.post("/summarize-bad")
async def summarize_bad(text: str) -> dict[str, str]:
client = AsyncAnthropic() # nouveau pool TCP/TLS à CHAQUE requête
message = await client.messages.create(...)
# ... et on ne ferme même pas le client → fuite de sockets
return {...}Ce que ça casse concrètement :
- Handshake TLS à chaque appel. Pas de keep-alive : chaque requête repaie l'établissement de connexion (souvent 50–200 ms). Sous charge, tu satures les ports éphémères et tu vois apparaître des
Cannot assign requested address. - Retries et rate-limit headers perdus. Le client SDK lit
retry-afteret applique un backoff. Un client jetable ne capitalise rien. - Fuite de ressources.
AsyncAnthropicnon fermé = connexions httpx qui traînent.
# ❌ ANTI-PATTERN n°2 : les vieux on_event (dépréciés depuis FastAPI 0.93)
@app.on_event("startup")
async def startup() -> None:
app.state.client = AsyncAnthropic()
@app.on_event("shutdown")
async def shutdown() -> None:
await app.state.client.close()Pourquoi c'est mauvais aujourd'hui : on_event est déprécié, ne partage pas de scope entre startup et shutdown (variables partagées via app.state uniquement), et ne compose pas avec async with. Si le startup échoue à mi-chemin, tu ne peux pas garantir le cleanup partiel. Le lifespan avec context manager te donne le try/finally gratuitement.
# ❌ ANTI-PATTERN n°3 : startup qui peut planter sans cleanup
@asynccontextmanager
async def lifespan_bad(app: FastAPI) -> AsyncIterator[None]:
db = await open_db_pool() # OK
cache = await open_redis() # 💥 si ça plante ici, db n'est jamais fermé
app.state.db, app.state.cache = db, cache
yield
await db.close()
await cache.close()La version correcte protège chaque acquisition (voir la section production).
⚙️ En production
Modes de défaillance
Le startup qui échoue à moitié. Si tu ouvres 3 ressources et que la 2e plante, les autres fuient. Protège-toi avec AsyncExitStack, qui empile les fermetures et les exécute même en cas d'exception :
from contextlib import AsyncExitStack
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
async with AsyncExitStack() as stack:
anthropic = AsyncAnthropic(max_retries=4)
stack.push_async_callback(anthropic.close) # fermé même si la suite plante
http = await stack.enter_async_context(
httpx.AsyncClient(timeout=10.0)
)
# Si open_db_pool() lève, anthropic.close() ET http.aclose()
# sont quand même appelés par le stack.
db = await open_db_pool()
stack.push_async_callback(db.close)
app.state.deps = AppState(anthropic=anthropic, http=http, db=db)
yield
# à la sortie du `async with`, tout est fermé en LIFO, garantiLe startup lent qui bloque le health check. Tout ce qui est avant yield retarde l'acceptation des requêtes. Ne fais PAS d'appel réseau bloquant de plusieurs secondes (pré-warm d'un modèle, migration DB) en synchrone dans le startup sans timeout — ton orchestrateur (Kubernetes, ECS) va tuer le pod si la readiness probe expire. Mets un timeout explicite :
import asyncio
try:
await asyncio.wait_for(warmup_cache(anthropic), timeout=15.0)
except (asyncio.TimeoutError, Exception) as exc:
# On démarre dégradé plutôt que de bloquer le boot indéfiniment.
app.state.deps.warm = False
logging.warning("warmup échoué, démarrage dégradé: %s", exc)Le shutdown qui ne draine pas. À la réception de SIGTERM, le code après yield s'exécute. Si tu fermes le client Anthropic avant que les requêtes en vol (un appel LLM de 30 s) ne se terminent, elles cassent. Uvicorn gère le drainage des requêtes HTTP ; mais pour des tâches de fond (streaming SSE, tool-use loop), il faut un drainage explicite (voir plus bas).
Performance
- Un client = un pool.
AsyncAnthropicréutilise les connexions. Avec un seul client process-wide et HTTP keep-alive, le coût d'un appel devient dominé par le temps modèle, pas par le réseau. - Workers ≠ partage de mémoire. Si tu lances
uvicorn --workers 4, le lifespan tourne 4 fois, une par process worker. Tu as 4 clients Anthropic, 4 pools. C'est correct (les process ne partagent pas la mémoire), mais dimensionne tes limites de connexions et tes rate limits en conséquence : 4 workers ×max_connectionschacun. - Prompt caching. Pour servir un agent, garde ton prompt système stable et place le
cache_controldessus. Le client étant partagé, les hits de cache s'accumulent à travers les requêtes (le cache est côté API, mais un client stable évite de réécrire le préfixe).
Sécurité
- Pas de clé en dur.
AsyncAnthropic()litANTHROPIC_API_KEYdepuis l'environnement. Ne passe jamais la clé dans le code ni dansapp.stateexposé. app.stateest global au process. N'y mets jamais de données par-utilisateur. C'est strictement pour les singletons partagés. Les données de requête vont dans le scope de la requête (dependencies request-scoped).- Fermeture = pas de socket qui traîne. Un client non fermé peut maintenir des connexions vers l'API ; en environnement multi-tenant, ferme proprement au shutdown.
Observabilité
- Logue le startup et le shutdown avec un timestamp : c'est ton signal de redéploiement.
- Expose un
/health(liveness, juste "le process tourne") et un/ready(readiness : "les clients sont prêts, le warmup est fait"). Ne confonds pas les deux — Kubernetes les traite différemment. - Compte les requêtes LLM en vol dans
app.statepour que le shutdown sache combien drainer, et pour exposer une métrique de concurrence.
Les tradeoffs senior
| Décision | Le pour | Le contre |
|---|---|---|
| Client unique dans le lifespan | Keep-alive, retries, pooling | Tous les endpoints partagent les mêmes limites/timeouts |
AsyncExitStack vs cleanup manuel | Cleanup garanti même en cas d'erreur partielle | Un poil plus verbeux |
| Warmup bloquant dans le startup | Première requête rapide | Boot plus lent, risque de readiness timeout |
| Warmup en tâche de fond | Boot instantané | Premières requêtes froides (cache miss) |
Tie-in concret : servir un agent LLM streamé
Le lifespan brille quand on sert un agent. Voici un endpoint SSE qui streame les tokens d'Anthropic, en réutilisant le client partagé, avec gestion des exceptions typées et drainage propre.
from anthropic import APIStatusError, RateLimitError
from fastapi.responses import StreamingResponse
@app.post("/agent/stream")
async def agent_stream(prompt: str, client: AnthropicDep) -> StreamingResponse:
async def token_generator():
try:
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=4096,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
# format Server-Sent Events
yield f"data: {text}\n\n"
final = await stream.get_final_message()
yield f"event: done\ndata: {final.stop_reason}\n\n"
except RateLimitError:
# le SDK a déjà retry max_retries fois ; ici on a épuisé
yield "event: error\ndata: rate_limited\n\n"
except APIStatusError as exc:
yield f"event: error\ndata: {exc.status_code}\n\n"
return StreamingResponse(token_generator(), media_type="text/event-stream")Pourquoi le streaming ? Au-delà de ~16K max_tokens, une requête non-streamée risque un timeout HTTP. Le streaming évite ça et donne un time-to-first-token bas. Le client AsyncAnthropic partagé via lifespan rend chaque connexion SSE quasi-gratuite côté réseau.
Pour un drainage propre des streams en vol au shutdown :
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
anthropic = AsyncAnthropic(max_retries=4)
app.state.deps = AppState(anthropic=anthropic, http=httpx.AsyncClient())
app.state.inflight = 0 # compteur de streams actifs
yield
# Laisse les streams en cours se terminer avant de fermer le client.
for _ in range(50): # max 5 s d'attente
if app.state.inflight == 0:
break
await asyncio.sleep(0.1)
await anthropic.close()
await app.state.deps.http.aclose()🏋️ Exercices
Exercice 1 — Du on_event au lifespan (implémenter)
Objectif : Tu hérites d'une app FastAPI qui crée un AsyncAnthropic dans @app.on_event("startup") et le ferme dans @app.on_event("shutdown"). Migre vers un lifespan typé avec dataclass + app.state, et expose le client via une dependency Annotated.
Indice/Solution : Crée @asynccontextmanager async def lifespan(app), déplace la création avant yield et la fermeture après. Range le client dans une @dataclass AppState sur app.state.deps. Écris def get_anthropic(request: Request) -> AsyncAnthropic et un alias AnthropicDep = Annotated[AsyncAnthropic, Depends(get_anthropic)]. Passe lifespan=lifespan à FastAPI(...). Vérifie qu'aucune route n'instancie de client.
Exercice 2 — Cleanup garanti avec AsyncExitStack (production-grade)
Objectif : Ouvre trois ressources au startup (client Anthropic, pool DB factice, client httpx). Simule un échec d'ouverture de la 2e. Prouve que les ressources déjà ouvertes sont bien fermées, sans fuite.
Indice/Solution : Utilise async with AsyncExitStack() as stack. Pour chaque ressource, soit stack.enter_async_context(...) (si c'est un context manager), soit stack.push_async_callback(resource.close) juste après création. Insère un raise RuntimeError("db down") après le 1er client. Ajoute des print/logs dans chaque close. Lance l'app et constate que anthropic.close() est appelé malgré le crash. Compare avec une version sans AsyncExitStack qui, elle, fuit.
Exercice 3 — Readiness vs liveness avec warmup à timeout (production-grade)
Objectif : Le startup doit pré-chauffer un cache de prompt en faisant un appel max_tokens=0 (cache pre-warm) vers claude-opus-4-8. Si le warmup dépasse 10 s, l'app doit démarrer en mode dégradé sans bloquer. Expose /health (toujours 200 si le process tourne) et /ready (200 seulement si warm == True).
Indice/Solution : Dans le lifespan, après création du client, enveloppe le warmup dans asyncio.wait_for(warmup(client), timeout=10.0) et stocke app.state.deps.warm. Le warmup fait un appel de pré-chauffage de cache (max_tokens=0) :
async def warmup(client: AsyncAnthropic) -> None:
await client.messages.create(
model="claude-opus-4-8",
max_tokens=0, # cache pre-warm : prefill seul, 0 token de sortie facturé
system=[
{
"type": "text",
"text": SYS,
"cache_control": {"type": "ephemeral"}, # breakpoint sur le préfixe stable
}
],
messages=[{"role": "user", "content": "warmup"}],
)/ready renvoie JSONResponse(status_code=503) si not warm. Note : max_tokens=0 n'est pas compatible avec stream=True, thinking activé, output_config.format, ni tool_choice forcé — c'est strictement le pattern de pré-chauffage de cache.
Exercice 4 — Tool-use loop avec drainage au shutdown (break-then-fix)
Objectif : Implémente un endpoint qui lance une boucle tool-use (Claude appelle un outil get_weather, tu renvoies le résultat, Claude continue). Maintiens un compteur app.state.inflight. Casse délibérément le shutdown en fermant le client avant de drainer, observe une boucle qui plante au milieu, puis répare avec le drainage à timeout.
Indice/Solution : Incrémente app.state.inflight à l'entrée de l'endpoint, décrémente dans un finally. La boucle : while response.stop_reason == "tool_use", exécute l'outil, ré-appelle messages.create avec le tool_result. Version cassée : await anthropic.close() immédiat au shutdown → une boucle en vol lève une exception sur le client fermé. Version réparée : boucle d'attente while app.state.inflight and elapsed < 5s: await asyncio.sleep(0.1) avant close().
Exercice 5 — Multi-worker et dimensionnement des pools (raisonnement + mesure)
Objectif : Lance l'app avec uvicorn --workers 4. Ajoute un log au startup affichant os.getpid(). Démontre que le lifespan tourne 4 fois. Puis raisonne : si chaque client httpx a max_connections=100, combien de connexions sortantes ton service peut-il ouvrir au total ? Ajuste pour rester sous une limite cible de 200.
Indice/Solution : import os; print("startup pid", os.getpid()) dans le lifespan — tu verras 4 PIDs distincts. Total connexions = workers × max_connections = 4 × 100 = 400. Pour rester sous 200, mets max_connections=50 par client. Documente le calcul dans un commentaire. Bonus : explique pourquoi un singleton process-wide ne suffit pas à dédupliquer entre workers (mémoire non partagée).
Exercice 6 — Lifespan testable avec dépendances mockées (test)
Objectif : Écris un test pytest qui lance l'app via httpx.ASGITransport + lifespan, en remplaçant AsyncAnthropic par un fake qui renvoie une réponse déterministe. Vérifie que /summarize fonctionne sans toucher l'API réelle, et que le client fake est bien fermé au teardown.
Indice/Solution : Utilise app.dependency_overrides[get_anthropic] = lambda: fake_client. Le fake expose messages.create en AsyncMock renvoyant un objet avec .content = [TextBlock(type="text", text="résumé")]. Pour exécuter le lifespan en test : async with LifespanManager(app): (paquet asgi-lifespan) ou le support natif d'httpx.ASGITransport. Assert que fake_client.close a été appelé après la sortie du context manager.
🎤 En entretien
Q : Pourquoi lifespan plutôt que @app.on_event("startup") ? R : on_event est déprécié ; le lifespan est un context manager qui partage un scope entre startup et shutdown via le yield, compose avec async with/AsyncExitStack, et garantit le cleanup même si le startup échoue à mi-chemin.
Q : Tu sers un agent LLM. Où instancies-tu le client Anthropic, et pourquoi ? R : Une seule fois dans le lifespan, rangé dans app.state, injecté par DI — pour réutiliser le pool de connexions httpx, le keep-alive et les retries du SDK ; un client par requête repaie le handshake TLS et perd le retry-after.
Q : Que se passe-t-il pour le lifespan avec uvicorn --workers 4 ? R : Il s'exécute une fois par worker (4 process indépendants, pas de mémoire partagée), donc 4 clients et 4 pools — il faut dimensionner max_connections et les rate limits en multipliant par le nombre de workers.
Q : Comment garantis-tu qu'un appel LLM de 30 s en cours ne casse pas pendant un redéploiement ? R : Au shutdown (après yield), on draine : on attend que le compteur de requêtes/streams en vol retombe à zéro (avec un timeout, p. ex. 5 s) avant de fermer le client ; Uvicorn draine les requêtes HTTP, mais les tâches de fond/SSE demandent un drainage explicite.