Skip to content

Background tasks

TL;DR — Une BackgroundTask FastAPI exécute du code après que la réponse HTTP a été envoyée au client, dans le même process et la même boucle d'événements que ton endpoint. C'est l'outil parfait pour le travail court, best-effort, qui ne doit pas faire attendre l'utilisateur : logguer, envoyer un email de confirmation, invalider un cache, déclencher un webhook. Ce n'est pas une file de tâches : pas de persistance, pas de retry, pas d'isolation — si le process meurt, la tâche meurt avec lui, et une tâche bloquante (time.sleep, requête SQL synchrone) gèle tout le serveur. Pour un agent LLM, tu t'en sers pour le fire-and-forget (tracer un appel, persister une trace de tool-use) — jamais pour la génération elle-même, qui veut du streaming SSE ou une vraie file Celery/ARQ. La règle senior : async def + I/O non bloquant pour les tâches courtes, def (thread pool) pour le CPU/I/O bloquant inévitable, et une vraie file dès que tu as besoin de durabilité ou de retry.

🧠 Mental model

Tu connais le pattern « réponds vite, finis le boulot après » de NestJS : tu renvoies un 201 Created, et tu balances le reste dans un EventEmitter ou un @OnEvent. Les BackgroundTasks de FastAPI sont exactement ce réflexe, mais avec une nuance critique qui n'existe pas pareil en Node.

L'analogie du serveur de restaurant. Le serveur (ton endpoint) prend ta commande, te confirme « c'est noté » (la réponse HTTP), puis va porter le ticket en cuisine (la background task). Tant qu'il ne fait que déposer le ticket, il revient prendre la commande suivante immédiatement. Mais s'il décide de rester en cuisine cuire le plat lui-même (une tâche bloquante), plus personne dans la salle n'est servi — il n'y a qu'un seul serveur (une seule boucle d'événements).

C'est là que ça diverge de Node : en Node, ton await dans un handler ne bloque jamais la boucle tant que tu restes sur de l'I/O async. En FastAPI, tu écris du Python où le synchrone bloquant existe partout (requests, psycopg2, time.sleep, du calcul lourd), et une background task async def qui appelle du code bloquant gèle la boucle pour toutes les requêtes concurrentes.

   Client                FastAPI (1 boucle asyncio)
     │   POST /signup          │
     │ ──────────────────────► │
     │                         │ crée user (await db)
     │   202 Accepted          │
     │ ◄────────────────────── │  ← réponse ENVOYÉE
     │                         │
     │                         │ ───► background task: send_welcome_email()
     │   (déjà parti)          │       s'exécute APRÈS la réponse,
     │                         │       MÊME process, MÊME boucle

Le point mental clé : la frontière n'est pas « avant/après la fonction » mais « avant/après l'envoi de la réponse ». La tâche tourne dans le cycle de vie de la requête ASGI, pas dans un worker séparé.

Le cœur : comment ça marche vraiment

Il y a deux surfaces d'API. La première, celle que tu utiliseras 95 % du temps, est le paramètre injecté BackgroundTasks.

La bonne façon : injecter BackgroundTasks

python
from fastapi import BackgroundTasks, FastAPI
from pydantic import BaseModel, EmailStr

app = FastAPI()


class SignupRequest(BaseModel):
    email: EmailStr
    name: str


async def send_welcome_email(email: str, name: str) -> None:
    # I/O non bloquant : un client HTTP async (httpx), pas `requests`.
    import httpx

    async with httpx.AsyncClient(timeout=10.0) as client:
        await client.post(
            "https://api.email-provider.test/v1/send",
            json={"to": email, "template": "welcome", "vars": {"name": name}},
        )


@app.post("/signup", status_code=202)
async def signup(payload: SignupRequest, tasks: BackgroundTasks) -> dict[str, str]:
    user_id = await create_user(payload.email, payload.name)
    # On ENREGISTRE la tâche ; elle ne s'exécute pas encore.
    tasks.add_task(send_welcome_email, payload.email, payload.name)
    return {"user_id": user_id, "status": "created"}

