Skip to content

SSE streaming (tokens LLM)

TL;DR — SSE (Server-Sent Events) est un canal HTTP unidirectionnel serveur → client, en texte clair, qui réutilise une simple connexion GET : le serveur garde la réponse ouverte et pousse des évènements data: ...\n\n au fil de l'eau. C'est exactement le bon outil pour streamer les tokens d'un agent LLM vers un navigateur : pas de handshake WebSocket, reconnexion automatique gratuite (Last-Event-ID), et il passe à travers tous les proxies HTTP. En FastAPI on l'expose avec une StreamingResponse (ou sse-starlette) dont le corps est un générateur async ; côté Anthropic, on branche ce générateur sur AsyncAnthropic().messages.stream(...) et on relaye chaque text delta. Le piège senior n'est pas d'écrire le yield — c'est de gérer le buffering des proxies, la déconnexion client (sinon l'appel LLM continue de coûter de l'argent), le heartbeat, et le fait qu'async def + un bloc bloquant tuent tout l'event loop.


🧠 Mental model

Tu connais déjà trois façons de renvoyer une réponse HTTP. SSE est la quatrième, et la plus simple des trois "temps réel".

                        client → serveur   serveur → client   transport
  Requête classique          1                  1            HTTP (fermé après)
  Polling / long-polling     N                  N            HTTP (N requêtes)
  SSE                        1                  ∞            HTTP (1 connexion, gardée ouverte)
  WebSocket                  ∞                  ∞            ws:// (full-duplex)

L'analogie : un WebSocket est un appel téléphonique — les deux parties parlent quand elles veulent, mais il faut composer le numéro, établir la ligne, gérer le raccrochage. SSE est un flux radio : tu allumes le poste (GET), le serveur émet en continu, tu écoutes. Tu ne peux pas parler à la radio (unidirectionnel), mais pour "afficher des tokens qui arrivent", c'est précisément ce qu'il faut — et brancher/débrancher la radio est trivial.

Pour un agent LLM, le modèle mental est encore plus net. Le LLM génère token par token, de façon intrinsèquement séquentielle et lente (plusieurs secondes pour une réponse longue). Sans streaming, l'utilisateur fixe un spinner pendant 8 secondes puis reçoit un pavé. Avec SSE, le premier token s'affiche en ~300 ms et le texte se "tape" tout seul. Le TTFT (time-to-first-token) devient ta métrique de latence perçue, pas le temps total.

  navigateur                FastAPI (async gen)            Anthropic API
     │   GET /chat (SSE)         │                              │
     ├──────────────────────────▶                              │
     │                          │   messages.stream(...)        │
     │                          ├──────────────────────────────▶
     │                          │◀── text_delta "Bon"           │
     │◀── data: {"t":"Bon"}     │                              │
     │                          │◀── text_delta "jour"          │
     │◀── data: {"t":"jour"}    │                              │
     │         ...              │            ...                │
     │◀── event: done           │◀── message_stop               │
     │   (connexion fermée)     │                              │

