Skip to content

asyncio & concurrence

TL;DRasyncio te donne de la concurrence sur un seul thread : une boucle d'événements (event loop) jongle entre des milliers de tâches qui passent leur temps à attendre (réseau, disque, base de données, appels LLM). Tu écris du code qui ressemble à du synchrone (await), mais chaque await est un point où ta tâche rend la main pour que la boucle fasse avancer les autres. Ce n'est pas du parallélisme : un seul morceau de ton code Python tourne à un instant T (le GIL est toujours là). C'est l'outil idéal pour le I/O-bound — typiquement un backend FastAPI qui appelle des agents Claude, streame des tokens, et orchestre des boucles tool-use. Pour le CPU-bound, asyncio ne sert à rien : il faut des process. Les pièges de senior : bloquer la boucle avec du code synchrone, oublier await, créer des tâches fire-and-forget qui meurent en silence, et confondre gather (tout ou rien) avec une gestion d'erreurs robuste.

Tu viens de PHP/TS et tu écris du Python + NestJS + Angular. Bonne nouvelle : le modèle mental d'asyncio est le même que celui de l'event loop de Node.js. La syntaxe async/await est quasi identique à TypeScript. La différence majeure : en Python, le code synchrone bloquant est partout (la stdlib historique est synchrone), donc le risque n°1 est de bloquer la boucle sans s'en rendre compte. En Node, presque tout est async par défaut ; en Python, tu dois choisir activement les bibliothèques async.


🧠 Mental model

L'analogie : le serveur de restaurant

Imagine un seul serveur (un thread) dans un restaurant.

  • Modèle synchrone (bloquant) : le serveur prend la commande de la table 1, va en cuisine, attend planté devant le four que le plat soit prêt, sert la table 1, puis passe à la table 2. Pendant les 12 minutes de cuisson, il ne fait rien. C'est requests.get() ou time.sleep().

  • Modèle asyncio (concurrent) : le serveur prend la commande de la table 1, la passe en cuisine (await), et pendant que ça cuit, il va prendre la commande de la table 2, 3, 4.... Quand la cuisine sonne (le await se résout), il revient servir. Un seul serveur, mais 20 tables servies « en même temps ». C'est await client.get().

Le point crucial : le serveur ne cuisine jamais lui-même. S'il devait éplucher 10 kg de patates (du CPU-bound), il serait bloqué tout aussi sûrement qu'à attendre le four — et toutes les tables attendraient. asyncio ne magique pas le calcul ; il ne fait que recouvrir les temps d'attente.

Le diagramme : où est le temps

SYNCHRONE (3 appels LLM séquentiels) — total ≈ 6 s
┌─────────┐        ┌─────────┐        ┌─────────┐
│ appel 1 │ attend │ appel 2 │ attend │ appel 3 │
└─────────┘        └─────────┘        └─────────┘
0s        2s       2s        4s       4s        6s

CONCURRENT (asyncio.gather, 3 appels lancés ensemble) — total ≈ 2 s
┌─────────┐
│ appel 1 │
├─────────┤
│ appel 2 │   les 3 attentes se recouvrent
├─────────┤
│ appel 3 │
└─────────┘
0s        2s

L'event loop, conceptuellement :

       ┌──────────────────────────────────────────┐
       │              EVENT LOOP (1 thread)         │
       │                                            │
   ┌──▶│  prend une tâche prête  ──▶  l'exécute     │
   │   │         jusqu'au prochain  `await`         │
   │   │                    │                       │
   │   │   la tâche rend la main (I/O en attente)   │
   │   └────────────────────┼───────────────────────┘
   │                        ▼
   │              [ file des tâches prêtes ]
   │                        │
   └────────────────────────┘   quand l'I/O se termine,
                                 la tâche redevient « prête »

Tant qu'une coroutine ne fait pas await, elle monopolise la boucle. C'est toute la subtilité.


Les bases, version idiomatique

Coroutine, await, asyncio.run

Une fonction async def ne s'exécute pas quand tu l'appelles : elle retourne une coroutine, un objet inerte. Il faut l'await (depuis une autre coroutine) ou la donner à la boucle via asyncio.run.

python
import asyncio


async def fetch_user(user_id: int) -> dict[str, str | int]:
    # Simule un I/O (DB, HTTP...) qui prend 1 s mais ne consomme PAS de CPU.
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"user-{user_id}"}


async def main() -> None:
    user = await fetch_user(42)
    print(user)


