Context managers
TL;DR — Un context manager est un objet qui garantit qu'une action de setup et son action de teardown s'exécutent en paire, même si une exception explose au milieu : c'est le
try/finallyrendu réutilisable et déclaratif viawith. Tu l'utilises pour tout ce qui doit être libéré de façon déterministe — fichiers, connexions DB, locks, sessions HTTP, spans de tracing, transactions. Le protocole est minuscule (__enter__/__exit__, ou__aenter__/__aexit__en async), et@contextlib.contextmanagerte permet d'en écrire un en deux lignes autour d'unyield. La règle d'or : le cleanup vit dans lefinally, jamais après leyieldnu — sinon une exception saute ton teardown et tu fuites une connexion. En production AI, tu t'en serviras pour leasync withdu clientAsyncAnthropic(pool de connexions httpx), pour lemessages.stream()(qui ferme la connexion SSE), et pour instrumenter tes appels LLM avec des spans qui se ferment proprement même sur timeout.
🧠 Mental model
Un context manager, c'est un sas d'aéroport (airlock). Tu ne peux pas entrer dans la zone sans que la porte extérieure se ferme d'abord (__enter__ : setup), et quand tu ressors, la porte se referme automatiquement derrière toi, que tu sortes calmement ou en courant parce qu'il y a le feu (__exit__ : teardown, appelé même sur exception).
Le point clé que les ex-devs PHP/TS ratent souvent : en JavaScript/TypeScript, la libération de ressources est soit manuelle (finally), soit déléguée au GC (non déterministe). En Python, with te donne une libération déterministe et locale — tu sais exactement quand la ressource est fermée : à la sortie du bloc, point. (TC39 rattrape ça avec using/Symbol.dispose, mais Python l'a depuis 2005.)
with open("f.txt") as f: ┌──────────────────────────────┐
data = f.read() │ __enter__() → renvoie `f` │ setup
process(data) │ ┌──────────────────────────┐ │
│ │ corps du bloc │ │ ← ton code
# ici f est DÉJÀ fermé │ │ (peut lever une excep.) │ │
│ └──────────────────────────┘ │
│ __exit__(typ, val, tb) │ teardown
│ ← TOUJOURS appelé │ (même si exception)
└──────────────────────────────┘
Équivalent désucré :
mgr = open("f.txt")
f = mgr.__enter__()
try:
data = f.read(); process(data)
finally:
mgr.__exit__(*sys.exc_info()) # ou (None, None, None) si pas d'exceptionMentalement : with = try/finally packagé dans un objet réutilisable. Tout ce que tu écrirais dans un finally répété partout devient un context manager écrit une fois.
Les fondamentaux : le protocole with
Un objet est un context manager s'il implémente deux méthodes. C'est tout.
import time
from types import TracebackType
class Timer:
"""Chronomètre le bloc qu'il enveloppe."""
def __enter__(self) -> "Timer":
self.start = time.perf_counter()
return self # ← la valeur liée par `as`
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
self.elapsed = time.perf_counter() - self.start
print(f"durée : {self.elapsed:.3f}s")
return False # ← ne PAS avaler l'exception (voir plus bas)
with Timer() as t:
sum(range(10_000_000))
# durée : 0.142sDeux subtilités qui font la différence entre junior et senior :
__enter__renvoie ce qui sera lié paras. Souventself, mais pas toujours :open()renvoie le fichier, pas le manager. Tu peux renvoyer n'importe quoi (un curseur DB, un client, un tuple…).__exit__reçoit l'exception et décide de son sort. Les trois arguments sont(None, None, None)si le bloc s'est bien passé, ou décrivent l'exception sinon. Sa valeur de retour est cruciale :return False(ouNone, implicite) → l'exception se propage normalement après le cleanup. C'est ce que tu veux 99 % du temps.return True→ l'exception est avalée (supprimée). Dangereux : à réserver à des cas explicites (contextlib.suppress).
La mauvaise façon (le piège classique) : avaler les exceptions par accident
# ❌ MAUVAIS — return True masque toutes les erreurs
class DbConnection:
def __enter__(self):
self.conn = connect()
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
return True # ← CATASTROPHE : avale silencieusement TOUTES les exceptions
with DbConnection() as conn:
conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
raise ValueError("bug métier") # ← cette exception DISPARAÎT
# le programme continue comme si de rien n'était, transaction à moitié appliquéeLe return True transforme ton context manager en trou noir à exceptions. Le bug ci-dessus est invisible : aucun traceback, aucune alerte, juste un état corrompu. Règle non négociable : __exit__ renvoie False/None sauf si tu veux explicitement supprimer une exception précise — et même alors, filtre sur exc_type :
# ✅ Suppression ciblée et explicite
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
self.conn.close()
# on ne supprime QUE l'erreur "déjà fermé", rien d'autre
return exc_type is not None and issubclass(exc_type, AlreadyClosedError)La façon idiomatique : @contextlib.contextmanager
Écrire une classe avec __enter__/__exit__ pour un manager simple est verbeux. La forme idiomatique utilise un générateur : tout ce qui est avant le yield est le setup, ce qui est après est le teardown.
import contextlib
import time
from collections.abc import Iterator
@contextlib.contextmanager
def timer(label: str) -> Iterator[None]:
start = time.perf_counter()
try:
yield # ← le corps du `with` s'exécute ICI
finally:
elapsed = time.perf_counter() - start
print(f"{label} : {elapsed:.3f}s")
with timer("calcul lourd"):
sum(range(10_000_000))
# calcul lourd : 0.142sSi ton manager produit une valeur (as x), tu la passes au yield :
import contextlib
import os
import tempfile
from collections.abc import Iterator
@contextlib.contextmanager
def temp_file(content: str) -> Iterator[str]:
fd, path = tempfile.mkstemp()
try:
with open(fd, "w") as f:
f.write(content)
yield path # ← `as path` reçoit cette valeur
finally:
os.unlink(path) # cleanup garanti
with temp_file("hello") as path:
with open(path) as f: # on ferme aussi ce handle proprement
print(f.read()) # hello
# le fichier temporaire est supprimé iciLe piège mortel de @contextmanager : le try/finally n'est pas optionnel
# ❌ MAUVAIS — pas de try/finally
@contextlib.contextmanager
def db_session():
session = Session()
yield session
session.close() # ← jamais appelé si le bloc lève une exception !
# ✅ BON — le cleanup est dans le finally
@contextlib.contextmanager
def db_session() -> Iterator[Session]:
session = Session()
try:
yield session
except Exception:
session.rollback()
raise # ré-émet : on ne masque pas l'erreur
finally:
session.close() # TOUJOURS exécutéMécaniquement, quand une exception explose dans le bloc with, elle est réinjectée au point du yield (via gen.throw()). Si ton yield n'est pas protégé par try/finally, le code après le yield est sauté et tu fuites la ressource. C'est la cause n°1 de fuites de connexions DB en production Python.
Composition : with multiple et ExitStack
Plusieurs managers sur une ligne, fermés en ordre inverse (LIFO, comme une pile) :
with open("in.txt") as src, open("out.txt", "w") as dst:
dst.write(src.read())
# dst fermé d'abord, puis srcQuand le nombre de ressources est dynamique (connu seulement à l'exécution), with statique ne suffit pas. contextlib.ExitStack gère une pile variable de cleanups :
import contextlib
def merge_files(paths: list[str], out: str) -> None:
with contextlib.ExitStack() as stack:
files = [stack.enter_context(open(p)) for p in paths]
with open(out, "w") as dst:
for f in files:
dst.write(f.read())
# tous les fichiers de `files` sont fermés ici, en ordre LIFO,
# même si l'un des open() avait échoué en cours de routeExitStack est l'outil senior par excellence : il sert aussi à enregistrer des callbacks de cleanup conditionnels (stack.callback(fn)) et à transférer la responsabilité de fermeture hors du bloc (stack.pop_all() pour le pattern « commit/rollback »).
⚙️ En production
Async : async with et le client Anthropic
Tout le protocole a un jumeau asynchrone : __aenter__/__aexit__, décorateur @contextlib.asynccontextmanager, et mot-clé async with. Indispensable dès que tu touches à de l'I/O réseau — donc à tout appel LLM.
Le client AsyncAnthropic est lui-même un async context manager : il détient un pool de connexions httpx qu'il faut fermer pour ne pas fuiter des sockets.
import asyncio
from anthropic import AsyncAnthropic
async def ask(question: str) -> str:
# `async with` ferme le pool de connexions httpx à la sortie, même sur exception
async with AsyncAnthropic() as client:
message = await client.messages.create(
model="claude-opus-4-8",
max_tokens=1024,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": question}],
)
return "".join(b.text for b in message.content if b.type == "text")
print(asyncio.run(ask("Explique les context managers en une phrase.")))Pool partagé, pas par requête
Dans un service FastAPI, tu ne crées pas un AsyncAnthropic() par requête — ouvrir/fermer le pool httpx à chaque appel tue la performance et la réutilisation des connexions. Crée un client au démarrage (via le lifespan), réutilise-le, et ferme-le à l'arrêt. Voir plus bas.
Streaming : le manager qui ferme la connexion SSE
client.messages.stream() renvoie un context manager (pas une coroutine) : il faut le consommer dans un async with, sinon la connexion Server-Sent Events reste ouverte. C'est exactement le cas d'usage où oublier le with fuite une ressource réseau silencieusement.
from anthropic import AsyncAnthropic
async def stream_tokens(client: AsyncAnthropic, prompt: str) -> None:
# `async with` garantit la fermeture de la connexion SSE même si on `break` ou si ça lève
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=2048,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
final = await stream.get_final_message()
print(f"\n[{final.usage.output_tokens} tokens de sortie]")FastAPI : lifespan et Depends, deux saveurs de context manager
FastAPI utilise des context managers à deux endroits que tu dois maîtriser.
1. lifespan — un @asynccontextmanager qui encadre toute la durée de vie de l'app. C'est là que tu ouvres le client Anthropic une fois et le ranges dans app.state :
import contextlib
from collections.abc import AsyncIterator
from anthropic import AsyncAnthropic
from fastapi import Depends, FastAPI, Request
from pydantic import BaseModel
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# setup : avant le premier request
app.state.anthropic = AsyncAnthropic()
yield
# teardown : à l'arrêt du serveur (SIGTERM, Ctrl-C)
await app.state.anthropic.close()
app = FastAPI(lifespan=lifespan)
def get_client(request: Request) -> AsyncAnthropic:
return request.app.state.anthropic
class AskBody(BaseModel):
question: str
class AskResponse(BaseModel):
answer: str
@app.post("/ask")
async def ask(body: AskBody, client: AsyncAnthropic = Depends(get_client)) -> AskResponse:
message = await client.messages.create(
model="claude-opus-4-8",
max_tokens=1024,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": body.question}],
)
answer = "".join(b.text for b in message.content if b.type == "text")
return AskResponse(answer=answer)2. Depends avec yield — une dépendance FastAPI peut elle-même être un générateur : le code avant yield s'exécute avant le handler, le code après après la réponse. C'est un context manager déguisé, parfait pour une session DB par requête :
from collections.abc import AsyncIterator
async def get_session() -> AsyncIterator[Session]:
async with async_session_factory() as session: # ← async with imbriqué
yield session # injecté dans le handler
# cleanup (close/rollback) garanti après l'envoi de la réponseFailure modes — ce qui casse en vrai
| Symptôme | Cause | Correctif |
|---|---|---|
| Fuite de connexions DB sous charge, le pool s'épuise | yield sans try/finally dans un @contextmanager | Mettre le close() dans finally |
| Exceptions qui disparaissent, état corrompu | __exit__ renvoie True | Renvoyer False/None, supprimer de façon ciblée seulement |
Sockets CLOSE_WAIT qui s'accumulent | messages.stream() consommé sans async with | Toujours envelopper dans async with |
| Latence LLM élevée, pas de réutilisation de connexion | AsyncAnthropic() créé par requête | Un client au lifespan, partagé |
RuntimeError: generator didn't yield | @contextmanager qui lève avant le yield | Faire le setup risqué avant le try, avec gestion explicite |
__exit__ lève une exception qui masque l'originale | cleanup buggé | finally doit être infaillible ; logge mais ne lève pas |
Observabilité : un manager pour instrumenter chaque appel LLM
Le pattern senior : encapsuler la latence + le coût + le traçage d'un appel Anthropic dans un seul context manager réutilisable. Le finally garantit que tu enregistres la métrique même sur timeout ou RateLimitError.
import contextlib
import time
from collections.abc import AsyncIterator
from dataclasses import dataclass
import anthropic
from anthropic import AsyncAnthropic
# tarifs claude-opus-4-8 : 5 $ / Mtok entrée, 25 $ / Mtok sortie
PRICE_IN, PRICE_OUT = 5.0 / 1e6, 25.0 / 1e6
@dataclass
class CallMetrics:
input_tokens: int = 0
output_tokens: int = 0
elapsed: float = 0.0
error: str | None = None
@property
def cost_usd(self) -> float:
return self.input_tokens * PRICE_IN + self.output_tokens * PRICE_OUT
@contextlib.asynccontextmanager
async def traced_call(label: str) -> AsyncIterator[CallMetrics]:
metrics = CallMetrics()
start = time.perf_counter()
try:
yield metrics # l'appelant remplit metrics.*_tokens
except anthropic.APIError as exc:
metrics.error = type(exc).__name__ # on capture l'erreur typée
raise # mais on la ré-émet
finally:
metrics.elapsed = time.perf_counter() - start
# émis quoi qu'il arrive : succès, timeout, rate-limit
print(
f"[{label}] {metrics.elapsed:.2f}s "
f"{metrics.input_tokens}+{metrics.output_tokens} tok "
f"${metrics.cost_usd:.4f} err={metrics.error}"
)
async def summarize(client: AsyncAnthropic, text: str) -> str:
async with traced_call("summarize") as m:
msg = await client.messages.create(
model="claude-opus-4-8",
max_tokens=512,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": f"Résume : {text}"}],
)
m.input_tokens = msg.usage.input_tokens
m.output_tokens = msg.usage.output_tokens
return "".join(b.text for b in msg.content if b.type == "text")Les tradeoffs senior
- Classe vs
@contextmanager: générateur pour le cas simple (un setup, un teardown, pas d'état réutilisable) ; classe quand l'objet a une vie propre, plusieurs méthodes, ou doit être réentrant. Un@contextmanagern'est pas réentrant ni réutilisable — un générateur est à usage unique. Pour un manager réutilisable (le même objet dans plusieurswith), il faut une classe. __exit__doit être infaillible : s'il lève, son exception masque celle du bloc (sauf chaînage). Garde-le trivial — unclose(), pas de logique métier.- Ne mets pas de logique de retry dans
__exit__: c'est le rôle d'un décorateur (cf. leçon 01), pas du teardown. Sépare les responsabilités. Côté SDK,AsyncAnthropicretry déjà429/5xxen interne (max_retries) ; un context manager y ajoute le timeout wall-clock et la mesure, pas le retry. contextlib.suppress(FileNotFoundError)remplace avantageusement untry/except/pass— plus lisible, intention explicite.
🏋️ Exercices
Exercice 1 — Le manager try/finally (échauffement)
Objectif : écris un context manager cd(path) qui change le répertoire courant à l'entrée et le restaure à la sortie, même si le bloc lève une exception. Deux versions : une classe et un @contextmanager.
Indice / Solution
Le piège : capturer le cwd d'origine avant de changer, et restaurer dans finally.
import contextlib
import os
from collections.abc import Iterator
@contextlib.contextmanager
def cd(path: str) -> Iterator[None]:
origin = os.getcwd() # capturer AVANT
os.chdir(path)
try:
yield
finally:
os.chdir(origin) # restaurer TOUJOURSTeste-le en levant une exception dans le bloc et vérifie que os.getcwd() est revenu à l'origine.
Exercice 2 — Suppression ciblée (intermédiaire)
Objectif : écris swallow(*exc_types), un context manager qui supprime uniquement les exceptions des types passés, et laisse passer les autres. Reproduis le comportement de contextlib.suppress à la main.
Indice / Solution
Tout est dans la valeur de retour de __exit__ : True pour avaler, False pour propager. Filtre sur issubclass.
class swallow:
def __init__(self, *exc_types: type[BaseException]) -> None:
self.exc_types = exc_types
def __enter__(self) -> "swallow":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
return exc_type is not None and issubclass(exc_type, self.exc_types)
with swallow(KeyError):
{}["absent"] # avalée
with swallow(KeyError):
raise ValueError("x") # propagée → le test doit voir le ValueErrorExercice 3 — ExitStack dynamique (intermédiaire/dur)
Objectif : écris open_all(paths) qui ouvre une liste de fichiers de taille inconnue à la compilation, renvoie la liste des objets fichier, et garantit que tous sont fermés à la sortie — y compris si l'ouverture du 3ᵉ fichier échoue (les 2 premiers doivent quand même être fermés).
Indice / Solution
with statique ne marche pas (nombre variable). ExitStack.enter_context enregistre chaque ressource sur la pile ; si un open() lève, la pile déjà constituée est dépilée proprement.
import contextlib
from collections.abc import Iterator
from typing import IO
@contextlib.contextmanager
def open_all(paths: list[str]) -> Iterator[list[IO[str]]]:
with contextlib.ExitStack() as stack:
files = [stack.enter_context(open(p)) for p in paths]
yield files
# sortie : tous fermés en LIFO, même sur échec partiel d'ouvertureTest du cas dur : passe un chemin inexistant en 3ᵉ position, et vérifie (via file.closed) que les 2 premiers sont bien fermés.
Exercice 4 — Manager async pour appel LLM avec timeout (dur, AI)
Objectif : écris un @asynccontextmanager llm_call(*, timeout) qui (a) impose un timeout via asyncio.timeout, (b) capture et classifie les exceptions typées du SDK (RateLimitError, APITimeoutError), (c) enregistre la latence dans tous les cas. L'appelant fait async with llm_call(timeout=20) as ctx: ctx.message = await client.messages.create(...).
Indice / Solution
asyncio.timeout(timeout) est lui-même un async context manager — tu en imbriques un. Le finally capture la latence ; le except classifie sans avaler (raise).
import asyncio
import contextlib
import time
from collections.abc import AsyncIterator
from dataclasses import dataclass
import anthropic
@dataclass
class LlmCtx:
message: anthropic.types.Message | None = None
elapsed: float = 0.0
error: str | None = None
@contextlib.asynccontextmanager
async def llm_call(*, timeout: float) -> AsyncIterator[LlmCtx]:
ctx = LlmCtx()
start = time.perf_counter()
try:
async with asyncio.timeout(timeout): # CM imbriqué
yield ctx
except (anthropic.RateLimitError, anthropic.APITimeoutError, TimeoutError) as exc:
ctx.error = type(exc).__name__
raise
finally:
ctx.elapsed = time.perf_counter() - start
print(f"llm_call {ctx.elapsed:.2f}s err={ctx.error}")Note : le SDK Anthropic retry déjà 429/5xx en interne (max_retries) ; ce manager ajoute un timeout wall-clock et la mesure, deux choses que le retry SDK ne couvre pas.
Exercice 5 — Casse-puis-répare : la fuite de connexion (dur)
Objectif : on te donne ce code de pool de connexions qui fuit sous charge. Reproduis la fuite, identifie la cause, corrige-la, puis prouve la correction.
# Code fourni — bugué
@contextlib.contextmanager
def borrow(pool):
conn = pool.acquire()
yield conn
pool.release(conn) # ← pourquoi ça fuite ?Indice / Solution
La fuite : si le code dans le with lève, release() n'est jamais atteint → la connexion n'est jamais rendue au pool → sous charge, le pool s'épuise et tout se bloque.
Repro : with borrow(pool) as c: raise RuntimeError, puis observe pool.available qui ne remonte pas.
Correctif :
import contextlib
from collections.abc import Iterator
@contextlib.contextmanager
def borrow(pool) -> Iterator[Connection]:
conn = pool.acquire()
try:
yield conn
finally:
pool.release(conn) # rendu garantiPreuve : un test qui lève dans le bloc et asserte que pool.available est revenu à sa valeur initiale. C'est exactement le bug n°1 des services Python en prod — la version statique with open(...) cache le piège parce que open() est correctement écrit, mais ton propre manager ne l'est pas par défaut.
Exercice 6 — Manager réentrant et réutilisable (expert)
Objectif : @contextmanager produit un objet à usage unique. Écris une classe Lock réutilisable (le même objet utilisable dans plusieurs with successifs) et réentrante (un with imbriqué sur le même objet ne deadlock pas). Compare avec une tentative en @contextmanager et explique pourquoi elle échoue.
Indice / Solution
Un générateur est épuisé après un with : le réutiliser lève RuntimeError: generator didn't yield (il a déjà rendu). D'où la classe, adossée à un RLock (réentrant).
import threading
class Lock:
def __init__(self) -> None:
self._lock = threading.RLock()
def __enter__(self) -> "Lock":
self._lock.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
self._lock.release()
return False
lk = Lock()
with lk: # usage 1
with lk: # réentrant : RLock supporte l'acquisition imbriquée, pas de deadlock
...
with lk: # réutilisé : OK car c'est une classe, pas un générateur épuisé
...La leçon senior : choisis classe dès que tu as besoin de réutilisation, de réentrance, ou d'un état interrogeable. Le générateur est pour le cas jetable simple.
🎤 En entretien
Q : Quelle est la différence entre with et un simple try/finally ? Pourquoi un mot-clé dédié ? R : with est un try/finally réutilisable et nommé — tu encapsules la paire setup/teardown une fois dans un objet (ou un générateur) au lieu de la dupliquer chez chaque appelant ; bonus, la libération est déterministe et locale au bloc.
Q : Que se passe-t-il si une exception est levée dans un bloc with ? Et que contrôle la valeur de retour de __exit__ ? R : __exit__ est appelé avec le type/valeur/traceback de l'exception (cleanup garanti) ; s'il renvoie falsy l'exception se propage normalement, s'il renvoie True elle est supprimée — ce qu'on ne veut presque jamais, car ça masque les erreurs silencieusement.
Q : Pourquoi @contextlib.contextmanager exige-t-il un try/finally autour du yield ? R : Parce qu'une exception du bloc with est réinjectée au point du yield via gen.throw() ; sans finally, le teardown après le yield est sauté et la ressource fuite — c'est la cause n°1 des fuites de connexions DB en Python.
Q : Comment gérer un nombre dynamique de ressources, ou un client LLM dans FastAPI ? R : contextlib.ExitStack (ou AsyncExitStack) pour un nombre variable de ressources fermées en LIFO ; pour FastAPI, le client AsyncAnthropic s'ouvre une fois dans le lifespan (@asynccontextmanager) et se ferme à l'arrêt — jamais un client par requête, sinon on tue la réutilisation du pool httpx.