Le serveur FastAPI est un tuyau qui transforme : il consomme un stream (celui d'Anthropic, lui aussi en SSE sous le capot) et en réémet un autre (le tien, vers le navigateur), en changeant éventuellement le format au passage.


Le wire format SSE (ce que voit vraiment le navigateur)

SSE n'est pas du JSON, c'est un format texte ligne par ligne défini par la spec EventSource. Un évènement = un ou plusieurs champs clé: valeur, terminé par une ligne vide :

event: token
data: {"text": "Bonjour"}
id: 42

event: done
data: [DONE]

Les champs qui comptent :

  • data: — la charge utile (souvent du JSON sérialisé, mais c'est libre). Plusieurs lignes data: consécutives sont concaténées avec \n.
  • event: — un nom d'évènement nommé ; côté JS tu écoutes es.addEventListener("token", ...). Par défaut c'est l'évènement message.
  • id: — identifiant ; le navigateur le renvoie dans l'en-tête Last-Event-ID à la reconnexion. C'est la fonctionnalité qui rend SSE robuste gratuitement.
  • retry: — délai de reconnexion en ms.
  • Une ligne commençant par : est un commentaire — on s'en sert comme heartbeat (: keep-alive\n\n).

Le \n\n final (ligne vide) est obligatoire pour que le client "valide" l'évènement. L'oublier = le navigateur bufferise indéfiniment et tu crois que rien n'arrive.


La bonne façon : un endpoint FastAPI fully-typed

On veut un agent qui répond en streamant. Modèle Pydantic v2 en entrée, générateur async en sortie, client Anthropic injecté par DI.

python
# app/main.py
from __future__ import annotations

import asyncio
import json
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Annotated

import anthropic
from anthropic import AsyncAnthropic
from fastapi import Depends, FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field

MODEL = "claude-opus-4-8"


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # Un seul client pour tout le process : il gère un pool de connexions HTTP
    # et les retries. En créer un par requête est un anti-pattern (voir + bas).
    app.state.anthropic = AsyncAnthropic(max_retries=3)
    yield
    await app.state.anthropic.close()


app = FastAPI(lifespan=lifespan)


def get_client(request: Request) -> AsyncAnthropic:
    return request.app.state.anthropic


ClientDep = Annotated[AsyncAnthropic, Depends(get_client)]


class ChatRequest(BaseModel):
    prompt: str = Field(min_length=1, max_length=8_000)
    system: str | None = None


def sse(data: str, *, event: str | None = None, id: str | None = None) -> str:
    """Sérialise un évènement SSE. data peut contenir des \\n -> une ligne data: par ligne."""
    lines: list[str] = []
    if event is not None:
        lines.append(f"event: {event}")
    if id is not None:
        lines.append(f"id: {id}")
    for line in data.split("\n"):
        lines.append(f"data: {line}")
    return "\n".join(lines) + "\n\n"


async def stream_agent(
    client: AsyncAnthropic,
    body: ChatRequest,
    request: Request,
) -> AsyncIterator[str]:
    # 1) Heartbeat de pré-connexion : force les proxies à flusher les en-têtes
    #    et donne au navigateur un signal "la ligne est ouverte".
    yield ": connected\n\n"

    try:
        async with client.messages.stream(
            model=MODEL,
            max_tokens=4_096,
            thinking={"type": "adaptive"},  # jamais budget_tokens sur Opus 4.8
            system=body.system or "Tu es un assistant concis et précis.",
            messages=[{"role": "user", "content": body.prompt}],
        ) as stream:
            async for text in stream.text_stream:
                # 2) Le client a fermé l'onglet ? On arrête tout immédiatement.
                if await request.is_disconnected():
                    break
                yield sse(json.dumps({"text": text}), event="token")

        final = await stream.get_final_message()
        usage = {
            "input_tokens": final.usage.input_tokens,
            "output_tokens": final.usage.output_tokens,
        }
        yield sse(json.dumps(usage), event="usage")
        yield sse("[DONE]", event="done")

    except anthropic.APIError as exc:
        # 3) Une erreur après le 200 OK ne peut PLUS devenir un 500.
        #    On la pousse comme un évènement SSE et le front l'affiche.
        yield sse(json.dumps({"message": str(exc)}), event="error")


@app.post("/chat")
async def chat(body: ChatRequest, client: ClientDep, request: Request) -> StreamingResponse:
    return StreamingResponse(
        stream_agent(client, body, request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # désactive le buffering nginx (crucial)
            "Connection": "keep-alive",
        },
    )

Trois choses non négociables que cet exemple contient et que les tutos zappent :

  1. media_type="text/event-stream" + Cache-Control: no-cache + X-Accel-Buffering: no. Sans le dernier, nginx (ou tout reverse-proxy) bufferise ta réponse et l'utilisateur reçoit tout d'un coup à la fin — le streaming devient invisible. C'est le bug n°1 en prod.
  2. request.is_disconnected() — si l'utilisateur ferme l'onglet, sans ce check ton générateur continue de consommer le stream Anthropic jusqu'au bout. Tu payes des tokens pour une réponse que personne ne lira.
  3. Le try/except qui émet un event: error — une fois le 200 OK et les en-têtes envoyés, tu ne peux plus renvoyer un code HTTP d'erreur. La seule façon de signaler un problème est dans le flux.

Le client JavaScript correspondant tient en quelques lignes (note : EventSource natif ne fait que du GET ; pour un POST avec corps on utilise fetch + un reader de stream, ou la lib @microsoft/fetch-event-source) :

ts
const res = await fetch("/chat", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ prompt: "Explique SSE" }),
});

