Skip to content

⭐ Servir un agent (tool-loop streamé en SSE)

TL;DR — Servir un agent, ce n'est pas exposer POST /chat qui rend une string. C'est exposer un flux : le client ouvre une connexion SSE, et le serveur lui pousse, token par token, ce que produit la boucle d'agent (thinking → texte → appels d'outils → résultats → re-boucle) jusqu'à end_turn. Le pivot mental : l'agent n'est pas une fonction, c'est une conversation à plusieurs tours que le serveur orchestre côté serveur. On utilise AsyncAnthropic + client.messages.stream(...), une boucle tool_use manuelle (pour garder le contrôle : gating, logs, annulation), et un StreamingResponse FastAPI qui émet du text/event-stream. Les pièges de prod ne sont pas dans le LLM : ils sont dans la déconnexion client (annuler la boucle), le back-pressure, les timeouts des proxies, et le fait qu'une exception levée après le premier byte ne peut plus changer le code HTTP.


🧠 Mental model

L'analogie : le commentateur sportif, pas le rédacteur

Un endpoint classique REST, c'est un rédacteur : tu lui poses une question, il part écrire, et il te rend l'article fini. Tu attends, puis tu reçois tout d'un bloc.

Un agent streamé, c'est un commentateur sportif en direct. Il parle pendant que l'action se déroule : « Claude réfléchit… il décide d'appeler search_db… voilà le résultat… il reprend la parole… ». Le client entend le match au fur et à mesure. Et surtout : si le spectateur éteint la télé (déconnexion), le commentateur doit se taire — continuer à commenter une rame vide, c'est brûler des tokens et du CPU pour personne.

Côté toi (le dev), tu n'es ni le rédacteur ni Claude. Tu es le régisseur : tu tiens la boucle, tu décides quels outils Claude a le droit d'appeler, tu coupes l'antenne si le spectateur part, et tu transformes chaque événement interne du SDK en un événement SSE propre que le frontend sait afficher.

Le tool-loop, en ASCII

text
  client (navigateur / NestJS)
       │  GET /agent/stream?q=...   (Accept: text/event-stream)

 ┌─────────────────────────────────────────────────────────┐
 │ FastAPI  →  StreamingResponse(text/event-stream)         │
 │                                                          │
 │   messages = [user]                                      │
 │   ┌──────────────  LOOP  ──────────────────────────┐     │
 │   │ client.messages.stream(messages, tools)         │     │
 │   │   → yield SSE: thinking / text deltas ───────────┼──▶ client
 │   │   final = await stream.get_final_message()       │     │
 │   │                                                  │     │
 │   │ stop_reason == "end_turn"?  ── oui ──▶ break     │     │
 │   │              == "tool_use"? ── oui ─┐            │     │
 │   │                                     ▼            │     │
 │   │   exécuter chaque tool_use (TES fonctions)       │     │
 │   │   append(assistant content) ; append(tool_result)│    │
 │   │   yield SSE: tool_use / tool_result ─────────────┼──▶ client
 │   └──────────────  re-loop  ──────────────────────────┘    │
 │                                                          │
 │   yield SSE: event: done                                 │
 └─────────────────────────────────────────────────────────┘

Deux invariants qui sous-tendent tout le reste :

  1. L'API Messages est sans état. À chaque tour de boucle, tu renvoies tout l'historique (messages). Le serveur ne « se souvient » de rien entre deux appels HTTP vers Anthropic. C'est toi qui tiens la liste messages.
  2. Tu dois renvoyer le content complet de l'assistant (les blocs tool_use inclus, pas seulement le texte) avant d'ajouter les tool_result. Sinon l'appariement tool_use_idtool_result casse et l'API renvoie un 400.

Le SDK Anthropic, le strict nécessaire

Avant FastAPI, isolons la mécanique LLM. On utilise le client asynchrone (AsyncAnthropic) — un serveur web async qui ferait des appels bloquants planterait sa boucle d'événements.

