Skip to content

Appeler les SDK LLM (async/stream/structured)

TL;DR — Un appel LLM est un appel réseau lent (souvent 10 s à plusieurs minutes), donc en Python tout passe par AsyncAnthropic et async/await : jamais le client sync dans un endpoint FastAPI. Tu ne fais quasiment jamais un messages.create() bloquant non plus — tu streames (messages.stream()) pour ne pas exploser les timeouts HTTP et pour pousser les tokens à l'utilisateur au fil de l'eau. Pour récupérer du JSON typé garanti, tu utilises les sorties structurées natives (messages.parse() + Pydantic v2), pas du prompt-engineering fragile. Et pour un agent, tu boucles sur tool_use jusqu'à end_turn, en réinjectant les tool_result. Les trois piliers à interrioriser : async partout, stream par défaut, types en entrée comme en sortie.


🧠 Mental model

Tu viens de PHP/TS et tu as l'habitude des appels HTTP « requête → réponse » qui durent quelques dizaines de millisecondes. Un appel LLM, ce n'est pas ça. C'est plutôt un coup de téléphone à un expert qui réfléchit à voix haute pendant deux minutes : si tu raccroches au bout de 30 secondes (timeout), tu n'as rien ; si tu attends en silence sans rien afficher, l'utilisateur croit que ça a planté ; et si tu poses ta question à dix experts en même temps depuis un thread unique qui bloque, tu paralyses tout ton standard téléphonique.

   Appel HTTP classique (REST CRUD)          Appel LLM
   ┌──────────────────────────┐              ┌────────────────────────────────────┐
   │ req ──> [20 ms] ──> resp │              │ req ──> [.....10s à 5min.....] resp │
   └──────────────────────────┘              └────────────────────────────────────┘
        sync, c'est OK                          le thread DOIT être libéré pendant l'attente
                                                => async/await
                                                => streaming (tokens au fil de l'eau)

Les trois conséquences directes de cette latence :

  1. Async ou tu meurs. Pendant ces secondes/minutes d'attente, le thread ne doit faire aucun travail CPU — il attend des octets sur une socket. C'est le cas d'usage canonique d'asyncio. Un client sync dans un worker FastAPI bloque l'event loop et tue ta concurrence.
  2. Stream ou tu timeouts. Au-delà de ~16K tokens de sortie, une requête non-streamée risque de dépasser les timeouts HTTP du SDK. Le streaming résout ça et donne l'effet « machine à écrire » attendu par les utilisateurs.
  3. Types ou tu débugges du texte. Le modèle produit du texte par défaut. Si ton code attend du JSON, ne le supplie pas dans le prompt — contraints-le côté API.

Le SDK Anthropic est ta façade par-dessus POST /v1/messages. Tout (tool use, sorties structurées, streaming) est une feature de ce seul endpoint, pas dix API différentes.


Core teaching

Le client : AsyncAnthropic, jamais le sync dans un endpoint

Installe le SDK officiel (jamais requests/httpx à la main pour parler à Claude, ni un shim « compatible OpenAI ») :

bash
pip install "anthropic>=0.40"
python
# llm/client.py
from anthropic import AsyncAnthropic

# Le client résout la clé depuis l'environnement (ANTHROPIC_API_KEY).
# Ne hardcode JAMAIS la clé. Un seul client par process, réutilisé partout :
# il maintient un pool de connexions httpx sous le capot.
client = AsyncAnthropic()

MODEL = "claude-opus-4-8"  # phare actuel : 5 $ / 25 $ par Mtok, contexte 1M

❌ La mauvaise façon (le piège le plus fréquent)

python
# ❌ NE FAIS PAS ÇA dans un endpoint FastAPI async
from anthropic import Anthropic  # client SYNCHRONE

client = Anthropic()

@app.post("/chat")
async def chat(body: ChatIn) -> ChatOut:
    # Cet appel bloque l'EVENT LOOP pendant toute la durée de génération.
    # Tous les autres requests du worker sont gelés. Catastrophe de perf.
    msg = client.messages.create(
        model=MODEL,
        max_tokens=1024,
        messages=[{"role": "user", "content": body.text}],
    )
    return ChatOut(text=msg.content[0].text)

Le client sync fait un appel réseau bloquant. Dans une coroutine, ça gèle l'event loop pendant des secondes : ta concurrence tombe à 1. (Si tu dois utiliser le client sync depuis du code async — par exemple une lib legacy — enveloppe-le dans await asyncio.to_thread(...), mais le vrai correctif est AsyncAnthropic.)

✅ La façon idiomatique

python
# api/chat.py
from anthropic import AsyncAnthropic
from fastapi import FastAPI, Depends
from pydantic import BaseModel

app = FastAPI()


# Injection de dépendance : un client partagé, pas un par requête.
def get_client() -> AsyncAnthropic:
    return client  # le singleton de llm/client.py


class ChatIn(BaseModel):
    text: str


class ChatOut(BaseModel):
    reply: str


@app.post("/chat")
async def chat(body: ChatIn, llm: AsyncAnthropic = Depends(get_client)) -> ChatOut:
    msg = await llm.messages.create(
        model=MODEL,
        max_tokens=1024,
        messages=[{"role": "user", "content": body.text}],
    )
    # content est une liste de blocs hétérogènes ; on filtre les blocs texte.
    reply = "".join(b.text for b in msg.content if b.type == "text")
    return ChatOut(reply=reply)

Note : content n'est pas une string. C'est une list de blocs (text, thinking, tool_use…). Lire msg.content[0].text aveuglément casse dès qu'un bloc thinking ou tool_use arrive en tête. Filtre toujours par b.type.

⚠️ Thinking adaptatif et paramètres d'échantillonnage. Sur Opus 4.8 (comme 4.7) : le thinking se configure uniquement en adaptatif (thinking={"type": "adaptive"}). thinking={"type": "enabled", "budget_tokens": N} renvoie une 400budget_tokens est supprimé, tout comme temperature, top_p, top_k. Pour piloter la profondeur de réflexion, tu utilises output_config={"effort": "..."} (low/medium/high/xhigh/max), jamais un budget de tokens.

python
msg = await llm.messages.create(
    model=MODEL,
    max_tokens=16000,
    thinking={"type": "adaptive"},          # Claude décide quand/combien réfléchir
    output_config={"effort": "high"},        # profondeur globale, PAS budget_tokens
    messages=[{"role": "user", "content": "..."}],
)

Streaming : par défaut, pas en option

Dès que la sortie peut être longue (> ~16K tokens) ou que tu veux afficher au fil de l'eau, tu streames. Si tu n'as pas besoin de traiter chaque event individuellement, le helper get_final_message() te rend le message complet une fois le stream terminé :

python
async with llm.messages.stream(
    model=MODEL,
    max_tokens=64000,                 # avec stream, on peut viser haut sans timeout
    messages=[{"role": "user", "content": "Écris un long rapport."}],
) as stream:
    async for text in stream.text_stream:   # juste les deltas de texte
        print(text, end="", flush=True)
    final = await stream.get_final_message()  # Message complet : usage, stop_reason…

print(final.usage.output_tokens)

SSE : streamer les tokens vers le navigateur depuis FastAPI

C'est le cas concret quand tu sers un agent à un front Angular. On émet des Server-Sent Events. Point crucial : propager la déconnexion du client pour ne pas continuer à payer des tokens dans le vide.

python
# api/stream.py
import json
from collections.abc import AsyncIterator

from anthropic import AsyncAnthropic
from fastapi import FastAPI, Depends, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

app = FastAPI()


class AskIn(BaseModel):
    question: str


async def sse_tokens(
    llm: AsyncAnthropic, question: str, request: Request
) -> AsyncIterator[str]:
    async with llm.messages.stream(
        model=MODEL,
        max_tokens=64000,
        messages=[{"role": "user", "content": question}],
    ) as stream:
        async for text in stream.text_stream:
            # Si le client a fermé l'onglet, on arrête de générer.
            if await request.is_disconnected():
                break
            yield f"data: {json.dumps({'delta': text})}\n\n"
        yield "data: [DONE]\n\n"


@app.post("/ask")
async def ask(
    body: AskIn, request: Request, llm: AsyncAnthropic = Depends(get_client)
) -> StreamingResponse:
    return StreamingResponse(
        sse_tokens(llm, body.question, request),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

Le header X-Accel-Buffering: no désactive le buffering nginx — sinon ton proxy accumule les tokens et l'effet « temps réel » disparaît. C'est le bug n°1 quand « le streaming marche en local mais pas en prod ».


Sorties structurées : Pydantic v2 + messages.parse()

Tu construis des agents : tu as besoin de JSON exploitable, pas de prose. N'extrais pas du JSON à coups de regex sur du texte libre. Utilise les sorties structurées natives. Le SDK valide la réponse contre ton schéma Pydantic via messages.parse().

python
# llm/extract.py
from pydantic import BaseModel, Field


class Lead(BaseModel):
    name: str
    email: str
    plan: str = Field(description="Plan demandé : free, pro ou enterprise")
    wants_demo: bool


async def extract_lead(llm: AsyncAnthropic, raw: str) -> Lead:
    msg = await llm.messages.parse(
        model=MODEL,
        max_tokens=1024,
        messages=[{"role": "user", "content": f"Extrais les infos du lead :\n{raw}"}],
        output_format=Lead,   # le SDK transmet le json_schema et valide la réponse
    )
    # parsed est un Lead validé — ou None si le modèle a refusé.
    if msg.parsed is None:
        raise ValueError(f"Pas de sortie structurée (stop_reason={msg.stop_reason})")
    return msg.parsed

❌ La mauvaise façon

python
# ❌ Fragile : on espère que le modèle renverra du JSON propre.
msg = await llm.messages.create(
    model=MODEL, max_tokens=1024,
    messages=[{"role": "user",
               "content": f"Renvoie UNIQUEMENT du JSON {{name, email}} pour : {raw}"}],
)
data = json.loads(msg.content[0].text)  # casse au premier ```json ... ``` ou texte parasite

Le modèle ajoute parfois un préambule, une fence Markdown, ou un commentaire. Ton json.loads plante en prod sur 0,5 % des requêtes — précisément celles que tu ne testes pas. Les sorties structurées natives garantissent un JSON conforme au schéma.

Limites utiles à connaître : les sorties structurées ne supportent pas les contraintes numériques (minimum/maximum), les contraintes de longueur de string, ni les schémas récursifs. Les SDK Python/TS retirent ces contraintes du schéma envoyé et les valident côté client. additionalProperties: false est requis (Pydantic v2 le génère par défaut avec model_config = ConfigDict(extra="forbid") si besoin).


La boucle d'agent : tool_use → exécuter → réinjecter → end_turn

Le cœur d'un agent maison : tu définis des outils, le modèle décide d'en appeler, toi tu les exécutes côté serveur, puis tu renvoies les résultats et tu reboucles jusqu'à ce que le modèle ait fini (stop_reason == "end_turn").

python
# agent/loop.py
import json
from anthropic import AsyncAnthropic

TOOLS = [
    {
        "name": "get_weather",
        "description": "Renvoie la météo actuelle d'une ville. "
                       "Appelle cet outil quand l'utilisateur demande la météo.",
        "input_schema": {
            "type": "object",
            "properties": {"city": {"type": "string", "description": "Nom de la ville"}},
            "required": ["city"],
        },
    }
]


async def run_tool(name: str, args: dict) -> str:
    if name == "get_weather":
        # Ici un vrai appel API météo. On simule.
        return json.dumps({"city": args["city"], "temp_c": 21, "sky": "clear"})
    raise ValueError(f"Outil inconnu : {name}")


async def agent_turn(llm: AsyncAnthropic, user_input: str) -> str:
    messages: list[dict] = [{"role": "user", "content": user_input}]

    for _ in range(10):  # garde-fou anti-boucle-infinie
        msg = await llm.messages.create(
            model=MODEL,
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        )

        if msg.stop_reason == "end_turn":
            return "".join(b.text for b in msg.content if b.type == "text")

        if msg.stop_reason == "tool_use":
            # On REINJECTE le tour assistant complet (blocs tool_use compris),
            # puis on renvoie un tour user avec les tool_result correspondants.
            messages.append({"role": "assistant", "content": msg.content})

            results = []
            for block in msg.content:
                if block.type == "tool_use":
                    output = await run_tool(block.name, block.input)
                    results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,   # DOIT matcher le tool_use
                        "content": output,
                    })
            messages.append({"role": "user", "content": results})
            continue

    raise RuntimeError("Limite d'itérations atteinte")

