Patterns async (gather, semaphore, timeouts)
TL;DR —
asyncione rend rien plus rapide en soi : il te permet d'attendre plusieurs choses en même temps sur un seul thread. Les trois leviers que tu utiliseras 90 % du temps sontasyncio.gather()(lancer N coroutines en parallèle et récupérer leurs résultats dans l'ordre), unasyncio.Semaphore(plafonner la concurrence pour ne pas saturer une API ou ta DB), etasyncio.timeout()(couper net une opération qui traîne). Mal combinés, ils produisent des bugs sournois : une exception dansgatherqui laisse 50 coroutines orphelines, un timeout qui ne libère pas la connexion, unawaitoublié qui sérialise tout. Cette leçon te montre la version idiomatique, la version piège, et comment câbler ces trois primitives pour appeler une flotte d'agents LLM (Anthropic SDK) sans faire tomber ton serveur.
🧠 Mental model
Tu viens de Node/TS, donc tu as déjà l'intuition de la boucle d'événements. Le piège, c'est de croire qu'async def en Python = async en JS. Ce n'est pas pareil sur un point crucial : en JS, const p = fetch(...) démarre la requête immédiatement. En Python, coro = fetch(...) ne fait rien — la coroutine est inerte tant que tu ne l'await pas ou que tu ne la passes pas à la boucle.
L'analogie qui marche : pense à un chef de cuisine seul (un seul thread).
┌─────────────────────────────────────┐
│ Event loop (le chef, seul) │
└─────────────────────────────────────┘
│ supervise, ne cuisine qu'une chose à la fois
┌──────────────────┼──────────────────┐
▼ ▼ ▼
[ Pâtes ] [ Sauce ] [ Four ]
await eau await mijote await cuisson
bouillante 15 min 20 min
│ │ │
└──── pendant qu'une casserole « await », le chef passe à une autre ────┘Le chef ne sait pas faire bouillir l'eau plus vite. Mais pendant que l'eau bout (I/O, attente réseau), il ne reste pas planté devant : il va lancer la sauce, surveiller le four. C'est ça, asyncio : du recouvrement d'attentes, pas du calcul parallèle. Si une tâche est du calcul pur (CPU-bound, genre parser 4 Go de JSON), le chef est bloqué dessus et tout le reste attend — asyncio ne t'aide pas, il faut un ProcessPoolExecutor.
gather= « lance ces trois casseroles en même temps et préviens-moi quand tout est prêt ».Semaphore= « je n'ai que 4 plaques de cuisson, pas plus de 4 casseroles à feu en même temps ».timeout= « si la viande n'est pas cuite dans 20 min, sors-la et passe au plan B ».
Garde cette image : tout le reste en découle.
Le socle : gather
asyncio.gather(*coros) prend des coroutines (ou des Tasks), les planifie toutes en concurrence, et renvoie une liste de résultats dans l'ordre des arguments — pas dans l'ordre d'achèvement.
La bonne façon
import asyncio
import httpx
async def fetch_user(client: httpx.AsyncClient, user_id: int) -> dict:
resp = await client.get(f"https://api.example.com/users/{user_id}")
resp.raise_for_status()
return resp.json()
async def fetch_all(user_ids: list[int]) -> list[dict]:
async with httpx.AsyncClient(timeout=10.0) as client:
# On construit N coroutines, puis on les lance ENSEMBLE.
results = await asyncio.gather(
*(fetch_user(client, uid) for uid in user_ids)
)
return results # ordre garanti : results[0] correspond à user_ids[0]
asyncio.run(fetch_all([1, 2, 3, 42]))Si tu as 4 utilisateurs et que chaque requête prend 100 ms, le tout prend ~100 ms, pas 400 ms. Les quatre attentes se recouvrent.
La mauvaise façon (le piège du débutant async)
# ❌ ANTI-PATTERN : ceci sérialise tout, c'est de l'async pour rien.
async def fetch_all_wrong(user_ids: list[int]) -> list[dict]:
async with httpx.AsyncClient() as client:
results = []
for uid in user_ids:
results.append(await fetch_user(client, uid)) # await DANS la boucle
return resultsIci chaque await bloque la boucle jusqu'à la fin de la requête avant de passer à la suivante. 4 × 100 ms = 400 ms. Tu as payé le coût cognitif de l'async sans le bénéfice. La règle : await dans une boucle for séquentielle est presque toujours un bug de perf quand les itérations sont indépendantes. Construis les coroutines d'abord, gather ensuite.
gather et les exceptions : le piège n°1 en prod
Comportement par défaut : dès qu'une coroutine lève une exception, gather la propage immédiatement à l'appelant. Mais — point sournois — les autres coroutines ne sont pas annulées, elles continuent en arrière-plan, orphelines. Tu récupères une exception mais tu as perdu les résultats des coroutines qui avaient réussi.
# return_exceptions=True : aucune exception ne remonte ; chaque échec
# devient une valeur dans la liste. À toi de trier le grain de l'ivraie.
results = await asyncio.gather(
*(fetch_user(client, uid) for uid in user_ids),
return_exceptions=True,
)
ok: list[dict] = []
errors: list[tuple[int, BaseException]] = []
for uid, res in zip(user_ids, results):
if isinstance(res, Exception):
errors.append((uid, res))
else:
ok.append(res)⚠️
return_exceptions=Trueattrape aussiasyncio.CancelledError(c'est uneBaseException, pas uneExceptiondepuis Python 3.8). Si tu faisisinstance(res, Exception), une coroutine annulée passera entre les mailles. En pratique, pour du fan-out propre avec annulation correcte, préfère un TaskGroup (voir plus bas).
Le levier de production : Semaphore
gather sur 10 000 URLs, c'est 10 000 connexions ouvertes d'un coup. Tu vas exploser les limites de l'API distante (HTTP 429), saturer ton pool de connexions, ou te faire tuer par l'OS (« too many open files »). Le Semaphore est le régulateur de débit.
Un sémaphore est un compteur : acquire() le décrémente (et bloque si à 0), release() l'incrémente. Avec async with, tu n'oublies jamais le release.
import asyncio
import httpx
async def fetch_limited(
client: httpx.AsyncClient,
sem: asyncio.Semaphore,
url: str,
) -> str:
async with sem: # au plus N coroutines passent ce point en même temps
resp = await client.get(url)
resp.raise_for_status()
return resp.text
async def crawl(urls: list[str], concurrency: int = 8) -> list[str]:
sem = asyncio.Semaphore(concurrency)
async with httpx.AsyncClient(timeout=15.0) as client:
return await asyncio.gather(
*(fetch_limited(client, sem, u) for u in urls)
)Subtilité importante : on crée quand même toutes les coroutines d'un coup avec gather. Le sémaphore ne limite pas le nombre de coroutines créées, il limite combien franchissent le async with sem simultanément. Les autres sont créées, planifiées, et attendent poliment leur tour sur le acquire(). Pour 10 000 URLs c'est OK (une coroutine en attente coûte quelques Ko). Pour 10 millions, il faut un pattern producteur/consommateur avec une asyncio.Queue pour ne pas matérialiser 10 M de coroutines en mémoire.
# ❌ Erreur classique : recréer le sémaphore à chaque appel, ou le partager
# entre boucles d'événements différentes.
async def fetch_limited_wrong(url: str) -> str:
sem = asyncio.Semaphore(8) # inutile : un sémaphore par appel = aucune limite globale
async with sem:
...Un Semaphore est lié à la boucle d'événements active au moment de sa première utilisation. Ne le crée jamais au niveau module (SEM = asyncio.Semaphore(8) en haut du fichier) : sous Uvicorn/Gunicorn avec plusieurs workers, ou dans les tests qui montent une boucle par test, tu auras des RuntimeError: bound to a different event loop. Crée-le dans une coroutine, ou via la lifespan de l'app (voir section FastAPI).
Le filet de sécurité : timeout
En réseau, « lent » est indistinguable de « mort ». Sans timeout, une connexion zombie immobilise une coroutine pour toujours. Depuis Python 3.11, la primitive idiomatique est asyncio.timeout(), un context manager qui annule proprement ce qu'il englobe.
import asyncio
async def with_deadline(coro_factory, seconds: float):
try:
async with asyncio.timeout(seconds):
return await coro_factory()
except TimeoutError:
# Python 3.11+ : asyncio.TimeoutError EST le TimeoutError natif.
return None # ou raise, ou valeur de repliPourquoi asyncio.timeout() plutôt que le vieux asyncio.wait_for() ? timeout() est un context manager : il englobe un bloc de code (plusieurs await), pas une seule coroutine, et compose mieux. Il existe aussi asyncio.timeout_at(deadline) pour une échéance absolue (loop.time() + N), pratique quand tu propages un budget temps global à travers plusieurs étapes.
Le piège : timeout ≠ ressource libérée
Un timeout annule la coroutine en y injectant une CancelledError au point d'await courant. Si ta ressource (connexion, fichier, lock) n'est pas dans un async with/try-finally, elle fuit.
# ❌ La connexion peut fuiter si le timeout frappe pendant le traitement.
async def leaky(pool):
conn = await pool.acquire()
async with asyncio.timeout(5):
await do_work(conn) # timeout ici → CancelledError → on saute le release
await pool.release(conn) # jamais atteint
# ✅ Toujours envelopper la ressource dans un gestionnaire de contexte.
async def safe(pool):
async with asyncio.timeout(5):
async with pool.acquire() as conn: # __aexit__ libère même sous CancelledError
await do_work(conn)Distingue les deux niveaux de timeout : le timeout applicatif (asyncio.timeout) coupe ton bloc logique ; le timeout transport (celui du client HTTP, httpx.AsyncClient(timeout=...)) coupe la requête réseau. Tu veux généralement les deux, avec le timeout transport plus court que l'applicatif, sinon le second ne sert jamais.
La primitive moderne : TaskGroup (Python 3.11+)
gather est l'outil historique, mais pour du fan-out structuré, asyncio.TaskGroup est strictement supérieur : c'est de la concurrence structurée (le pattern « nursery »). Si une tâche échoue, le groupe annule automatiquement toutes les autres et attend leur arrêt propre avant de sortir du bloc — zéro coroutine orpheline.
async def fetch_structured(client: httpx.AsyncClient, ids: list[int]) -> list[dict]:
results: dict[int, dict] = {}
async with asyncio.TaskGroup() as tg:
for uid in ids:
tg.create_task(_store(client, uid, results))
# On n'arrive ICI que si TOUTES ont réussi. Sinon : ExceptionGroup levé,
# et toutes les tâches sœurs ont déjà été annulées.
return [results[uid] for uid in ids]
async def _store(client, uid, out):
out[uid] = await fetch_user(client, uid)Les erreurs remontent dans un ExceptionGroup, qu'on attrape avec la syntaxe except* :
try:
await fetch_structured(client, ids)
except* httpx.HTTPStatusError as eg:
for err in eg.exceptions:
log.warning("HTTP fail: %s", err)
except* TimeoutError as eg:
log.error("%d timeouts", len(eg.exceptions))Règle senior : TaskGroup quand tu veux « tout ou rien » (annulation en cascade) ; gather(..., return_exceptions=True) quand tu veux « best-effort » (collecte les succès, isole les échecs). Choisis selon la sémantique métier, pas par habitude.
Application : appeler une flotte d'agents LLM
C'est là que tout converge. Tu construis des agents IA : tu vas souvent vouloir lancer plusieurs appels Claude en parallèle (résumer 50 documents, classifier un batch, fan-out d'un orchestrateur vers des sous-agents). Les trois primitives deviennent vitales :
Semaphore: les limites de débit Anthropic (RPM/TPM) sont réelles. Sans plafond, tu prends des 429 en rafale.timeout: un appel àclaude-opus-4-8en thinking adaptatif sur une tâche dure peut prendre plusieurs minutes. Tu veux un budget temps explicite.gather/TaskGroup: orchestrer le fan-out et récupérer les résultats.
Le SDK Anthropic fournit un client async natif (AsyncAnthropic) — c'est lui qu'il faut utiliser dans une app asyncio, jamais le client synchrone (qui bloquerait la boucle).
Fan-out de résumés, plafonné et avec budget temps
import asyncio
from anthropic import AsyncAnthropic
from anthropic import APIStatusError, RateLimitError
client = AsyncAnthropic() # lit ANTHROPIC_API_KEY depuis l'environnement
async def summarize_one(
sem: asyncio.Semaphore,
doc: str,
*,
per_call_timeout: float = 120.0,
) -> str:
async with sem: # respecte le plafond de concurrence
async with asyncio.timeout(per_call_timeout):
# .stream() évite les timeouts HTTP sur les longues réponses,
# même si on ne consomme pas les tokens un par un ici.
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=1024,
thinking={"type": "adaptive"}, # JAMAIS budget_tokens sur Opus 4.8
system="Résume le document en 3 puces.",
messages=[{"role": "user", "content": doc}],
) as stream:
msg = await stream.get_final_message()
# On extrait le texte des blocs (un message peut contenir thinking + text).
return "".join(b.text for b in msg.content if b.type == "text")
async def summarize_all(docs: list[str], concurrency: int = 5) -> list[str | None]:
sem = asyncio.Semaphore(concurrency)
results = await asyncio.gather(
*(summarize_one(sem, d) for d in docs),
return_exceptions=True,
)
out: list[str | None] = []
for r in results:
if isinstance(r, RateLimitError):
out.append(None) # le SDK a déjà retenté 2× ; on abandonne ce doc
elif isinstance(r, (TimeoutError, APIStatusError)):
out.append(None)
else:
out.append(r)
return outQuelques décisions seniors encodées ici :
Semaphoreavant l'appel, pas après. Leasync with semenveloppe tout l'appel pour que le compteur reflète les appels en vol, thinking compris.stream()+get_final_message(). Pourmax_tokensélevé ou thinking long, le SDK recommande le streaming pour ne pas heurter le timeout HTTP de la requête. Si tu n'as pas besoin des tokens un par un,get_final_message()te rend le message complet — tu gardes la robustesse sans complexifier.- Retries gérés par le SDK.
AsyncAnthropicretente automatiquement les 429/5xx avec backoff exponentiel (max_retries=2par défaut). N'ajoute pas ta propre couche de retry par-dessus sans désactiver celle du SDK, sinon tu multiplies les tentatives (2 × 2 = thundering herd). Configure plutôtAsyncAnthropic(max_retries=4). - Exceptions typées. On filtre sur
RateLimitError/APIStatusErrordu SDK, jamais sur le contenu du message d'erreur.
La boucle tool-use (agent), en async
Un vrai agent ne fait pas un appel : il boucle (le modèle demande un outil → tu l'exécutes → tu renvoies le résultat → il continue). Voici la boucle manuelle, async, avec timeout global sur tout le tour d'agent :
from anthropic.types import ToolParam
TOOLS: list[ToolParam] = [
{
"name": "get_weather",
"description": "Renvoie la météo. Appelle-la quand l'utilisateur "
"demande le temps qu'il fait dans une ville.",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
}
]
async def run_agent(prompt: str, *, turn_budget: float = 300.0) -> str:
messages: list[dict] = [{"role": "user", "content": prompt}]
async with asyncio.timeout(turn_budget): # budget pour TOUT le tour d'agent
while True:
resp = await client.messages.create(
model="claude-opus-4-8",
max_tokens=2048,
thinking={"type": "adaptive"},
tools=TOOLS,
messages=messages,
)
if resp.stop_reason == "refusal": # Opus 4.8 peut refuser ; à gérer
return "[refus du modèle]"
if resp.stop_reason != "tool_use":
return "".join(b.text for b in resp.content if b.type == "text")
# Le modèle veut un (ou plusieurs) outil(s) : on les exécute EN PARALLÈLE.
messages.append({"role": "assistant", "content": resp.content})
tool_uses = [b for b in resp.content if b.type == "tool_use"]
tool_results = await asyncio.gather(
*(execute_tool(tu) for tu in tool_uses)
)
messages.append({"role": "user", "content": tool_results})
async def execute_tool(tool_use) -> dict:
# tool_use.input est DÉJÀ parsé en dict par le SDK — ne fais pas de match
# de chaîne sur du JSON brut.
result = await call_weather_api(tool_use.input["city"])
return {
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result,
}Note le gather sur les tool_use : quand le modèle demande plusieurs outils dans une même réponse, tu les exécutes en concurrence — c'est exactement le cas d'usage de gather. Et le asyncio.timeout(turn_budget) autour de toute la boucle while montre la force du context manager : un seul timeout couvre N appels successifs.
Sorties structurées async
Pour extraire du JSON validé d'un batch (classification, extraction), messages.parse() te rend un objet déjà conforme à ton schéma — pas de prompt « réponds en JSON », pas de parsing fragile :
from pydantic import BaseModel
class Ticket(BaseModel):
priority: str
category: str
needs_human: bool
async def classify(sem: asyncio.Semaphore, text: str) -> Ticket | None:
async with sem, asyncio.timeout(60):
resp = await client.messages.parse(
model="claude-opus-4-8",
max_tokens=512,
messages=[{"role": "user", "content": text}],
output_config={"format": Ticket}, # sortie structurée native
)
return resp.parsed_outputCâbler tout ça dans FastAPI
FastAPI est async-natif : tes endpoints async def tournent sur la boucle d'événements. Conséquence directe : un seul time.sleep() ou un appel SDK synchrone dans un endpoint bloque tout le serveur. Toujours AsyncAnthropic, jamais Anthropic.
Le client SDK et le sémaphore doivent vivre aussi longtemps que l'app — on les crée dans la lifespan, pas par requête (créer un client HTTP par requête tue le pool de connexions et le keep-alive).
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from anthropic import AsyncAnthropic, RateLimitError, APIStatusError
import asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
# Créés UNE fois, sur la boucle de l'app.
app.state.anthropic = AsyncAnthropic(max_retries=4)
app.state.llm_sem = asyncio.Semaphore(8) # plafond global d'appels LLM concurrents
yield
await app.state.anthropic.close()
app = FastAPI(lifespan=lifespan)
class BatchRequest(BaseModel):
documents: list[str]
class BatchResponse(BaseModel):
summaries: list[str | None]
async def _summarize(client: AsyncAnthropic, sem: asyncio.Semaphore, doc: str) -> str:
async with sem, asyncio.timeout(120):
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=1024,
thinking={"type": "adaptive"},
system="Résume en 3 puces.",
messages=[{"role": "user", "content": doc}],
) as stream:
msg = await stream.get_final_message()
return "".join(b.text for b in msg.content if b.type == "text")
@app.post("/summarize-batch")
async def summarize_batch(req: BatchRequest) -> BatchResponse:
if len(req.documents) > 100:
raise HTTPException(413, "Batch trop volumineux (max 100).")
client: AsyncAnthropic = app.state.anthropic
sem: asyncio.Semaphore = app.state.llm_sem
raw = await asyncio.gather(
*(_summarize(client, sem, d) for d in req.documents),
return_exceptions=True,
)
summaries: list[str | None] = [
None if isinstance(r, (RateLimitError, APIStatusError, TimeoutError)) else r
for r in raw
]
return BatchResponse(summaries=summaries)Streaming SSE vers le client (token par token)
Pour un chat, tu veux pousser les tokens au navigateur dès qu'ils arrivent. FastAPI + StreamingResponse + le messages.stream() du SDK :
from fastapi.responses import StreamingResponse
async def token_generator(client: AsyncAnthropic, prompt: str):
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=2048,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream: # déjà découpé en deltas de texte
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
@app.get("/chat")
async def chat(q: str):
client: AsyncAnthropic = app.state.anthropic
return StreamingResponse(
token_generator(client, q),
media_type="text/event-stream",
)Le async for sur stream.text_stream est le pont parfait entre la coroutine du SDK et le générateur asynchrone que StreamingResponse consomme. Aucune accumulation en mémoire : chaque token traverse de bout en bout.
⚙️ En production
Modes de défaillance
- Sémaphore global vs. par worker. Sous Gunicorn avec 4 workers Uvicorn, un
Semaphore(8)créé dans la lifespan donne 8 appels concurrents par worker = 32 globalement. Si ta limite Anthropic est de 8, tu te fais rate-limiter. Pour un vrai plafond global multi-process, il faut un régulateur partagé (Redis + token bucket), pas un sémaphore en mémoire. - Coroutines orphelines. Avec
gather(sans TaskGroup), une exception laisse les autres tourner. En prod, ça veut dire des appels LLM facturés qui continuent après que la requête HTTP a échoué. UtiliseTaskGroupou annule explicitement. - Timeout qui ne propage pas. Si un endpoint a un timeout côté load-balancer (ex. 30 s) plus court que ton
asyncio.timeout(120), le client reçoit un 504 mais ta coroutine continue de brûler des tokens. Aligne les budgets : LB ≥ applicatif ≥ transport. - Bloquer la boucle. Un appel sync planqué (lib synchrone,
requests,time.sleep, gros parsing CPU) gèle tous les endpoints. Repère-les avecasyncio.get_event_loop().slow_callback_durationouPYTHONASYNCIODEBUG=1. Déporte le CPU-bound versasyncio.to_thread()(I/O bloquant) ou unProcessPoolExecutor(calcul pur).
Performance
- Le bon
concurrencyn'est pas « le plus haut possible » : c'estmin(limite_API, capacité_DB, RAM/coroutine). Mesure, ne devine pas. Pour les LLM, commence bas (5-8) car chaque appel consomme beaucoup de TPM. - Réutilise un
AsyncClient/AsyncAnthropicpour toute l'app (pooling de connexions, keep-alive). Un client par requête = handshake TLS à chaque appel. - Prompt caching : si tes appels partagent un gros préfixe (system prompt, documents de référence), ajoute
cache_control: {"type": "ephemeral"}sur le dernier bloc stable. Les lectures de cache coûtent ~0,1× le prix d'entrée — sur un fan-out de 50 appels au même contexte, c'est massif. Mais attention : le cache est un préfixe — un seul octet qui change (un timestamp dans le system prompt) l'invalide.
Sécurité
- Le timeout est aussi une défense DoS : sans budget temps, un client peut ouvrir mille connexions lentes et épuiser tes coroutines (slowloris). Timeout systématique sur tout I/O exposé.
- Plafonne la taille des batchs (le
413ci-dessus). Sans ça,gathersur une liste fournie par l'utilisateur = amplification (1 requête → 10 000 appels LLM facturés). - Ne mets jamais de clé API dans le prompt ou les messages — ils sont loggés/cachés.
Observabilité
- Logge la profondeur du sémaphore (
sem._value, ou mieux, un compteur que tu maintiens) : si elle est toujours à 0, tu es saturé, augmente la concurrence ou la limite API. - Sur chaque réponse Anthropic,
response.usagete donneinput_tokens,output_tokens,cache_read_input_tokens. Émets-les en métriques : c'est ta facture en temps réel et ton signal de cache-hit. - Distingue dans tes traces les
TimeoutError(problème de latence) desRateLimitError(problème de débit) — les remèdes sont opposés (augmenter le timeout vs. baisser la concurrence).
Le tradeoff senior à retenir : la concurrence est un curseur entre débit et pression sur les dépendances. gather sans Semaphore maximise le débit et fait tomber tes dépendances. await séquentiel les protège mais gaspille la latence. Le Semaphore est le réglage de ce curseur — et sa bonne valeur se mesure sous charge réelle, jamais en théorie.
🏋️ Exercices
Exercice 1 — Du séquentiel au concurrent (implémenter)
Objectif : on te donne une fonction qui récupère le profil de N utilisateurs en séquentiel (un await par boucle). Réécris-la avec gather pour que les requêtes se recouvrent, en conservant l'ordre des résultats. Mesure le gain avec time.perf_counter() sur 10 utilisateurs simulés à 200 ms chacun.
Indice/Solution : construis un générateur de coroutines
(fetch(u) for u in users)et passe-le àasyncio.gather(*...).gatherpréserve l'ordre des arguments, doncresults[i]correspond toujours àusers[i]même si la requêteifinit en dernier. Attendu : ~200 ms au lieu de ~2 s.
Exercice 2 — Plafonner sans casser l'ordre (production-grade)
Objectif : ta version de l'ex. 1 ouvre 10 000 connexions et l'API distante renvoie des 429. Ajoute un Semaphore(20) pour ne jamais dépasser 20 requêtes en vol, sans perdre l'ordre ni transformer le code en boucle séquentielle.
Indice/Solution : encapsule l'appel dans
async with sem:à l'intérieur de la coroutine, pas autour dugather. Le sémaphore vit dans une coroutine (ou la lifespan), jamais au niveau module. Vérifie qu'au pic, jamais plus de 20 coroutines ne sont entreacquireetrelease(instrumente avec un compteur partagé incrémenté/décrémenté autour duasync with sem).
Exercice 3 — Budget temps et ressources propres (casser puis réparer)
Objectif : on te fournit une fonction qui acquiert une connexion d'un pool, puis fait du travail sous asyncio.timeout(5). Sous charge, le pool s'épuise mystérieusement. Trouve la fuite, puis corrige-la.
Indice/Solution : la connexion est acquise avant le bloc timeout (ou libérée après), donc une
CancelledErrorinjectée par le timeout saute lerelease. Répare en mettant la ressource dans unasync with pool.acquire() as conn:à l'intérieur duasync with asyncio.timeout(5):—__aexit__s'exécute même sous annulation. Bonus : reproduis la fuite avec un pool de taille 2 et 5 appels qui timeout, observe le blocage, puis confirme la réparation.
Exercice 4 — Tout ou rien vs. best-effort (concevoir)
Objectif : implémente deux versions d'un fan-out de 5 appels : (a) avec TaskGroup, qui doit lever et tout annuler si un appel échoue ; (b) avec gather(return_exceptions=True), qui doit renvoyer les 4 succès et isoler l'échec. Écris un test qui force l'échec du 3ᵉ appel et vérifie les deux comportements.
Indice/Solution : pour (a),
except* ValueErrorautour duasync with asyncio.TaskGroup()— vérifie que les tâches 1-2 et 4-5 ont bien été annulées (mets un flagcancelleddans leurexcept asyncio.CancelledError). Pour (b), filtreisinstance(r, Exception)sur la liste de résultats. La discussion attendue : quand choisir l'un ou l'autre selon la sémantique métier.
Exercice 5 — Fan-out LLM réaliste (AI, dur)
Objectif : avec AsyncAnthropic, écris summarize_batch(docs) qui résume jusqu'à 50 documents avec claude-opus-4-8, plafonné à 6 appels concurrents, 90 s de budget par appel, en utilisant messages.stream() + get_final_message(). Les échecs (timeout, rate-limit) ne doivent pas faire tomber le batch entier : renvoie None pour les documents échoués. Logge le total de input_tokens/output_tokens consommés.
Indice/Solution : combine
Semaphore(6)+asyncio.timeout(90)(les deux dans unasync with sem, asyncio.timeout(90):) +gather(return_exceptions=True). Thinking adaptatif ({"type": "adaptive"}), jamaisbudget_tokenssur Opus 4.8. Filtre surRateLimitError/APIStatusError/TimeoutErrordu SDK. Accumulemsg.usage.input_tokenset.output_tokensau fil des succès. Piège : si tu mets letimeoutautour dugatherau lieu de dans chaque coroutine, tu coupes tout le batch à 90 s — ce n'est pas ce qu'on veut.
Exercice 6 — Le plafond global qui ment (casser puis réparer, avancé)
Objectif : déploie l'app FastAPI ci-dessus avec gunicorn -w 4. Observe que malgré Semaphore(8), tu prends des 429 dès ~32 appels concurrents. Explique pourquoi, puis propose (au moins en pseudo-code) un régulateur de débit vraiment global.
Indice/Solution : chaque worker a sa propre boucle d'événements et donc son propre
Semaphore(8)→ 4 × 8 = 32. Un sémaphore en mémoire ne peut pas coordonner des process. Solution : un token-bucket dans Redis (INCR+EXPIRE, ou un script Lua atomique) que chaque worker interroge avant l'appel ; ou baisseSemaphoreà8 / nb_workers; ou centralise tous les appels LLM derrière un service dédié single-process. Discussion attendue : le coût de coordination (latence Redis) vs. le bénéfice (plafond exact).
🎤 En entretien
Q : Quelle est la différence entre asyncio.gather et asyncio.TaskGroup ? Quand prends-tu l'un plutôt que l'autre ? R : gather est non structuré — une exception ne propage pas l'annulation aux coroutines sœurs (elles deviennent orphelines) ; TaskGroup (3.11+) est de la concurrence structurée qui annule tout en cascade dès le premier échec. TaskGroup pour « tout ou rien », gather(return_exceptions=True) pour du best-effort où tu veux isoler les échecs et garder les succès.
Q : Un Semaphore(10) dans ton app FastAPI tournant sous 4 workers — combien d'appels concurrents au maximum, et est-ce un problème ? R : 40, pas 10 — chaque worker a sa propre boucle et son propre sémaphore en mémoire ; un Semaphore ne coordonne pas des process distincts. Si la limite réelle (API, DB) est globale, il faut un régulateur partagé (Redis token-bucket) ou diviser la valeur par le nombre de workers.
Q : Pourquoi un timeout peut-il provoquer une fuite de ressources, et comment l'éviter ? R : asyncio.timeout annule en injectant une CancelledError au point d'await courant ; si la ressource n'est pas dans un async with/try-finally, le code de libération est sauté. La parade : toujours envelopper la ressource dans un gestionnaire de contexte à l'intérieur du bloc timeout, pour que __aexit__ s'exécute même sous annulation.
Q : Tu fais du fan-out d'appels Claude depuis un endpoint async. Quels sont les trois garde-fous que tu mets en place et pourquoi ? R : (1) AsyncAnthropic réutilisé via la lifespan, pour ne pas bloquer la boucle ni recréer le pool de connexions ; (2) un Semaphore pour respecter les limites RPM/TPM et éviter les 429 (les retries SDK ne suffisent pas si tu inondes) ; (3) asyncio.timeout par appel, car le thinking adaptatif d'Opus 4.8 peut durer des minutes — couplé à stream()/get_final_message() pour éviter les timeouts HTTP sur les longues réponses, et à une gestion des exceptions typées (RateLimitError, APIStatusError) plutôt qu'un match de chaîne.