Modèle par défaut : claude-opus-4-8 (1M de contexte, 5/25 USD le Mtok). Réflexion : adaptative (thinking={"type": "adaptive"}) — c'est Claude qui décide quand et combien réfléchir. On règle la profondeur via output_config={"effort": ...}, jamais via budget_tokens (qui renvoie un 400 sur Opus 4.7/4.8).

python
# pip install "anthropic>=0.92" "fastapi>=0.115" "uvicorn[standard]>=0.30"
from anthropic import AsyncAnthropic

client = AsyncAnthropic()  # lit ANTHROPIC_API_KEY dans l'environnement

async def one_shot(question: str) -> str:
    async with client.messages.stream(
        model="claude-opus-4-8",
        max_tokens=4096,
        thinking={"type": "adaptive"},
        output_config={"effort": "medium"},  # low | medium | high | xhigh | max
        messages=[{"role": "user", "content": question}],
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
        final = await stream.get_final_message()
    return final.stop_reason  # "end_turn", "tool_use", "max_tokens", "refusal"...

stream.text_stream ne donne que les deltas de texte (pratique pour un simple chat). Pour un agent, on a besoin du flux d'événements bruts — on y revient. get_final_message() reconstitue le Message complet une fois le flux terminé : c'est lui qui porte stop_reason, content (avec les blocs tool_use) et usage.

Pourquoi streamer même pour un « one-shot » ? Parce qu'au-delà de ~16K tokens de sortie, une requête non-streamée risque de dépasser le timeout HTTP du SDK. Streamer est l'option par défaut dès qu'il y a de l'input long, de l'output long, ou un max_tokens élevé.


Définir des outils (la bonne et la mauvaise façon)

Un outil = un nom, une description, et un JSON Schema d'entrée. La description est lue par Claude pour décider quand appeler l'outil : sois prescriptif sur le déclencheur, pas seulement descriptif.

❌ La mauvaise façon : schéma flou, exécution non typée

python
# ❌ NE FAITES PAS ÇA
tools = [{
    "name": "search",
    "description": "cherche des trucs",          # quand l'appeler ? mystère
    "input_schema": {"type": "object"},          # aucune propriété → Claude invente
}]

async def run_tool(name, inp):                   # pas de types, pas de validation
    if name == "search":
        return do_search(inp["query"])           # KeyError si Claude n'envoie pas "query"

Problèmes : Claude ne sait pas quand déclencher l'outil, le schéma vide l'autorise à passer n'importe quoi, et l'exécuteur explose au premier input inattendu — au milieu d'un flux SSE, donc impossible à rattraper proprement.

✅ La bonne façon : registre typé, schéma explicite, validation Pydantic

On modélise chaque outil comme une paire (schéma déclaré à Claude, fonction Python validée). On valide l'entrée avec Pydantic v2 : si Claude envoie un input malformé, on renvoie un tool_result is_error=True au lieu de planter — Claude saura corriger au tour suivant.

python
from __future__ import annotations

import json
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Any

from pydantic import BaseModel, Field, ValidationError


class SearchArgs(BaseModel):
    query: str = Field(description="Mots-clés de recherche, en langage naturel")
    limit: int = Field(default=5, ge=1, le=20)


@dataclass(frozen=True, slots=True)
class Tool:
    name: str
    description: str
    args_model: type[BaseModel]
    handler: Callable[[BaseModel], Awaitable[str]]

    def to_anthropic(self) -> dict[str, Any]:
        return {
            "name": self.name,
            "description": self.description,
            # Pydantic v2 génère le JSON Schema gratuitement
            "input_schema": self.args_model.model_json_schema(),
        }


async def _search(args: SearchArgs) -> str:
    # args est déjà validé et typé ici
    rows = await db.search(args.query, limit=args.limit)  # type: ignore[name-defined]
    return json.dumps([r.as_dict() for r in rows], ensure_ascii=False)


SEARCH = Tool(
    name="search_db",
    description=(
        "Recherche dans la base produits. "
        "Appelle cet outil dès que la question porte sur un prix, "
        "un stock, ou une caractéristique d'un produit — ne réponds pas de mémoire."
    ),
    args_model=SearchArgs,
    handler=_search,  # type: ignore[arg-type]
)

REGISTRY: dict[str, Tool] = {SEARCH.name: SEARCH}


async def execute_tool(name: str, raw_input: dict[str, Any]) -> tuple[str, bool]:
    """Retourne (contenu, is_error). Ne lève jamais — l'erreur revient à Claude."""
    tool = REGISTRY.get(name)
    if tool is None:
        return f"Outil inconnu : {name}", True
    try:
        args = tool.args_model.model_validate(raw_input)
    except ValidationError as exc:
        return f"Arguments invalides : {exc.errors()}", True
    try:
        return await tool.handler(args), False
    except Exception as exc:  # surface contrôlée, on log à part
        return f"Échec de l'exécution : {exc}", True

Le point senior : un échec d'outil n'est pas une exception serveur, c'est une donnée pour Claude. On renvoie is_error=True dans le tool_result, Claude lit l'erreur et s'adapte. Faire remonter l'exception jusqu'au StreamingResponse couperait le flux sans que le client comprenne pourquoi.


La boucle d'agent streamée

C'est le cœur. On écrit un générateur asynchrone qui yield des dicts d'événements (on les sérialisera en SSE juste après). On utilise la boucle manuelle plutôt que le tool-runner du SDK, parce qu'en prod on veut intercepter chaque appel d'outil (gating, audit, annulation).

python
from collections.abc import AsyncIterator
from typing import Any, Literal, TypedDict

from anthropic import AsyncAnthropic
from anthropic.types import Message

client = AsyncAnthropic()

MAX_TURNS = 12  # garde-fou anti-boucle-infinie


class AgentEvent(TypedDict):
    type: Literal["thinking", "text", "tool_use", "tool_result", "done", "error"]
    data: dict[str, Any]


async def run_agent(question: str) -> AsyncIterator[AgentEvent]:
    messages: list[dict[str, Any]] = [{"role": "user", "content": question}]
    tools = [t.to_anthropic() for t in REGISTRY.values()]

    for _turn in range(MAX_TURNS):
        async with client.messages.stream(
            model="claude-opus-4-8",
            max_tokens=8192,
            thinking={"type": "adaptive", "display": "summarized"},
            output_config={"effort": "high"},
            tools=tools,
            messages=messages,
        ) as stream:
            # Flux fin : on relaie chaque delta au client en temps réel
            async for event in stream:
                if event.type == "content_block_delta":
                    if event.delta.type == "thinking_delta":
                        yield {"type": "thinking", "data": {"text": event.delta.thinking}}
                    elif event.delta.type == "text_delta":
                        yield {"type": "text", "data": {"text": event.delta.text}}

            final: Message = await stream.get_final_message()

        if final.stop_reason != "tool_use":
            # end_turn / max_tokens / refusal → on s'arrête
            if final.stop_reason == "refusal":
                yield {"type": "error", "data": {"reason": "refusal"}}
            yield {"type": "done", "data": {"stop_reason": final.stop_reason}}
            return

        # ── Claude veut des outils ──────────────────────────────────────
        # 1. on RENVOIE le content complet de l'assistant (blocs tool_use inclus)
        messages.append({"role": "assistant", "content": final.content})

        # 2. on exécute chaque tool_use et on rassemble les tool_result
        tool_results: list[dict[str, Any]] = []
        for block in final.content:
            if block.type != "tool_use":
                continue
            yield {"type": "tool_use", "data": {"name": block.name, "input": block.input}}

            content, is_error = await execute_tool(block.name, block.input)
            yield {"type": "tool_result", "data": {"name": block.name, "is_error": is_error}}

            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": content,
                "is_error": is_error,
            })

        # 3. on renvoie tous les résultats dans UN seul message user
        messages.append({"role": "user", "content": tool_results})

    # garde-fou atteint
    yield {"type": "done", "data": {"stop_reason": "max_turns"}}