Trois invariants à ne jamais violer :

  1. Réinjecte msg.content en entier dans le tour assistant — pas seulement le texte. Si tu perds les blocs tool_use, l'API ne sait plus à quoi répondent tes tool_result.
  2. Chaque tool_result porte le bon tool_use_id. C'est l'appariement requête↔réponse d'outil.
  3. Parse block.input comme du JSON déjà désérialisé par le SDK — ne fais jamais de string-matching sur l'input sérialisé (l'échappement Unicode peut varier).

Pour la plupart des cas, le tool runner du SDK (@beta_tool + boucle automatique) t'évite d'écrire cette boucle. Garde la boucle manuelle quand tu as besoin de contrôle fin : approbation humaine avant exécution, logging custom, exécution conditionnelle.


⚙️ En production

Retries et exceptions typées

Le SDK retry automatiquement les 429 et 5xx avec backoff exponentiel (max_retries=2 par défaut). N'écris pas ta propre boucle de retry par-dessus — tu doublerais le backoff. Configure-la plutôt sur le client. Et attrape des exceptions typées, jamais du string-matching sur le message d'erreur :

python
import anthropic

client = AsyncAnthropic(max_retries=4, timeout=120.0)

try:
    msg = await client.messages.create(model=MODEL, max_tokens=1024, messages=[...])