Trois choses à intérioriser :

  1. add_task ne lance rien tout de suite. Il empile (callable, args, kwargs) dans une liste attachée à la réponse. L'exécution a lieu quand la couche ASGI a fini d'écrire le corps de la réponse sur le socket.
  2. BackgroundTasks est injecté par dépendance. Tu n'instancies rien : tu déclares le paramètre, FastAPI le fournit. Tu peux aussi le recevoir dans une dépendance (Depends) et y ajouter des tâches — elles s'agrègent toutes sur la même réponse.
  3. L'ordre est FIFO. Les tâches s'exécutent séquentiellement, dans l'ordre d'ajout, l'une après l'autre. Ce n'est pas de la concurrence : add_task(a); add_task(b) veut dire a puis b, pas a || b.

async def vs def : la décision la plus importante du fichier

FastAPI inspecte la signature de ta tâche (via Starlette) et la traite différemment :

python
import anyio


# CAS 1 — coroutine : awaitée DANS la boucle d'événements.
async def task_async(x: int) -> None:
    await some_async_io(x)        # OK : ne bloque pas la boucle


# CAS 2 — fonction sync : exécutée dans un THREAD POOL (anyio).
def task_sync(x: int) -> None:
    legacy_blocking_db_call(x)    # OK : isolée hors de la boucle
Tu écrisStarlette faitConséquence
async defawait task() directement dans la boucleDu code bloquant ici gèle tout le serveur
defawait run_in_threadpool(task)Isolé du loop, mais limité par la taille du pool (40 threads par défaut)

La règle senior : si ta tâche fait de l'I/O et que tu as une lib async (httpx, asyncpg, aioredis), écris async def. Si ta tâche est intrinsèquement bloquante (lib legacy synchrone, calcul Python pur), écris def pour que Starlette la pousse dans le thread pool. Le piège mortel est async def + appel bloquant — tu obtiens le pire des deux mondes.

python
# 🔥 LE PIÈGE — async def qui bloque la boucle
async def task_broken(report_id: str) -> None:
    import time, requests          # requests = client HTTP SYNCHRONE
    time.sleep(5)                  # gèle la boucle 5 s pour TOUT LE MONDE
    requests.post(...)             # idem, bloque jusqu'à la réponse

Pendant ces 5 secondes, aucune autre requête HTTP n'est servie par ce worker. Tes health checks timeout, ton load balancer retire l'instance, et tu débugges un « FastAPI lent » qui est en réalité une boucle gelée.

L'autre surface : BackgroundTask (singulier) sur une Response

Quand tu construis une Response à la main (souvent une StreamingResponse), tu peux attacher une tâche unique via le paramètre background. C'est le seul moyen de lancer du travail après la fin d'un stream :

python
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse


@app.get("/export")
async def export_data() -> StreamingResponse:
    async def generate():
        async for row in stream_rows():
            yield row

    return StreamingResponse(
        generate(),
        media_type="application/x-ndjson",
        background=BackgroundTask(cleanup_temp_files, export_id="abc"),
    )

BackgroundTask (singulier, depuis starlette.background) ≠ BackgroundTasks (pluriel, le conteneur injecté). On y revient dans les exercices car c'est une question d'entretien classique.

🔌 Lien avec un agent LLM (Anthropic SDK)

Servir un agent, c'est exactement le terrain où les background tasks brillent — et où elles te trahissent si tu te trompes de couche.

Ce que tu NE fais PAS : lancer la génération du modèle dans une background task. La génération est longue, l'utilisateur veut voir les tokens arriver, et une tâche fire-and-forget ne peut rien renvoyer au client. Pour ça, tu streames (voir la leçon SSE).

Ce que tu FAIS : le post-traitement best-effort une fois la réponse partie — persister la trace, comptabiliser les tokens, envoyer la facturation interne. Voici un endpoint qui streame une réponse Claude et persiste l'usage en background une fois le stream fini :

python
import os
from anthropic import AsyncAnthropic
from starlette.background import BackgroundTask
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

app = FastAPI()
client = AsyncAnthropic()  # lit ANTHROPIC_API_KEY ; réutilisé entre requêtes


class ChatRequest(BaseModel):
    prompt: str
    conversation_id: str


