SSE streaming (tokens LLM)
TL;DR — SSE (Server-Sent Events) est un canal HTTP unidirectionnel serveur → client, en texte clair, qui réutilise une simple connexion
GET: le serveur garde la réponse ouverte et pousse des évènementsdata: ...\n\nau fil de l'eau. C'est exactement le bon outil pour streamer les tokens d'un agent LLM vers un navigateur : pas de handshake WebSocket, reconnexion automatique gratuite (Last-Event-ID), et il passe à travers tous les proxies HTTP. En FastAPI on l'expose avec uneStreamingResponse(ousse-starlette) dont le corps est un générateur async ; côté Anthropic, on branche ce générateur surAsyncAnthropic().messages.stream(...)et on relaye chaquetextdelta. Le piège senior n'est pas d'écrire leyield— c'est de gérer le buffering des proxies, la déconnexion client (sinon l'appel LLM continue de coûter de l'argent), le heartbeat, et le fait qu'async def+ un bloc bloquant tuent tout l'event loop.
🧠 Mental model
Tu connais déjà trois façons de renvoyer une réponse HTTP. SSE est la quatrième, et la plus simple des trois "temps réel".
client → serveur serveur → client transport
Requête classique 1 1 HTTP (fermé après)
Polling / long-polling N N HTTP (N requêtes)
SSE 1 ∞ HTTP (1 connexion, gardée ouverte)
WebSocket ∞ ∞ ws:// (full-duplex)L'analogie : un WebSocket est un appel téléphonique — les deux parties parlent quand elles veulent, mais il faut composer le numéro, établir la ligne, gérer le raccrochage. SSE est un flux radio : tu allumes le poste (GET), le serveur émet en continu, tu écoutes. Tu ne peux pas parler à la radio (unidirectionnel), mais pour "afficher des tokens qui arrivent", c'est précisément ce qu'il faut — et brancher/débrancher la radio est trivial.
Pour un agent LLM, le modèle mental est encore plus net. Le LLM génère token par token, de façon intrinsèquement séquentielle et lente (plusieurs secondes pour une réponse longue). Sans streaming, l'utilisateur fixe un spinner pendant 8 secondes puis reçoit un pavé. Avec SSE, le premier token s'affiche en ~300 ms et le texte se "tape" tout seul. Le TTFT (time-to-first-token) devient ta métrique de latence perçue, pas le temps total.
navigateur FastAPI (async gen) Anthropic API
│ GET /chat (SSE) │ │
├──────────────────────────▶ │
│ │ messages.stream(...) │
│ ├──────────────────────────────▶
│ │◀── text_delta "Bon" │
│◀── data: {"t":"Bon"} │ │
│ │◀── text_delta "jour" │
│◀── data: {"t":"jour"} │ │
│ ... │ ... │
│◀── event: done │◀── message_stop │
│ (connexion fermée) │ │Le serveur FastAPI est un tuyau qui transforme : il consomme un stream (celui d'Anthropic, lui aussi en SSE sous le capot) et en réémet un autre (le tien, vers le navigateur), en changeant éventuellement le format au passage.
Le wire format SSE (ce que voit vraiment le navigateur)
SSE n'est pas du JSON, c'est un format texte ligne par ligne défini par la spec EventSource. Un évènement = un ou plusieurs champs clé: valeur, terminé par une ligne vide :
event: token
data: {"text": "Bonjour"}
id: 42
event: done
data: [DONE]Les champs qui comptent :
data:— la charge utile (souvent du JSON sérialisé, mais c'est libre). Plusieurs lignesdata:consécutives sont concaténées avec\n.event:— un nom d'évènement nommé ; côté JS tu écouteses.addEventListener("token", ...). Par défaut c'est l'évènementmessage.id:— identifiant ; le navigateur le renvoie dans l'en-têteLast-Event-IDà la reconnexion. C'est la fonctionnalité qui rend SSE robuste gratuitement.retry:— délai de reconnexion en ms.- Une ligne commençant par
:est un commentaire — on s'en sert comme heartbeat (: keep-alive\n\n).
Le \n\n final (ligne vide) est obligatoire pour que le client "valide" l'évènement. L'oublier = le navigateur bufferise indéfiniment et tu crois que rien n'arrive.
La bonne façon : un endpoint FastAPI fully-typed
On veut un agent qui répond en streamant. Modèle Pydantic v2 en entrée, générateur async en sortie, client Anthropic injecté par DI.
# app/main.py
from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Annotated
import anthropic
from anthropic import AsyncAnthropic
from fastapi import Depends, FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
MODEL = "claude-opus-4-8"
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Un seul client pour tout le process : il gère un pool de connexions HTTP
# et les retries. En créer un par requête est un anti-pattern (voir + bas).
app.state.anthropic = AsyncAnthropic(max_retries=3)
yield
await app.state.anthropic.close()
app = FastAPI(lifespan=lifespan)
def get_client(request: Request) -> AsyncAnthropic:
return request.app.state.anthropic
ClientDep = Annotated[AsyncAnthropic, Depends(get_client)]
class ChatRequest(BaseModel):
prompt: str = Field(min_length=1, max_length=8_000)
system: str | None = None
def sse(data: str, *, event: str | None = None, id: str | None = None) -> str:
"""Sérialise un évènement SSE. data peut contenir des \\n -> une ligne data: par ligne."""
lines: list[str] = []
if event is not None:
lines.append(f"event: {event}")
if id is not None:
lines.append(f"id: {id}")
for line in data.split("\n"):
lines.append(f"data: {line}")
return "\n".join(lines) + "\n\n"
async def stream_agent(
client: AsyncAnthropic,
body: ChatRequest,
request: Request,
) -> AsyncIterator[str]:
# 1) Heartbeat de pré-connexion : force les proxies à flusher les en-têtes
# et donne au navigateur un signal "la ligne est ouverte".
yield ": connected\n\n"
try:
async with client.messages.stream(
model=MODEL,
max_tokens=4_096,
thinking={"type": "adaptive"}, # jamais budget_tokens sur Opus 4.8
system=body.system or "Tu es un assistant concis et précis.",
messages=[{"role": "user", "content": body.prompt}],
) as stream:
async for text in stream.text_stream:
# 2) Le client a fermé l'onglet ? On arrête tout immédiatement.
if await request.is_disconnected():
break
yield sse(json.dumps({"text": text}), event="token")
final = await stream.get_final_message()
usage = {
"input_tokens": final.usage.input_tokens,
"output_tokens": final.usage.output_tokens,
}
yield sse(json.dumps(usage), event="usage")
yield sse("[DONE]", event="done")
except anthropic.APIError as exc:
# 3) Une erreur après le 200 OK ne peut PLUS devenir un 500.
# On la pousse comme un évènement SSE et le front l'affiche.
yield sse(json.dumps({"message": str(exc)}), event="error")
@app.post("/chat")
async def chat(body: ChatRequest, client: ClientDep, request: Request) -> StreamingResponse:
return StreamingResponse(
stream_agent(client, body, request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # désactive le buffering nginx (crucial)
"Connection": "keep-alive",
},
)Trois choses non négociables que cet exemple contient et que les tutos zappent :
media_type="text/event-stream"+Cache-Control: no-cache+X-Accel-Buffering: no. Sans le dernier, nginx (ou tout reverse-proxy) bufferise ta réponse et l'utilisateur reçoit tout d'un coup à la fin — le streaming devient invisible. C'est le bug n°1 en prod.request.is_disconnected()— si l'utilisateur ferme l'onglet, sans ce check ton générateur continue de consommer le stream Anthropic jusqu'au bout. Tu payes des tokens pour une réponse que personne ne lira.- Le
try/exceptqui émet unevent: error— une fois le200 OKet les en-têtes envoyés, tu ne peux plus renvoyer un code HTTP d'erreur. La seule façon de signaler un problème est dans le flux.
Le client JavaScript correspondant tient en quelques lignes (note : EventSource natif ne fait que du GET ; pour un POST avec corps on utilise fetch + un reader de stream, ou la lib @microsoft/fetch-event-source) :
const res = await fetch("/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt: "Explique SSE" }),
});
const reader = res.body!.pipeThrough(new TextDecoderStream()).getReader();
let buffer = "";
for (;;) {
const { value, done } = await reader.read();
if (done) break;
buffer += value;
// découpe sur la ligne vide (séparateur d'évènement SSE)
const events = buffer.split("\n\n");
buffer = events.pop() ?? "";
for (const block of events) {
const dataLine = block.split("\n").find((l) => l.startsWith("data: "));
if (dataLine) console.log(JSON.parse(dataLine.slice(6)));
}
}La mauvaise façon (et pourquoi elle casse)
Anti-pattern 1 : un def synchrone bloquant dans une coroutine
# ❌ NE FAIS PAS ÇA
@app.post("/chat")
async def chat_bad(body: ChatRequest):
client = anthropic.Anthropic() # client SYNCHRONE dans une route async !
def generate():
with client.messages.stream(model=MODEL, max_tokens=4096,
messages=[{"role": "user", "content": body.prompt}]) as s:
for text in s.text_stream: # itération bloquante
yield f"data: {text}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")Pourquoi c'est cassé :
anthropic.Anthropic()(sansAsync) fait des appels HTTP bloquants. Dans une routeasync, chaquefor text in s.text_streamgèle l'event loop entier : pendant que tu streames pour un utilisateur, toutes les autres requêtes du process sont en attente. Tu transformes ton serveur async en serveur mono-thread séquentiel.- Créer un
Anthropic()par requête recrée un pool de connexions à chaque fois : pas de keep-alive vers l'API, latence de TLS handshake à chaque appel.
Règle : dans une route
async def, tout I/O doit êtreawait-able. Un client sync n'a rien à y faire. Si tu dois utiliser du code bloquant, isole-le dansawait anyio.to_thread.run_sync(...).
Anti-pattern 2 : accumuler la réponse complète puis la "fausser" en stream
# ❌ Streaming en façade
async def fake_stream(client, prompt):
msg = await client.messages.create(model=MODEL, max_tokens=4096,
messages=[{"role": "user", "content": prompt}])
full = msg.content[0].text
for word in full.split(): # on a DÉJÀ attendu toute la réponse
yield f"data: {word}\n\n"Tu attends la réponse complète (TTFT = temps total), puis tu la débites mot par mot. L'utilisateur a le pire des deux mondes : la latence du non-streaming et l'effet "saccadé" du streaming. Le vrai streaming part de messages.stream et relaie les deltas au moment où ils arrivent.
Anti-pattern 3 : oublier la ligne vide
yield f"data: {text}\n" # ❌ un seul \n → le navigateur n'émet jamais l'évènement
yield f"data: {text}\n\n" # ✅ \n\n termine l'évènementLa boucle tool-use en streaming (l'agent réel)
Un "agent" ne fait pas que cracher du texte : il appelle des outils, lit le résultat, continue. En streaming, ça veut dire qu'on relaie les tokens, on détecte stop_reason == "tool_use", on exécute l'outil, on renvoie le résultat dans une nouvelle requête, et on re-streame — le tout sur la même connexion SSE vers le navigateur.
TOOLS: list[dict] = [
{
"name": "get_weather",
"description": "Donne la météo actuelle. À appeler quand l'utilisateur "
"demande le temps qu'il fait dans une ville.",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string", "description": "Nom de la ville"}},
"required": ["city"],
},
}
]
async def run_tool(name: str, args: dict) -> str:
if name == "get_weather":
# ... vrai appel I/O async ici
return json.dumps({"city": args["city"], "temp_c": 21, "sky": "dégagé"})
return json.dumps({"error": f"outil inconnu: {name}"})
async def stream_agent_loop(
client: AsyncAnthropic, prompt: str, request: Request
) -> AsyncIterator[str]:
messages: list[dict] = [{"role": "user", "content": prompt}]
yield ": connected\n\n"
for _ in range(8): # garde-fou anti-boucle-infinie : JAMAIS de while True nu
if await request.is_disconnected():
return
async with client.messages.stream(
model=MODEL, max_tokens=4_096, tools=TOOLS,
thinking={"type": "adaptive"}, messages=messages,
) as stream:
async for event in stream:
if event.type == "content_block_delta" and event.delta.type == "text_delta":
yield sse(json.dumps({"text": event.delta.text}), event="token")
final = await stream.get_final_message()
# On ré-empile la réponse complète de l'assistant (blocs text + tool_use)
messages.append({"role": "assistant", "content": final.content})
if final.stop_reason != "tool_use":
yield sse("[DONE]", event="done")
return
# Exécute tous les tool_use et renvoie les résultats dans un message user
results: list[dict] = []
for block in final.content:
if block.type == "tool_use":
yield sse(json.dumps({"tool": block.name}), event="tool_call")
output = await run_tool(block.name, dict(block.input))
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
yield sse(json.dumps({"message": "max_iterations atteint"}), event="error")Points senior :
- On ré-empile
final.contenten entier (pas seulement le texte) : les blocstool_usedoivent être préservés sinon l'API renvoie un 400 (tool_resultorphelin). - La boucle est bornée (
for _ in range(8)), jamais unwhile True: un agent qui boucle est un agent qui brûle ton budget. - On émet des évènements SSE typés (
token,tool_call,done,error) pour que le front affiche "🔧 appel get_weather…" pendant l'exécution de l'outil.
Sorties structurées : streamer du JSON validé
Parfois l'agent doit produire du JSON (un formulaire, une extraction). Le streaming de tokens bruts est inutile ici (un JSON mi-écrit n'est pas parsable). On utilise les structured outputs natifs d'Anthropic via messages.parse(), qui valident contre un schéma Pydantic — et on streame plutôt un signal de progression puis le résultat final validé.
from pydantic import BaseModel
class ExtractedContact(BaseModel):
name: str
email: str
interests: list[str]
async def stream_structured(client: AsyncAnthropic, text: str) -> AsyncIterator[str]:
yield sse(json.dumps({"status": "extracting"}), event="progress")
msg = await client.messages.parse(
model=MODEL,
max_tokens=1_024,
messages=[{"role": "user", "content": f"Extrais les infos de : {text}"}],
output_config={"format": {"type": "json_schema", "schema": ExtractedContact}},
)
contact: ExtractedContact | None = msg.parsed_output # None si refus / échec
if contact is None:
yield sse(json.dumps({"message": "extraction échouée"}), event="error")
return
yield sse(contact.model_dump_json(), event="result")
yield sse("[DONE]", event="done")Le client constraint la sortie au schéma : pas de prefill, pas de "réponds en JOSN stp". parsed_output est None en cas de refus (stop_reason == "refusal") — toujours le vérifier avant de l'utiliser.
⚙️ En production
Failure modes
| Symptôme | Cause | Correctif |
|---|---|---|
| Tout arrive d'un coup à la fin | Buffering du reverse-proxy | X-Accel-Buffering: no (nginx), proxy_buffering off;, désactiver la compression sur le text/event-stream |
| L'event loop se gèle sous charge | Client/itérateur sync dans une route async | AsyncAnthropic + async for, ou anyio.to_thread |
| Coûts LLM qui dérapent | Pas de check de déconnexion | await request.is_disconnected() dans la boucle, et fermer le async with stream |
| Connexion coupée à ~60 s d'inactivité (thinking long) | Timeout idle du proxy/LB | Heartbeat : ping\n\n toutes les 15 s |
| Le front ne reçoit jamais l'erreur | Erreur levée après le 200 OK | Émettre event: error dans le flux, pas un raise |
| Reconnexion → réponse dupliquée/perdue | Pas de gestion Last-Event-ID | Mettre un id: croissant et reprendre l'état côté serveur (souvent : on ne reprend pas un stream LLM, on signale au front de relancer) |
Heartbeat (le détail qui sauve la prod)
Un agent en thinking: adaptive peut réfléchir plusieurs dizaines de secondes avant le premier token. Beaucoup de load-balancers coupent une connexion inactive à 60 s. Solution : émettre un commentaire SSE périodiquement, en parallèle du stream.
async def with_heartbeat(
gen: AsyncIterator[str], interval: float = 15.0
) -> AsyncIterator[str]:
queue: asyncio.Queue[str | None] = asyncio.Queue()
async def pump() -> None:
try:
async for chunk in gen:
await queue.put(chunk)
finally:
await queue.put(None) # sentinelle de fin
task = asyncio.create_task(pump())
try:
while True:
try:
chunk = await asyncio.wait_for(queue.get(), timeout=interval)
except TimeoutError:
yield ": ping\n\n" # rien à envoyer → on garde la ligne vivante
continue
if chunk is None:
return
yield chunk
finally:
task.cancel()Performance & scaling
- SSE = une connexion HTTP longue par client. Chaque worker uvicorn a une limite de connexions ouvertes. 10 000 utilisateurs en streaming = 10 000 connexions tenues. Dimensionne tes workers et le
ulimit -n(file descriptors) en conséquence, et envisage de mettre le streaming derrière un service dédié. - Le débit n'est pas ton goulot — c'est le LLM qui est lent. Ton serveur passe son temps à
await. C'est pour ça qu'async est obligatoire : un worker async sature volontiers des milliers de streams concurrents puisque chacun est I/O-bound. - HTTP/1.1 limite à 6 connexions par domaine dans le navigateur. Si tu ouvres plusieurs SSE en parallèle vers le même host, tu satures vite. HTTP/2 (multiplexing) résout ça — sers ton API en h2 derrière le proxy.
Sécurité
- Authentifie le
GET/POSTde stream comme n'importe quel endpoint : un stream LLM ouvert sans auth = facture illimitée. Token dans l'en-tête (pas dans l'URL, car les URLs fuient dans les logs). - Rate-limit par utilisateur avant d'ouvrir le stream — sinon un client peut ouvrir N connexions et épuiser tes workers + ton quota Anthropic.
- Ne streame jamais le contenu brut d'un
thinkingblock vers l'utilisateur final s'il contient du raisonnement sensible : sur Opus 4.8 ledisplayest"omitted"par défaut (le texte de thinking est vide) ; metsdisplay: "summarized"seulement si tu veux montrer un résumé. - Échappe/sérialise toujours le
data:en JSON. Du texte LLM brut contenant des\nou des structures inattendues casse le parsing SSE si tu fais duf"data: {text}"naïf.
Observabilité
- Logue le TTFT et le temps total séparément : ce sont deux SLO distincts.
- Capture
final.usage(input/output tokens) à la fin de chaque stream pour la facturation et le suivi de coût — l'exemple plus haut l'émet déjà enevent: usage. - Compte les déconnexions précoces (
is_disconnectedvrai) : un taux élevé signale soit une UX lente, soit des timeouts proxy.
Tradeoffs senior : SSE vs WebSocket vs polling
| SSE | WebSocket | Long-polling | |
|---|---|---|---|
| Direction | serveur → client | bidirectionnel | serveur → client |
| Reconnexion auto | ✅ native (Last-Event-ID) | ❌ à coder | ✅ trivial |
| Passe les proxies HTTP | ✅ | ⚠️ (Upgrade) | ✅ |
| Complexité | faible | moyenne/haute | faible |
| Bon pour | tokens LLM, notifs, progress | chat collaboratif, jeux, voix | fallback legacy |
Pour streamer des tokens d'agent : SSE par défaut. Le client n'a rien à envoyer pendant la génération (c'est unidirectionnel par nature), la reconnexion est gratuite, et ça passe partout. On passe à WebSocket seulement si on a besoin d'interrompre/steerer l'agent en plein milieu (envoyer un "stop" ou un nouveau message sans rouvrir une requête) — et même là, beaucoup d'archis gardent SSE pour la sortie + un POST séparé pour l'interruption.
🏋️ Exercices
Exercice 1 — Le tuyau de base (implémente)
Objectif : exposer POST /chat qui streame la réponse d'AsyncAnthropic.messages.stream en SSE, avec sérialisation JSON correcte et un event: done final. Vérifie avec curl -N -X POST localhost:8000/chat -d '{"prompt":"compte jusqu'à 10"}' que les tokens arrivent progressivement (pas d'un bloc).
Indice/Solution : reprends stream_agent mais sans la boucle outils. Le test clé : curl -N (no-buffer) doit afficher les lignes au fil de l'eau. Si tout sort à la fin → tu as oublié media_type="text/event-stream" ou tu as un buffering. Sérialise chaque token via sse(json.dumps({"text": text}), event="token").
Exercice 2 — Coupe la facture (production-grade)
Objectif : ajoute la détection de déconnexion. Lance le stream, tue le curl (Ctrl-C) au bout de 2 tokens, et prouve via un log que le générateur s'est arrêté immédiatement au lieu de consommer tout le stream Anthropic.
Indice/Solution : if await request.is_disconnected(): break dans la boucle async for. Pour prouver l'arrêt : logue chaque token émis ; après le Ctrl-C tu dois voir le log s'arrêter en ~1 token, pas continuer. Bonus : ferme explicitement le async with (le break suffit car le with est nettoyé en sortie).
Exercice 3 — Heartbeat & timeout (production-grade)
Objectif : place un reverse-proxy nginx devant ton app avec proxy_read_timeout 30s;. Provoque un thinking long (prompt complexe, effort: "high"). Sans heartbeat la connexion saute à 30 s ; avec with_heartbeat, elle tient.
Indice/Solution : enveloppe ton générateur dans with_heartbeat(stream_agent(...), interval=15). Vérifie côté curl -N que des lignes : ping apparaissent toutes les 15 s pendant le silence. Sans le wrapper, nginx coupe et curl rend la main avec une erreur de connexion.
Exercice 4 — La boucle agent (implémente)
Objectif : implémente stream_agent_loop avec un outil get_weather factice. Le front doit recevoir : token (texte intro) → tool_call → token (réponse finale) → done. Borne la boucle à 8 itérations.
Indice/Solution : détecte final.stop_reason == "tool_use", ré-empile final.content entier, exécute, renvoie un message user avec les blocs tool_result (chacun avec son tool_use_id). Piège : si tu n'ajoutes que le texte et pas les blocs tool_use, l'API te renvoie un 400 au tour suivant.
Exercice 5 — Casse puis répare (break-then-fix)
Objectif : on te donne un endpoint qui utilise anthropic.Anthropic() (sync) dans une route async def et itère for text in s.text_stream. Sous wrk/ab avec 50 connexions concurrentes, mesure que la latence explose (event loop gelé). Répare-le.
Indice/Solution : la latence p99 grimpe car chaque stream bloque l'event loop. Fix : remplace par AsyncAnthropic + async for text in stream.text_stream. Re-mesure : la p99 redevient plate car les 50 streams s'entrelacent. Bonus pédagogique : remets le sync mais derrière anyio.to_thread.run_sync et observe que ça "marche" mais consomme un thread du pool par stream — ça ne scale pas autant qu'async natif.
Exercice 6 — Reprise sur reconnexion (hard)
Objectif : ajoute un id: incrémental à chaque évènement et gère l'en-tête Last-Event-ID. À la reconnexion, le serveur doit décider quoi faire d'un stream LLM interrompu.
Indice/Solution : un stream LLM n'est pas rejouable token par token côté Anthropic, donc la "reprise" honnête consiste soit à (a) avoir bufferisé les tokens déjà émis et les rejouer jusqu'au Last-Event-ID puis continuer (coûteux en mémoire), soit (b) renvoyer un event: resume_unsupported et laisser le front relancer la requête complète. Le bon réflexe senior : documente que SSE garantit la connexion, pas l'idempotence de la génération — et choisis (b) sauf besoin fort.
🎤 En entretien
Q : Pourquoi SSE plutôt que WebSocket pour streamer les tokens d'un LLM ? R : Le flux est unidirectionnel (serveur → client), donc le full-duplex du WebSocket est inutile ; SSE apporte en prime la reconnexion native via Last-Event-ID, passe à travers les proxies HTTP sans Upgrade, et tient en une StreamingResponse — moins de surface de bug.
Q : Quel est le piège le plus courant avec une StreamingResponse async en FastAPI ? R : Mettre du code bloquant (client Anthropic() sync, itération sync) dans la coroutine, ce qui gèle l'event loop et sérialise toutes les requêtes du worker — il faut un client await-able (AsyncAnthropic + async for) ou isoler le bloquant dans un thread.
Q : L'utilisateur ferme l'onglet en plein stream — que se passe-t-il et comment tu le gères ? R : Sans intervention, ton générateur continue de tirer le stream Anthropic et tu paies des tokens pour rien ; on teste await request.is_disconnected() dans la boucle et on break, ce qui ferme aussi le async with stream et coupe l'appel LLM.
Q : Comment signales-tu une erreur survenue après l'envoi du 200 OK ? R : Impossible de changer le code HTTP une fois les en-têtes partis — on émet un évènement SSE dédié (event: error\ndata: {...}) dans le flux et le front l'interprète ; on ne fait jamais raise à ce stade.
Q : Le streaming "marche" en local mais en prod tout arrive d'un coup à la fin. Diagnostic ? R : Le reverse-proxy bufferise la réponse text/event-stream ; on désactive le buffering (X-Accel-Buffering: no / proxy_buffering off), on s'assure que la compression n'est pas appliquée au flux, et qu'aucun middleware n'accumule le corps avant de le renvoyer.