except anthropic.RateLimitError:
    ...  # 429 : déjà retryé par le SDK ; ici c'est l'échec final
except anthropic.APIStatusError as e:
    if e.type == "overloaded_error":   # 529 : surcharge transitoire
        ...
    elif e.type == "invalid_request_error":  # 400 : bug côté toi, pas de retry
        ...

Failure modes spécifiques aux LLM

Mode de défaillanceSymptômeCorrectif senior
Client sync dans une coroutineConcurrence effondrée, latence p99 exploseAsyncAnthropic ; sinon asyncio.to_thread
Pas de stream sur grosse sortieTimeout HTTP intermittent à > ~16K tokensmessages.stream() + get_final_message()
content[0].text aveugleIndexError/AttributeError sur blocs thinking/tool_useFiltrer par b.type == "text"
stop_reason == "max_tokens" ignoréRéponse tronquée en plein milieuVérifier stop_reason ; relancer avec max_tokens plus haut
stop_reason == "refusal" non géréCrash sur content videBrancher sur stop_reason avant de lire content
Buffering du proxyStream « saccadé » ou tout d'un coup en prodX-Accel-Buffering: no, désactiver le buffer nginx
tool_use_id non apparié400 sur le tour suivant de l'agentRecopier l'id du tool_use dans le tool_result

