Observabilité (OTel, request-id)
TL;DR — L'observabilité, c'est répondre à « que s'est-il passé pour cette requête ? » sans rejouer le bug. On la construit sur trois piliers : les logs (événements structurés en JSON), les métriques (agrégats numériques : latence, débit, taux d'erreur) et les traces (l'arbre causal d'une requête à travers vos services). Le fil rouge qui relie les trois est un identifiant de corrélation (
request_id/trace_id) propagé viacontextvarspour survivre àasync/await. En FastAPI, on instrumente avec OpenTelemetry (OTel) — un standard vendor-neutral — plus un middleware qui injecte lerequest_id. Quand vous servez un agent LLM Anthropic, l'observabilité devient vitale : une requête peut durer 90 s, brûler 2 $ de tokens, boucler 8 fois sur des outils, et échouer silencieusement sur unstop_reason: "refusal". Sans spans, usage tokens et corrélation, vous débuggez à l'aveugle un système non-déterministe.
🧠 Mental model
Vous venez de NestJS : vous connaissez les interceptors, le Logger, peut-être @nestjs/terminus. Transposez ainsi :
| NestJS / TS | FastAPI / Python | Rôle |
|---|---|---|
LoggerService (Pino/Winston) | structlog / logging JSON | Logs structurés |
Interceptor (NestInterceptor) | Middleware ASGI | Capter chaque requête |
AsyncLocalStorage | contextvars.ContextVar | Propager le request_id dans l'async |
@opentelemetry/sdk-node | opentelemetry-sdk | Traces/métriques OTel |
cls-rtracer | middleware request-id maison | Corrélation |
L'analogie centrale. Imaginez un colis dans un réseau de tri postal. Le log, c'est le carnet de bord d'un centre de tri (« colis arrivé 14h02, scanné, parti 14h05 »). La métrique, c'est le tableau de bord du directeur (« 12 000 colis/h, 0,3 % perdus »). La trace, c'est le suivi du colis individuel : chaque scan est un span, et l'ensemble des scans forme l'arbre du voyage. Le numéro de suivi imprimé sur l'étiquette — le trace_id — est ce qui permet de tout relier. Sans lui, vous avez trois sources de vérité qui ne se parlent pas.
Requête HTTP entrante
│
▼
┌─────────────────────────────────┐
│ Middleware request-id │ ← génère/lit X-Request-ID
│ contextvar.set(request_id) │ pose le contexte
└───────────────┬─────────────────┘
│
┌──────────────┼───────────────────────────────┐
│ SPAN racine: POST /agent/chat (trace_id=abc) │
│ ├─ SPAN: db.query (12 ms) │
│ ├─ SPAN: anthropic.messages.stream │
│ │ ├─ event: tokens_in=4200 │
│ │ ├─ SPAN: tool.search_docs (310 ms) │
│ │ └─ event: tokens_out=890 │
│ └─ log{request_id=abc, level=info, ...} │
└───────────────────────────────────────────────┘
│
▼
Collector OTel → Jaeger / Tempo / Datadog
Logs JSON → Loki / Elasticsearch
Métriques → PrometheusLe point clé : un seul trace_id traverse logs, métriques exemplars et spans. Quand un client signale « ma requête de 14h a planté », vous cherchez le request_id, et il vous donne d'un coup la trace complète et tous les logs.
Pilier 0 : la corrélation avec contextvars
Avant tout, le problème fondateur en async. Une variable globale ne marche pas : 200 coroutines partagent le même thread, elles s'écraseraient mutuellement le request_id. La solution Python est contextvars — l'équivalent de l'AsyncLocalStorage de Node : chaque tâche asyncio hérite d'une copie du contexte.
# observability/context.py
from __future__ import annotations
import uuid
from contextvars import ContextVar
# La valeur par défaut "-" signale "hors requête" (tâche de fond, startup...).
_request_id: ContextVar[str] = ContextVar("request_id", default="-")
def set_request_id(value: str | None = None) -> str:
rid = value or uuid.uuid4().hex
_request_id.set(rid)
return rid
def get_request_id() -> str:
return _request_id.get()❌ La façon naïve (qui fuit entre requêtes)
# NE FAITES PAS ÇA
current_request_id: str = "-" # variable de module, partagée par tout le process
async def handler() -> None:
global current_request_id
current_request_id = uuid.uuid4().hex
await some_io() # ⚠️ pendant cet await, une AUTRE requête écrase la globale
log(current_request_id) # ← peut afficher l'ID d'une requête concurrenteSous charge, deux requêtes qui s'entrelacent sur un await mélangent leurs IDs. Vous obtenez des logs où la requête A porte l'ID de la requête B. Indébuggable.
✅ La façon idiomatique
ContextVar.set() est isolé par contexte d'exécution. Un await ne fait pas fuiter la valeur vers une autre coroutine, parce qu'asyncio propage une copie du contexte à chaque Task. C'est exactement la garantie qu'on veut.
Pilier 1 : logs structurés en JSON
En prod, on ne loggue jamais en texte libre. On loggue des objets : un parseur (Loki, Elastic) doit pouvoir filtrer request_id="abc" AND level="error". On branche le request_id automatiquement via un Filter logging.
# observability/logging_setup.py
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import Any
from .context import get_request_id
class RequestIdFilter(logging.Filter):
"""Injecte le request_id du contexte courant dans chaque LogRecord."""
def filter(self, record: logging.LogRecord) -> bool:
record.request_id = get_request_id()
return True
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, Any] = {
"ts": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
"request_id": getattr(record, "request_id", "-"),
}
# Les champs ajoutés via logger.info("...", extra={...}) sont conservés.
for key, value in getattr(record, "__dict__", {}).items():
if key not in _RESERVED and not key.startswith("_"):
payload.setdefault(key, value)
if record.exc_info:
payload["exc"] = self.formatException(record.exc_info)
return json.dumps(payload, default=str, ensure_ascii=False)
_RESERVED = set(logging.LogRecord("", 0, "", 0, "", (), None).__dict__) | {
"request_id",
"taskName",
}
def configure_logging(level: int = logging.INFO) -> None:
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
handler.addFilter(RequestIdFilter())
root = logging.getLogger()
root.handlers.clear() # on retire le handler par défaut de uvicorn
root.addHandler(handler)
root.setLevel(level)Désormais logger.info("agent terminé", extra={"tokens_out": 890}) produit :
{"ts":"2026-06-16T12:00:01Z","level":"INFO","msg":"agent terminé","request_id":"abc","tokens_out":890}Le request_id est là sans qu'aucun appelant ait eu à le passer. C'est tout l'intérêt du contextvar.
Pilier 2 : le middleware request-id
On capte chaque requête au plus tôt. Un middleware ASGI pur (pas un décorateur de route) garantit qu'on couvre toutes les routes, y compris les erreurs de validation.
# observability/middleware.py
from __future__ import annotations
import time
import logging
from starlette.types import ASGIApp, Message, Receive, Scope, Send
from .context import set_request_id, get_request_id
logger = logging.getLogger("http")
_HEADER = b"x-request-id"
class RequestIdMiddleware:
"""Middleware ASGI : lit/génère X-Request-ID, le pose dans le contexte,
le renvoie dans la réponse, et loggue la latence."""
def __init__(self, app: ASGIApp) -> None:
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
headers = dict(scope["headers"])
incoming = headers.get(_HEADER)
rid = set_request_id(incoming.decode() if incoming else None)
start = time.perf_counter()
async def send_wrapper(message: Message) -> None:
if message["type"] == "http.response.start":
# On renvoie l'ID au client pour qu'il puisse le citer dans un ticket.
raw_headers = message.setdefault("headers", [])
raw_headers.append((_HEADER, rid.encode()))
await send(message)
try:
await self.app(scope, receive, send_wrapper)
finally:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"request terminée",
extra={
"method": scope["method"],
"path": scope["path"],
"elapsed_ms": round(elapsed_ms, 2),
},
)Pourquoi un middleware ASGI brut, pas @app.middleware("http") ?
Le décorateur BaseHTTPMiddleware de Starlette enveloppe la requête dans une StreamingResponse interne, ce qui casse le streaming SSE (votre flux de tokens LLM serait bufferisé puis renvoyé d'un bloc) et complique la propagation des contextvars. Le middleware ASGI de classe ci-dessus, lui, est transparent au streaming. Pour un service qui stream des tokens, c'est non négociable.
Pilier 3 : OpenTelemetry (traces + métriques)
OTel est le standard. On configure un TracerProvider, on exporte vers un collector (OTLP), et on auto-instrumente FastAPI + httpx d'une ligne.
# observability/otel.py
from __future__ import annotations
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from fastapi import FastAPI
def configure_tracing(app: FastAPI, *, service_name: str, otlp_endpoint: str) -> None:
resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)
# BatchSpanProcessor : exporte par lots en arrière-plan (jamais sur le chemin
# critique de la requête). Surtout PAS SimpleSpanProcessor en prod (synchrone).
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(provider)
# Auto-instrumentation : crée un span racine par requête + propage le contexte W3C.
FastAPIInstrumentor.instrument_app(app)
# Instrumente httpx : tout appel sortant (dont l'API Anthropic) devient un span enfant.
HTTPXClientInstrumentor().instrument()
def get_tracer() -> trace.Tracer:
return trace.get_tracer("app")Le trace_id d'OTel et notre request_id maison doivent converger. Le plus propre : aligner le request_id sur le trace_id OTel quand une trace existe, sinon générer. On enrichit set_request_id :
# observability/context.py (variante alignée OTel)
from opentelemetry import trace
def set_request_id(value: str | None = None) -> str:
span = trace.get_current_span()
ctx = span.get_span_context()
if ctx.is_valid: # une trace OTel existe déjà → on s'aligne dessus
rid = format(ctx.trace_id, "032x")
else:
rid = value or uuid.uuid4().hex
_request_id.set(rid)
return ridMaintenant logs, traces et l'en-tête X-Request-ID partagent le même identifiant.
Assemblage : l'application
# main.py
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from observability.logging_setup import configure_logging
from observability.middleware import RequestIdMiddleware
from observability.otel import configure_tracing
@asynccontextmanager
async def lifespan(app: FastAPI):
configure_logging(level=logging.INFO)
yield
app = FastAPI(lifespan=lifespan)
app.add_middleware(RequestIdMiddleware)
configure_tracing(app, service_name="agent-gateway", otlp_endpoint="http://collector:4317")Ordre des middlewares
Starlette exécute les middlewares dans l'ordre inverse de leur ajout (LIFO, comme une pile). Ajoutez RequestIdMiddleware en dernier pour qu'il s'exécute en premier : le request_id doit être posé avant que tout autre middleware ne loggue.
⚙️ En production
Servir un agent LLM Anthropic, instrumenté de bout en bout
C'est ici que l'observabilité paie. Un endpoint d'agent qui stream des tokens, boucle sur des outils, et expose chaque étape comme un span. On utilise le SDK Python AsyncAnthropic avec messages.stream, la pensée adaptative et l'effort via output_config.
# routes/agent.py
from __future__ import annotations
import logging
from anthropic import AsyncAnthropic
import anthropic
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from observability.otel import get_tracer
from observability.context import get_request_id
router = APIRouter()
logger = logging.getLogger("agent")
tracer = get_tracer()
# Le client est créé une fois (pool de connexions httpx réutilisé, déjà instrumenté OTel).
client = AsyncAnthropic(max_retries=4) # retries SDK + backoff exponentiel sur 429/5xx
class ChatIn(BaseModel):
message: str
SYSTEM_PROMPT = "Tu es un assistant concis. Réponds en français."
@router.post("/agent/chat")
async def agent_chat(body: ChatIn) -> StreamingResponse:
rid = get_request_id()
async def token_stream():
# Span métier : on mesure tout l'appel LLM, distinct du span httpx.
with tracer.start_as_current_span("anthropic.messages.stream") as span:
span.set_attribute("llm.model", "claude-opus-4-8")
span.set_attribute("request_id", rid)
try:
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=4096,
system=[
{
"type": "text",
"text": SYSTEM_PROMPT,
# Cache du prompt système : ~0,1× sur les lectures.
"cache_control": {"type": "ephemeral"},
}
],
thinking={"type": "adaptive"}, # PAS de budget_tokens (4.8)
output_config={"effort": "high"},
messages=[{"role": "user", "content": body.message}],
) as stream:
async for text in stream.text_stream:
yield text
final = await stream.get_final_message()
# Observabilité métier : le coût et l'issue, sur le span.
span.set_attribute("llm.input_tokens", final.usage.input_tokens)
span.set_attribute("llm.output_tokens", final.usage.output_tokens)
span.set_attribute(
"llm.cache_read_tokens",
final.usage.cache_read_input_tokens or 0,
)
span.set_attribute("llm.stop_reason", final.stop_reason or "")
logger.info(
"agent terminé",
extra={
"input_tokens": final.usage.input_tokens,
"output_tokens": final.usage.output_tokens,
"stop_reason": final.stop_reason,
},
)
except anthropic.RateLimitError:
span.set_attribute("error", True)
logger.warning("rate limit Anthropic", exc_info=True)
raise HTTPException(status_code=429, detail="LLM rate limited")
except anthropic.APIError as exc:
span.set_attribute("error", True)
logger.error("erreur API Anthropic", extra={"status": exc.status_code if hasattr(exc, "status_code") else None}, exc_info=True)
raise HTTPException(status_code=502, detail="LLM upstream error")
return StreamingResponse(
token_stream(),
media_type="text/event-stream",
headers={"X-Request-ID": rid}, # le client peut citer cet ID dans un ticket
)Ce qu'on a gagné : pour chaque requête d'agent, une trace montre la latence du LLM, le nombre de tokens entrée/sortie, le taux de cache, et le stop_reason. Si un utilisateur dit « la réponse s'est coupée », vous lisez stop_reason: "max_tokens" sur le span sans rejouer.
Lever une HTTPException à l'intérieur d'un générateur de stream
Subtilité importante : une fois que StreamingResponse a commencé à émettre, la ligne de statut et les en-têtes sont déjà envoyés au client. Lever une HTTPException dans token_stream() ne produira donc pas un vrai 429/502 — le client a déjà reçu un 200 OK et reçoit ensuite une réponse tronquée. C'est pourquoi, ci-dessus, l'essentiel du travail d'observabilité est fait avant de lever : on enregistre l'erreur sur le span (span.set_attribute("error", True)) et dans le log structuré, qui restent corrects quoi qu'il arrive. Pour signaler proprement l'échec au client en SSE, la pratique senior est d'émettre un événement d'erreur dans le flux (yield 'event: error\\ndata: {...}\\n\\n') avant de couper, plutôt que de compter sur le code HTTP. Réservez le raise HTTPException aux échecs qui surviennent avant le premier yield.
La boucle d'outils (tool-use), tracée
Quand l'agent appelle des outils, chaque itération doit être un span. Voici le squelette d'une boucle manuelle instrumentée — chaque exécution d'outil est un span enfant, on voit ainsi où part le temps.
async def run_tool_loop(client: AsyncAnthropic, user_msg: str) -> str:
tracer = get_tracer()
messages: list[dict] = [{"role": "user", "content": user_msg}]
tools = [{
"name": "search_docs",
"description": "Cherche dans la doc interne. À appeler quand la question porte sur le produit.",
"input_schema": {"type": "object", "properties": {"q": {"type": "string"}}, "required": ["q"]},
}]
for _ in range(8): # garde-fou : jamais de boucle infinie
resp = await client.messages.create(
model="claude-opus-4-8",
max_tokens=4096,
thinking={"type": "adaptive"},
tools=tools,
messages=messages,
)
if resp.stop_reason == "end_turn":
return "".join(b.text for b in resp.content if b.type == "text")
messages.append({"role": "assistant", "content": resp.content})
results = []
for block in resp.content:
if block.type == "tool_use":
with tracer.start_as_current_span(f"tool.{block.name}") as span:
span.set_attribute("tool.input", str(block.input))
output = await execute_tool(block.name, block.input) # votre code
span.set_attribute("tool.output_len", len(output))
results.append({
"type": "tool_result",
"tool_use_id": block.id, # ne jamais oublier l'ID de corrélation
"content": output,
})
messages.append({"role": "user", "content": results})
raise RuntimeError("boucle d'outils non convergente après 8 itérations")Les arbitrages senior
- Failure modes. L'erreur classique :
BatchSpanProcessoraccumule des spans en mémoire si le collector est down → fuite mémoire lente. Configurezmax_queue_sizeet acceptez que des spans soient droppés (l'observabilité ne doit jamais faire tomber la prod). Côté logs : unJsonFormatterqui lève sur un objet non-sérialisable plante le handler ; d'où ledefault=strdansjson.dumps. - Performance. Tracer a un coût. N'instrumentez pas chaque petite fonction — instrumentez les frontières (HTTP, DB, LLM, outils). Le
BatchSpanProcessorexporte hors du chemin critique ; ne le remplacez jamais parSimpleSpanProcessoren prod (il bloque sur chaque export). - Sampling. À fort débit, ne tracez pas 100 % des requêtes. Utilisez un
ParentBasedTraceIdRatio(ex. 10 %). Mais gardez toutes les traces en erreur (tail-based sampling au niveau du collector) — ce sont elles qu'on veut. - Sécurité / PII. Ne loggez jamais le prompt utilisateur brut ni la sortie du LLM sans réflexion : ils contiennent souvent des données personnelles. Loggez la longueur, le nombre de tokens, le stop_reason — pas le contenu. Idem pour les clés API :
AsyncAnthropic()litANTHROPIC_API_KEYde l'environnement, ne la mettez jamais dans un span. Les en-têtesAuthorizationdoivent être filtrés de l'auto-instrumentation FastAPI. - Coût LLM = métrique de premier ordre. Exportez
input_tokens+output_tokenscomme une métrique OTel (Counter), taggée parmodeletroute. C'est votre facture en temps réel. Un agent qui boucle 8 fois sur des outils peut coûter 50× une requête normale : sans cette métrique, vous le découvrez sur la facture en fin de mois. - Le
cache_read_input_tokens. Surveillez-le : s'il reste à zéro alors que vous avez posé uncache_control, un invalidateur silencieux casse votre cache (timestamp dans le system prompt, outils réordonnés). C'est une métrique d'observabilité et d'optimisation de coût. - Health checks. Exposez
/healthz(liveness, pas de dépendance) et/readyz(readiness : DB joignable, clé LLM présente). Excluez-les du tracing (FastAPIInstrumentor(..., excluded_urls="healthz,readyz")), sinon ils polluent vos traces.
🏋️ Exercices
Exercice 1 — Le request-id qui fuit (implémenter → comprendre)
Objectif. Reproduire la fuite de request_id entre requêtes concurrentes avec une variable globale, puis la corriger avec contextvars. Écrivez deux endpoints /leaky et /safe qui posent un ID, await asyncio.sleep(0.1), puis le relisent. Lancez 50 requêtes concurrentes avec httpx.AsyncClient et comptez combien relisent un ID différent de celui posé. Indice/Solution. Sur /leaky (variable de module) vous verrez des dizaines de mismatches ; sur /safe (ContextVar), zéro. Le await est le point où asyncio bascule entre coroutines — c'est précisément là que la globale se fait écraser, mais où la copie de contexte tient bon.
Exercice 2 — Streaming SSE non bufferisé (implémenter → casser → réparer)
Objectif. Montrer que BaseHTTPMiddleware casse le streaming. Créez un endpoint qui yield un token toutes les 200 ms. Vérifiez côté client (curl -N) que les tokens arrivent en flux. Puis ajoutez un @app.middleware("http") qui lit response.body → observez que le flux devient un bloc unique. Réparez avec le middleware ASGI de classe de la leçon. Indice/Solution. BaseHTTPMiddleware consomme la StreamingResponse pour la ré-emballer. Le middleware ASGI brut wrappe send sans toucher au corps → transparent. Pour un agent LLM, c'est la différence entre voir les tokens apparaître et attendre 90 s un mur de texte.
Exercice 3 — Métrique de coût tokens (implémenter → production-grade)
Objectif. Exporter une métrique OTel Counter llm.tokens.total taggée {model, kind=input|output, route}, incrémentée à chaque appel Anthropic. Branchez un exporter Prometheus et tracez sur Grafana le coût estimé en $/min (input × 5/1e6 + output × 25/1e6 pour claude-opus-4-8). Indice/Solution. meter.create_counter("llm.tokens.total") puis counter.add(usage.input_tokens, {"kind": "input", "model": ..., "route": ...}). L'astuce prod : faites-le dans un finally pour compter même les requêtes qui échouent en cours de stream (tokens déjà facturés).
Exercice 4 — Corréler une erreur via le trace_id (production-grade)
Objectif. Provoquer une anthropic.APIError (clé invalide), la capturer, et garantir que le request_id du log d'erreur, l'en-tête X-Request-ID de la réponse 502, et le trace_id du span en erreur sont identiques. Écrivez un test qui appelle l'endpoint et assert l'égalité des trois. Indice/Solution. C'est l'alignement set_request_id ← trace.get_current_span().get_span_context().trace_id. Le test : assert resp.headers["x-request-id"] == captured_log_record.request_id. Si ça diverge, votre middleware s'exécute avant que le span OTel ne soit créé — corrigez l'ordre LIFO.
Exercice 5 — Sampling sans perdre les erreurs (casser → réparer)
Objectif. Configurer un sampling à 5 % puis montrer qu'une trace en erreur sur 20 est perdue. Implémentez un sampler custom (ou config collector) qui échantillonne 5 % du trafic mais 100 % des spans portant error=true. Indice/Solution. Le head-based sampling décide avant de connaître l'issue → il ne peut pas garder les erreurs. La vraie solution est le tail-based sampling dans l'OTel Collector (tail_sampling processor avec une policy status_code: ERROR). Côté SDK, on peut au mieux forcer sampled sur le span quand on set error.
Exercice 6 — Fuite mémoire du BatchSpanProcessor (casser → réparer)
Objectif. Couper le collector, envoyer 100 000 spans, observer la RAM grimper. Corrigez avec BatchSpanProcessor(exporter, max_queue_size=2048) et vérifiez que les spans excédentaires sont droppés (compteur dropped_spans) plutôt que de saturer la mémoire. Indice/Solution. Par défaut la queue est bornée à 2048, mais un mauvais réglage (ou un SimpleSpanProcessor) peut bloquer ou fuir. La leçon senior : l'observabilité dégrade gracieusement — perdre des traces est acceptable, faire tomber le service ne l'est pas.
🎤 En entretien
Q : Pourquoi contextvars et pas une variable globale ou threading.local pour le request-id en FastAPI ? R : FastAPI tourne sur un event loop async monothread : une globale serait écrasée entre coroutines concurrentes sur chaque await, et threading.local ne distingue pas les coroutines (elles partagent le thread). contextvars propage une copie isolée du contexte à chaque Task asyncio — c'est l'équivalent de l'AsyncLocalStorage de Node.
Q : Logs, métriques, traces — quand utiliser quoi ? R : Métriques pour les tendances agrégées et les alertes (latence p99, taux d'erreur, $/min de tokens) — peu coûteuses, cardinalité bornée. Traces pour comprendre une requête lente à travers les services (l'arbre causal des spans). Logs pour le détail d'un événement précis. Le trace_id partagé est ce qui permet de sauter de l'un à l'autre.
Q : Comment instrumenter un appel LLM Anthropic en streaming sans casser le flux ni rater le coût ? R : Span métier autour de client.messages.stream, on yield les tokens via stream.text_stream, puis await stream.get_final_message() pour récupérer usage (tokens, cache) qu'on pose en attributs de span et en métrique counter. Crucial : un middleware ASGI brut (pas BaseHTTPMiddleware) pour ne pas bufferiser le SSE, et le calcul du coût dans un finally pour compter même les streams interrompus.
Q : Vous tracez 100 % des requêtes ? R : Non, à fort débit on échantillonne (ex. 5–10 %) pour le coût de stockage. Mais on garde 100 % des traces en erreur via du tail-based sampling au niveau du collector — la décision d'échantillonnage est prise après avoir vu l'issue, ce que le head-based sampling ne peut pas faire. Et on n'oublie jamais de filtrer la PII et les secrets (clés API, en-têtes Authorization, contenu des prompts) avant export.