Détails qui font la différence :

  • display: "summarized" sur le thinking : sinon, sur Opus 4.8, le défaut est "omitted" et le champ thinking arrive vide. Côté UX, ça ressemble à une longue pause avant la réponse. On opte explicitement pour le résumé si on streame la réflexion à l'utilisateur.
  • Le block.input est déjà parsé par le SDK. Ne fais jamais de str.contains sur l'input sérialisé — Opus 4.x peut échapper l'Unicode ou les / différemment. Travaille sur l'objet.
  • Plusieurs tool_use dans un même tour : Claude peut en demander plusieurs en parallèle. On les exécute tous, et on renvoie tous les tool_result dans un seul message user. Un message par résultat casserait l'alternance des rôles.
  • MAX_TURNS : un agent buggé (ou un prompt adverse) peut boucler. Toujours un plafond.

L'endpoint FastAPI : du générateur au text/event-stream

Maintenant on branche le générateur sur SSE. Le format SSE est simple : des lignes event: <type> et data: <json>, séparées par une ligne vide.

python
import asyncio
import json
import logging

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

logger = logging.getLogger("agent")
app = FastAPI()


def sse(event: AgentEvent) -> str:
    """Sérialise un AgentEvent au format Server-Sent Events."""
    return f"event: {event['type']}\ndata: {json.dumps(event['data'], ensure_ascii=False)}\n\n"