if __name__ == "__main__":
    asyncio.run(main())  # crée la boucle, lance main(), ferme la boucle

asyncio.run() est le seul point d'entrée correct. Tu ne crées pas la boucle à la main (get_event_loop() est déprécié pour ça), et tu n'appelles asyncio.run() qu'une seule fois au sommet de ton programme — jamais en boucle, jamais dans une lib.

La faute classique : appeler sans await

python
# ❌ FAUX — la coroutine n'est jamais exécutée
async def main() -> None:
    fetch_user(42)  # crée une coroutine, la jette aussitôt
    # RuntimeWarning: coroutine 'fetch_user' was never awaited

En TS, fetchUser(42) lance déjà le travail (la promesse est eager). En Python, une coroutine est lazy : rien ne démarre tant que la boucle ne la prend pas. Oublier await ne plante pas immédiatement — tu obtiens un RuntimeWarning discret et un résultat manquant. C'est un bug de prod redoutable.

Concurrence réelle : gather vs TaskGroup

await séquentiel n'apporte aucune concurrence. Pour lancer plusieurs choses en même temps, deux outils.

python
import asyncio


async def fetch_user(user_id: int) -> dict[str, str | int]:
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"user-{user_id}"}


# Séquentiel — 3 secondes
async def slow() -> list[dict[str, str | int]]:
    return [await fetch_user(i) for i in range(3)]


# Concurrent avec gather — ~1 seconde
async def fast_gather() -> list[dict[str, str | int]]:
    return await asyncio.gather(*(fetch_user(i) for i in range(3)))

asyncio.gather lance toutes les coroutines et renvoie les résultats dans l'ordre. Mais son comportement d'erreur est piégeux : par défaut, dès qu'une tâche lève une exception, gather la propage immédiatementsans attendre ni annuler les autres, qui continuent en arrière-plan, orphelines. C'est une source de fuites.

Depuis Python 3.11, préfère asyncio.TaskGroup : structuré, il attend toutes les tâches et annule proprement les sœurs si l'une échoue (les erreurs sont regroupées dans un ExceptionGroup).

python
import asyncio


async def fetch_user(user_id: int) -> dict[str, str | int]:
    await asyncio.sleep(1)
    if user_id == 2:
        raise ValueError(f"user {user_id} introuvable")
    return {"id": user_id, "name": f"user-{user_id}"}


async def fetch_all() -> list[dict[str, str | int]]:
    results: list[dict[str, str | int]] = []
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_user(i)) for i in range(3)]
    # On n'arrive ici QUE si toutes ont réussi ; sinon ExceptionGroup levé
    # et les autres tâches sont annulées automatiquement.
    return [t.result() for t in tasks]


async def main() -> None:
    try:
        await fetch_all()
    except* ValueError as eg:  # except* = gestion d'ExceptionGroup (3.11+)
        for exc in eg.exceptions:
            print(f"échec: {exc}")


asyncio.run(main())

Règle de senior : TaskGroup par défaut. gather(..., return_exceptions=True) quand tu veux explicitement récupérer les succès et les échecs sans tout annuler (ex : un fan-out best-effort où un échec partiel est acceptable).

Le piège mortel : bloquer la boucle

python
import time
import asyncio


# ❌ CATASTROPHE — time.sleep() est SYNCHRONE et bloque TOUTE la boucle
async def broken() -> None:
    time.sleep(5)  # pendant 5 s, AUCUNE autre tâche n'avance


# ✅ correct — asyncio.sleep() rend la main à la boucle
async def good() -> None:
    await asyncio.sleep(5)

Tout appel synchrone bloquant (time.sleep, requests.get, psycopg2, lecture fichier classique, un calcul lourd) gèle l'event loop entier. Sur un serveur, ça veut dire que pendant ces 5 secondes, toutes les requêtes de tous les utilisateurs sont figées. C'est le bug qui transforme un serveur async en serveur mono-client.

Quand tu dois appeler du code synchrone (une lib legacy, un calcul CPU), délègue-le à un thread pool sans bloquer la boucle :

python
import asyncio


def cpu_heavy(n: int) -> int:
    return sum(i * i for i in range(n))  # synchrone, CPU-bound


async def main() -> None:
    # asyncio.to_thread déplace l'appel synchrone dans un thread séparé ;
    # la boucle reste libre pendant ce temps.
    result = await asyncio.to_thread(cpu_heavy, 10_000_000)
    print(result)


