Skip to content

Logging & observabilité

TL;DR — En Python, print() est un cul-de-sac : aucun niveau, aucune structure, aucune corrélation. La voie senior, c'est du logging structuré JSON (un événement = un dict de champs typés), porté par le module logging de la stdlib (ou structlog), enrichi automatiquement par un contextvars (request_id, trace_id) propagé dans tes endpoints FastAPI et tes appels d'agents. L'observabilité va plus loin que les logs : ce sont les trois piliers — logs (ce qui s'est passé), métriques (combien/à quelle vitesse), traces (le chemin d'une requête à travers tes services). Pour un agent LLM, tu instrumentes en plus les tokens, le coût, la latence par tool call, le cache hit rate et le stop_reason — sinon tu pilotes une facture Anthropic à l'aveugle. OpenTelemetry est le standard qui unifie les trois piliers ; tu l'apprends une fois, tu le branches sur n'importe quel backend (Grafana, Datadog, Honeycomb).


🧠 Mental model

Venant de NestJS/Angular, tu connais déjà Logger (Nest) et les interceptors. Le réflexe à désapprendre vient surtout de PHP/early-career : logger, c'est écrire des phrases pour un humain. Faux. À l'échelle, personne ne lit tes logs ligne par ligne. Une machine les lit : elle filtre, agrège, alerte. Donc un bon log n'est pas une phrase, c'est un enregistrement de données — un objet avec des champs queryables.

L'analogie : pense à la différence entre un journal intime et une base de données.

print("User 42 a payé 19.99€ pour le plan pro")   ← journal intime
                                                      (impossible à filtrer/agréger)

log.info("payment.succeeded",                       ← base de données
         user_id=42, amount=19.99,                    (SELECT SUM(amount)
         currency="EUR", plan="pro")                   WHERE event="payment.succeeded")

Le second te permet, six mois plus tard, de répondre à « combien de paiements pro ont échoué hier entre 14h et 15h pour les users européens ? » sans jamais avoir anticipé la question. Le premier te condamne au grep désespéré.

Les trois piliers de l'observabilité, vus de haut :

                        UNE REQUÊTE ENTRE

        ┌─────────────────────┼──────────────────────┐
        ▼                     ▼                       ▼
     LOGS                 MÉTRIQUES                TRACES
  (événements)          (agrégats)             (chemin causal)
        │                     │                       │
 "que s'est-il        "combien / à quelle      "où est passé le
  passé exactement     vitesse / quel taux       temps, à travers
  à 14:03:11 ?"        d'erreur ?"               quels services ?"
        │                     │                       │
   structlog/            Prometheus /           OpenTelemetry +
   logging JSON          OTel metrics           Tempo/Jaeger
        └─────────────────────┴───────────────────────┘

                    corrélés par trace_id

Le fil rouge qui relie les trois, c'est le trace_id (et son enfant le span_id). Si tes logs portent le trace_id, ta métrique d'erreur porte le trace_id, et ta trace est le trace_id — alors depuis une alerte « taux d'erreur en hausse » tu sautes en un clic à la trace fautive, puis aux logs exacts de cette requête. C'est ça, l'observabilité : pas trois outils séparés, un système corrélé.

Pour un agent LLM, ajoute une quatrième dimension mentale : chaque appel au modèle est une transaction facturée et non-déterministe. Tu ne debug pas un agent comme une fonction pure ; tu l'observes comme un système distribué qui te coûte de l'argent à chaque tour de boucle.


Le socle : logging de la stdlib, bien configuré

Premier réflexe à corriger : n'utilise jamais print() en production, et ne crée jamais un logger « global » mal nommé. La convention Python, c'est un logger par module, nommé par __name__.

python
# ❌ La mauvaise façon — celle qu'on voit partout
import logging

def process_order(order_id: int) -> None:
    print(f"processing order {order_id}")          # invisible, non filtrable
    logging.info(f"order {order_id} done")          # root logger, message interpolé

Trois péchés dans ces deux lignes : print ne passe par aucun handler (pas de fichier, pas de niveau, pas de JSON) ; logging.info utilise le root logger (impossible de régler le niveau module par module) ; et le message est pré-interpolé dans une f-string (tu perds les champs structurés, et tu paies le coût du formatage même quand le log est filtré).

python
# ✅ La bonne façon
import logging

logger = logging.getLogger(__name__)   # un logger par module, hiérarchique

def process_order(order_id: int) -> None:
    logger.info("order.processing.started", extra={"order_id": order_id})
    # ... travail ...
    logger.info("order.processing.completed", extra={"order_id": order_id})

Le nom de l'événement (order.processing.started) est stable et machine-friendly : c'est lui qu'on filtre, pas le texte libre. Les données variables passent par extra, pas par interpolation.

Pourquoi extra seul ne suffit pas : le formatter JSON

Par défaut, le logging de la stdlib jette le contenu de extra dans une string formatée et illisible. Pour un vrai logging structuré, il faut un formatter JSON. Voici une implémentation complète, typée, sans dépendance :

python
from __future__ import annotations

import datetime as dt
import json
import logging
from typing import Any

# Les attributs « natifs » d'un LogRecord qu'on ne veut pas dupliquer dans le JSON.
_RESERVED = frozenset(logging.makeLogRecord({}).__dict__)


class JsonFormatter(logging.Formatter):
    """Sérialise chaque LogRecord en une ligne JSON (un objet = un événement)."""

    def format(self, record: logging.LogRecord) -> str:
        payload: dict[str, Any] = {
            "ts": dt.datetime.fromtimestamp(
                record.created, tz=dt.timezone.utc
            ).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "event": record.getMessage(),
            "module": record.module,
            "line": record.lineno,
        }

        # Tout ce que l'appelant a passé via extra= se retrouve dans __dict__
        # sans être un attribut réservé : on l'ajoute tel quel.
        for key, value in record.__dict__.items():
            if key not in _RESERVED and not key.startswith("_"):
                payload[key] = value

        if record.exc_info:
            payload["exception"] = self.formatException(record.exc_info)

        return json.dumps(payload, ensure_ascii=False, default=str)


def configure_logging(level: int = logging.INFO) -> None:
    handler = logging.StreamHandler()        # stdout — le bon défaut en conteneur
    handler.setFormatter(JsonFormatter())

    root = logging.getLogger()
    root.handlers.clear()                    # évite les doublons (uvicorn ajoute les siens)
    root.addHandler(handler)
    root.setLevel(level)

Règle d'or des conteneurs (Docker/Kubernetes) : logge sur stdout/stderr, jamais dans un fichier. La plateforme (Docker, K8s, le runtime cloud) capte les flux standard et les route. Écrire dans un fichier dans un conteneur, c'est te condamner à perdre tes logs au prochain redéploiement.


Le chaînon manquant : le contexte propagé (contextvars)

Le vrai problème : tu veux que chaque log émis pendant le traitement d'une requête porte automatiquement le request_id, le user_id, le trace_id — sans les passer en argument à chaque fonction. En NestJS tu ferais ça avec un AsyncLocalStorage. L'équivalent Python natif, c'est contextvars, et il fonctionne correctement avec asyncio (chaque tâche async a sa propre copie du contexte).

python
from __future__ import annotations

import contextvars
import logging
import uuid
from typing import Any

# Une variable de contexte qui survit aux await et reste isolée par tâche async.
_log_context: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
    "log_context", default={}
)


