Appeler les SDK LLM (async/stream/structured)
TL;DR — Un appel LLM est un appel réseau lent (souvent 10 s à plusieurs minutes), donc en Python tout passe par
AsyncAnthropicetasync/await: jamais le client sync dans un endpoint FastAPI. Tu ne fais quasiment jamais unmessages.create()bloquant non plus — tu streames (messages.stream()) pour ne pas exploser les timeouts HTTP et pour pousser les tokens à l'utilisateur au fil de l'eau. Pour récupérer du JSON typé garanti, tu utilises les sorties structurées natives (messages.parse()+ Pydantic v2), pas du prompt-engineering fragile. Et pour un agent, tu boucles surtool_usejusqu'àend_turn, en réinjectant lestool_result. Les trois piliers à interrioriser : async partout, stream par défaut, types en entrée comme en sortie.
🧠 Mental model
Tu viens de PHP/TS et tu as l'habitude des appels HTTP « requête → réponse » qui durent quelques dizaines de millisecondes. Un appel LLM, ce n'est pas ça. C'est plutôt un coup de téléphone à un expert qui réfléchit à voix haute pendant deux minutes : si tu raccroches au bout de 30 secondes (timeout), tu n'as rien ; si tu attends en silence sans rien afficher, l'utilisateur croit que ça a planté ; et si tu poses ta question à dix experts en même temps depuis un thread unique qui bloque, tu paralyses tout ton standard téléphonique.
Appel HTTP classique (REST CRUD) Appel LLM
┌──────────────────────────┐ ┌────────────────────────────────────┐
│ req ──> [20 ms] ──> resp │ │ req ──> [.....10s à 5min.....] resp │
└──────────────────────────┘ └────────────────────────────────────┘
sync, c'est OK le thread DOIT être libéré pendant l'attente
=> async/await
=> streaming (tokens au fil de l'eau)Les trois conséquences directes de cette latence :
- Async ou tu meurs. Pendant ces secondes/minutes d'attente, le thread ne doit faire aucun travail CPU — il attend des octets sur une socket. C'est le cas d'usage canonique d'
asyncio. Un client sync dans un worker FastAPI bloque l'event loop et tue ta concurrence. - Stream ou tu timeouts. Au-delà de ~16K tokens de sortie, une requête non-streamée risque de dépasser les timeouts HTTP du SDK. Le streaming résout ça et donne l'effet « machine à écrire » attendu par les utilisateurs.
- Types ou tu débugges du texte. Le modèle produit du texte par défaut. Si ton code attend du JSON, ne le supplie pas dans le prompt — contraints-le côté API.
Le SDK Anthropic est ta façade par-dessus POST /v1/messages. Tout (tool use, sorties structurées, streaming) est une feature de ce seul endpoint, pas dix API différentes.
Core teaching
Le client : AsyncAnthropic, jamais le sync dans un endpoint
Installe le SDK officiel (jamais requests/httpx à la main pour parler à Claude, ni un shim « compatible OpenAI ») :
pip install "anthropic>=0.40"# llm/client.py
from anthropic import AsyncAnthropic
# Le client résout la clé depuis l'environnement (ANTHROPIC_API_KEY).
# Ne hardcode JAMAIS la clé. Un seul client par process, réutilisé partout :
# il maintient un pool de connexions httpx sous le capot.
client = AsyncAnthropic()
MODEL = "claude-opus-4-8" # phare actuel : 5 $ / 25 $ par Mtok, contexte 1M❌ La mauvaise façon (le piège le plus fréquent)
# ❌ NE FAIS PAS ÇA dans un endpoint FastAPI async
from anthropic import Anthropic # client SYNCHRONE
client = Anthropic()
@app.post("/chat")
async def chat(body: ChatIn) -> ChatOut:
# Cet appel bloque l'EVENT LOOP pendant toute la durée de génération.
# Tous les autres requests du worker sont gelés. Catastrophe de perf.
msg = client.messages.create(
model=MODEL,
max_tokens=1024,
messages=[{"role": "user", "content": body.text}],
)
return ChatOut(text=msg.content[0].text)Le client sync fait un appel réseau bloquant. Dans une coroutine, ça gèle l'event loop pendant des secondes : ta concurrence tombe à 1. (Si tu dois utiliser le client sync depuis du code async — par exemple une lib legacy — enveloppe-le dans await asyncio.to_thread(...), mais le vrai correctif est AsyncAnthropic.)
✅ La façon idiomatique
# api/chat.py
from anthropic import AsyncAnthropic
from fastapi import FastAPI, Depends
from pydantic import BaseModel
app = FastAPI()
# Injection de dépendance : un client partagé, pas un par requête.
def get_client() -> AsyncAnthropic:
return client # le singleton de llm/client.py
class ChatIn(BaseModel):
text: str
class ChatOut(BaseModel):
reply: str
@app.post("/chat")
async def chat(body: ChatIn, llm: AsyncAnthropic = Depends(get_client)) -> ChatOut:
msg = await llm.messages.create(
model=MODEL,
max_tokens=1024,
messages=[{"role": "user", "content": body.text}],
)
# content est une liste de blocs hétérogènes ; on filtre les blocs texte.
reply = "".join(b.text for b in msg.content if b.type == "text")
return ChatOut(reply=reply)Note : content n'est pas une string. C'est une list de blocs (text, thinking, tool_use…). Lire msg.content[0].text aveuglément casse dès qu'un bloc thinking ou tool_use arrive en tête. Filtre toujours par b.type.
⚠️ Thinking adaptatif et paramètres d'échantillonnage. Sur Opus 4.8 (comme 4.7) : le thinking se configure uniquement en adaptatif (
thinking={"type": "adaptive"}).thinking={"type": "enabled", "budget_tokens": N}renvoie une 400 —budget_tokensest supprimé, tout commetemperature,top_p,top_k. Pour piloter la profondeur de réflexion, tu utilisesoutput_config={"effort": "..."}(low/medium/high/xhigh/max), jamais un budget de tokens.
msg = await llm.messages.create(
model=MODEL,
max_tokens=16000,
thinking={"type": "adaptive"}, # Claude décide quand/combien réfléchir
output_config={"effort": "high"}, # profondeur globale, PAS budget_tokens
messages=[{"role": "user", "content": "..."}],
)Streaming : par défaut, pas en option
Dès que la sortie peut être longue (> ~16K tokens) ou que tu veux afficher au fil de l'eau, tu streames. Si tu n'as pas besoin de traiter chaque event individuellement, le helper get_final_message() te rend le message complet une fois le stream terminé :
async with llm.messages.stream(
model=MODEL,
max_tokens=64000, # avec stream, on peut viser haut sans timeout
messages=[{"role": "user", "content": "Écris un long rapport."}],
) as stream:
async for text in stream.text_stream: # juste les deltas de texte
print(text, end="", flush=True)
final = await stream.get_final_message() # Message complet : usage, stop_reason…
print(final.usage.output_tokens)SSE : streamer les tokens vers le navigateur depuis FastAPI
C'est le cas concret quand tu sers un agent à un front Angular. On émet des Server-Sent Events. Point crucial : propager la déconnexion du client pour ne pas continuer à payer des tokens dans le vide.
# api/stream.py
import json
from collections.abc import AsyncIterator
from anthropic import AsyncAnthropic
from fastapi import FastAPI, Depends, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI()
class AskIn(BaseModel):
question: str
async def sse_tokens(
llm: AsyncAnthropic, question: str, request: Request
) -> AsyncIterator[str]:
async with llm.messages.stream(
model=MODEL,
max_tokens=64000,
messages=[{"role": "user", "content": question}],
) as stream:
async for text in stream.text_stream:
# Si le client a fermé l'onglet, on arrête de générer.
if await request.is_disconnected():
break
yield f"data: {json.dumps({'delta': text})}\n\n"
yield "data: [DONE]\n\n"
@app.post("/ask")
async def ask(
body: AskIn, request: Request, llm: AsyncAnthropic = Depends(get_client)
) -> StreamingResponse:
return StreamingResponse(
sse_tokens(llm, body.question, request),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)Le header X-Accel-Buffering: no désactive le buffering nginx — sinon ton proxy accumule les tokens et l'effet « temps réel » disparaît. C'est le bug n°1 quand « le streaming marche en local mais pas en prod ».
Sorties structurées : Pydantic v2 + messages.parse()
Tu construis des agents : tu as besoin de JSON exploitable, pas de prose. N'extrais pas du JSON à coups de regex sur du texte libre. Utilise les sorties structurées natives. Le SDK valide la réponse contre ton schéma Pydantic via messages.parse().
# llm/extract.py
from pydantic import BaseModel, Field
class Lead(BaseModel):
name: str
email: str
plan: str = Field(description="Plan demandé : free, pro ou enterprise")
wants_demo: bool
async def extract_lead(llm: AsyncAnthropic, raw: str) -> Lead:
msg = await llm.messages.parse(
model=MODEL,
max_tokens=1024,
messages=[{"role": "user", "content": f"Extrais les infos du lead :\n{raw}"}],
output_format=Lead, # le SDK transmet le json_schema et valide la réponse
)
# parsed est un Lead validé — ou None si le modèle a refusé.
if msg.parsed is None:
raise ValueError(f"Pas de sortie structurée (stop_reason={msg.stop_reason})")
return msg.parsed❌ La mauvaise façon
# ❌ Fragile : on espère que le modèle renverra du JSON propre.
msg = await llm.messages.create(
model=MODEL, max_tokens=1024,
messages=[{"role": "user",
"content": f"Renvoie UNIQUEMENT du JSON {{name, email}} pour : {raw}"}],
)
data = json.loads(msg.content[0].text) # casse au premier ```json ... ``` ou texte parasiteLe modèle ajoute parfois un préambule, une fence Markdown, ou un commentaire. Ton json.loads plante en prod sur 0,5 % des requêtes — précisément celles que tu ne testes pas. Les sorties structurées natives garantissent un JSON conforme au schéma.
Limites utiles à connaître : les sorties structurées ne supportent pas les contraintes numériques (
minimum/maximum), les contraintes de longueur de string, ni les schémas récursifs. Les SDK Python/TS retirent ces contraintes du schéma envoyé et les valident côté client.additionalProperties: falseest requis (Pydantic v2 le génère par défaut avecmodel_config = ConfigDict(extra="forbid")si besoin).
La boucle d'agent : tool_use → exécuter → réinjecter → end_turn
Le cœur d'un agent maison : tu définis des outils, le modèle décide d'en appeler, toi tu les exécutes côté serveur, puis tu renvoies les résultats et tu reboucles jusqu'à ce que le modèle ait fini (stop_reason == "end_turn").
# agent/loop.py
import json
from anthropic import AsyncAnthropic
TOOLS = [
{
"name": "get_weather",
"description": "Renvoie la météo actuelle d'une ville. "
"Appelle cet outil quand l'utilisateur demande la météo.",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string", "description": "Nom de la ville"}},
"required": ["city"],
},
}
]
async def run_tool(name: str, args: dict) -> str:
if name == "get_weather":
# Ici un vrai appel API météo. On simule.
return json.dumps({"city": args["city"], "temp_c": 21, "sky": "clear"})
raise ValueError(f"Outil inconnu : {name}")
async def agent_turn(llm: AsyncAnthropic, user_input: str) -> str:
messages: list[dict] = [{"role": "user", "content": user_input}]
for _ in range(10): # garde-fou anti-boucle-infinie
msg = await llm.messages.create(
model=MODEL,
max_tokens=4096,
tools=TOOLS,
messages=messages,
)
if msg.stop_reason == "end_turn":
return "".join(b.text for b in msg.content if b.type == "text")
if msg.stop_reason == "tool_use":
# On REINJECTE le tour assistant complet (blocs tool_use compris),
# puis on renvoie un tour user avec les tool_result correspondants.
messages.append({"role": "assistant", "content": msg.content})
results = []
for block in msg.content:
if block.type == "tool_use":
output = await run_tool(block.name, block.input)
results.append({
"type": "tool_result",
"tool_use_id": block.id, # DOIT matcher le tool_use
"content": output,
})
messages.append({"role": "user", "content": results})
continue
raise RuntimeError("Limite d'itérations atteinte")Trois invariants à ne jamais violer :
- Réinjecte
msg.contenten entier dans le tour assistant — pas seulement le texte. Si tu perds les blocstool_use, l'API ne sait plus à quoi répondent testool_result. - Chaque
tool_resultporte le bontool_use_id. C'est l'appariement requête↔réponse d'outil. - Parse
block.inputcomme du JSON déjà désérialisé par le SDK — ne fais jamais de string-matching sur l'input sérialisé (l'échappement Unicode peut varier).
Pour la plupart des cas, le tool runner du SDK (
@beta_tool+ boucle automatique) t'évite d'écrire cette boucle. Garde la boucle manuelle quand tu as besoin de contrôle fin : approbation humaine avant exécution, logging custom, exécution conditionnelle.
⚙️ En production
Retries et exceptions typées
Le SDK retry automatiquement les 429 et 5xx avec backoff exponentiel (max_retries=2 par défaut). N'écris pas ta propre boucle de retry par-dessus — tu doublerais le backoff. Configure-la plutôt sur le client. Et attrape des exceptions typées, jamais du string-matching sur le message d'erreur :
import anthropic
client = AsyncAnthropic(max_retries=4, timeout=120.0)
try:
msg = await client.messages.create(model=MODEL, max_tokens=1024, messages=[...])
except anthropic.RateLimitError:
... # 429 : déjà retryé par le SDK ; ici c'est l'échec final
except anthropic.APIStatusError as e:
if e.type == "overloaded_error": # 529 : surcharge transitoire
...
elif e.type == "invalid_request_error": # 400 : bug côté toi, pas de retry
...Failure modes spécifiques aux LLM
| Mode de défaillance | Symptôme | Correctif senior |
|---|---|---|
| Client sync dans une coroutine | Concurrence effondrée, latence p99 explose | AsyncAnthropic ; sinon asyncio.to_thread |
| Pas de stream sur grosse sortie | Timeout HTTP intermittent à > ~16K tokens | messages.stream() + get_final_message() |
content[0].text aveugle | IndexError/AttributeError sur blocs thinking/tool_use | Filtrer par b.type == "text" |
stop_reason == "max_tokens" ignoré | Réponse tronquée en plein milieu | Vérifier stop_reason ; relancer avec max_tokens plus haut |
stop_reason == "refusal" non géré | Crash sur content vide | Brancher sur stop_reason avant de lire content |
| Buffering du proxy | Stream « saccadé » ou tout d'un coup en prod | X-Accel-Buffering: no, désactiver le buffer nginx |
tool_use_id non apparié | 400 sur le tour suivant de l'agent | Recopier l'id du tool_use dans le tool_result |
Performance et coût : le prompt caching
Pour un agent, ton prompt système + tes définitions d'outils sont stables et renvoyés à chaque tour. Le prompt caching (cache_control: {"type": "ephemeral"}) facture les lectures à ~0,1× le prix d'entrée. C'est le levier de coût n°1.
msg = await llm.messages.create(
model=MODEL,
max_tokens=4096,
system=[{
"type": "text",
"text": LARGE_STABLE_SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"}, # cache le préfixe stable
}],
messages=messages,
)
# Vérifie l'efficacité :
print(msg.usage.cache_read_input_tokens, msg.usage.cache_creation_input_tokens)Le caching est un match de préfixe : le moindre octet qui change avant le breakpoint invalide tout ce qui suit. L'invalidateur silencieux classique : un datetime.now() ou un UUID interpolé dans le prompt système. Si cache_read_input_tokens reste à zéro sur des requêtes au préfixe identique, traque le contenu volatil en tête de prompt. Minimum cacheable sur Opus 4.8 : ~4096 tokens (en dessous, ça ne cache pas, silencieusement).
Choix de modèle (le tradeoff senior)
| Modèle | Prix (in/out par Mtok) | Quand |
|---|---|---|
claude-opus-4-8 | 5 $ / 25 $ | Raisonnement, agents long-horizon, code |
claude-sonnet-4-6 | 3 $ / 15 $ | Bon équilibre vitesse/intelligence |
claude-haiku-4-5 | 1 $ / 5 $ | Classification, sous-agents, tâches simples, latence |
Un pattern senior : route les sous-tâches (résumé, classification, routing) vers Haiku et garde Opus pour la boucle principale. Mais attention — changer de modèle en cours de conversation invalide le cache (le cache est scopé par modèle). Pour un sous-appel moins cher, spawn un sous-agent dédié plutôt que swap le modèle dans la boucle.
Sécurité
- Clé API en variable d'env, jamais en dur, jamais loggée. Le SDK la lit depuis
ANTHROPIC_API_KEY. - Outils à effet de bord (envoyer un mail, modifier la DB) : valide les inputs dans le handler et gate les actions irréversibles derrière une confirmation. Ne fais jamais confiance aveuglément à
block.input. - Prompt injection : du contenu utilisateur ou récupéré du web peut contenir des instructions hostiles. Ne mets jamais de secret dans le prompt — il peut fuiter dans la sortie.
Observabilité
- Logge
msg.usage(input/output/cache tokens) etstop_reasonsur chaque appel — c'est ta facture et ton signal de troncature. - Logge le
request_id(présent sur les erreurs API) pour tracer un appel de bout en bout avec le support. - Pour les agents, instrumente le nombre d'itérations de boucle : une explosion = un outil qui échoue en silence et que le modèle re-tente.
🏋️ Exercices
1. Endpoint de chat async + comptage de tokens
Objectif — Construis un endpoint FastAPI POST /chat qui prend un texte, appelle Opus 4.8 en async, et renvoie la réponse plus le nombre de tokens d'entrée/sortie consommés.
Indice/Solution — Utilise AsyncAnthropic injecté via Depends. Pour les tokens, lis msg.usage.input_tokens et msg.usage.output_tokens sur la réponse (pas besoin d'un appel count_tokens séparé — l'usage réel est dans la réponse). Filtre content par b.type == "text".
2. Streaming SSE résilient à la déconnexion
Objectif — Transforme l'endpoint en StreamingResponse SSE. Quand le client ferme la connexion, la génération doit réellement s'arrêter (vérifie dans les logs que tu ne continues pas à streamer).
Indice/Solution — async with llm.messages.stream(...), itère stream.text_stream, et teste await request.is_disconnected() à chaque delta pour break. Ajoute X-Accel-Buffering: no. Pour valider l'arrêt, ajoute un print à chaque delta et curl puis Ctrl-C.
3. Extraction structurée production-grade
Objectif — Écris extract_invoice(raw: str) -> Invoice où Invoice est un modèle Pydantic v2 avec vendor: str, total: float, currency: Literal["EUR", "USD"], lines: list[InvoiceLine]. Gère le cas où le modèle refuse.
Indice/Solution — messages.parse(..., output_format=Invoice). Si msg.parsed is None, branche sur msg.stop_reason ("refusal" → remonte une erreur métier propre, pas un 500). Rappelle-toi : pas de contrainte numérique dans le schéma — valide total >= 0 côté Pydantic avec un field_validator.
4. Boucle d'agent multi-outils
Objectif — Implémente un agent avec deux outils (get_weather, search_db). Le modèle doit pouvoir en appeler plusieurs dans un même tour. Boucle jusqu'à end_turn avec un garde-fou de 10 itérations.
Indice/Solution — Dans un tour tool_use, msg.content peut contenir plusieurs blocs tool_use. Exécute-les tous (idéalement en parallèle avec asyncio.gather), puis renvoie tous les tool_result dans un seul tour user. Recopie chaque tool_use_id.
5. Casse-puis-répare : le cache fantôme
Objectif — On te donne un agent dont cache_read_input_tokens reste obstinément à 0 malgré un gros prompt système. Trouve et corrige l'invalidateur.
Indice/Solution — Le coupable est presque toujours du contenu volatil avant le breakpoint : f"... Date actuelle : {datetime.now()} ..." dans le system, ou un json.dumps(tools) sans sort_keys=True, ou un set itéré. Déplace tout contenu dynamique après le dernier cache_control, gèle le préfixe, et re-vérifie l'usage. Diff les octets de deux requêtes consécutives si tu ne trouves pas.
6. Production-grade : retries, timeouts, et budget de coût
Objectif — Durcis l'agent : max_retries et timeout configurés sur le client, gestion typée de RateLimitError/OverloadedError, et un garde-fou qui stoppe l'agent si l'usage cumulé dépasse un budget de tokens.
Indice/Solution — Configure sur le client (AsyncAnthropic(max_retries=4, timeout=120)), pas une boucle maison. Accumule msg.usage.input_tokens + output_tokens à chaque tour de boucle et lève une exception métier au-delà du seuil. Pour 529, le except anthropic.APIStatusError as e: if e.type == "overloaded_error" te laisse router vers un modèle moins chargé (Haiku) en repli.
🎤 En entretien
Q : Pourquoi AsyncAnthropic et pas le client sync dans un endpoint FastAPI ? Parce qu'un appel LLM bloque sur du réseau pendant des secondes ; un client sync dans une coroutine gèle l'event loop et effondre la concurrence du worker. L'async libère le thread pendant l'attente I/O.
Q : Quand streames-tu, et qu'est-ce que ça résout concrètement ? Par défaut dès que la sortie peut être longue (> ~16K tokens) ou affichée en direct. Ça évite les timeouts HTTP sur les grosses générations et ça pousse les tokens au fil de l'eau pour l'UX ; get_final_message() reconstitue le message complet si je n'ai pas besoin des events un par un.
Q : Comment garantis-tu du JSON typé en sortie, et pourquoi pas du prompt-engineering ? Sorties structurées natives via messages.parse() + schéma Pydantic v2 : l'API contraint le format, pas un espoir. Le prompt-engineering (« renvoie uniquement du JSON ») casse sur les préambules/fences en prod ; les sorties natives garantissent la conformité au schéma.
Q : Décris la boucle d'un agent tool-use et ses invariants. On appelle l'API avec tools ; tant que stop_reason == "tool_use", on réinjecte le tour assistant complet (blocs tool_use inclus), on exécute les outils, on renvoie les tool_result avec le bon tool_use_id, et on reboucle jusqu'à end_turn — avec un garde-fou d'itérations. Perdre les blocs tool_use ou mal apparier l'id provoque une 400.