@app.post("/chat")
async def chat(req: ChatRequest) -> StreamingResponse:
    # On capture l'usage pendant le stream pour le persister APRÈS.
    captured: dict[str, int] = {}

    async def token_stream():
        async with client.messages.stream(
            model="claude-opus-4-8",
            max_tokens=4096,
            thinking={"type": "adaptive"},          # adaptatif (jamais budget_tokens)
            messages=[{"role": "user", "content": req.prompt}],
        ) as stream:
            async for text in stream.text_stream:
                yield text
            final = await stream.get_final_message()
            captured["input_tokens"] = final.usage.input_tokens
            captured["output_tokens"] = final.usage.output_tokens

    async def persist_usage() -> None:
        # S'exécute une fois le stream terminé, hors du chemin critique.
        await record_usage(
            conversation_id=req.conversation_id,
            input_tokens=captured.get("input_tokens", 0),
            output_tokens=captured.get("output_tokens", 0),
        )

    return StreamingResponse(
        token_stream(),
        media_type="text/plain",
        background=BackgroundTask(persist_usage),  # singulier : post-stream
    )

Note le détail clé : captured est rempli dans le générateur (au moment du get_final_message()), et lu dans la tâche persist_usage, qui ne tourne qu'après que le générateur soit épuisé. C'est garanti par l'ordre ASGI : tâche background d'une Response = exécutée après le dernier yield.

Pour un endpoint non streamé (réponse JSON simple), le pattern fire-and-forget standard suffit :

python
@app.post("/classify")
async def classify(req: ChatRequest, tasks: BackgroundTasks) -> dict[str, str]:
    msg = await client.messages.create(
        model="claude-haiku-4-5",                  # rapide/pas cher pour de la classif
        max_tokens=256,
        messages=[{"role": "user", "content": req.prompt}],
    )
    label = msg.content[0].text if msg.content else ""
    # Trace best-effort : si ça rate, on ne casse pas la réponse au client.
    tasks.add_task(log_classification, req.conversation_id, label, msg.usage.output_tokens)
    return {"label": label}

Subtilité à connaître : avec les retries du SDK Anthropic (max_retries, exponential backoff sur 429/5xx), un messages.create peut prendre plusieurs secondes. Ne mets jamais l'appel modèle lui-même dans une add_task « pour aller plus vite » — la réponse partirait sans le résultat, et tu n'as aucun moyen de le renvoyer.

⚙️ En production

Les background tasks sont séduisantes parce qu'elles n'ont aucune dépendance. C'est précisément ce qui les rend dangereuses. Voici les modes de défaillance que tu dois connaître avant de t'appuyer dessus.

Mode de défaillance 1 — Aucune durabilité

Si le process est tué (déploiement, OOM, crash, SIGTERM pendant un rolling update) avant que la tâche ne tourne, elle est perdue silencieusement. Pas de DLQ, pas de retry, pas de trace.

Règle : une background task ne doit jamais porter une garantie métier. « L'email de bienvenue devrait partir » → OK. « Le paiement doit être capturé » → file persistante (Celery, ARQ, Dramatiq, ou un outbox en base).

Mode de défaillance 2 — Les exceptions sont avalées

Une exception levée dans une background task n'atteint jamais le client (la réponse est déjà partie) et, selon ta version, peut juste être logguée par Starlette — ou pas. Tu dois encapsuler la robustesse toi-même :

python
import logging

logger = logging.getLogger("background")


async def safe_send_email(email: str) -> None:
    try:
        await send_welcome_email(email, name="")
    except Exception:  # noqa: BLE001 — on veut TOUT attraper en fire-and-forget
        logger.exception("welcome email failed for %s", email)
        # éventuellement : pousser dans une vraie file pour retry

Ne laisse jamais une tâche planter sans try/except : au mieux tu perds l'observabilité, au pire (sur certaines stacks) tu remontes une 500 sur une réponse déjà commitée.

Mode de défaillance 3 — La famine du thread pool

Les tâches def (synchrones) passent par le thread pool AnyIO, partagé avec tous les def endpoints et toutes les dépendances synchrones. Sa taille par défaut est 40 threads. Si tu balances 200 tâches synchrones lentes en rafale, tu épuises le pool, et tes endpoints def se mettent en file d'attente derrière. Symptôme : latence p99 qui explose sous charge sans que le CPU bouge.

python
# Augmenter le pool au démarrage si tu dépends de tâches sync lourdes
import anyio
from contextlib import asynccontextmanager


@asynccontextmanager
async def lifespan(app: FastAPI):
    limiter = anyio.to_thread.current_default_thread_limiter()
    limiter.total_tokens = 100  # à mesurer, pas à deviner
    yield


app = FastAPI(lifespan=lifespan)

Mais si tu en arrives là, la vraie réponse est souvent : sors le travail du process (file dédiée).

Performance — la frontière à ne pas franchir