def bind_context(**fields: Any) -> contextvars.Token[dict[str, Any]]:
    """Ajoute des champs au contexte courant. Retourne un token pour restaurer."""
    current = _log_context.get()
    return _log_context.set({**current, **fields})


def get_context() -> dict[str, Any]:
    return _log_context.get()


class ContextFilter(logging.Filter):
    """Injecte le contexte courant dans chaque LogRecord avant formatage."""

    def filter(self, record: logging.LogRecord) -> bool:
        for key, value in _log_context.get().items():
            # On ne réécrit pas un champ déjà fourni explicitement via extra=.
            if not hasattr(record, key):
                setattr(record, key, value)
        return True   # ne filtre rien — on l'utilise comme « enricher »

Tu attaches le filtre au handler une fois, et tous tes logs sont enrichis :

python
handler.addFilter(ContextFilter())

Maintenant, n'importe où dans la stack d'appel d'une requête :

python
logger.info("agent.tool_call.started", extra={"tool": "search_web"})
# → produit automatiquement {..., "request_id": "...", "trace_id": "...", "tool": "search_web"}

Note structlog — Tout ce que tu viens de coder à la main (bind_context, formatter JSON, enrichissement automatique) est exactement ce que structlog te donne clé en main, avec une API plus ergonomique (log = log.bind(request_id=...)). En production réelle, beaucoup d'équipes seniors choisissent structlog pour le confort. Mais comprendre l'implémentation stdlib est non-négociable : c'est elle que tu devras déboguer quand structlog, uvicorn et gunicorn se marcheront dessus sur la config des handlers.