@app.get("/agent/stream")
async def agent_stream(request: Request, q: str) -> StreamingResponse:
    async def event_source() -> AsyncIterator[str]:
        try:
            async for event in run_agent(q):
                # Si le client a coupé, on arrête la boucle d'agent → on cesse de payer.
                if await request.is_disconnected():
                    logger.info("client parti, abandon de la boucle d'agent")
                    return
                yield sse(event)
        except asyncio.CancelledError:
            logger.info("stream annulé (déconnexion / shutdown)")
            raise
        except Exception:
            logger.exception("erreur dans la boucle d'agent")
            # On a peut-être déjà streamé du contenu : impossible de changer le code HTTP.
            # On émet donc un événement d'erreur applicatif, puis on ferme.
            yield sse({"type": "error", "data": {"reason": "internal"}})

    return StreamingResponse(
        event_source(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # désactive le buffering nginx
        },
    )

Le frontend (Angular, NestJS gateway, ou navigateur brut) consomme ça avec EventSource ou fetch + ReadableStream :

ts
// côté Angular / TS
const es = new EventSource(`/agent/stream?q=${encodeURIComponent(question)}`);
es.addEventListener("text", (e) => append(JSON.parse(e.data).text));
es.addEventListener("tool_use", (e) => showTool(JSON.parse(e.data)));
es.addEventListener("done", () => es.close());
es.addEventListener("error", () => es.close());

Note EventSource ne fait que des GET et ne gère pas les headers custom (auth). Pour passer un Authorization, beaucoup d'équipes utilisent plutôt fetch() avec response.body.getReader() côté client — même format SSE, mais POST + headers possibles.


⚙️ En production

1. Déconnexion client = annulation de la boucle (le piège n°1)

C'est l'erreur la plus chère. Un utilisateur ferme l'onglet à mi-réponse ; si tu ne vérifies pas request.is_disconnected(), la boucle d'agent continue : elle appelle Claude tour après tour, exécute des outils, et te facture chaque token pour un client qui n'écoute plus. Le check is_disconnected() à chaque itération coupe l'antenne. Starlette propage aussi une asyncio.CancelledError dans le générateur quand la connexion tombe — laisse-la remonter (raise), ne l'avale pas.

2. Le code HTTP est figé dès le premier byte