Performance et coût : le prompt caching

Pour un agent, ton prompt système + tes définitions d'outils sont stables et renvoyés à chaque tour. Le prompt caching (cache_control: {"type": "ephemeral"}) facture les lectures à ~0,1× le prix d'entrée. C'est le levier de coût n°1.

python
msg = await llm.messages.create(
    model=MODEL,
    max_tokens=4096,
    system=[{
        "type": "text",
        "text": LARGE_STABLE_SYSTEM_PROMPT,
        "cache_control": {"type": "ephemeral"},   # cache le préfixe stable
    }],
    messages=messages,
)
# Vérifie l'efficacité :
print(msg.usage.cache_read_input_tokens, msg.usage.cache_creation_input_tokens)

Le caching est un match de préfixe : le moindre octet qui change avant le breakpoint invalide tout ce qui suit. L'invalidateur silencieux classique : un datetime.now() ou un UUID interpolé dans le prompt système. Si cache_read_input_tokens reste à zéro sur des requêtes au préfixe identique, traque le contenu volatil en tête de prompt. Minimum cacheable sur Opus 4.8 : ~4096 tokens (en dessous, ça ne cache pas, silencieusement).

Choix de modèle (le tradeoff senior)

ModèlePrix (in/out par Mtok)Quand
claude-opus-4-85 $ / 25 $Raisonnement, agents long-horizon, code
claude-sonnet-4-63 $ / 15 $Bon équilibre vitesse/intelligence
claude-haiku-4-51 $ / 5 $Classification, sous-agents, tâches simples, latence