Intégration FastAPI : un middleware qui corrèle tout

C'est ici que ça devient concret. Un middleware FastAPI qui : génère un request_id, le binde au contexte, mesure la latence, et logge début + fin de chaque requête.

python
from __future__ import annotations

import logging
import time
import uuid

from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint

logger = logging.getLogger(__name__)


class ObservabilityMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        # Honore un request_id entrant (propagation inter-services) ou en génère un.
        request_id = request.headers.get("x-request-id", str(uuid.uuid4()))

        token = bind_context(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
        )
        start = time.perf_counter()

        logger.info("http.request.started")
        try:
            response = await call_next(request)
        except Exception:
            # On logge l'exception AVANT de la laisser remonter au handler d'erreur.
            duration_ms = (time.perf_counter() - start) * 1000
            logger.exception(
                "http.request.failed", extra={"duration_ms": round(duration_ms, 2)}
            )
            raise
        finally:
            _log_context.reset(token)   # nettoie le contexte de cette requête

        duration_ms = (time.perf_counter() - start) * 1000
        logger.info(
            "http.request.completed",
            extra={
                "status_code": response.status_code,
                "duration_ms": round(duration_ms, 2),
            },
        )
        response.headers["x-request-id"] = request_id   # renvoie-le au client
        return response


def create_app() -> FastAPI:
    configure_logging()
    app = FastAPI()
    app.add_middleware(ObservabilityMiddleware)
    return app


app = create_app()


@app.get("/health")
async def health() -> dict[str, str]:
    return {"status": "ok"}

Subtilité senior dans ce code : le _log_context.reset(token) dans le finally. Sans lui, dans un worker qui réutilise des tâches, le contexte d'une requête pourrait fuiter sur la suivante — un bug d'observabilité classique où les logs de Bob portent le user_id d'Alice. Le pattern token/reset est la bonne discipline.

Piège fréquent — Ne logge pas le corps des requêtes ni les headers Authorization/Cookie à l'aveugle. C'est à la fois une fuite de PII/secrets (RGPD, conformité) et un gouffre à volume. On y revient dans « En production ».


Le cœur AI : observer un agent Claude

C'est ici que l'observabilité devient vitale plutôt que confortable. Un endpoint qui appelle un agent LLM, sans instrumentation, est une boîte noire qui :

  • te coûte de l'argent par token sans que tu saches combien,
  • a une latence non bornée (le modèle peut « réfléchir » longtemps),
  • échoue de façons non-déterministes (refus, rate limit, overload),
  • boucle sur des tool calls dont tu ne vois ni le nombre ni le coût.

