Logging & observabilité
TL;DR — En Python,
print()est un cul-de-sac : aucun niveau, aucune structure, aucune corrélation. La voie senior, c'est du logging structuré JSON (un événement = un dict de champs typés), porté par le moduleloggingde la stdlib (oustructlog), enrichi automatiquement par uncontextvars(request_id, trace_id) propagé dans tes endpoints FastAPI et tes appels d'agents. L'observabilité va plus loin que les logs : ce sont les trois piliers — logs (ce qui s'est passé), métriques (combien/à quelle vitesse), traces (le chemin d'une requête à travers tes services). Pour un agent LLM, tu instrumentes en plus les tokens, le coût, la latence par tool call, le cache hit rate et le stop_reason — sinon tu pilotes une facture Anthropic à l'aveugle. OpenTelemetry est le standard qui unifie les trois piliers ; tu l'apprends une fois, tu le branches sur n'importe quel backend (Grafana, Datadog, Honeycomb).
🧠 Mental model
Venant de NestJS/Angular, tu connais déjà Logger (Nest) et les interceptors. Le réflexe à désapprendre vient surtout de PHP/early-career : logger, c'est écrire des phrases pour un humain. Faux. À l'échelle, personne ne lit tes logs ligne par ligne. Une machine les lit : elle filtre, agrège, alerte. Donc un bon log n'est pas une phrase, c'est un enregistrement de données — un objet avec des champs queryables.
L'analogie : pense à la différence entre un journal intime et une base de données.
print("User 42 a payé 19.99€ pour le plan pro") ← journal intime
(impossible à filtrer/agréger)
log.info("payment.succeeded", ← base de données
user_id=42, amount=19.99, (SELECT SUM(amount)
currency="EUR", plan="pro") WHERE event="payment.succeeded")Le second te permet, six mois plus tard, de répondre à « combien de paiements pro ont échoué hier entre 14h et 15h pour les users européens ? » sans jamais avoir anticipé la question. Le premier te condamne au grep désespéré.
Les trois piliers de l'observabilité, vus de haut :
UNE REQUÊTE ENTRE
│
┌─────────────────────┼──────────────────────┐
▼ ▼ ▼
LOGS MÉTRIQUES TRACES
(événements) (agrégats) (chemin causal)
│ │ │
"que s'est-il "combien / à quelle "où est passé le
passé exactement vitesse / quel taux temps, à travers
à 14:03:11 ?" d'erreur ?" quels services ?"
│ │ │
structlog/ Prometheus / OpenTelemetry +
logging JSON OTel metrics Tempo/Jaeger
└─────────────────────┴───────────────────────┘
│
corrélés par trace_idLe fil rouge qui relie les trois, c'est le trace_id (et son enfant le span_id). Si tes logs portent le trace_id, ta métrique d'erreur porte le trace_id, et ta trace est le trace_id — alors depuis une alerte « taux d'erreur en hausse » tu sautes en un clic à la trace fautive, puis aux logs exacts de cette requête. C'est ça, l'observabilité : pas trois outils séparés, un système corrélé.
Pour un agent LLM, ajoute une quatrième dimension mentale : chaque appel au modèle est une transaction facturée et non-déterministe. Tu ne debug pas un agent comme une fonction pure ; tu l'observes comme un système distribué qui te coûte de l'argent à chaque tour de boucle.
Le socle : logging de la stdlib, bien configuré
Premier réflexe à corriger : n'utilise jamais print() en production, et ne crée jamais un logger « global » mal nommé. La convention Python, c'est un logger par module, nommé par __name__.
# ❌ La mauvaise façon — celle qu'on voit partout
import logging
def process_order(order_id: int) -> None:
print(f"processing order {order_id}") # invisible, non filtrable
logging.info(f"order {order_id} done") # root logger, message interpoléTrois péchés dans ces deux lignes : print ne passe par aucun handler (pas de fichier, pas de niveau, pas de JSON) ; logging.info utilise le root logger (impossible de régler le niveau module par module) ; et le message est pré-interpolé dans une f-string (tu perds les champs structurés, et tu paies le coût du formatage même quand le log est filtré).
# ✅ La bonne façon
import logging
logger = logging.getLogger(__name__) # un logger par module, hiérarchique
def process_order(order_id: int) -> None:
logger.info("order.processing.started", extra={"order_id": order_id})
# ... travail ...
logger.info("order.processing.completed", extra={"order_id": order_id})Le nom de l'événement (order.processing.started) est stable et machine-friendly : c'est lui qu'on filtre, pas le texte libre. Les données variables passent par extra, pas par interpolation.
Pourquoi extra seul ne suffit pas : le formatter JSON
Par défaut, le logging de la stdlib jette le contenu de extra dans une string formatée et illisible. Pour un vrai logging structuré, il faut un formatter JSON. Voici une implémentation complète, typée, sans dépendance :
from __future__ import annotations
import datetime as dt
import json
import logging
from typing import Any
# Les attributs « natifs » d'un LogRecord qu'on ne veut pas dupliquer dans le JSON.
_RESERVED = frozenset(logging.makeLogRecord({}).__dict__)
class JsonFormatter(logging.Formatter):
"""Sérialise chaque LogRecord en une ligne JSON (un objet = un événement)."""
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, Any] = {
"ts": dt.datetime.fromtimestamp(
record.created, tz=dt.timezone.utc
).isoformat(),
"level": record.levelname,
"logger": record.name,
"event": record.getMessage(),
"module": record.module,
"line": record.lineno,
}
# Tout ce que l'appelant a passé via extra= se retrouve dans __dict__
# sans être un attribut réservé : on l'ajoute tel quel.
for key, value in record.__dict__.items():
if key not in _RESERVED and not key.startswith("_"):
payload[key] = value
if record.exc_info:
payload["exception"] = self.formatException(record.exc_info)
return json.dumps(payload, ensure_ascii=False, default=str)
def configure_logging(level: int = logging.INFO) -> None:
handler = logging.StreamHandler() # stdout — le bon défaut en conteneur
handler.setFormatter(JsonFormatter())
root = logging.getLogger()
root.handlers.clear() # évite les doublons (uvicorn ajoute les siens)
root.addHandler(handler)
root.setLevel(level)Règle d'or des conteneurs (Docker/Kubernetes) : logge sur stdout/stderr, jamais dans un fichier. La plateforme (Docker, K8s, le runtime cloud) capte les flux standard et les route. Écrire dans un fichier dans un conteneur, c'est te condamner à perdre tes logs au prochain redéploiement.
Le chaînon manquant : le contexte propagé (contextvars)
Le vrai problème : tu veux que chaque log émis pendant le traitement d'une requête porte automatiquement le request_id, le user_id, le trace_id — sans les passer en argument à chaque fonction. En NestJS tu ferais ça avec un AsyncLocalStorage. L'équivalent Python natif, c'est contextvars, et il fonctionne correctement avec asyncio (chaque tâche async a sa propre copie du contexte).
from __future__ import annotations
import contextvars
import logging
import uuid
from typing import Any
# Une variable de contexte qui survit aux await et reste isolée par tâche async.
_log_context: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
"log_context", default={}
)
def bind_context(**fields: Any) -> contextvars.Token[dict[str, Any]]:
"""Ajoute des champs au contexte courant. Retourne un token pour restaurer."""
current = _log_context.get()
return _log_context.set({**current, **fields})
def get_context() -> dict[str, Any]:
return _log_context.get()
class ContextFilter(logging.Filter):
"""Injecte le contexte courant dans chaque LogRecord avant formatage."""
def filter(self, record: logging.LogRecord) -> bool:
for key, value in _log_context.get().items():
# On ne réécrit pas un champ déjà fourni explicitement via extra=.
if not hasattr(record, key):
setattr(record, key, value)
return True # ne filtre rien — on l'utilise comme « enricher »Tu attaches le filtre au handler une fois, et tous tes logs sont enrichis :
handler.addFilter(ContextFilter())Maintenant, n'importe où dans la stack d'appel d'une requête :
logger.info("agent.tool_call.started", extra={"tool": "search_web"})
# → produit automatiquement {..., "request_id": "...", "trace_id": "...", "tool": "search_web"}Note
structlog— Tout ce que tu viens de coder à la main (bind_context, formatter JSON, enrichissement automatique) est exactement ce questructlogte donne clé en main, avec une API plus ergonomique (log = log.bind(request_id=...)). En production réelle, beaucoup d'équipes seniors choisissentstructlogpour le confort. Mais comprendre l'implémentation stdlib est non-négociable : c'est elle que tu devras déboguer quandstructlog,uvicornetgunicornse marcheront dessus sur la config des handlers.
Intégration FastAPI : un middleware qui corrèle tout
C'est ici que ça devient concret. Un middleware FastAPI qui : génère un request_id, le binde au contexte, mesure la latence, et logge début + fin de chaque requête.
from __future__ import annotations
import logging
import time
import uuid
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
logger = logging.getLogger(__name__)
class ObservabilityMiddleware(BaseHTTPMiddleware):
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
# Honore un request_id entrant (propagation inter-services) ou en génère un.
request_id = request.headers.get("x-request-id", str(uuid.uuid4()))
token = bind_context(
request_id=request_id,
method=request.method,
path=request.url.path,
)
start = time.perf_counter()
logger.info("http.request.started")
try:
response = await call_next(request)
except Exception:
# On logge l'exception AVANT de la laisser remonter au handler d'erreur.
duration_ms = (time.perf_counter() - start) * 1000
logger.exception(
"http.request.failed", extra={"duration_ms": round(duration_ms, 2)}
)
raise
finally:
_log_context.reset(token) # nettoie le contexte de cette requête
duration_ms = (time.perf_counter() - start) * 1000
logger.info(
"http.request.completed",
extra={
"status_code": response.status_code,
"duration_ms": round(duration_ms, 2),
},
)
response.headers["x-request-id"] = request_id # renvoie-le au client
return response
def create_app() -> FastAPI:
configure_logging()
app = FastAPI()
app.add_middleware(ObservabilityMiddleware)
return app
app = create_app()
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}Subtilité senior dans ce code : le _log_context.reset(token) dans le finally. Sans lui, dans un worker qui réutilise des tâches, le contexte d'une requête pourrait fuiter sur la suivante — un bug d'observabilité classique où les logs de Bob portent le user_id d'Alice. Le pattern token/reset est la bonne discipline.
Piège fréquent — Ne logge pas le corps des requêtes ni les headers
Authorization/Cookieà l'aveugle. C'est à la fois une fuite de PII/secrets (RGPD, conformité) et un gouffre à volume. On y revient dans « En production ».
Le cœur AI : observer un agent Claude
C'est ici que l'observabilité devient vitale plutôt que confortable. Un endpoint qui appelle un agent LLM, sans instrumentation, est une boîte noire qui :
- te coûte de l'argent par token sans que tu saches combien,
- a une latence non bornée (le modèle peut « réfléchir » longtemps),
- échoue de façons non-déterministes (refus, rate limit, overload),
- boucle sur des tool calls dont tu ne vois ni le nombre ni le coût.
Voici un service d'agent streamé, typé, instrumenté, avec le SDK Anthropic. On utilise AsyncAnthropic, le streaming (messages.stream), l'adaptive thinking, le paramètre effort (jamais budget_tokens), les exceptions typées du SDK, et le prompt caching — et on logge tout ce qui compte.
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
import anthropic
from anthropic import AsyncAnthropic
logger = logging.getLogger(__name__)
client = AsyncAnthropic() # lit ANTHROPIC_API_KEY depuis l'environnement
MODEL = "claude-opus-4-8" # flagship : 5 $ / 25 $ par Mtok (input / output) @ 1M ctx
@dataclass(slots=True)
class AgentResult:
text: str
input_tokens: int
output_tokens: int
cache_read_tokens: int
stop_reason: str | None
duration_ms: float
@property
def cost_usd(self) -> float:
"""Coût estimé. Les lectures de cache sont facturées ~0,1× le prix input."""
billable_input = self.input_tokens - self.cache_read_tokens
return (
billable_input * 5.0 / 1_000_000
+ self.cache_read_tokens * 0.5 / 1_000_000 # ~0,1× = 0,50 $/Mtok
+ self.output_tokens * 25.0 / 1_000_000
)
async def run_agent(system_prompt: str, user_message: str) -> AgentResult:
start = time.perf_counter()
logger.info("llm.request.started", extra={"model": MODEL})
try:
async with client.messages.stream(
model=MODEL,
max_tokens=4096,
thinking={"type": "adaptive"}, # ✅ pas de budget_tokens
output_config={"effort": "high"}, # low | medium | high | xhigh | max
system=[
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"}, # cache le préfixe stable
}
],
messages=[{"role": "user", "content": user_message}],
) as stream:
# On consomme le flux. On pourrait pousser chaque delta vers le client SSE ;
# ici on agrège pour le message final.
async for _event in stream:
pass
message = await stream.get_final_message()
except anthropic.RateLimitError:
logger.warning("llm.request.rate_limited", extra={"model": MODEL})
raise
except anthropic.APIStatusError as exc:
# Exceptions TYPÉES — on ne fait jamais de string-matching sur le message.
logger.error(
"llm.request.api_error",
extra={"status": exc.status_code, "error_type": exc.type},
)
raise
duration_ms = (time.perf_counter() - start) * 1000
usage = message.usage
text = "".join(b.text for b in message.content if b.type == "text")
result = AgentResult(
text=text,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cache_read_tokens=usage.cache_read_input_tokens or 0,
stop_reason=message.stop_reason,
duration_ms=round(duration_ms, 2),
)
# LE log qui change ta vie : tout ce qu'il faut pour piloter coût + perf.
logger.info(
"llm.request.completed",
extra={
"model": MODEL,
"input_tokens": result.input_tokens,
"output_tokens": result.output_tokens,
"cache_read_tokens": result.cache_read_tokens,
"stop_reason": result.stop_reason,
"duration_ms": result.duration_ms,
"cost_usd": round(result.cost_usd, 6),
},
)
if result.stop_reason == "max_tokens":
# Signal d'alerte : la réponse a été tronquée. À monitorer.
logger.warning("llm.response.truncated", extra={"model": MODEL})
return resultPourquoi le streaming même si on agrège à la fin ? Parce qu'au-delà de quelques milliers de max_tokens, un appel non-streamé risque de timeout HTTP côté SDK. Le streaming est la façon robuste d'appeler un LLM dès que la sortie peut être longue. Et si tu sers un agent en temps réel à un front (SSE), tu pousses event.text au fur et à mesure — le streaming devient à la fois une nécessité technique et une UX.
Instrumenter la boucle de tool-use
Un vrai agent boucle : le modèle demande un tool call, ton code l'exécute, renvoie le résultat, le modèle continue. Chaque itération est un appel facturé. Une boucle non observée peut partir en vrille (10 itérations à effort: max = facture salée). Tu dois compter et logger chaque tour.
from __future__ import annotations
import logging
import time
from typing import Any
import anthropic
from anthropic import AsyncAnthropic
logger = logging.getLogger(__name__)
client = AsyncAnthropic()
MAX_ITERATIONS = 10 # garde-fou anti-boucle-infinie — TOUJOURS en avoir un
async def run_agent_loop(
tools: list[dict[str, Any]],
tool_handlers: dict[str, Any],
user_message: str,
) -> str:
messages: list[dict[str, Any]] = [{"role": "user", "content": user_message}]
total_input = total_output = 0
for iteration in range(MAX_ITERATIONS):
start = time.perf_counter()
response = await client.messages.create(
model="claude-opus-4-8",
max_tokens=4096,
thinking={"type": "adaptive"},
tools=tools,
messages=messages,
)
total_input += response.usage.input_tokens
total_output += response.usage.output_tokens
logger.info(
"agent.iteration",
extra={
"iteration": iteration,
"stop_reason": response.stop_reason,
"iter_duration_ms": round((time.perf_counter() - start) * 1000, 2),
},
)
if response.stop_reason == "end_turn":
logger.info(
"agent.loop.completed",
extra={
"iterations": iteration + 1,
"total_input_tokens": total_input,
"total_output_tokens": total_output,
},
)
return "".join(b.text for b in response.content if b.type == "text")
# Le modèle veut des outils : on les exécute et on les logge un par un.
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
t0 = time.perf_counter()
try:
output = await tool_handlers[block.name](block.input)
is_error = False
except Exception as exc:
output, is_error = str(exc), True
logger.exception("agent.tool.failed", extra={"tool": block.name})
logger.info(
"agent.tool.executed",
extra={
"tool": block.name,
"is_error": is_error,
"tool_duration_ms": round((time.perf_counter() - t0) * 1000, 2),
},
)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
"is_error": is_error,
}
)
messages.append({"role": "user", "content": tool_results})
# On a épuisé le garde-fou sans end_turn : c'est un INCIDENT, pas un succès.
logger.error("agent.loop.exhausted", extra={"max_iterations": MAX_ITERATIONS})
raise RuntimeError(f"Agent n'a pas convergé en {MAX_ITERATIONS} itérations")Note le logger.error("agent.loop.exhausted") : épuiser le garde-fou n'est pas un cas nominal, c'est un signal d'alerte. Si tu ne le logges pas distinctement (avec un niveau error), tu ne le verras jamais dans tes dashboards.
Sorties structurées : messages.parse
Quand tu attends un JSON typé du modèle (extraction, classification), n'utilise pas un prefill (interdit, 400 sur Opus 4.8). Utilise les sorties structurées natives via client.messages.parse, et logge le stop_reason pour détecter un refus qui casserait le schéma.
from __future__ import annotations
import logging
from anthropic import AsyncAnthropic
from pydantic import BaseModel
logger = logging.getLogger(__name__)
client = AsyncAnthropic()
class Ticket(BaseModel):
priority: str
category: str
summary: str
async def classify(text: str) -> Ticket | None:
message = await client.messages.parse(
model="claude-opus-4-8",
max_tokens=1024,
messages=[{"role": "user", "content": f"Classe ce ticket : {text}"}],
output_config={"format": Ticket}, # schéma natif, validé par le SDK
)
if message.stop_reason == "refusal":
logger.warning("llm.classification.refused")
return None
logger.info(
"llm.classification.completed",
extra={"output_tokens": message.usage.output_tokens},
)
return message.parsed_output⚙️ En production
Le code ci-dessus est correct mais naïf face à la prod. Voici les arbitrages et les pièges qu'un senior anticipe.
Modes de défaillance
- Les logs synchrones bloquent l'event loop.
logging.StreamHandlerécrit de façon synchrone. Sous forte charge async, des centaines delogger.infopar seconde sérialisant du JSON peuvent bloquer ton event loop FastAPI. La parade :logging.handlers.QueueHandler+QueueListener— le handler met le record dans une queue (non-bloquant), un thread dédié fait la sérialisation et l'I/O. C'est le pattern à connaître pour du logging async-safe. - Les agents échouent silencieusement. Un
stop_reason == "refusal"renvoie un HTTP 200 : si ton code litmessage.content[0].textsans vérifier lestop_reason, tu plantes sur un refus ou tu sers une réponse vide en croyant que tout va bien. Toujours brancher surstop_reasonavant de lire le contenu. Idemmax_tokens: réponse tronquée mais code 200. - Boucle d'agent non bornée. Sans
MAX_ITERATIONS, un agent qui « bégaie » sur ses tool calls peut boucler jusqu'à épuiser ton rate limit et ta carte bleue. Le garde-fou est obligatoire, et son déclenchement doit être un logerrorqui alerte. - Fuite de contexte entre requêtes. Oublier le
reset(token)ducontextvarsfait fuiter leuser_idd'une requête sur la suivante dans un worker réutilisé. Bug d'observabilité insidieux car les logs « marchent » — ils mentent juste sur l'identité.
Performance
- Le logging coûte du CPU. Sérialiser du JSON à chaque ligne n'est pas gratuit. Garde
DEBUGdésactivé en prod (et utiliselogger.isEnabledFor(logging.DEBUG)avant de construire un payload de debug coûteux). - Échantillonne les traces, pas les logs d'erreur. Tracer 100 % des requêtes coûte cher (stockage, perf). En prod à fort volume, on échantillonne les traces (ex. 1–10 %) — mais on garde toujours 100 % des traces qui contiennent une erreur (tail-based sampling). Les logs
error/warning, eux, ne s'échantillonnent jamais. - Le cache Anthropic est un levier de coût observable. Si tes
cache_read_tokenssont à zéro alors que tu envoies un gros system prompt stable, c'est qu'un invalidateur silencieux (un timestamp, un UUID, un dict non trié dans le préfixe) casse le cache. Tu ne le verras que parce que tu loggescache_read_tokens— d'où l'importance de l'instrumenter.
Sécurité
- Ne logge jamais de secrets ni de PII brute. Pas de header
Authorization, pas de mot de passe, pas de numéro de carte, pas de contenu utilisateur sensible. Mets en place un filtre de rédaction (regex sur les patterns de tokens/clés, allowlist de champs). Pour un agent LLM : attention au contenu des messages utilisateurs et des tool results, qui peuvent contenir des données personnelles. - Les
request_id/trace_idne sont pas des secrets — propage-les volontiers entre services (headerx-request-id). Mais ne mets jamais d'identifiant sensible (token de session) dans un trace_id. - Le volume de logs est une surface d'attaque DoS. Un endpoint qui logge le corps complet de chaque requête peut être saturé par un attaquant qui envoie des payloads massifs. Borne ce que tu logges.
Observabilité (le méta-niveau)
- OpenTelemetry (OTel) est le standard à apprendre. Plutôt que de coder métriques et traces à la main, instrumente avec OTel :
opentelemetry-instrumentation-fastapiauto-trace tes endpoints, et tu exportes vers n'importe quel backend (Grafana Tempo, Jaeger, Datadog, Honeycomb) sans réécrire ton code. Letrace_idd'OTel est exactement celui que tu veux injecter dans tes logs via lecontextvars. - Les 4 « golden signals » (Google SRE) : latence, trafic, erreurs, saturation. Pour un agent, ajoute : coût par requête, tokens par requête, taux de refus/troncature, profondeur de boucle. Ce sont tes SLI.
- Alerte sur les symptômes, pas les causes. Alerte sur « le p99 de latence dépasse 5 s » ou « le taux d'erreur dépasse 2 % », pas sur « le CPU est à 80 % ». L'utilisateur ne ressent pas le CPU, il ressent la latence.
🏋️ Exercices
Exercice 1 — Le formatter JSON robuste (implémenter)
Objectif — Écris un JsonFormatter qui gère correctement : les exceptions (exc_info), les champs extra, un champ service et env injectés à la configuration, et qui ne crash jamais même si un champ extra n'est pas sérialisable JSON (ex. un objet custom).
Indice/Solution — Pars du JsonFormatter de la leçon. Pour la non-sérialisabilité, json.dumps(..., default=str) couvre la plupart des cas, mais teste avec un objet qui lève dans __str__ : entoure la sérialisation d'un try/except qui produit au minimum un JSON {"event": "...", "_serialization_error": "..."}. Un formatter qui crash fait perdre tous les logs — c'est inacceptable.
Exercice 2 — Le contexte qui ne fuit pas (implémenter + tester)
Objectif — Implémente bind_context/reset puis prouve par un test qu'un contextvars ne fuit pas entre deux tâches asyncio concurrentes : lance deux coroutines qui bindent chacune un request_id différent, font un await asyncio.sleep(0) entrelacé, et vérifient que chacune voit toujours son id.
Indice/Solution — Le test ressemble à : await asyncio.gather(task("A"), task("B")) où chaque task fait bind_context(rid=x), await asyncio.sleep(0), puis assert get_context()["rid"] == x. Le test passe nativement grâce à contextvars (chaque tâche a sa copie). Pour comprendre par contraste : refais-le avec une simple variable globale dict — le test échoue. Ça ancre pourquoi contextvars existe.
Exercice 3 — Le tracker de coût d'agent (production-grade)
Objectif — Construis un context manager track_llm_cost() qui, sur tout un bloc d'appels à Claude (boucle de tool-use comprise), accumule input_tokens, output_tokens, cache_read_tokens, calcule le coût total en USD avec les tarifs réels (claude-opus-4-8 : 5 $/25 $ par Mtok ; lectures de cache ~0,1× l'input), et logge un récapitulatif agent.session.cost à la sortie. Bonus : ajoute un budget qui logge un warning si la session dépasse un seuil en USD.
Indice/Solution — Un contextlib.asynccontextmanager qui yield un petit objet accumulateur (dataclass mutable avec add(usage)). Tes fonctions d'appel récupèrent l'accumulateur via un contextvars (même pattern que le contexte de log) et appellent acc.add(response.usage) après chaque appel. À la sortie du with, calcule le coût et logge. Le budget : compare acc.cost_usd à un seuil et logger.warning("agent.session.over_budget", extra={...}).
Exercice 4 — Détecter le refus et la troncature (casser puis réparer)
Objectif — Pars du classify() de la leçon. Casse-le : retire la vérification stop_reason, puis fabrique un cas où le modèle refuse (un prompt qui déclenche un refus) ou tronque (max_tokens=5). Observe le crash ou la réponse vide silencieuse. Puis répare : branche sur stop_reason (refusal, max_tokens) avant de lire parsed_output, logge le bon niveau pour chaque cas, et renvoie un résultat explicite (None ou une erreur métier) plutôt qu'un plantage.
Indice/Solution — La version cassée fait return message.parsed_output sans garde → sur un refus, parsed_output est None et tu propages un None silencieux ; sur troncature, le JSON est incomplet et le parsing échoue. La réparation : if message.stop_reason == "refusal": logger.warning(...); return None et if message.stop_reason == "max_tokens": logger.error("llm.truncated"); raise .... La leçon de fond : un HTTP 200 d'un LLM ne signifie pas un succès métier.
Exercice 5 — Le handler non-bloquant (production-grade)
Objectif — Remplace le StreamHandler synchrone par un couple QueueHandler + QueueListener pour que l'I/O des logs ne bloque jamais l'event loop. Mesure : sous une boucle qui émet 50 000 logs, compare le temps de bout-en-bout avec le handler synchrone vs. le handler en queue.
Indice/Solution — queue.Queue(-1), un QueueHandler(q) attaché au root, et un QueueListener(q, real_handler) que tu .start() au boot et .stop() au shutdown (lifespan FastAPI). Le QueueHandler se contente de put (rapide, non-bloquant) ; le QueueListener consomme dans un thread dédié et fait la sérialisation JSON + l'écriture. Piège à documenter : pense à listener.stop() proprement, sinon tu perds les derniers logs au shutdown.
Exercice 6 — Corréler une trace OTel avec tes logs (intégration)
Objectif — Branche opentelemetry-instrumentation-fastapi sur l'app, et fais en sorte que le trace_id généré par OTel soit injecté dans chaque log JSON via ton contextvars. Vérifie qu'une requête produit une trace et des logs portant le même trace_id.
Indice/Solution — Dans le middleware (ou un processeur de log), récupère le span courant via opentelemetry.trace.get_current_span().get_span_context().trace_id, formate-le en hex (format(trace_id, "032x")), et bind_context(trace_id=...). Le test final : appelle l'endpoint, récupère le trace_id du header de réponse OTel, et grep ce même id dans la sortie JSON des logs. Quand les deux coïncident, tu as fait de l'observabilité, pas juste du logging.
🎤 En entretien
Q : Quelle est la différence entre logs, métriques et traces, et quand utiliser quoi ? R : Les logs sont des événements discrets et riches (« quoi exactement, à quel instant »), les métriques sont des agrégats numériques bon marché à stocker (« combien, à quelle vitesse, quel taux »), les traces suivent une requête à travers plusieurs services (« où est passé le temps »). On les corrèle par trace_id : on alerte sur une métrique, on saute à la trace fautive, on lit les logs de cette trace.
Q : Pourquoi le logging structuré plutôt que des messages textuels, et pourquoi contextvars plutôt que passer le request_id en argument ? R : Le structuré rend les logs queryables/agrégables par une machine (filtrer, compter, alerter sans avoir anticipé la question) ; contextvars propage automatiquement le contexte de requête à travers toute la stack async sans polluer chaque signature, et garantit l'isolation par tâche asyncio (pas de fuite entre requêtes concurrentes), là où une variable globale fuiterait.
Q : Comment instrumenter un agent LLM en production, et qu'est-ce qui change par rapport à un service classique ? R : En plus des golden signals (latence, trafic, erreurs, saturation), j'instrumente les tokens (input/output/cache), le coût USD par requête, le stop_reason, la profondeur de boucle de tool-use et la latence par tool call — parce qu'un appel LLM est une transaction facturée, non-déterministe et à latence non bornée ; un HTTP 200 avec stop_reason: "refusal" ou max_tokens est un échec métier qu'un monitoring HTTP classique ne verrait pas.
Q : Ton logging synchrone ralentit ton service FastAPI sous charge. Que fais-tu ? R : Je passe à un QueueHandler/QueueListener : le handler dépose le record dans une queue (non-bloquant pour l'event loop), un thread dédié fait la sérialisation JSON et l'I/O. Je vérifie aussi que DEBUG est désactivé en prod, je garde les payloads coûteux derrière isEnabledFor, et j'échantillonne les traces (tail-based, en gardant 100 % des traces en erreur) plutôt que les logs d'erreur.