asyncio.run(main())

Note : to_thread aide pour le I/O bloquant et soulage le CPU-bound léger, mais le GIL limite le gain réel sur du CPU lourd. Pour du vrai parallélisme CPU, c'est ProcessPoolExecutor (loop.run_in_executor) ou multiprocessing.


Connexion au monde des agents LLM

C'est ici qu'asyncio devient ton outil principal de tous les jours. Appeler un agent Claude, c'est du I/O pur : tu envoies une requête, et tu attends — souvent plusieurs minutes sur une tâche agentique longue. Bloquer un thread par requête ne scale pas. asyncio te laisse gérer des centaines de conversations concurrentes sur un seul process.

On utilise le SDK officiel Anthropic, version asynchrone : AsyncAnthropic. Modèle par défaut : claude-opus-4-8 (1M de contexte, 5/25 USD par Mtok). Pour du moins cher : claude-haiku-4-5 (1/5 USD).

bash
pip install anthropic
export ANTHROPIC_API_KEY=sk-ant-...

Un appel async simple

python
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()  # lit ANTHROPIC_API_KEY depuis l'environnement


async def ask(question: str) -> str:
    message = await client.messages.create(
        model="claude-opus-4-8",
        max_tokens=1024,
        messages=[{"role": "user", "content": question}],
    )
    # content est une liste de blocs ; on prend le texte du premier bloc texte.
    return "".join(block.text for block in message.content if block.type == "text")


async def main() -> None:
    print(await ask("Explique l'event loop en une phrase."))


asyncio.run(main())

Fan-out concurrent : appeler N agents en parallèle

Le cas typique : tu dois classifier 50 documents, ou poser la même question à plusieurs variantes de prompt. En séquentiel, c'est 50 × la latence. Avec gather, c'est ≈ 1 × la latence — mais il faut borner la concurrence avec un Semaphore, sinon tu satures tes rate limits (429) et la mémoire.

python
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()


async def classify(text: str, sem: asyncio.Semaphore) -> str:
    async with sem:  # au plus N appels en vol simultanément
        message = await client.messages.create(
            model="claude-haiku-4-5",  # tâche simple → modèle rapide/économe
            max_tokens=16,
            messages=[{"role": "user", "content": f"Sentiment (positif/négatif) : {text}"}],
        )
        return "".join(b.text for b in message.content if b.type == "text").strip()


async def classify_all(texts: list[str]) -> list[str]:
    sem = asyncio.Semaphore(8)  # plafond de concurrence
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(classify(t, sem)) for t in texts]
    return [t.result() for t in tasks]


async def main() -> None:
    docs = ["Super produit !", "Service décevant.", "Correct sans plus."]
    print(await classify_all(docs))


asyncio.run(main())

Le SDK Anthropic retry automatiquement les erreurs réseau, 429 et 5xx avec backoff exponentiel (par défaut max_retries=2). Tu n'écris donc pas la logique de retry toi-même — mais le Semaphore reste indispensable pour éviter d'atteindre la limite en premier lieu.