Voici un service d'agent streamé, typé, instrumenté, avec le SDK Anthropic. On utilise AsyncAnthropic, le streaming (messages.stream), l'adaptive thinking, le paramètre effort (jamais budget_tokens), les exceptions typées du SDK, et le prompt caching — et on logge tout ce qui compte.

python
from __future__ import annotations

import logging
import time
from dataclasses import dataclass

import anthropic
from anthropic import AsyncAnthropic

logger = logging.getLogger(__name__)

client = AsyncAnthropic()   # lit ANTHROPIC_API_KEY depuis l'environnement

MODEL = "claude-opus-4-8"   # flagship : 5 $ / 25 $ par Mtok (input / output) @ 1M ctx


@dataclass(slots=True)
class AgentResult:
    text: str
    input_tokens: int
    output_tokens: int
    cache_read_tokens: int
    stop_reason: str | None
    duration_ms: float

    @property
    def cost_usd(self) -> float:
        """Coût estimé. Les lectures de cache sont facturées ~0,1× le prix input."""
        billable_input = self.input_tokens - self.cache_read_tokens
        return (
            billable_input * 5.0 / 1_000_000
            + self.cache_read_tokens * 0.5 / 1_000_000   # ~0,1× = 0,50 $/Mtok
            + self.output_tokens * 25.0 / 1_000_000
        )


async def run_agent(system_prompt: str, user_message: str) -> AgentResult:
    start = time.perf_counter()
    logger.info("llm.request.started", extra={"model": MODEL})

    try:
        async with client.messages.stream(
            model=MODEL,
            max_tokens=4096,
            thinking={"type": "adaptive"},        # ✅ pas de budget_tokens
            output_config={"effort": "high"},     # low | medium | high | xhigh | max
            system=[
                {
                    "type": "text",
                    "text": system_prompt,
                    "cache_control": {"type": "ephemeral"},   # cache le préfixe stable
                }
            ],
            messages=[{"role": "user", "content": user_message}],
        ) as stream:
            # On consomme le flux. On pourrait pousser chaque delta vers le client SSE ;
            # ici on agrège pour le message final.
            async for _event in stream:
                pass
            message = await stream.get_final_message()

    except anthropic.RateLimitError:
        logger.warning("llm.request.rate_limited", extra={"model": MODEL})
        raise
    except anthropic.APIStatusError as exc:
        # Exceptions TYPÉES — on ne fait jamais de string-matching sur le message.
        logger.error(
            "llm.request.api_error",
            extra={"status": exc.status_code, "error_type": exc.type},
        )
        raise

    duration_ms = (time.perf_counter() - start) * 1000
    usage = message.usage
    text = "".join(b.text for b in message.content if b.type == "text")

    result = AgentResult(
        text=text,
        input_tokens=usage.input_tokens,
        output_tokens=usage.output_tokens,
        cache_read_tokens=usage.cache_read_input_tokens or 0,
        stop_reason=message.stop_reason,
        duration_ms=round(duration_ms, 2),
    )

    # LE log qui change ta vie : tout ce qu'il faut pour piloter coût + perf.
    logger.info(
        "llm.request.completed",
        extra={
            "model": MODEL,
            "input_tokens": result.input_tokens,
            "output_tokens": result.output_tokens,
            "cache_read_tokens": result.cache_read_tokens,
            "stop_reason": result.stop_reason,
            "duration_ms": result.duration_ms,
            "cost_usd": round(result.cost_usd, 6),
        },
    )

    if result.stop_reason == "max_tokens":
        # Signal d'alerte : la réponse a été tronquée. À monitorer.
        logger.warning("llm.response.truncated", extra={"model": MODEL})

    return result

Pourquoi le streaming même si on agrège à la fin ? Parce qu'au-delà de quelques milliers de max_tokens, un appel non-streamé risque de timeout HTTP côté SDK. Le streaming est la façon robuste d'appeler un LLM dès que la sortie peut être longue. Et si tu sers un agent en temps réel à un front (SSE), tu pousses event.text au fur et à mesure — le streaming devient à la fois une nécessité technique et une UX.