Dès que StreamingResponse a émis le premier octet, le statut 200 OK et les headers sont partis. Si ton agent plante au tour 3, tu ne peux pas renvoyer un 500. La seule issue propre : émettre un événement SSE error applicatif que le frontend sait interpréter, puis fermer. Corollaire : valide tout ce qui peut l'être (auth, quotas, paramètres) avant d'ouvrir le flux, pendant que tu peux encore renvoyer un vrai code d'erreur.

3. Timeouts et buffering des proxies

Les SSE longs se font tuer par des intermédiaires qui ne sont pas au courant :

  • nginx : proxy_buffering off; (ou le header X-Accel-Buffering: no), et proxy_read_timeout assez large. Sans ça, nginx bufferise et le client ne voit rien jusqu'à la fin.
  • Load balancers / API gateways : idle_timeout souvent à 60s par défaut → la connexion saute au milieu d'une longue réflexion. Pour les tenir éveillés, émets un heartbeat (: keep-alive\n\n, un commentaire SSE) toutes les ~15s.
  • Une tâche Opus 4.8 difficile peut tourner plusieurs minutes : prévois l'UX (indicateur de progression) et les timeouts en conséquence.
python
async def with_heartbeat(gen: AsyncIterator[str], interval: float = 15.0) -> AsyncIterator[str]:
    queue: asyncio.Queue[str | None] = asyncio.Queue()

    async def pump() -> None:
        async for chunk in gen:
            await queue.put(chunk)
        await queue.put(None)

    task = asyncio.create_task(pump())
    try:
        while True:
            try:
                item = await asyncio.wait_for(queue.get(), timeout=interval)
            except TimeoutError:
                yield ": keep-alive\n\n"  # commentaire SSE, ignoré par le client
                continue
            if item is None:
                return
            yield item
    finally:
        task.cancel()

4. Fiabilité : retries et exceptions typées

Le SDK Anthropic retente automatiquement les 429 et 5xx avec backoff exponentiel (max_retries=2 par défaut). Ne réinvente pas une couche de retry par-dessus. En revanche, gère les exceptions typées au bon niveau — avant d'ouvrir le flux quand c'est possible :

python
import anthropic

try:
    # ex. un count_tokens ou une validation pré-vol avant d'ouvrir le SSE
    ...
except anthropic.RateLimitError:
    return JSONResponse({"error": "rate_limited"}, status_code=429)
except anthropic.AuthenticationError:
    return JSONResponse({"error": "auth"}, status_code=500)  # clé serveur invalide
except anthropic.APIStatusError as exc:
    if exc.type == "overloaded_error":
        return JSONResponse({"error": "overloaded"}, status_code=503)
    raise

Ne fais jamais de if "429" in str(exc) : utilise isinstance sur les classes typées (RateLimitError, OverloadedError, APIStatusError…). Le .type permet d'affiner ("billing_error" vs "permission_error", tous deux 403).

5. Performance et coût

  • Prompt caching. Si ton system prompt et ta liste d'outils sont stables, pose un cache_control: {"type": "ephemeral"} sur le dernier bloc système. Les tours suivants de la boucle (et les requêtes suivantes) relisent le préfixe à ~0,1× le prix au lieu de le repayer plein. Attention : le caching est un match de préfixe — un seul octet qui change en tête (un datetime.now() dans le system prompt, des outils dans un ordre non déterministe) invalide tout. Trie tes outils par nom, garde le system prompt figé. Vérifie avec usage.cache_read_input_tokens : s'il reste à zéro, tu as un invalidateur silencieux.
  • effort est ton principal levier coût/qualité. high est souvent le bon compromis ; low pour les sous-tâches simples ; max quand la justesse prime sur le coût. Un effort plus bas = moins d'appels d'outils, moins de préambule.
  • Concurrence. AsyncAnthropic est non-bloquant : un seul worker uvicorn tient des centaines de flux concurrents. Le coût n'est pas le CPU mais les tokens et tes rate limits (TPM/RPM). Mets une file ou un sémaphore si tu veux plafonner la concurrence.

