Skip to content

Context managers

TL;DR — Un context manager est un objet qui garantit qu'une action de setup et son action de teardown s'exécutent en paire, même si une exception explose au milieu : c'est le try/finally rendu réutilisable et déclaratif via with. Tu l'utilises pour tout ce qui doit être libéré de façon déterministe — fichiers, connexions DB, locks, sessions HTTP, spans de tracing, transactions. Le protocole est minuscule (__enter__ / __exit__, ou __aenter__ / __aexit__ en async), et @contextlib.contextmanager te permet d'en écrire un en deux lignes autour d'un yield. La règle d'or : le cleanup vit dans le finally, jamais après le yield nu — sinon une exception saute ton teardown et tu fuites une connexion. En production AI, tu t'en serviras pour le async with du client AsyncAnthropic (pool de connexions httpx), pour le messages.stream() (qui ferme la connexion SSE), et pour instrumenter tes appels LLM avec des spans qui se ferment proprement même sur timeout.

🧠 Mental model

Un context manager, c'est un sas d'aéroport (airlock). Tu ne peux pas entrer dans la zone sans que la porte extérieure se ferme d'abord (__enter__ : setup), et quand tu ressors, la porte se referme automatiquement derrière toi, que tu sortes calmement ou en courant parce qu'il y a le feu (__exit__ : teardown, appelé même sur exception).