Instrumenter la boucle de tool-use

Un vrai agent boucle : le modèle demande un tool call, ton code l'exécute, renvoie le résultat, le modèle continue. Chaque itération est un appel facturé. Une boucle non observée peut partir en vrille (10 itérations à effort: max = facture salée). Tu dois compter et logger chaque tour.

python
from __future__ import annotations

import logging
import time
from typing import Any

import anthropic
from anthropic import AsyncAnthropic

logger = logging.getLogger(__name__)
client = AsyncAnthropic()

MAX_ITERATIONS = 10   # garde-fou anti-boucle-infinie — TOUJOURS en avoir un


async def run_agent_loop(
    tools: list[dict[str, Any]],
    tool_handlers: dict[str, Any],
    user_message: str,
) -> str:
    messages: list[dict[str, Any]] = [{"role": "user", "content": user_message}]
    total_input = total_output = 0

    for iteration in range(MAX_ITERATIONS):
        start = time.perf_counter()
        response = await client.messages.create(
            model="claude-opus-4-8",
            max_tokens=4096,
            thinking={"type": "adaptive"},
            tools=tools,
            messages=messages,
        )
        total_input += response.usage.input_tokens
        total_output += response.usage.output_tokens

        logger.info(
            "agent.iteration",
            extra={
                "iteration": iteration,
                "stop_reason": response.stop_reason,
                "iter_duration_ms": round((time.perf_counter() - start) * 1000, 2),
            },
        )

        if response.stop_reason == "end_turn":
            logger.info(
                "agent.loop.completed",
                extra={
                    "iterations": iteration + 1,
                    "total_input_tokens": total_input,
                    "total_output_tokens": total_output,
                },
            )
            return "".join(b.text for b in response.content if b.type == "text")

        # Le modèle veut des outils : on les exécute et on les logge un par un.
        messages.append({"role": "assistant", "content": response.content})
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                t0 = time.perf_counter()
                try:
                    output = await tool_handlers[block.name](block.input)
                    is_error = False
                except Exception as exc:
                    output, is_error = str(exc), True
                    logger.exception("agent.tool.failed", extra={"tool": block.name})

                logger.info(
                    "agent.tool.executed",
                    extra={
                        "tool": block.name,
                        "is_error": is_error,
                        "tool_duration_ms": round((time.perf_counter() - t0) * 1000, 2),
                    },
                )
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": str(output),
                        "is_error": is_error,
                    }
                )
        messages.append({"role": "user", "content": tool_results})

    # On a épuisé le garde-fou sans end_turn : c'est un INCIDENT, pas un succès.
    logger.error("agent.loop.exhausted", extra={"max_iterations": MAX_ITERATIONS})
    raise RuntimeError(f"Agent n'a pas convergé en {MAX_ITERATIONS} itérations")

Note le logger.error("agent.loop.exhausted") : épuiser le garde-fou n'est pas un cas nominal, c'est un signal d'alerte. Si tu ne le logges pas distinctement (avec un niveau error), tu ne le verras jamais dans tes dashboards.

Sorties structurées : messages.parse

Quand tu attends un JSON typé du modèle (extraction, classification), n'utilise pas un prefill (interdit, 400 sur Opus 4.8). Utilise les sorties structurées natives via client.messages.parse, et logge le stop_reason pour détecter un refus qui casserait le schéma.

python
from __future__ import annotations

import logging

from anthropic import AsyncAnthropic
from pydantic import BaseModel

logger = logging.getLogger(__name__)
client = AsyncAnthropic()


class Ticket(BaseModel):
    priority: str
    category: str
    summary: str


async def classify(text: str) -> Ticket | None:
    message = await client.messages.parse(
        model="claude-opus-4-8",
        max_tokens=1024,
        messages=[{"role": "user", "content": f"Classe ce ticket : {text}"}],
        output_config={"format": Ticket},   # schéma natif, validé par le SDK
    )

    if message.stop_reason == "refusal":
        logger.warning("llm.classification.refused")
        return None

    logger.info(
        "llm.classification.completed",
        extra={"output_tokens": message.usage.output_tokens},
    )
    return message.parsed_output