Un pattern senior : route les sous-tâches (résumé, classification, routing) vers Haiku et garde Opus pour la boucle principale. Mais attention — changer de modèle en cours de conversation invalide le cache (le cache est scopé par modèle). Pour un sous-appel moins cher, spawn un sous-agent dédié plutôt que swap le modèle dans la boucle.

Sécurité

  • Clé API en variable d'env, jamais en dur, jamais loggée. Le SDK la lit depuis ANTHROPIC_API_KEY.
  • Outils à effet de bord (envoyer un mail, modifier la DB) : valide les inputs dans le handler et gate les actions irréversibles derrière une confirmation. Ne fais jamais confiance aveuglément à block.input.
  • Prompt injection : du contenu utilisateur ou récupéré du web peut contenir des instructions hostiles. Ne mets jamais de secret dans le prompt — il peut fuiter dans la sortie.

Observabilité

  • Logge msg.usage (input/output/cache tokens) et stop_reason sur chaque appel — c'est ta facture et ton signal de troncature.
  • Logge le request_id (présent sur les erreurs API) pour tracer un appel de bout en bout avec le support.
  • Pour les agents, instrumente le nombre d'itérations de boucle : une explosion = un outil qui échoue en silence et que le modèle re-tente.

🏋️ Exercices

1. Endpoint de chat async + comptage de tokens

Objectif — Construis un endpoint FastAPI POST /chat qui prend un texte, appelle Opus 4.8 en async, et renvoie la réponse plus le nombre de tokens d'entrée/sortie consommés.

Indice/Solution — Utilise AsyncAnthropic injecté via Depends. Pour les tokens, lis msg.usage.input_tokens et msg.usage.output_tokens sur la réponse (pas besoin d'un appel count_tokens séparé — l'usage réel est dans la réponse). Filtre content par b.type == "text".

2. Streaming SSE résilient à la déconnexion

Objectif — Transforme l'endpoint en StreamingResponse SSE. Quand le client ferme la connexion, la génération doit réellement s'arrêter (vérifie dans les logs que tu ne continues pas à streamer).

Indice/Solutionasync with llm.messages.stream(...), itère stream.text_stream, et teste await request.is_disconnected() à chaque delta pour break. Ajoute X-Accel-Buffering: no. Pour valider l'arrêt, ajoute un print à chaque delta et curl puis Ctrl-C.

3. Extraction structurée production-grade