Caractéristique de la tâcheBon outil
Courte, I/O async, best-effortBackgroundTasks + async def
Courte, bloquante (lib legacy)BackgroundTasks + def (thread pool)
CPU-bound (image, ML, parsing lourd)ProcessPoolExecutor ou worker dédié
Longue, doit survivre au crash, retryFile persistante (Celery / ARQ / Dramatiq)
Streaming vers le clientStreamingResponse (SSE/WebSocket), pas une background task

Le CPU-bound est un piège spécifique : même en def (thread pool), le GIL fait que ton calcul Python pur ne libère pas vraiment la boucle. Pour du CPU lourd, loop.run_in_executor avec un ProcessPoolExecutor, ou carrément un worker séparé.

Sécurité

  • Données capturées par closure. Une tâche qui capture la requête ou un objet de session peut tenir des données sensibles en mémoire plus longtemps que prévu, et — pire — référencer un état dont le cycle de vie est lié à la requête. Passe des valeurs primitives (user_id: int), pas l'objet Request entier.
  • Connexions liées à la requête. Si tu utilises une session DB injectée via Depends, elle peut être fermée au moment où la background task s'exécute (les dépendances avec yield se nettoient après la réponse — l'ordre relatif aux tâches est subtil et dépend de la version). N'hérite pas d'une connexion de requête dans une tâche : ouvre la tienne, ou utilise un pool.
  • Pas de secret dans les logs de tâche. Les tâches sont l'endroit où on logge « tranquillement » — c'est exactement là que fuient les tokens et PII.

Observabilité

La tâche tourne après la réponse, donc hors de ta trace de requête par défaut. Tu perds le request_id, le span APM, le contexte de logging. Propage-les explicitement :

python
async def traced_task(request_id: str, payload: dict) -> None:
    with logging_context(request_id=request_id):  # ton helper de corrélation
        logger.info("background task start")
        await do_work(payload)

Sans ça, tes logs de background sont orphelins et indébogables en incident.

🏋️ Exercices

Exercice 1 — Fire-and-forget correct (implémenter)

Objectif. Construire un endpoint POST /orders qui crée une commande (await DB), renvoie 201 avec l'order_id, et envoie en background deux notifications dans l'ordre : un email au client, puis un webhook à l'entrepôt. Les deux doivent être async def et non bloquantes.

Indice/Solution. Injecte tasks: BackgroundTasks, fais tasks.add_task(send_email, ...) puis tasks.add_task(notify_warehouse, ...). Vérifie expérimentalement que l'ordre est FIFO en logguant un timestamp dans chaque tâche. Utilise httpx.AsyncClient pour le webhook, jamais requests. La signature doit être async def send_email(...) pour rester dans la boucle.

Exercice 2 — Détecter et corriger la boucle gelée (break-then-fix)

Objectif. On te donne cet endpoint. Reproduis le gel, prouve-le avec un second endpoint /ping, puis corrige.

python
@app.post("/process")
async def process(req: Req, tasks: BackgroundTasks):
    tasks.add_task(heavy, req.id)
    return {"queued": req.id}

async def heavy(id: str) -> None:
    import time, requests
    time.sleep(8)
    requests.post("https://hooks.test/notify", json={"id": id})

Indice/Solution. Lance le serveur avec un seul worker. Appelle /process, puis spamme /ping : tu verras /ping bloqué ~8 s. La cause : heavy est async def mais appelle time.sleep + requests (bloquants). Deux corrections valides — (a) la transformer en def heavy(...) (Starlette la pousse dans le thread pool), ou (b) garder async def et remplacer par await asyncio.sleep(8) + httpx.AsyncClient. La (b) est supérieure car elle n'épuise pas le thread pool. Mesure la latence p99 de /ping avant/après.

Exercice 3 — Post-stream cleanup avec BackgroundTask singulier (implémenter)

Objectif. Construire GET /report/{id} qui streame un gros fichier ligne par ligne et, une fois le stream fini, supprime le fichier temporaire et incrémente un compteur de téléchargements. Le cleanup ne doit s'exécuter qu'après le dernier byte envoyé.

Indice/Solution. Tu ne peux pas utiliser BackgroundTasks injecté ici de façon fiable pour du « après le stream » — utilise StreamingResponse(generate(), background=BackgroundTask(cleanup, id)) (import depuis starlette.background). Prouve l'ordre : logge dans le générateur (chaque ligne) et dans cleanup ; le log de cleanup doit apparaître après la dernière ligne. Piège classique : si tu supprimes le fichier dans le générateur après le dernier yield, ça marche aussi, mais le pattern background= est plus propre et survit à une exception de cleanup sans corrompre le stream.