⚙️ En production

Le code ci-dessus est correct mais naïf face à la prod. Voici les arbitrages et les pièges qu'un senior anticipe.

Modes de défaillance

  • Les logs synchrones bloquent l'event loop. logging.StreamHandler écrit de façon synchrone. Sous forte charge async, des centaines de logger.info par seconde sérialisant du JSON peuvent bloquer ton event loop FastAPI. La parade : logging.handlers.QueueHandler + QueueListener — le handler met le record dans une queue (non-bloquant), un thread dédié fait la sérialisation et l'I/O. C'est le pattern à connaître pour du logging async-safe.
  • Les agents échouent silencieusement. Un stop_reason == "refusal" renvoie un HTTP 200 : si ton code lit message.content[0].text sans vérifier le stop_reason, tu plantes sur un refus ou tu sers une réponse vide en croyant que tout va bien. Toujours brancher sur stop_reason avant de lire le contenu. Idem max_tokens : réponse tronquée mais code 200.
  • Boucle d'agent non bornée. Sans MAX_ITERATIONS, un agent qui « bégaie » sur ses tool calls peut boucler jusqu'à épuiser ton rate limit et ta carte bleue. Le garde-fou est obligatoire, et son déclenchement doit être un log error qui alerte.
  • Fuite de contexte entre requêtes. Oublier le reset(token) du contextvars fait fuiter le user_id d'une requête sur la suivante dans un worker réutilisé. Bug d'observabilité insidieux car les logs « marchent » — ils mentent juste sur l'identité.

Performance

  • Le logging coûte du CPU. Sérialiser du JSON à chaque ligne n'est pas gratuit. Garde DEBUG désactivé en prod (et utilise logger.isEnabledFor(logging.DEBUG) avant de construire un payload de debug coûteux).
  • Échantillonne les traces, pas les logs d'erreur. Tracer 100 % des requêtes coûte cher (stockage, perf). En prod à fort volume, on échantillonne les traces (ex. 1–10 %) — mais on garde toujours 100 % des traces qui contiennent une erreur (tail-based sampling). Les logs error/warning, eux, ne s'échantillonnent jamais.
  • Le cache Anthropic est un levier de coût observable. Si tes cache_read_tokens sont à zéro alors que tu envoies un gros system prompt stable, c'est qu'un invalidateur silencieux (un timestamp, un UUID, un dict non trié dans le préfixe) casse le cache. Tu ne le verras que parce que tu logges cache_read_tokens — d'où l'importance de l'instrumenter.

Sécurité

  • Ne logge jamais de secrets ni de PII brute. Pas de header Authorization, pas de mot de passe, pas de numéro de carte, pas de contenu utilisateur sensible. Mets en place un filtre de rédaction (regex sur les patterns de tokens/clés, allowlist de champs). Pour un agent LLM : attention au contenu des messages utilisateurs et des tool results, qui peuvent contenir des données personnelles.
  • Les request_id/trace_id ne sont pas des secrets — propage-les volontiers entre services (header x-request-id). Mais ne mets jamais d'identifiant sensible (token de session) dans un trace_id.
  • Le volume de logs est une surface d'attaque DoS. Un endpoint qui logge le corps complet de chaque requête peut être saturé par un attaquant qui envoie des payloads massifs. Borne ce que tu logges.