6. Sécurité

  • Gating des outils à effet de bord. search_db est lisible ; send_email ou delete_account ne doivent jamais s'exécuter sans confirmation. La boucle manuelle te donne le point d'interception : avant await execute_tool(...), vérifie une allow-list / demande une confirmation humaine pour les actions irréversibles.
  • Pas de secret dans le prompt. Les clés API, tokens, etc. n'ont rien à faire dans le system prompt ou les messages — ils y resteraient persistés dans l'historique. Injecte-les côté handler d'outil, dans ton processus.
  • Injection de prompt. Le contenu renvoyé par un outil (résultat de recherche web, contenu d'un fichier utilisateur) peut contenir des instructions malveillantes. Traite les tool_result comme des données non fiables ; ne donne pas à l'agent des outils dont l'abus serait catastrophique sans gating.

7. Observabilité

usage (sur get_final_message()) porte input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens. Log-les par tour avec un request_id corrélé. Compte aussi le nombre de tours et le nombre d'appels d'outils par requête : une dérive (12 tours pour une question triviale) signale un prompt mal calibré ou un outil mal décrit. Le stop_reason final (end_turn vs max_turns vs refusal) est ta métrique de santé.


🏋️ Exercices

Exercice 1 — La boucle minimale (implémenter)

Objectif. Écris run_agent() de zéro : une boucle qui appelle client.messages.stream, relaie les deltas de texte, détecte stop_reason == "tool_use", exécute un unique outil get_time() (qui renvoie l'heure UTC), réinjecte le résultat, et reboucle jusqu'à end_turn. Expose-la sur GET /agent/stream en SSE.

Indice/Solution. Reprends le squelette de la section « boucle d'agent ». Le piège : il faut messages.append({"role": "assistant", "content": final.content}) avant d'ajouter le tool_result, sinon 400 (tool_use orphelin). Vérifie l'appariement : chaque tool_result.tool_use_id doit matcher un block.id du tour précédent.

Exercice 2 — Multi-outils parallèles + validation (production-grade)

Objectif. Ajoute deux outils (search_db, get_weather) avec schémas Pydantic. Force un cas où Claude appelle les deux dans le même tour (ex. « compare le prix du produit X et la météo à Paris »). Gère l'exécution concurrente avec asyncio.gather, et fais en sorte qu'un input invalide remonte comme is_error=True sans casser le flux.

Indice/Solution. Collecte les blocs tool_use, lance await asyncio.gather(*[execute_tool(b.name, b.input) for b in tool_blocks]), puis assemble tous les tool_result dans un seul message user. La validation : model_validate dans execute_tool attrape la ValidationError et la transforme en tool_result d'erreur — Claude relira l'erreur et corrigera ses arguments au tour suivant.

Exercice 3 — Annulation propre (break-then-fix)

Objectif. Lance un agent sur une tâche longue (effort max, plusieurs tours). Pendant le flux, ferme la connexion côté client (curl + Ctrl-C). Constate dans les logs que la boucle continue à appeler Claude. Puis corrige : ajoute le check request.is_disconnected() et la propagation de CancelledError. Mesure la différence de tokens facturés (usage) entre les deux versions.

Indice/Solution. Sans le check, Starlette finit par cancel la tâche, mais souvent après le tour en cours — tu paies au moins un appel inutile. Le is_disconnected() en tête de boucle coupe net. Pour l'observer : log usage.output_tokens cumulés à chaque tour, et compare un run interrompu avec/sans le garde-fou.

Exercice 4 — Heartbeat & survie aux proxies (production-grade)

Objectif. Mets ton endpoint derrière nginx avec proxy_read_timeout 30s. Constate que les réflexions longues (> 30s sans output) tuent la connexion. Corrige en (a) émettant un heartbeat SSE toutes les 15s, et (b) en posant proxy_buffering off. Vérifie que le client reçoit bien les tokens en temps réel et non d'un bloc à la fin.

Indice/Solution. Utilise le wrapper with_heartbeat de la section prod. Le commentaire SSE : keep-alive\n\n est ignoré par EventSource mais maintient le TCP vivant. Pour le buffering : sans X-Accel-Buffering: no, nginx accumule la réponse — symptôme typique « rien ne s'affiche puis tout d'un coup ».

Exercice 5 — Gating d'un outil destructif (sécurité)

Objectif. Ajoute un outil delete_record(id). Implémente un gate : quand Claude l'appelle, n'exécute pas ; émets un événement SSE confirm_required, mets la boucle en pause, et attends une seconde requête POST /agent/{id}/confirm qui débloque (ou refuse) l'exécution. Renvoie un tool_result is_error=True avec un message si l'utilisateur refuse.

Indice/Solution. La pause d'un flux SSE en attendant une action externe est délicate : le plus simple est de stocker l'état de la boucle (les messages accumulés) dans un store (Redis), de fermer le flux sur confirm_required, puis de reprendre la boucle dans la requête /confirm avec un nouveau flux SSE. C'est exactement le pattern « human-in-the-loop » : l'agent propose, ton serveur dispose.

Exercice 6 — Caching & budget (optimisation, difficile)

Objectif. Donne à ton agent un gros system prompt (~3K tokens) + 8 outils. Mesure usage.cache_read_input_tokens sur 3 requêtes successives. Il devrait être > 0 dès la 2ᵉ. Puis introduis un bug : interpole datetime.now() en tête du system prompt. Constate que le cache tombe à zéro. Corrige, et déplace l'horodatage en fin de premier message user.

Indice/Solution. Pose cache_control: {"type": "ephemeral"} sur le dernier bloc système. Le caching est un match de préfixe : ordre tools → system → messages. Tout ce qui varie (timestamp, ID de requête) doit être après le dernier breakpoint. Le minimum cacheable sur Opus 4.8 est ~4096 tokens — un préfixe trop court ne cache pas (silencieusement). Trie tes outils par nom pour un ordre déterministe.


🎤 En entretien

Q : Pourquoi streamer un agent en SSE plutôt que renvoyer la réponse complète en JSON ? Latence perçue et timeouts : l'utilisateur voit les tokens arriver immédiatement, et au-delà de ~16K tokens de sortie une requête non-streamée dépasse le timeout HTTP du SDK. SSE relaie aussi les étapes intermédiaires (réflexion, appels d'outils), ce qui donne de la transparence sur ce que fait l'agent.

Q : Que se passe-t-il si une exception est levée au milieu d'un flux SSE ? Comment la signaler au client ? Le code HTTP est déjà 200 et parti dès le premier byte — impossible de renvoyer un 500. On émet un événement SSE applicatif error que le frontend sait interpréter, puis on ferme. D'où la règle : valider tout (auth, quotas, paramètres) avant d'ouvrir le flux, pendant qu'on peut encore renvoyer un vrai code.

Q : Comment éviter de payer des tokens pour un client qui s'est déconnecté ? Vérifier request.is_disconnected() à chaque tour de boucle et laisser remonter la asyncio.CancelledError que Starlette propage. Sans ça, la boucle d'agent continue d'appeler le modèle et d'exécuter des outils pour personne — coût direct en tokens.

Q : Pourquoi une boucle tool_use manuelle plutôt que le tool-runner du SDK ? Le contrôle. La boucle manuelle te donne le point d'interception entre la décision de Claude et l'exécution : gating des actions destructives (confirmation humaine), audit/log de chaque appel, exécution conditionnelle, et annulation propre sur déconnexion. Le tool-runner est parfait pour un script ; un service de prod qui sert des outils à effet de bord a besoin de ces points de contrôle.

Bibliothèque tech perso — Achref