Exercice 4 — Trace LLM durable (production-grade)

Objectif. Partir de l'endpoint /classify de la leçon (qui trace en background) et le rendre robuste : la trace doit survivre à un redémarrage du process. Décris (et implémente le squelette de) la migration vers un pattern outbox.

Indice/Solution. Le fire-and-forget perd la trace si le process meurt entre la réponse et l'exécution de la tâche. Pattern outbox : dans la même transaction que tout autre écrit (ou une écriture dédiée), insère une ligne outbox(event_type, payload, status='pending') — ça, c'est durable. Puis une background task (ou un worker séparé) lit les pending, les traite, passe en done. Si le process meurt, la ligne reste pending et un worker la reprend. La background task devient un optimisation de latence (traiter tout de suite), pas la garantie. Implémente : INSERT outbox synchrone dans l'endpoint + tasks.add_task(drain_outbox_row, row_id) + un cron/worker de rattrapage.

Exercice 5 — Famine du thread pool (break-then-fix)

Objectif. Saturer délibérément le thread pool avec des tâches def lentes et observer l'effondrement de la latence des endpoints def, puis corriger sans changer le code des tâches.

Indice/Solution. Crée une tâche def slow(): time.sleep(2) et un endpoint def health(): return "ok" (synchrone exprès). Balance 100 add_task(slow) en rafale via un client de charge, puis appelle /health : sa latence grimpe car les 40 threads du pool sont pris. Corrections : (a) augmenter current_default_thread_limiter().total_tokens au démarrage (rustine), (b) rendre health async def (il quitte le thread pool), (c) la vraie : déporter slow vers une file dédiée. Discute pourquoi (a) ne fait que repousser le mur.

Exercice 6 — Corrélation de logs à travers la frontière (production-grade)

Objectif. Les logs émis dans tes background tasks n'ont pas le request_id de la requête d'origine, ce qui les rend inutilisables en incident. Propage le contexte.

Indice/Solution. Capture le request_id (depuis un middleware ou un header) dans l'endpoint, passe-le en argument explicite à add_task(task, request_id=rid, ...). Dans la tâche, ré-installe-le via un contextvars.ContextVar ou ton helper de logging structuré. Vérifie que grep request_id=<x> ramène à la fois les logs de requête et de background task. Bonus : montre pourquoi capturer l'objet Request entier au lieu du seul request_id est un anti-pattern (cycle de vie + mémoire).

🎤 En entretien

Q : Quelle est la différence entre BackgroundTasks et une file de tâches comme Celery ?BackgroundTasks tourne dans le même process et la même boucle que l'app, sans persistance ni retry — best-effort, perdu au crash ; Celery/ARQ sont des workers séparés, durables, avec retry et scaling indépendant. On prend BackgroundTasks pour du court, fire-and-forget, sans garantie ; une file dès qu'il faut survivre à un redémarrage ou retenter.

Q : Une background task async def peut-elle ralentir tout le serveur ? Comment ? Oui — une async def est awaitée directement dans la boucle d'événements, donc tout appel bloquant dedans (time.sleep, requests, SQL synchrone) gèle la boucle pour toutes les requêtes concurrentes du worker. La parade : I/O async pur en async def, ou écrire la tâche en def pour qu'elle parte au thread pool.

Q : BackgroundTasks (pluriel) vs BackgroundTask (singulier) — quand chacun ?BackgroundTasks (pluriel, fastapi) est injecté dans l'endpoint et accepte plusieurs add_task exécutées FIFO ; BackgroundTask (singulier, starlette.background) s'attache à une Response/StreamingResponse via background= et porte une seule tâche — c'est le seul moyen propre de lancer du travail après la fin d'un stream.

Q : Pourquoi ne jamais réutiliser une session DB injectée par Depends dans une background task ? Parce que les dépendances avec yield se nettoient autour du cycle de la réponse, et la connexion peut être déjà fermée/recyclée quand la tâche s'exécute — tu lis sur un objet mort ou tu corromps le pool. On ouvre une connexion neuve (ou on prend une session du pool) à l'intérieur de la tâche, et on ne passe que des valeurs primitives par argument.

Bibliothèque tech perso — Achref