Objectif — Écris extract_invoice(raw: str) -> InvoiceInvoice est un modèle Pydantic v2 avec vendor: str, total: float, currency: Literal["EUR", "USD"], lines: list[InvoiceLine]. Gère le cas où le modèle refuse.

Indice/Solutionmessages.parse(..., output_format=Invoice). Si msg.parsed is None, branche sur msg.stop_reason ("refusal" → remonte une erreur métier propre, pas un 500). Rappelle-toi : pas de contrainte numérique dans le schéma — valide total >= 0 côté Pydantic avec un field_validator.

4. Boucle d'agent multi-outils

Objectif — Implémente un agent avec deux outils (get_weather, search_db). Le modèle doit pouvoir en appeler plusieurs dans un même tour. Boucle jusqu'à end_turn avec un garde-fou de 10 itérations.

Indice/Solution — Dans un tour tool_use, msg.content peut contenir plusieurs blocs tool_use. Exécute-les tous (idéalement en parallèle avec asyncio.gather), puis renvoie tous les tool_result dans un seul tour user. Recopie chaque tool_use_id.

5. Casse-puis-répare : le cache fantôme

Objectif — On te donne un agent dont cache_read_input_tokens reste obstinément à 0 malgré un gros prompt système. Trouve et corrige l'invalidateur.

Indice/Solution — Le coupable est presque toujours du contenu volatil avant le breakpoint : f"... Date actuelle : {datetime.now()} ..." dans le system, ou un json.dumps(tools) sans sort_keys=True, ou un set itéré. Déplace tout contenu dynamique après le dernier cache_control, gèle le préfixe, et re-vérifie l'usage. Diff les octets de deux requêtes consécutives si tu ne trouves pas.

6. Production-grade : retries, timeouts, et budget de coût

Objectif — Durcis l'agent : max_retries et timeout configurés sur le client, gestion typée de RateLimitError/OverloadedError, et un garde-fou qui stoppe l'agent si l'usage cumulé dépasse un budget de tokens.

Indice/Solution — Configure sur le client (AsyncAnthropic(max_retries=4, timeout=120)), pas une boucle maison. Accumule msg.usage.input_tokens + output_tokens à chaque tour de boucle et lève une exception métier au-delà du seuil. Pour 529, le except anthropic.APIStatusError as e: if e.type == "overloaded_error" te laisse router vers un modèle moins chargé (Haiku) en repli.


🎤 En entretien

Q : Pourquoi AsyncAnthropic et pas le client sync dans un endpoint FastAPI ? Parce qu'un appel LLM bloque sur du réseau pendant des secondes ; un client sync dans une coroutine gèle l'event loop et effondre la concurrence du worker. L'async libère le thread pendant l'attente I/O.

Q : Quand streames-tu, et qu'est-ce que ça résout concrètement ? Par défaut dès que la sortie peut être longue (> ~16K tokens) ou affichée en direct. Ça évite les timeouts HTTP sur les grosses générations et ça pousse les tokens au fil de l'eau pour l'UX ; get_final_message() reconstitue le message complet si je n'ai pas besoin des events un par un.

Q : Comment garantis-tu du JSON typé en sortie, et pourquoi pas du prompt-engineering ? Sorties structurées natives via messages.parse() + schéma Pydantic v2 : l'API contraint le format, pas un espoir. Le prompt-engineering (« renvoie uniquement du JSON ») casse sur les préambules/fences en prod ; les sorties natives garantissent la conformité au schéma.

Q : Décris la boucle d'un agent tool-use et ses invariants. On appelle l'API avec tools ; tant que stop_reason == "tool_use", on réinjecte le tour assistant complet (blocs tool_use inclus), on exécute les outils, on renvoie les tool_result avec le bon tool_use_id, et on reboucle jusqu'à end_turn — avec un garde-fou d'itérations. Perdre les blocs tool_use ou mal apparier l'id provoque une 400.

Bibliothèque tech perso — Achref