Observabilité (le méta-niveau)

  • OpenTelemetry (OTel) est le standard à apprendre. Plutôt que de coder métriques et traces à la main, instrumente avec OTel : opentelemetry-instrumentation-fastapi auto-trace tes endpoints, et tu exportes vers n'importe quel backend (Grafana Tempo, Jaeger, Datadog, Honeycomb) sans réécrire ton code. Le trace_id d'OTel est exactement celui que tu veux injecter dans tes logs via le contextvars.
  • Les 4 « golden signals » (Google SRE) : latence, trafic, erreurs, saturation. Pour un agent, ajoute : coût par requête, tokens par requête, taux de refus/troncature, profondeur de boucle. Ce sont tes SLI.
  • Alerte sur les symptômes, pas les causes. Alerte sur « le p99 de latence dépasse 5 s » ou « le taux d'erreur dépasse 2 % », pas sur « le CPU est à 80 % ». L'utilisateur ne ressent pas le CPU, il ressent la latence.

🏋️ Exercices

Exercice 1 — Le formatter JSON robuste (implémenter)

Objectif — Écris un JsonFormatter qui gère correctement : les exceptions (exc_info), les champs extra, un champ service et env injectés à la configuration, et qui ne crash jamais même si un champ extra n'est pas sérialisable JSON (ex. un objet custom).

Indice/Solution — Pars du JsonFormatter de la leçon. Pour la non-sérialisabilité, json.dumps(..., default=str) couvre la plupart des cas, mais teste avec un objet qui lève dans __str__ : entoure la sérialisation d'un try/except qui produit au minimum un JSON {"event": "...", "_serialization_error": "..."}. Un formatter qui crash fait perdre tous les logs — c'est inacceptable.

Exercice 2 — Le contexte qui ne fuit pas (implémenter + tester)

Objectif — Implémente bind_context/reset puis prouve par un test qu'un contextvars ne fuit pas entre deux tâches asyncio concurrentes : lance deux coroutines qui bindent chacune un request_id différent, font un await asyncio.sleep(0) entrelacé, et vérifient que chacune voit toujours son id.

Indice/Solution — Le test ressemble à : await asyncio.gather(task("A"), task("B")) où chaque task fait bind_context(rid=x), await asyncio.sleep(0), puis assert get_context()["rid"] == x. Le test passe nativement grâce à contextvars (chaque tâche a sa copie). Pour comprendre par contraste : refais-le avec une simple variable globale dict — le test échoue. Ça ancre pourquoi contextvars existe.

Exercice 3 — Le tracker de coût d'agent (production-grade)

Objectif — Construis un context manager track_llm_cost() qui, sur tout un bloc d'appels à Claude (boucle de tool-use comprise), accumule input_tokens, output_tokens, cache_read_tokens, calcule le coût total en USD avec les tarifs réels (claude-opus-4-8 : 5 $/25 $ par Mtok ; lectures de cache ~0,1× l'input), et logge un récapitulatif agent.session.cost à la sortie. Bonus : ajoute un budget qui logge un warning si la session dépasse un seuil en USD.

Indice/Solution — Un contextlib.asynccontextmanager qui yield un petit objet accumulateur (dataclass mutable avec add(usage)). Tes fonctions d'appel récupèrent l'accumulateur via un contextvars (même pattern que le contexte de log) et appellent acc.add(response.usage) après chaque appel. À la sortie du with, calcule le coût et logge. Le budget : compare acc.cost_usd à un seuil et logger.warning("agent.session.over_budget", extra={...}).

Exercice 4 — Détecter le refus et la troncature (casser puis réparer)

Objectif — Pars du classify() de la leçon. Casse-le : retire la vérification stop_reason, puis fabrique un cas où le modèle refuse (un prompt qui déclenche un refus) ou tronque (max_tokens=5). Observe le crash ou la réponse vide silencieuse. Puis répare : branche sur stop_reason (refusal, max_tokens) avant de lire parsed_output, logge le bon niveau pour chaque cas, et renvoie un résultat explicite (None ou une erreur métier) plutôt qu'un plantage.