const reader = res.body!.pipeThrough(new TextDecoderStream()).getReader();
let buffer = "";
for (;;) {
  const { value, done } = await reader.read();
  if (done) break;
  buffer += value;
  // découpe sur la ligne vide (séparateur d'évènement SSE)
  const events = buffer.split("\n\n");
  buffer = events.pop() ?? "";
  for (const block of events) {
    const dataLine = block.split("\n").find((l) => l.startsWith("data: "));
    if (dataLine) console.log(JSON.parse(dataLine.slice(6)));
  }
}

La mauvaise façon (et pourquoi elle casse)

Anti-pattern 1 : un def synchrone bloquant dans une coroutine

python
# ❌ NE FAIS PAS ÇA
@app.post("/chat")
async def chat_bad(body: ChatRequest):
    client = anthropic.Anthropic()  # client SYNCHRONE dans une route async !

    def generate():
        with client.messages.stream(model=MODEL, max_tokens=4096,
                                     messages=[{"role": "user", "content": body.prompt}]) as s:
            for text in s.text_stream:        # itération bloquante
                yield f"data: {text}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Pourquoi c'est cassé :

  • anthropic.Anthropic() (sans Async) fait des appels HTTP bloquants. Dans une route async, chaque for text in s.text_stream gèle l'event loop entier : pendant que tu streames pour un utilisateur, toutes les autres requêtes du process sont en attente. Tu transformes ton serveur async en serveur mono-thread séquentiel.
  • Créer un Anthropic() par requête recrée un pool de connexions à chaque fois : pas de keep-alive vers l'API, latence de TLS handshake à chaque appel.

Règle : dans une route async def, tout I/O doit être await-able. Un client sync n'a rien à y faire. Si tu dois utiliser du code bloquant, isole-le dans await anyio.to_thread.run_sync(...).

Anti-pattern 2 : accumuler la réponse complète puis la "fausser" en stream

python
# ❌ Streaming en façade
async def fake_stream(client, prompt):
    msg = await client.messages.create(model=MODEL, max_tokens=4096,
                                        messages=[{"role": "user", "content": prompt}])
    full = msg.content[0].text
    for word in full.split():            # on a DÉJÀ attendu toute la réponse
        yield f"data: {word}\n\n"

Tu attends la réponse complète (TTFT = temps total), puis tu la débites mot par mot. L'utilisateur a le pire des deux mondes : la latence du non-streaming et l'effet "saccadé" du streaming. Le vrai streaming part de messages.stream et relaie les deltas au moment où ils arrivent.

Anti-pattern 3 : oublier la ligne vide

python
yield f"data: {text}\n"   # ❌ un seul \n → le navigateur n'émet jamais l'évènement
yield f"data: {text}\n\n" # ✅ \n\n termine l'évènement

La boucle tool-use en streaming (l'agent réel)

Un "agent" ne fait pas que cracher du texte : il appelle des outils, lit le résultat, continue. En streaming, ça veut dire qu'on relaie les tokens, on détecte stop_reason == "tool_use", on exécute l'outil, on renvoie le résultat dans une nouvelle requête, et on re-streame — le tout sur la même connexion SSE vers le navigateur.

python
TOOLS: list[dict] = [
    {
        "name": "get_weather",
        "description": "Donne la météo actuelle. À appeler quand l'utilisateur "
                       "demande le temps qu'il fait dans une ville.",
        "input_schema": {
            "type": "object",
            "properties": {"city": {"type": "string", "description": "Nom de la ville"}},
            "required": ["city"],
        },
    }
]


async def run_tool(name: str, args: dict) -> str:
    if name == "get_weather":
        # ... vrai appel I/O async ici
        return json.dumps({"city": args["city"], "temp_c": 21, "sky": "dégagé"})
    return json.dumps({"error": f"outil inconnu: {name}"})


async def stream_agent_loop(
    client: AsyncAnthropic, prompt: str, request: Request
) -> AsyncIterator[str]:
    messages: list[dict] = [{"role": "user", "content": prompt}]
    yield ": connected\n\n"

    for _ in range(8):  # garde-fou anti-boucle-infinie : JAMAIS de while True nu
        if await request.is_disconnected():
            return

        async with client.messages.stream(
            model=MODEL, max_tokens=4_096, tools=TOOLS,
            thinking={"type": "adaptive"}, messages=messages,
        ) as stream:
            async for event in stream:
                if event.type == "content_block_delta" and event.delta.type == "text_delta":
                    yield sse(json.dumps({"text": event.delta.text}), event="token")

            final = await stream.get_final_message()

        # On ré-empile la réponse complète de l'assistant (blocs text + tool_use)
        messages.append({"role": "assistant", "content": final.content})

        if final.stop_reason != "tool_use":
            yield sse("[DONE]", event="done")
            return

        # Exécute tous les tool_use et renvoie les résultats dans un message user
        results: list[dict] = []
        for block in final.content:
            if block.type == "tool_use":
                yield sse(json.dumps({"tool": block.name}), event="tool_call")
                output = await run_tool(block.name, dict(block.input))
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })
        messages.append({"role": "user", "content": results})

    yield sse(json.dumps({"message": "max_iterations atteint"}), event="error")