Streaming de tokens (le cœur d'une bonne UX)

Pour un chat, tu ne veux pas attendre la réponse complète. Tu streames les tokens au fur et à mesure. C'est aussi obligatoire pour les réponses longues (max_tokens élevé), faute de quoi tu risques un timeout HTTP.

python
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()


async def stream_answer(question: str) -> None:
    async with client.messages.stream(
        model="claude-opus-4-8",
        max_tokens=2048,
        messages=[{"role": "user", "content": question}],
    ) as stream:
        async for text in stream.text_stream:  # itère sur les deltas de texte
            print(text, end="", flush=True)
    print()


asyncio.run(stream_answer("Écris un haïku sur la concurrence."))

async for est l'équivalent itérable d'await : à chaque token reçu, la coroutine rend la main, donc tu peux streamer des dizaines de conversations à la fois sur un seul thread. Si tu n'as pas besoin des deltas individuels, await stream.get_final_message() te rend le message complet.

La boucle tool-use (le squelette d'un agent)

Un agent, c'est une boucle : le modèle demande d'appeler un outil (stop_reason == "tool_use"), ton code l'exécute, tu renvoies le résultat, et tu reboucles jusqu'à end_turn. Tout est async.

python
import asyncio
from typing import Any
from anthropic import AsyncAnthropic
from anthropic.types import MessageParam, ToolParam

client = AsyncAnthropic()

TOOLS: list[ToolParam] = [
    {
        "name": "get_weather",
        "description": "Donne la météo actuelle d'une ville. À appeler dès qu'une ville est mentionnée.",
        "input_schema": {
            "type": "object",
            "properties": {"city": {"type": "string"}},
            "required": ["city"],
        },
    }
]


async def get_weather(city: str) -> str:
    await asyncio.sleep(0.2)  # simule un appel API météo (I/O)
    return f{city} : 21°C, ensoleillé."


async def run_agent(prompt: str) -> str:
    messages: list[MessageParam] = [{"role": "user", "content": prompt}]

    while True:
        response = await client.messages.create(
            model="claude-opus-4-8",
            max_tokens=1024,
            tools=TOOLS,
            messages=messages,
        )
        # On rattache TOUJOURS le tour assistant complet (blocs tool_use inclus).
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            return "".join(b.text for b in response.content if b.type == "text")

        # Exécute tous les outils demandés — en CONCURRENCE.
        tool_uses = [b for b in response.content if b.type == "tool_use"]

        async def run_one(block: Any) -> dict[str, Any]:
            result = await get_weather(**block.input)
            return {"type": "tool_result", "tool_use_id": block.id, "content": result}

        results = await asyncio.gather(*(run_one(b) for b in tool_uses))
        messages.append({"role": "user", "content": list(results)})


async def main() -> None:
    print(await run_agent("Quel temps fait-il à Tunis et à Lyon ?"))


asyncio.run(main())

Quand le modèle demande plusieurs outils dans un tour, on les exécute via gather plutôt que séquentiellement : si chaque outil est du I/O, la latence s'effondre. C'est exactement le pattern « serveur de restaurant » appliqué aux appels d'outils.

Sorties structurées typées

Quand tu veux un JSON garanti conforme à un schéma (extraction, classification), utilise messages.parse() avec un modèle Pydantic v2 plutôt que de parser à la main une réponse texte.

python
import asyncio
from pydantic import BaseModel
from anthropic import AsyncAnthropic

client = AsyncAnthropic()


class Contact(BaseModel):
    name: str
    email: str
    wants_demo: bool


async def extract(text: str) -> Contact:
    message = await client.messages.parse(
        model="claude-opus-4-8",
        max_tokens=1024,
        messages=[{"role": "user", "content": f"Extrais le contact : {text}"}],
        output_format=Contact,  # passe la classe Pydantic directement
    )
    return message.parsed_output  # déjà validé en Contact


async def main() -> None:
    c = await extract("Jane Doe ([email protected]) veut une démo.")
    print(c.name, c.email, c.wants_demo)


asyncio.run(main())

⚙️ En production

Modes de défaillance

  • Bloquer la boucle — le bug n°1. Un seul requests.get(), time.sleep(), driver DB synchrone (psycopg2 au lieu d'asyncpg), ou parsing JSON géant dans une coroutine, et tout ton serveur se fige. Symptôme : la latence p99 explose sous charge alors que le CPU est à 5 %. Audite avec PYTHONASYNCIODEBUG=1 (la boucle te prévient quand une coroutine tient la main trop longtemps) ou loop.set_debug(True).

  • Tâches fire-and-forget qui meurent en silenceasyncio.create_task(coro()) sans garder la référence : si la tâche lève une exception, personne ne la voit (et le GC peut même la tuer avant la fin). Garde toujours une référence forte, et attache un add_done_callback qui logge les exceptions, ou utilise un TaskGroup.

    python
    # ❌ la tâche peut disparaître + son exception est avalée
    asyncio.create_task(background_job())
    
    # ✅ référence gardée + erreurs loguées
    _tasks: set[asyncio.Task[None]] = set()
    
    def spawn(coro) -> None:
        task = asyncio.create_task(coro)
        _tasks.add(task)
        task.add_done_callback(_tasks.discard)
        task.add_done_callback(lambda t: t.exception() and log.error("task failed", exc_info=t.exception()))
  • Pas de timeout — un appel LLM qui pend indéfiniment bloque une connexion pour toujours. Enveloppe avec asyncio.timeout() (3.11+).

    python
    async with asyncio.timeout(30):
        result = await client.messages.create(...)
  • Annulation mal gérée — quand une tâche est annulée (timeout, client déconnecté), une CancelledError est levée à l'intérieur de ta coroutine. Ne l'avale jamais dans un except Exception: générique : tu transformerais une annulation en zombie. Si tu dois nettoyer, attrape-la, fais ton ménage, puis re-lève-la.

Performance

  • asyncio brille sur le I/O-bound (LLM, HTTP, DB, files). Sur le CPU-bound, il n'apporte rien (un seul thread Python à la fois). Pour du calcul lourd : ProcessPoolExecutor.
  • Borne toujours la concurrence (Semaphore) vers les services externes. « Lancer 10 000 appels d'un coup » n'est pas une optimisation : c'est un déni de service contre ton propre fournisseur (429 en cascade) et un OOM potentiel.
  • Réutilise un seul client AsyncAnthropic (ou httpx.AsyncClient) pour tout le process : il maintient un pool de connexions HTTP. En créer un par requête tue le keep-alive et explose en latence.

Sécurité

  • Pas de CancelledError avalée (voir plus haut) : sinon tu peux laisser des ressources ouvertes ou ignorer un shutdown.
  • Pas d'état mutable partagé sans garde : même mono-thread, une coroutine peut être interrompue à chaque await. Si deux tâches lisent-puis-écrivent une variable partagée autour d'un await, tu as une race condition. Utilise asyncio.Lock pour les sections critiques (jamais threading.Lock, qui bloquerait la boucle).
  • Ne mets jamais de secret dans le prompt par commodité : il est journalisé dans l'historique de conversation. Garde les clés côté serveur.

Observabilité

  • Loggue le usage de chaque réponse Anthropic (message.usage.input_tokens / output_tokens, et cache_read_input_tokens pour vérifier que ton prompt caching mord). Un cache hit ≈ 0,1× le prix d'entrée.
  • Trace le temps passé dans await vs hors await : si une coroutine passe du temps hors await, elle bloque la boucle.
  • Active PYTHONASYNCIODEBUG=1 en staging pour détecter coroutines jamais awaited et callbacks lents.

Les arbitrages de senior

  • TaskGroup (tout-ou-rien, annulation propre) vs gather(return_exceptions=True) (best-effort) : choisis selon que l'échec partiel est tolérable.
  • Async partout vs îlots async : n'introduis pas asyncio dans un codebase synchrone « pour faire moderne ». Async est contagieux (une fonction async ne peut être appelée que depuis de l'async). Le gain ne vaut le coût que si tu es réellement I/O-bound et concurrent.
  • Streaming vs réponse complète : le streaming améliore l'UX (time-to-first-token) et évite les timeouts sur les réponses longues, au prix d'un code de consommation un peu plus complexe.
  • output_config.effort (Opus 4.8) : règle la profondeur de raisonnement (lowxhigh/max) selon le compromis intelligence/latence/coût. Combine avec le adaptive thinking (thinking={"type": "adaptive"}). N'utilise jamais budget_tokens sur Opus 4.8 (rejeté en 400).

🏋️ Exercices

Exercice 1 — Repérer et corriger le blocage (implémenter)

Objectif : ce « serveur » async est en réalité séquentiel. Trouve pourquoi, corrige-le, et prouve le gain de temps.

python
import time
import asyncio


async def handle_request(req_id: int) -> str:
    time.sleep(1)  # « traitement »
    return f"req {req_id} ok"


async def main() -> None:
    start = time.perf_counter()
    results = await asyncio.gather(*(handle_request(i) for i in range(5)))
    print(results, f"{time.perf_counter() - start:.1f}s")


asyncio.run(main())

Indice/Solution : time.sleep(1) est synchrone → il bloque la boucle, donc gather ne sert à rien (5 s). Remplace par await asyncio.sleep(1) → ~1 s. La leçon : gather ne crée pas de concurrence si chaque tâche ne rend jamais la main.

Exercice 2 — Fan-out borné de classification (implémenter)

Objectif : classifie 100 textes via claude-haiku-4-5, avec au plus 10 appels concurrents, et retourne les résultats dans l'ordre d'entrée.

Indice/Solution : Semaphore(10) + TaskGroup ; conserve la liste des tasks dans l'ordre de création et lis t.result() dans cet ordre (les tasks préservent l'ordre, contrairement à as_completed). Vérifie sur les usage que tu ne dépasses pas tes limites.

Exercice 3 — Timeout + retry maison (production-grade)

Objectif : enveloppe un appel client.messages.create avec un timeout de 20 s ; en cas de TimeoutError, réessaie une fois avec un timeout doublé ; loggue chaque tentative. Ne re-tente pas sur une BadRequestError.

Indice/Solution : async with asyncio.timeout(t): autour de l'appel ; try/except TimeoutError. Attrape les erreurs typées (anthropic.APIStatusError, anthropic.RateLimitError) plutôt que de matcher des chaînes. Rappelle-toi que le SDK retry déjà 429/5xx — ton retry maison ne couvre que le timeout applicatif.

Exercice 4 — La tâche fantôme (casser puis réparer)

Objectif : ce code lève une exception qui n'apparaît jamais dans les logs. Reproduis le silence, puis répare-le pour que l'erreur soit visible.

python
import asyncio


async def background() -> None:
    await asyncio.sleep(0.1)
    raise RuntimeError("boom")


async def main() -> None:
    asyncio.create_task(background())  # référence non gardée
    await asyncio.sleep(1)
    print("fini")  # s'affiche, l'erreur est avalée


asyncio.run(main())

Indice/Solution : la task n'est ni gardée ni awaited → son exception est silencieuse. Répare avec un TaskGroup (async with asyncio.TaskGroup() as tg: tg.create_task(background())) qui propage via ExceptionGroup, ou garde la référence + add_done_callback qui loggue task.exception().

Exercice 5 — Boucle tool-use concurrente (production-grade)

Objectif : étends la boucle agent de la leçon avec deux outils (get_weather, get_time). Quand le modèle en demande plusieurs dans un tour, exécute-les concurremment. Ajoute un garde-fou max_iterations=10 pour éviter une boucle infinie.

Indice/Solution : dispatch par block.name vers la bonne coroutine ; gather sur tous les tool_use du tour ; compteur d'itérations qui lève si dépassé. Renvoie toujours response.content complet dans le tour assistant, et un tool_result par tool_use_id.

Exercice 6 — Race condition sous await (casser puis réparer)

Objectif : ce compteur partagé donne un total faux quand les tâches s'interrompent autour de l'await. Explique pourquoi, puis corrige.

python
import asyncio

counter = {"value": 0}


async def increment() -> None:
    current = counter["value"]
    await asyncio.sleep(0)  # point de bascule : une autre tâche peut s'intercaler
    counter["value"] = current + 1


async def main() -> None:
    await asyncio.gather(*(increment() for _ in range(1000)))
    print(counter["value"])  # souvent < 1000


asyncio.run(main())

Indice/Solution : entre read et write, l'await rend la main → plusieurs tâches lisent la même valeur. Mono-thread n'implique pas atomique dès qu'il y a un await. Corrige avec un asyncio.Lock autour de la section critique (async with lock:), ou élimine l'await au milieu.


🎤 En entretien

Q : Quelle est la différence entre concurrence et parallélisme, et où se situe asyncio ? La concurrence, c'est gérer plusieurs tâches en jonglant entre elles ; le parallélisme, c'est les exécuter littéralement en même temps sur plusieurs cœurs. asyncio fait de la concurrence sur un seul thread (à cause du GIL), donc il accélère le I/O-bound (en recouvrant les attentes) mais pas le CPU-bound.

Q : Pourquoi un seul requests.get() dans une route async peut-il faire tomber la latence de tout le serveur ? Parce qu'il est synchrone : il bloque l'event loop pendant toute la durée de l'appel réseau, donc aucune autre coroutine ne peut avancer — toutes les requêtes concurrentes sont gelées. Il faut un client async (httpx.AsyncClient, AsyncAnthropic) ou déléguer à asyncio.to_thread.

Q : asyncio.gather vs asyncio.TaskGroup — lequel et pourquoi ?TaskGroup (3.11+) par défaut : il est structuré, attend toutes les tâches et annule les sœurs proprement si l'une échoue, regroupant les erreurs en ExceptionGroup. gather propage la première exception sans annuler les autres (qui deviennent orphelines) — pratique seulement avec return_exceptions=True pour un fan-out best-effort.

Q : Comment streamer 200 conversations LLM simultanées sur un seul process Python sans bloquer ? Un seul AsyncAnthropic partagé (pool de connexions), client.messages.stream consommé en async for (chaque token rend la main à la boucle), concurrence bornée par un Semaphore pour respecter les rate limits, timeouts via asyncio.timeout, et zéro code synchrone bloquant dans le chemin chaud.

Bibliothèque tech perso — Achref