Le point clé que les ex-devs PHP/TS ratent souvent : en JavaScript/TypeScript, la libération de ressources est soit manuelle (finally), soit déléguée au GC (non déterministe). En Python, with te donne une libération déterministe et locale — tu sais exactement quand la ressource est fermée : à la sortie du bloc, point. (TC39 rattrape ça avec using/Symbol.dispose, mais Python l'a depuis 2005.)

   with open("f.txt") as f:        ┌──────────────────────────────┐
       data = f.read()             │  __enter__()  → renvoie `f`    │  setup
       process(data)               │  ┌──────────────────────────┐ │
                                   │  │  corps du bloc            │ │  ← ton code
   # ici f est DÉJÀ fermé           │  │  (peut lever une excep.)  │ │
                                   │  └──────────────────────────┘ │
                                   │  __exit__(typ, val, tb)        │  teardown
                                   │     ← TOUJOURS appelé          │  (même si exception)
                                   └──────────────────────────────┘

   Équivalent désucré :
       mgr = open("f.txt")
       f = mgr.__enter__()
       try:
           data = f.read(); process(data)
       finally:
           mgr.__exit__(*sys.exc_info())   # ou (None, None, None) si pas d'exception

Mentalement : with = try/finally packagé dans un objet réutilisable. Tout ce que tu écrirais dans un finally répété partout devient un context manager écrit une fois.

Les fondamentaux : le protocole with

Un objet est un context manager s'il implémente deux méthodes. C'est tout.

python
import time
from types import TracebackType


class Timer:
    """Chronomètre le bloc qu'il enveloppe."""

    def __enter__(self) -> "Timer":
        self.start = time.perf_counter()
        return self  # ← la valeur liée par `as`

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> bool:
        self.elapsed = time.perf_counter() - self.start
        print(f"durée : {self.elapsed:.3f}s")
        return False  # ← ne PAS avaler l'exception (voir plus bas)


with Timer() as t:
    sum(range(10_000_000))
# durée : 0.142s

Deux subtilités qui font la différence entre junior et senior :

  1. __enter__ renvoie ce qui sera lié par as. Souvent self, mais pas toujours : open() renvoie le fichier, pas le manager. Tu peux renvoyer n'importe quoi (un curseur DB, un client, un tuple…).

  2. __exit__ reçoit l'exception et décide de son sort. Les trois arguments sont (None, None, None) si le bloc s'est bien passé, ou décrivent l'exception sinon. Sa valeur de retour est cruciale :

    • return False (ou None, implicite) → l'exception se propage normalement après le cleanup. C'est ce que tu veux 99 % du temps.
    • return True → l'exception est avalée (supprimée). Dangereux : à réserver à des cas explicites (contextlib.suppress).

La mauvaise façon (le piège classique) : avaler les exceptions par accident

python
# ❌ MAUVAIS — return True masque toutes les erreurs
class DbConnection:
    def __enter__(self):
        self.conn = connect()
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.conn.close()
        return True  # ← CATASTROPHE : avale silencieusement TOUTES les exceptions


with DbConnection() as conn:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    raise ValueError("bug métier")   # ← cette exception DISPARAÎT
# le programme continue comme si de rien n'était, transaction à moitié appliquée

Le return True transforme ton context manager en trou noir à exceptions. Le bug ci-dessus est invisible : aucun traceback, aucune alerte, juste un état corrompu. Règle non négociable : __exit__ renvoie False/None sauf si tu veux explicitement supprimer une exception précise — et même alors, filtre sur exc_type :

python
# ✅ Suppression ciblée et explicite
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
    self.conn.close()
    # on ne supprime QUE l'erreur "déjà fermé", rien d'autre
    return exc_type is not None and issubclass(exc_type, AlreadyClosedError)

La façon idiomatique : @contextlib.contextmanager

Écrire une classe avec __enter__/__exit__ pour un manager simple est verbeux. La forme idiomatique utilise un générateur : tout ce qui est avant le yield est le setup, ce qui est après est le teardown.

python
import contextlib
import time
from collections.abc import Iterator


@contextlib.contextmanager
def timer(label: str) -> Iterator[None]:
    start = time.perf_counter()
    try:
        yield                       # ← le corps du `with` s'exécute ICI
    finally:
        elapsed = time.perf_counter() - start
        print(f"{label} : {elapsed:.3f}s")


with timer("calcul lourd"):
    sum(range(10_000_000))
# calcul lourd : 0.142s

Si ton manager produit une valeur (as x), tu la passes au yield :

python
import contextlib
import os
import tempfile
from collections.abc import Iterator


@contextlib.contextmanager
def temp_file(content: str) -> Iterator[str]:
    fd, path = tempfile.mkstemp()
    try:
        with open(fd, "w") as f:
            f.write(content)
        yield path                  # ← `as path` reçoit cette valeur
    finally:
        os.unlink(path)             # cleanup garanti


with temp_file("hello") as path:
    with open(path) as f:           # on ferme aussi ce handle proprement
        print(f.read())             # hello
# le fichier temporaire est supprimé ici

Le piège mortel de @contextmanager : le try/finally n'est pas optionnel

python
# ❌ MAUVAIS — pas de try/finally
@contextlib.contextmanager
def db_session():
    session = Session()
    yield session
    session.close()                 # ← jamais appelé si le bloc lève une exception !


# ✅ BON — le cleanup est dans le finally
@contextlib.contextmanager
def db_session() -> Iterator[Session]:
    session = Session()
    try:
        yield session
    except Exception:
        session.rollback()
        raise                       # ré-émet : on ne masque pas l'erreur
    finally:
        session.close()             # TOUJOURS exécuté

Mécaniquement, quand une exception explose dans le bloc with, elle est réinjectée au point du yield (via gen.throw()). Si ton yield n'est pas protégé par try/finally, le code après le yield est sauté et tu fuites la ressource. C'est la cause n°1 de fuites de connexions DB en production Python.

Composition : with multiple et ExitStack

Plusieurs managers sur une ligne, fermés en ordre inverse (LIFO, comme une pile) :

python
with open("in.txt") as src, open("out.txt", "w") as dst:
    dst.write(src.read())
# dst fermé d'abord, puis src

Quand le nombre de ressources est dynamique (connu seulement à l'exécution), with statique ne suffit pas. contextlib.ExitStack gère une pile variable de cleanups :

python
import contextlib


def merge_files(paths: list[str], out: str) -> None:
    with contextlib.ExitStack() as stack:
        files = [stack.enter_context(open(p)) for p in paths]
        with open(out, "w") as dst:
            for f in files:
                dst.write(f.read())
        # tous les fichiers de `files` sont fermés ici, en ordre LIFO,
        # même si l'un des open() avait échoué en cours de route

ExitStack est l'outil senior par excellence : il sert aussi à enregistrer des callbacks de cleanup conditionnels (stack.callback(fn)) et à transférer la responsabilité de fermeture hors du bloc (stack.pop_all() pour le pattern « commit/rollback »).

⚙️ En production

Async : async with et le client Anthropic

Tout le protocole a un jumeau asynchrone : __aenter__/__aexit__, décorateur @contextlib.asynccontextmanager, et mot-clé async with. Indispensable dès que tu touches à de l'I/O réseau — donc à tout appel LLM.

Le client AsyncAnthropic est lui-même un async context manager : il détient un pool de connexions httpx qu'il faut fermer pour ne pas fuiter des sockets.

python
import asyncio
from anthropic import AsyncAnthropic


async def ask(question: str) -> str:
    # `async with` ferme le pool de connexions httpx à la sortie, même sur exception
    async with AsyncAnthropic() as client:
        message = await client.messages.create(
            model="claude-opus-4-8",
            max_tokens=1024,
            thinking={"type": "adaptive"},
            messages=[{"role": "user", "content": question}],
        )
        return "".join(b.text for b in message.content if b.type == "text")


print(asyncio.run(ask("Explique les context managers en une phrase.")))

Pool partagé, pas par requête

Dans un service FastAPI, tu ne crées pas un AsyncAnthropic() par requête — ouvrir/fermer le pool httpx à chaque appel tue la performance et la réutilisation des connexions. Crée un client au démarrage (via le lifespan), réutilise-le, et ferme-le à l'arrêt. Voir plus bas.

Streaming : le manager qui ferme la connexion SSE

client.messages.stream() renvoie un context manager (pas une coroutine) : il faut le consommer dans un async with, sinon la connexion Server-Sent Events reste ouverte. C'est exactement le cas d'usage où oublier le with fuite une ressource réseau silencieusement.

python
from anthropic import AsyncAnthropic


async def stream_tokens(client: AsyncAnthropic, prompt: str) -> None:
    # `async with` garantit la fermeture de la connexion SSE même si on `break` ou si ça lève
    async with client.messages.stream(
        model="claude-opus-4-8",
        max_tokens=2048,
        thinking={"type": "adaptive"},
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
        final = await stream.get_final_message()
    print(f"\n[{final.usage.output_tokens} tokens de sortie]")

FastAPI : lifespan et Depends, deux saveurs de context manager

FastAPI utilise des context managers à deux endroits que tu dois maîtriser.

1. lifespan — un @asynccontextmanager qui encadre toute la durée de vie de l'app. C'est là que tu ouvres le client Anthropic une fois et le ranges dans app.state :

python
import contextlib
from collections.abc import AsyncIterator

from anthropic import AsyncAnthropic
from fastapi import Depends, FastAPI, Request
from pydantic import BaseModel


@contextlib.asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # setup : avant le premier request
    app.state.anthropic = AsyncAnthropic()
    yield
    # teardown : à l'arrêt du serveur (SIGTERM, Ctrl-C)
    await app.state.anthropic.close()


app = FastAPI(lifespan=lifespan)


def get_client(request: Request) -> AsyncAnthropic:
    return request.app.state.anthropic


class AskBody(BaseModel):
    question: str


class AskResponse(BaseModel):
    answer: str


@app.post("/ask")
async def ask(body: AskBody, client: AsyncAnthropic = Depends(get_client)) -> AskResponse:
    message = await client.messages.create(
        model="claude-opus-4-8",
        max_tokens=1024,
        thinking={"type": "adaptive"},
        messages=[{"role": "user", "content": body.question}],
    )
    answer = "".join(b.text for b in message.content if b.type == "text")
    return AskResponse(answer=answer)

2. Depends avec yield — une dépendance FastAPI peut elle-même être un générateur : le code avant yield s'exécute avant le handler, le code après après la réponse. C'est un context manager déguisé, parfait pour une session DB par requête :

python
from collections.abc import AsyncIterator


async def get_session() -> AsyncIterator[Session]:
    async with async_session_factory() as session:   # ← async with imbriqué
        yield session                                 # injecté dans le handler
        # cleanup (close/rollback) garanti après l'envoi de la réponse

Failure modes — ce qui casse en vrai

SymptômeCauseCorrectif
Fuite de connexions DB sous charge, le pool s'épuiseyield sans try/finally dans un @contextmanagerMettre le close() dans finally
Exceptions qui disparaissent, état corrompu__exit__ renvoie TrueRenvoyer False/None, supprimer de façon ciblée seulement
Sockets CLOSE_WAIT qui s'accumulentmessages.stream() consommé sans async withToujours envelopper dans async with
Latence LLM élevée, pas de réutilisation de connexionAsyncAnthropic() créé par requêteUn client au lifespan, partagé
RuntimeError: generator didn't yield@contextmanager qui lève avant le yieldFaire le setup risqué avant le try, avec gestion explicite
__exit__ lève une exception qui masque l'originalecleanup buggéfinally doit être infaillible ; logge mais ne lève pas

Observabilité : un manager pour instrumenter chaque appel LLM

Le pattern senior : encapsuler la latence + le coût + le traçage d'un appel Anthropic dans un seul context manager réutilisable. Le finally garantit que tu enregistres la métrique même sur timeout ou RateLimitError.

python
import contextlib
import time
from collections.abc import AsyncIterator
from dataclasses import dataclass

import anthropic
from anthropic import AsyncAnthropic

# tarifs claude-opus-4-8 : 5 $ / Mtok entrée, 25 $ / Mtok sortie
PRICE_IN, PRICE_OUT = 5.0 / 1e6, 25.0 / 1e6


@dataclass
class CallMetrics:
    input_tokens: int = 0
    output_tokens: int = 0
    elapsed: float = 0.0
    error: str | None = None

    @property
    def cost_usd(self) -> float:
        return self.input_tokens * PRICE_IN + self.output_tokens * PRICE_OUT


@contextlib.asynccontextmanager
async def traced_call(label: str) -> AsyncIterator[CallMetrics]:
    metrics = CallMetrics()
    start = time.perf_counter()
    try:
        yield metrics                       # l'appelant remplit metrics.*_tokens
    except anthropic.APIError as exc:
        metrics.error = type(exc).__name__  # on capture l'erreur typée
        raise                               # mais on la ré-émet
    finally:
        metrics.elapsed = time.perf_counter() - start
        # émis quoi qu'il arrive : succès, timeout, rate-limit
        print(
            f"[{label}] {metrics.elapsed:.2f}s "
            f"{metrics.input_tokens}+{metrics.output_tokens} tok "
            f"${metrics.cost_usd:.4f} err={metrics.error}"
        )


async def summarize(client: AsyncAnthropic, text: str) -> str:
    async with traced_call("summarize") as m:
        msg = await client.messages.create(
            model="claude-opus-4-8",
            max_tokens=512,
            thinking={"type": "adaptive"},
            messages=[{"role": "user", "content": f"Résume : {text}"}],
        )
        m.input_tokens = msg.usage.input_tokens
        m.output_tokens = msg.usage.output_tokens
        return "".join(b.text for b in msg.content if b.type == "text")

Les tradeoffs senior

  • Classe vs @contextmanager : générateur pour le cas simple (un setup, un teardown, pas d'état réutilisable) ; classe quand l'objet a une vie propre, plusieurs méthodes, ou doit être réentrant. Un @contextmanager n'est pas réentrant ni réutilisable — un générateur est à usage unique. Pour un manager réutilisable (le même objet dans plusieurs with), il faut une classe.
  • __exit__ doit être infaillible : s'il lève, son exception masque celle du bloc (sauf chaînage). Garde-le trivial — un close(), pas de logique métier.
  • Ne mets pas de logique de retry dans __exit__ : c'est le rôle d'un décorateur (cf. leçon 01), pas du teardown. Sépare les responsabilités. Côté SDK, AsyncAnthropic retry déjà 429/5xx en interne (max_retries) ; un context manager y ajoute le timeout wall-clock et la mesure, pas le retry.
  • contextlib.suppress(FileNotFoundError) remplace avantageusement un try/except/pass — plus lisible, intention explicite.

🏋️ Exercices

Exercice 1 — Le manager try/finally (échauffement)

Objectif : écris un context manager cd(path) qui change le répertoire courant à l'entrée et le restaure à la sortie, même si le bloc lève une exception. Deux versions : une classe et un @contextmanager.

Indice / Solution

Le piège : capturer le cwd d'origine avant de changer, et restaurer dans finally.

python
import contextlib
import os
from collections.abc import Iterator


@contextlib.contextmanager
def cd(path: str) -> Iterator[None]:
    origin = os.getcwd()        # capturer AVANT
    os.chdir(path)
    try:
        yield
    finally:
        os.chdir(origin)        # restaurer TOUJOURS

Teste-le en levant une exception dans le bloc et vérifie que os.getcwd() est revenu à l'origine.

Exercice 2 — Suppression ciblée (intermédiaire)

Objectif : écris swallow(*exc_types), un context manager qui supprime uniquement les exceptions des types passés, et laisse passer les autres. Reproduis le comportement de contextlib.suppress à la main.

Indice / Solution

Tout est dans la valeur de retour de __exit__ : True pour avaler, False pour propager. Filtre sur issubclass.

python
class swallow:
    def __init__(self, *exc_types: type[BaseException]) -> None:
        self.exc_types = exc_types

    def __enter__(self) -> "swallow":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        return exc_type is not None and issubclass(exc_type, self.exc_types)


with swallow(KeyError):
    {}["absent"]            # avalée
with swallow(KeyError):
    raise ValueError("x")   # propagée → le test doit voir le ValueError

Exercice 3 — ExitStack dynamique (intermédiaire/dur)

Objectif : écris open_all(paths) qui ouvre une liste de fichiers de taille inconnue à la compilation, renvoie la liste des objets fichier, et garantit que tous sont fermés à la sortie — y compris si l'ouverture du 3ᵉ fichier échoue (les 2 premiers doivent quand même être fermés).

Indice / Solution

with statique ne marche pas (nombre variable). ExitStack.enter_context enregistre chaque ressource sur la pile ; si un open() lève, la pile déjà constituée est dépilée proprement.

python
import contextlib
from collections.abc import Iterator
from typing import IO


@contextlib.contextmanager
def open_all(paths: list[str]) -> Iterator[list[IO[str]]]:
    with contextlib.ExitStack() as stack:
        files = [stack.enter_context(open(p)) for p in paths]
        yield files
        # sortie : tous fermés en LIFO, même sur échec partiel d'ouverture

Test du cas dur : passe un chemin inexistant en 3ᵉ position, et vérifie (via file.closed) que les 2 premiers sont bien fermés.

Exercice 4 — Manager async pour appel LLM avec timeout (dur, AI)

Objectif : écris un @asynccontextmanager llm_call(*, timeout) qui (a) impose un timeout via asyncio.timeout, (b) capture et classifie les exceptions typées du SDK (RateLimitError, APITimeoutError), (c) enregistre la latence dans tous les cas. L'appelant fait async with llm_call(timeout=20) as ctx: ctx.message = await client.messages.create(...).

Indice / Solution

asyncio.timeout(timeout) est lui-même un async context manager — tu en imbriques un. Le finally capture la latence ; le except classifie sans avaler (raise).

python
import asyncio
import contextlib
import time
from collections.abc import AsyncIterator
from dataclasses import dataclass

import anthropic


@dataclass
class LlmCtx:
    message: anthropic.types.Message | None = None
    elapsed: float = 0.0
    error: str | None = None


@contextlib.asynccontextmanager
async def llm_call(*, timeout: float) -> AsyncIterator[LlmCtx]:
    ctx = LlmCtx()
    start = time.perf_counter()
    try:
        async with asyncio.timeout(timeout):   # CM imbriqué
            yield ctx
    except (anthropic.RateLimitError, anthropic.APITimeoutError, TimeoutError) as exc:
        ctx.error = type(exc).__name__
        raise
    finally:
        ctx.elapsed = time.perf_counter() - start
        print(f"llm_call {ctx.elapsed:.2f}s err={ctx.error}")

Note : le SDK Anthropic retry déjà 429/5xx en interne (max_retries) ; ce manager ajoute un timeout wall-clock et la mesure, deux choses que le retry SDK ne couvre pas.

Exercice 5 — Casse-puis-répare : la fuite de connexion (dur)

Objectif : on te donne ce code de pool de connexions qui fuit sous charge. Reproduis la fuite, identifie la cause, corrige-la, puis prouve la correction.

python
# Code fourni — bugué
@contextlib.contextmanager
def borrow(pool):
    conn = pool.acquire()
    yield conn
    pool.release(conn)   # ← pourquoi ça fuite ?
Indice / Solution

La fuite : si le code dans le with lève, release() n'est jamais atteint → la connexion n'est jamais rendue au pool → sous charge, le pool s'épuise et tout se bloque.

Repro : with borrow(pool) as c: raise RuntimeError, puis observe pool.available qui ne remonte pas.

Correctif :

python
import contextlib
from collections.abc import Iterator


@contextlib.contextmanager
def borrow(pool) -> Iterator[Connection]:
    conn = pool.acquire()
    try:
        yield conn
    finally:
        pool.release(conn)   # rendu garanti

Preuve : un test qui lève dans le bloc et asserte que pool.available est revenu à sa valeur initiale. C'est exactement le bug n°1 des services Python en prod — la version statique with open(...) cache le piège parce que open() est correctement écrit, mais ton propre manager ne l'est pas par défaut.

Exercice 6 — Manager réentrant et réutilisable (expert)

Objectif : @contextmanager produit un objet à usage unique. Écris une classe Lock réutilisable (le même objet utilisable dans plusieurs with successifs) et réentrante (un with imbriqué sur le même objet ne deadlock pas). Compare avec une tentative en @contextmanager et explique pourquoi elle échoue.

Indice / Solution

Un générateur est épuisé après un with : le réutiliser lève RuntimeError: generator didn't yield (il a déjà rendu). D'où la classe, adossée à un RLock (réentrant).

python
import threading


class Lock:
    def __init__(self) -> None:
        self._lock = threading.RLock()

    def __enter__(self) -> "Lock":
        self._lock.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        self._lock.release()
        return False


lk = Lock()
with lk:            # usage 1
    with lk:        # réentrant : RLock supporte l'acquisition imbriquée, pas de deadlock
        ...
with lk:            # réutilisé : OK car c'est une classe, pas un générateur épuisé
    ...

La leçon senior : choisis classe dès que tu as besoin de réutilisation, de réentrance, ou d'un état interrogeable. Le générateur est pour le cas jetable simple.

🎤 En entretien

Q : Quelle est la différence entre with et un simple try/finally ? Pourquoi un mot-clé dédié ? R : with est un try/finally réutilisable et nommé — tu encapsules la paire setup/teardown une fois dans un objet (ou un générateur) au lieu de la dupliquer chez chaque appelant ; bonus, la libération est déterministe et locale au bloc.

Q : Que se passe-t-il si une exception est levée dans un bloc with ? Et que contrôle la valeur de retour de __exit__ ? R : __exit__ est appelé avec le type/valeur/traceback de l'exception (cleanup garanti) ; s'il renvoie falsy l'exception se propage normalement, s'il renvoie True elle est supprimée — ce qu'on ne veut presque jamais, car ça masque les erreurs silencieusement.

Q : Pourquoi @contextlib.contextmanager exige-t-il un try/finally autour du yield ? R : Parce qu'une exception du bloc with est réinjectée au point du yield via gen.throw() ; sans finally, le teardown après le yield est sauté et la ressource fuite — c'est la cause n°1 des fuites de connexions DB en Python.

Q : Comment gérer un nombre dynamique de ressources, ou un client LLM dans FastAPI ? R : contextlib.ExitStack (ou AsyncExitStack) pour un nombre variable de ressources fermées en LIFO ; pour FastAPI, le client AsyncAnthropic s'ouvre une fois dans le lifespan (@asynccontextmanager) et se ferme à l'arrêt — jamais un client par requête, sinon on tue la réutilisation du pool httpx.

Bibliothèque tech perso — Achref