Points senior :

  • On ré-empile final.content en entier (pas seulement le texte) : les blocs tool_use doivent être préservés sinon l'API renvoie un 400 (tool_result orphelin).
  • La boucle est bornée (for _ in range(8)), jamais un while True : un agent qui boucle est un agent qui brûle ton budget.
  • On émet des évènements SSE typés (token, tool_call, done, error) pour que le front affiche "🔧 appel get_weather…" pendant l'exécution de l'outil.

Sorties structurées : streamer du JSON validé

Parfois l'agent doit produire du JSON (un formulaire, une extraction). Le streaming de tokens bruts est inutile ici (un JSON mi-écrit n'est pas parsable). On utilise les structured outputs natifs d'Anthropic via messages.parse(), qui valident contre un schéma Pydantic — et on streame plutôt un signal de progression puis le résultat final validé.

python
from pydantic import BaseModel


class ExtractedContact(BaseModel):
    name: str
    email: str
    interests: list[str]


async def stream_structured(client: AsyncAnthropic, text: str) -> AsyncIterator[str]:
    yield sse(json.dumps({"status": "extracting"}), event="progress")
    msg = await client.messages.parse(
        model=MODEL,
        max_tokens=1_024,
        messages=[{"role": "user", "content": f"Extrais les infos de : {text}"}],
        output_config={"format": {"type": "json_schema", "schema": ExtractedContact}},
    )
    contact: ExtractedContact | None = msg.parsed_output  # None si refus / échec
    if contact is None:
        yield sse(json.dumps({"message": "extraction échouée"}), event="error")
        return
    yield sse(contact.model_dump_json(), event="result")
    yield sse("[DONE]", event="done")

Le client constraint la sortie au schéma : pas de prefill, pas de "réponds en JOSN stp". parsed_output est None en cas de refus (stop_reason == "refusal") — toujours le vérifier avant de l'utiliser.


⚙️ En production

Failure modes

SymptômeCauseCorrectif
Tout arrive d'un coup à la finBuffering du reverse-proxyX-Accel-Buffering: no (nginx), proxy_buffering off;, désactiver la compression sur le text/event-stream
L'event loop se gèle sous chargeClient/itérateur sync dans une route asyncAsyncAnthropic + async for, ou anyio.to_thread
Coûts LLM qui dérapentPas de check de déconnexionawait request.is_disconnected() dans la boucle, et fermer le async with stream
Connexion coupée à ~60 s d'inactivité (thinking long)Timeout idle du proxy/LBHeartbeat : ping\n\n toutes les 15 s
Le front ne reçoit jamais l'erreurErreur levée après le 200 OKÉmettre event: error dans le flux, pas un raise
Reconnexion → réponse dupliquée/perduePas de gestion Last-Event-IDMettre un id: croissant et reprendre l'état côté serveur (souvent : on ne reprend pas un stream LLM, on signale au front de relancer)

Heartbeat (le détail qui sauve la prod)

Un agent en thinking: adaptive peut réfléchir plusieurs dizaines de secondes avant le premier token. Beaucoup de load-balancers coupent une connexion inactive à 60 s. Solution : émettre un commentaire SSE périodiquement, en parallèle du stream.

python
async def with_heartbeat(
    gen: AsyncIterator[str], interval: float = 15.0
) -> AsyncIterator[str]:
    queue: asyncio.Queue[str | None] = asyncio.Queue()

    async def pump() -> None:
        try:
            async for chunk in gen:
                await queue.put(chunk)
        finally:
            await queue.put(None)  # sentinelle de fin

    task = asyncio.create_task(pump())
    try:
        while True:
            try:
                chunk = await asyncio.wait_for(queue.get(), timeout=interval)
            except TimeoutError:
                yield ": ping\n\n"   # rien à envoyer → on garde la ligne vivante
                continue
            if chunk is None:
                return
            yield chunk
    finally:
        task.cancel()