Indice/Solution — La version cassée fait return message.parsed_output sans garde → sur un refus, parsed_output est None et tu propages un None silencieux ; sur troncature, le JSON est incomplet et le parsing échoue. La réparation : if message.stop_reason == "refusal": logger.warning(...); return None et if message.stop_reason == "max_tokens": logger.error("llm.truncated"); raise .... La leçon de fond : un HTTP 200 d'un LLM ne signifie pas un succès métier.

Exercice 5 — Le handler non-bloquant (production-grade)

Objectif — Remplace le StreamHandler synchrone par un couple QueueHandler + QueueListener pour que l'I/O des logs ne bloque jamais l'event loop. Mesure : sous une boucle qui émet 50 000 logs, compare le temps de bout-en-bout avec le handler synchrone vs. le handler en queue.

Indice/Solutionqueue.Queue(-1), un QueueHandler(q) attaché au root, et un QueueListener(q, real_handler) que tu .start() au boot et .stop() au shutdown (lifespan FastAPI). Le QueueHandler se contente de put (rapide, non-bloquant) ; le QueueListener consomme dans un thread dédié et fait la sérialisation JSON + l'écriture. Piège à documenter : pense à listener.stop() proprement, sinon tu perds les derniers logs au shutdown.

Exercice 6 — Corréler une trace OTel avec tes logs (intégration)

Objectif — Branche opentelemetry-instrumentation-fastapi sur l'app, et fais en sorte que le trace_id généré par OTel soit injecté dans chaque log JSON via ton contextvars. Vérifie qu'une requête produit une trace et des logs portant le même trace_id.

Indice/Solution — Dans le middleware (ou un processeur de log), récupère le span courant via opentelemetry.trace.get_current_span().get_span_context().trace_id, formate-le en hex (format(trace_id, "032x")), et bind_context(trace_id=...). Le test final : appelle l'endpoint, récupère le trace_id du header de réponse OTel, et grep ce même id dans la sortie JSON des logs. Quand les deux coïncident, tu as fait de l'observabilité, pas juste du logging.


🎤 En entretien

Q : Quelle est la différence entre logs, métriques et traces, et quand utiliser quoi ? R : Les logs sont des événements discrets et riches (« quoi exactement, à quel instant »), les métriques sont des agrégats numériques bon marché à stocker (« combien, à quelle vitesse, quel taux »), les traces suivent une requête à travers plusieurs services (« où est passé le temps »). On les corrèle par trace_id : on alerte sur une métrique, on saute à la trace fautive, on lit les logs de cette trace.

Q : Pourquoi le logging structuré plutôt que des messages textuels, et pourquoi contextvars plutôt que passer le request_id en argument ? R : Le structuré rend les logs queryables/agrégables par une machine (filtrer, compter, alerter sans avoir anticipé la question) ; contextvars propage automatiquement le contexte de requête à travers toute la stack async sans polluer chaque signature, et garantit l'isolation par tâche asyncio (pas de fuite entre requêtes concurrentes), là où une variable globale fuiterait.

Q : Comment instrumenter un agent LLM en production, et qu'est-ce qui change par rapport à un service classique ? R : En plus des golden signals (latence, trafic, erreurs, saturation), j'instrumente les tokens (input/output/cache), le coût USD par requête, le stop_reason, la profondeur de boucle de tool-use et la latence par tool call — parce qu'un appel LLM est une transaction facturée, non-déterministe et à latence non bornée ; un HTTP 200 avec stop_reason: "refusal" ou max_tokens est un échec métier qu'un monitoring HTTP classique ne verrait pas.

Q : Ton logging synchrone ralentit ton service FastAPI sous charge. Que fais-tu ? R : Je passe à un QueueHandler/QueueListener : le handler dépose le record dans une queue (non-bloquant pour l'event loop), un thread dédié fait la sérialisation JSON et l'I/O. Je vérifie aussi que DEBUG est désactivé en prod, je garde les payloads coûteux derrière isEnabledFor, et j'échantillonne les traces (tail-based, en gardant 100 % des traces en erreur) plutôt que les logs d'erreur.

Bibliothèque tech perso — Achref