Performance & scaling

  • SSE = une connexion HTTP longue par client. Chaque worker uvicorn a une limite de connexions ouvertes. 10 000 utilisateurs en streaming = 10 000 connexions tenues. Dimensionne tes workers et le ulimit -n (file descriptors) en conséquence, et envisage de mettre le streaming derrière un service dédié.
  • Le débit n'est pas ton goulot — c'est le LLM qui est lent. Ton serveur passe son temps à await. C'est pour ça qu'async est obligatoire : un worker async sature volontiers des milliers de streams concurrents puisque chacun est I/O-bound.
  • HTTP/1.1 limite à 6 connexions par domaine dans le navigateur. Si tu ouvres plusieurs SSE en parallèle vers le même host, tu satures vite. HTTP/2 (multiplexing) résout ça — sers ton API en h2 derrière le proxy.

Sécurité

  • Authentifie le GET/POST de stream comme n'importe quel endpoint : un stream LLM ouvert sans auth = facture illimitée. Token dans l'en-tête (pas dans l'URL, car les URLs fuient dans les logs).
  • Rate-limit par utilisateur avant d'ouvrir le stream — sinon un client peut ouvrir N connexions et épuiser tes workers + ton quota Anthropic.
  • Ne streame jamais le contenu brut d'un thinking block vers l'utilisateur final s'il contient du raisonnement sensible : sur Opus 4.8 le display est "omitted" par défaut (le texte de thinking est vide) ; mets display: "summarized" seulement si tu veux montrer un résumé.
  • Échappe/sérialise toujours le data: en JSON. Du texte LLM brut contenant des \n ou des structures inattendues casse le parsing SSE si tu fais du f"data: {text}" naïf.

Observabilité

  • Logue le TTFT et le temps total séparément : ce sont deux SLO distincts.
  • Capture final.usage (input/output tokens) à la fin de chaque stream pour la facturation et le suivi de coût — l'exemple plus haut l'émet déjà en event: usage.
  • Compte les déconnexions précoces (is_disconnected vrai) : un taux élevé signale soit une UX lente, soit des timeouts proxy.

Tradeoffs senior : SSE vs WebSocket vs polling

SSEWebSocketLong-polling
Directionserveur → clientbidirectionnelserveur → client
Reconnexion auto✅ native (Last-Event-ID)❌ à coder✅ trivial
Passe les proxies HTTP⚠️ (Upgrade)
Complexitéfaiblemoyenne/hautefaible
Bon pourtokens LLM, notifs, progresschat collaboratif, jeux, voixfallback legacy

Pour streamer des tokens d'agent : SSE par défaut. Le client n'a rien à envoyer pendant la génération (c'est unidirectionnel par nature), la reconnexion est gratuite, et ça passe partout. On passe à WebSocket seulement si on a besoin d'interrompre/steerer l'agent en plein milieu (envoyer un "stop" ou un nouveau message sans rouvrir une requête) — et même là, beaucoup d'archis gardent SSE pour la sortie + un POST séparé pour l'interruption.


🏋️ Exercices

Exercice 1 — Le tuyau de base (implémente)

Objectif : exposer POST /chat qui streame la réponse d'AsyncAnthropic.messages.stream en SSE, avec sérialisation JSON correcte et un event: done final. Vérifie avec curl -N -X POST localhost:8000/chat -d '{"prompt":"compte jusqu'à 10"}' que les tokens arrivent progressivement (pas d'un bloc).

Indice/Solution : reprends stream_agent mais sans la boucle outils. Le test clé : curl -N (no-buffer) doit afficher les lignes au fil de l'eau. Si tout sort à la fin → tu as oublié media_type="text/event-stream" ou tu as un buffering. Sérialise chaque token via sse(json.dumps({"text": text}), event="token").

Exercice 2 — Coupe la facture (production-grade)

Objectif : ajoute la détection de déconnexion. Lance le stream, tue le curl (Ctrl-C) au bout de 2 tokens, et prouve via un log que le générateur s'est arrêté immédiatement au lieu de consommer tout le stream Anthropic.

Indice/Solution : if await request.is_disconnected(): break dans la boucle async for. Pour prouver l'arrêt : logue chaque token émis ; après le Ctrl-C tu dois voir le log s'arrêter en ~1 token, pas continuer. Bonus : ferme explicitement le async with (le break suffit car le with est nettoyé en sortie).

Exercice 3 — Heartbeat & timeout (production-grade)

Objectif : place un reverse-proxy nginx devant ton app avec proxy_read_timeout 30s;. Provoque un thinking long (prompt complexe, effort: "high"). Sans heartbeat la connexion saute à 30 s ; avec with_heartbeat, elle tient.

Indice/Solution : enveloppe ton générateur dans with_heartbeat(stream_agent(...), interval=15). Vérifie côté curl -N que des lignes : ping apparaissent toutes les 15 s pendant le silence. Sans le wrapper, nginx coupe et curl rend la main avec une erreur de connexion.

Exercice 4 — La boucle agent (implémente)

Objectif : implémente stream_agent_loop avec un outil get_weather factice. Le front doit recevoir : token (texte intro) → tool_calltoken (réponse finale) → done. Borne la boucle à 8 itérations.

Indice/Solution : détecte final.stop_reason == "tool_use", ré-empile final.content entier, exécute, renvoie un message user avec les blocs tool_result (chacun avec son tool_use_id). Piège : si tu n'ajoutes que le texte et pas les blocs tool_use, l'API te renvoie un 400 au tour suivant.

Exercice 5 — Casse puis répare (break-then-fix)

Objectif : on te donne un endpoint qui utilise anthropic.Anthropic() (sync) dans une route async def et itère for text in s.text_stream. Sous wrk/ab avec 50 connexions concurrentes, mesure que la latence explose (event loop gelé). Répare-le.

Indice/Solution : la latence p99 grimpe car chaque stream bloque l'event loop. Fix : remplace par AsyncAnthropic + async for text in stream.text_stream. Re-mesure : la p99 redevient plate car les 50 streams s'entrelacent. Bonus pédagogique : remets le sync mais derrière anyio.to_thread.run_sync et observe que ça "marche" mais consomme un thread du pool par stream — ça ne scale pas autant qu'async natif.

Exercice 6 — Reprise sur reconnexion (hard)

Objectif : ajoute un id: incrémental à chaque évènement et gère l'en-tête Last-Event-ID. À la reconnexion, le serveur doit décider quoi faire d'un stream LLM interrompu.

Indice/Solution : un stream LLM n'est pas rejouable token par token côté Anthropic, donc la "reprise" honnête consiste soit à (a) avoir bufferisé les tokens déjà émis et les rejouer jusqu'au Last-Event-ID puis continuer (coûteux en mémoire), soit (b) renvoyer un event: resume_unsupported et laisser le front relancer la requête complète. Le bon réflexe senior : documente que SSE garantit la connexion, pas l'idempotence de la génération — et choisis (b) sauf besoin fort.


🎤 En entretien

Q : Pourquoi SSE plutôt que WebSocket pour streamer les tokens d'un LLM ? R : Le flux est unidirectionnel (serveur → client), donc le full-duplex du WebSocket est inutile ; SSE apporte en prime la reconnexion native via Last-Event-ID, passe à travers les proxies HTTP sans Upgrade, et tient en une StreamingResponse — moins de surface de bug.

Q : Quel est le piège le plus courant avec une StreamingResponse async en FastAPI ? R : Mettre du code bloquant (client Anthropic() sync, itération sync) dans la coroutine, ce qui gèle l'event loop et sérialise toutes les requêtes du worker — il faut un client await-able (AsyncAnthropic + async for) ou isoler le bloquant dans un thread.

Q : L'utilisateur ferme l'onglet en plein stream — que se passe-t-il et comment tu le gères ? R : Sans intervention, ton générateur continue de tirer le stream Anthropic et tu paies des tokens pour rien ; on teste await request.is_disconnected() dans la boucle et on break, ce qui ferme aussi le async with stream et coupe l'appel LLM.

Q : Comment signales-tu une erreur survenue après l'envoi du 200 OK ? R : Impossible de changer le code HTTP une fois les en-têtes partis — on émet un évènement SSE dédié (event: error\ndata: {...}) dans le flux et le front l'interprète ; on ne fait jamais raise à ce stade.

Q : Le streaming "marche" en local mais en prod tout arrive d'un coup à la fin. Diagnostic ? R : Le reverse-proxy bufferise la réponse text/event-stream ; on désactive le buffering (X-Accel-Buffering: no / proxy_buffering off), on s'assure que la compression n'est pas appliquée au flux, et qu'aucun middleware n'accumule le corps avant de le renvoyer.

Bibliothèque tech perso — Achref