Skip to content

File uploads

TL;DR — En FastAPI, un fichier n'arrive jamais d'un coup en RAM : c'est un flux multipart/form-data que Starlette spille sur disque au-delà d'un seuil. Le bon réflexe senior, c'est de streamer par chunks (await file.read(size)) plutôt que de gober tout le fichier, de valider taille + type via les magic bytes (jamais via l'extension ni le Content-Type que le client contrôle), et de déléguer le travail lourd (antivirus, upload S3, transcodage) à une tâche d'arrière-plan. Côté IA, un upload se transforme soit en base64 inline (petite image/PDF envoyés directement à messages.create), soit en file_id via la Files API d'Anthropic quand le même document sert plusieurs requêtes. On code tout en async, typé Python 3.12, avec UploadFile + httpx.AsyncClient + AsyncAnthropic.

Tu viens de PHP/TS : en PHP tu avais $_FILES['x']['tmp_name'] (un fichier déjà écrit sur disque par le SAPI) ; en NestJS tu collais @UseInterceptors(FileInterceptor('file')) avec Multer et tu récupérais un Express.Multer.File (buffer ou disque selon la config). FastAPI est plus proche de NestJS, mais le modèle de streaming est explicite et asynchrone — et c'est exactement là que se jouent les bugs de prod (OOM, fichiers tronqués, blocage de l'event loop).

🧠 Mental model

Un upload HTTP n'est pas « un fichier ». C'est un corps de requête multipart/form-data : une suite de parts, chacune avec ses headers (Content-Disposition, Content-Type) et son contenu binaire, séparés par un boundary. Le serveur lit ce corps au fil de l'eau, octet par octet.

L'analogie : pense à un upload comme à un tapis roulant à la douane, pas comme à un colis posé sur ton bureau.

  • Tu ne reçois pas le colis fermé ; tu vois les objets défiler.
  • Tu peux inspecter chaque morceau au passage (taille cumulée, premiers octets).
  • Si tu attends d'avoir tout empilé sur ton bureau pour regarder, tu croules (OOM) dès qu'un client envoie 4 Go.

Concrètement, UploadFile de FastAPI enveloppe un SpooledTemporaryFile Python : le contenu reste en mémoire sous un seuil (~1 Mo par défaut côté Starlette), puis est spillé sur disque automatiquement. Tu manipules donc un objet file-like asynchrone, pas un bytes en RAM.

Client                         FastAPI / Starlette                Toi (handler)
  │  multipart/form-data            │                                  │
  ├───── part: "file" ────────────►│  parse boundaries                 │
  │      (chunk 1)                  │  écrit dans SpooledTemporaryFile  │
  ├───── part: "file" ────────────►│   ┌──────────────┐                │
  │      (chunk 2)                  │   │  < seuil ?   │── RAM ───┐     │
  ├───── ...                        │   │  > seuil ?   │── disque ┤     │
  │                                 │   └──────────────┘          ▼     │
  │                                 │                       UploadFile  │
  │                                 │                       .read(n) ───┤ await chunk par chunk

Deux objets distincts à ne pas confondre :

OutilPour quoiType
UploadFileun vrai fichier binaire (image, PDF, CSV)flux file-like async
Form(...)un champ texte du même formulaire multipartstr / int
Body(...) / modèle Pydanticun corps JSON (pas de fichier)modèle typé

Règle d'or : dès qu'il y a un fichier dans la requête, tout le corps est multipart — tu ne peux plus utiliser un modèle Pydantic JSON pour les autres champs, il faut passer par Form(...). C'est le piège n°1 quand on vient d'une API JSON.

Le cœur du sujet

Mise en place

bash
pip install "fastapi[standard]" python-multipart httpx "anthropic>=0.92.0" uvicorn

python-multipart est obligatoire : sans lui, FastAPI lève une erreur claire au démarrage dès qu'un endpoint déclare un UploadFile. (FastAPI 0.115+ le réclame explicitement.)

La façon idiomatique : streamer, valider, borner

python
from __future__ import annotations

import hashlib
from pathlib import Path
from typing import Final

import aiofiles
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, status

app = FastAPI()

UPLOAD_DIR: Final = Path("/var/uploads")
MAX_BYTES: Final = 25 * 1024 * 1024          # 25 Mo — limite métier explicite
CHUNK_SIZE: Final = 1024 * 1024              # 1 Mo lu à la fois
# Magic bytes : on valide le VRAI type, pas l'extension ni le Content-Type client
MAGIC: Final[dict[bytes, str]] = {
    b"\x89PNG\r\n\x1a\n": "image/png",
    b"\xff\xd8\xff": "image/jpeg",
    b"%PDF-": "application/pdf",
}


def sniff_media_type(head: bytes) -> str | None:
    return next((mt for sig, mt in MAGIC.items() if head.startswith(sig)), None)


@app.post("/uploads", status_code=status.HTTP_201_CREATED)
async def upload(
    file: UploadFile = File(...),
    label: str = Form(...),                  # champ texte du même multipart
) -> dict[str, str | int]:
    # 1) Lire les premiers octets pour sniffer le type AVANT de tout écrire
    head = await file.read(8)
    media_type = sniff_media_type(head)
    if media_type is None:
        raise HTTPException(
            status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
            detail="Type de fichier non supporté (png/jpeg/pdf attendus).",
        )

    # 2) Streamer le reste vers le disque, en bornant la taille au fil de l'eau
    UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
    dest = UPLOAD_DIR / f"{label}-{file.filename}"
    digest = hashlib.sha256(head)
    total = len(head)

    async with aiofiles.open(dest, "wb") as out:
        await out.write(head)
        while chunk := await file.read(CHUNK_SIZE):
            total += len(chunk)
            if total > MAX_BYTES:
                await out.close()
                dest.unlink(missing_ok=True)
                raise HTTPException(
                    status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
                    detail=f"Fichier > {MAX_BYTES} octets.",
                )
            digest.update(chunk)
            await out.write(chunk)

    return {"path": str(dest), "media_type": media_type, "bytes": total, "sha256": digest.hexdigest()}

Points senior dans ce code :

  • On lit par chunks (await file.read(CHUNK_SIZE)) — la RAM consommée est bornée à CHUNK_SIZE, peu importe la taille du fichier.
  • On sniffe les magic bytes sur les premiers octets : file.content_type et file.filename sont fournis par le client et donc non fiables. Un attaquant renomme payload.exe en cv.pdf et met Content-Type: application/pdf — seul le contenu réel ne ment pas.
  • On borne la taille pendant l'écriture, pas après. Vérifier file.size ne suffit pas : il peut être None, et un client malveillant peut mentir sur Content-Length.
  • aiofiles garde l'écriture disque hors de l'event loop. Sans lui, un open(...).write() synchrone bloque la boucle pour tous les autres clients.

La façon naïve (le piège classique)

python
# ❌ NE FAIS PAS ÇA EN PROD
@app.post("/uploads-bad")
async def upload_bad(file: UploadFile = File(...)) -> dict[str, int]:
    content = await file.read()        # tout le fichier en RAM, d'un coup
    Path(f"/tmp/{file.filename}").write_bytes(content)  # write_bytes SYNC → bloque l'event loop
    return {"bytes": len(content)}

Trois bombes à retardement :

  1. await file.read() sans argument charge l'intégralité en mémoire. 100 clients × 200 Mo = 20 Go de RAM → OOM kill.
  2. write_bytes est synchrone : pendant l'écriture, l'event loop est gelé, donc toutes les autres requêtes async attendent. Tu viens de transformer ton serveur async en serveur mono-thread bloquant.
  3. file.filename brut dans un chemin → path traversal (../../etc/cron.d/x). Toujours Path(file.filename).name ou un nom généré.

Si tu fais du travail purement bloquant et inévitable (lib qui ne propose pas d'API async), pousse-le dans un thread avec await anyio.to_thread.run_sync(...) plutôt que de bloquer la boucle.

Plusieurs fichiers et def vs async def

python
@app.post("/batch")
async def upload_batch(files: list[UploadFile] = File(...)) -> list[str]:
    return [f.filename or "(anonyme)" for f in files]

Subtilité que beaucoup ratent : un handler def (synchrone) est exécuté par FastAPI dans un threadpool, donc un file.read() y est en réalité l'API synchrone. Un handler async def doit utiliser await file.read(). Mélanger les deux (faire un file.read() sans await dans un async def) te renvoie une coroutine, pas des octets — bug silencieux garanti.

Lien IA : servir un agent qui reçoit des fichiers

C'est le scénario réel pour toi : un endpoint FastAPI reçoit une image ou un PDF de ton front Angular, et tu veux que Claude l'analyse. Deux stratégies, et le choix est un tradeoff senior.

Stratégie A — base64 inline (one-shot, petit fichier)

Le fichier ne sert qu'une fois → on l'encode en base64 et on l'envoie directement dans messages.create. Simple, zéro état serveur, mais le payload est ré-uploadé à chaque requête et compte dans les tokens d'entrée à chaque fois.

python
import base64
from anthropic import AsyncAnthropic, APIStatusError
from fastapi import FastAPI, UploadFile, File, HTTPException, status

app = FastAPI()
claude = AsyncAnthropic()   # lit ANTHROPIC_API_KEY ; réutilise UNE instance (pool httpx)

MAX_INLINE: int = 5 * 1024 * 1024   # 5 Mo : au-delà, passe par la Files API (stratégie B)


@app.post("/analyze")
async def analyze_image(file: UploadFile = File(...), prompt: str = "Décris cette image.") -> dict[str, str]:
    data = bytearray()
    while chunk := await file.read(1024 * 1024):
        data.extend(chunk)
        if len(data) > MAX_INLINE:
            raise HTTPException(status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, "Image trop lourde pour l'inline.")

    media_type = file.content_type or "image/png"
    b64 = base64.standard_b64encode(bytes(data)).decode()

    try:
        msg = await claude.messages.create(
            model="claude-opus-4-8",
            max_tokens=1024,
            thinking={"type": "adaptive"},   # adaptive = recommandé sur Opus 4.8
            messages=[{
                "role": "user",
                "content": [
                    {"type": "image", "source": {"type": "base64", "media_type": media_type, "data": b64}},
                    {"type": "text", "text": prompt},
                ],
            }],
        )
    except APIStatusError as exc:
        # On NE laisse PAS une 429/529 d'Anthropic fuiter en 500 opaque
        raise HTTPException(status.HTTP_502_BAD_GATEWAY, f"Erreur upstream Claude: {exc.type}") from exc

    text = next((b.text for b in msg.content if b.type == "text"), "")
    return {"analysis": text}

Pour un PDF, remplace le bloc image par {"type": "document", "source": {"type": "base64", "media_type": "application/pdf", "data": b64}}.

Stratégie B — Files API (document réutilisé sur plusieurs tours)

Si le même document est interrogé plusieurs fois (chat multi-tours sur un contrat de 80 pages), tu l'uploades une fois vers Anthropic, tu récupères un file_id, et tu le référenceras ensuite par ID — sans re-transmettre les octets.

python
from anthropic import AsyncAnthropic
from fastapi import FastAPI, UploadFile, File

app = FastAPI()
claude = AsyncAnthropic()
FILES_BETA = ["files-api-2025-04-14"]


@app.post("/documents")
async def register_document(file: UploadFile = File(...)) -> dict[str, str]:
    # On passe le flux directement — pas besoin de tout charger en RAM côté nous
    uploaded = await claude.beta.files.upload(
        file=(file.filename, await file.read(), file.content_type),
        betas=FILES_BETA,
    )
    return {"file_id": uploaded.id}   # à persister côté toi, réutilisable N fois


@app.post("/documents/{file_id}/ask")
async def ask_document(file_id: str, question: str) -> dict[str, str]:
    msg = await claude.beta.messages.create(
        model="claude-opus-4-8",
        max_tokens=1024,
        thinking={"type": "adaptive"},
        betas=FILES_BETA,
        messages=[{
            "role": "user",
            "content": [
                {"type": "document", "source": {"type": "file", "file_id": file_id}},
                {"type": "text", "text": question},
            ],
        }],
    )
    text = next((b.text for b in msg.content if b.type == "text"), "")
    return {"answer": text}

Le tradeoff :

Base64 inline (A)Files API (B)
État serveuraucunun file_id à persister
Coût si N requêtesN × ré-upload + N × tokens du doc1 upload, doc compté en entrée à chaque tour (mais pas de ré-transmission réseau)
Bon pouranalyse one-shot, petite imageRAG/chat multi-tours sur gros PDF
Avec prompt cachingcombinablecombinable — mets le document tôt dans le prompt, derrière un cache_control

Senior move : pour un chat multi-tours sur un gros document, Files API + prompt caching. Tu places le bloc document en tête du prompt avec un breakpoint cache_control, et les tours suivants lisent le document depuis le cache (~0,1× du prix d'entrée) au lieu de le re-payer plein tarif.

⚙️ En production

Modes de défaillance

  • OOM par accumulation. Le danger n'est pas un gros fichier, c'est la concurrence : 50 uploads simultanés de 200 Mo lus en RAM = 10 Go. Streame par chunks, et limite la concurrence (worker count, ou un asyncio.Semaphore autour du travail lourd).
  • Fichier tronqué / client qui coupe. Si le client ferme la connexion en plein upload, ton while file.read() lève une exception réseau. Écris dans un fichier temporaire et fais un os.replace() atomique seulement après succès complet — jamais de fichier à moitié écrit visible.
  • Path traversal. file.filename contient ../ → tu écris hors du dossier prévu. Toujours Path(file.filename).name, ou mieux : un nom généré (UUID, hash du contenu).
  • Reverse proxy. Nginx a client_max_body_size (défaut 1 Mo) qui rejettera l'upload en 413 avant même que FastAPI le voie. Aligne cette limite avec MAX_BYTES côté app, sinon tu débogueras une 413 fantôme qui ne vient pas de ton code.

Performance

  • L'I/O disque/réseau doit être async (aiofiles, httpx.AsyncClient). Une seule écriture synchrone bloque l'event loop pour tout le monde.
  • Le travail CPU lourd (resize d'image, OCR, hashing de gros fichiers) bloque aussi la boucle → anyio.to_thread.run_sync(...) ou un vrai worker (Celery/ARQ).
  • Pour un fichier qui part vers S3/MinIO et vers Claude, ne lis pas deux fois : streame une fois, calcule le hash et bufferise au besoin.

Sécurité

  • Valide par magic bytes, jamais par extension ni Content-Type.
  • Borne la taille pendant la lecture (limite stricte côté app + côté proxy).
  • Scanne à l'antivirus (ClamAV) les fichiers d'origine externe avant stockage/traitement, en tâche d'arrière-plan.
  • Stocke hors du document root web ; sers via URL signée, jamais via chemin direct.
  • Pour les uploads transmis à un LLM : un PDF/une image peut contenir des instructions d'injection (texte caché, prompt embarqué). Traite la sortie du modèle comme non fiable — ne l'exécute pas, ne la passe pas à un outil sensible sans garde-fou.

Observabilité

  • Logge : taille, type sniffé, durée d'upload, request_id. Pour les appels Claude, logge usage.input_tokens / output_tokens (et cache_read_input_tokens si tu caches) — c'est ta facture.
  • Émets une métrique « bytes uploadés » et « uploads rejetés par raison » (413/415) pour repérer abus et erreurs de config proxy.

Le tradeoff senior : le réflexe junior est « je lis le fichier, je le traite, je réponds ». Le réflexe senior est « je stream le fichier, je valide tôt, je borne, et je délègue le lourd hors de la requête ». L'upload synchrone qui bloque jusqu'à la fin du traitement (antivirus + S3 + appel LLM dans le handler) tient en démo et explose sous charge. Réponds 202 Accepted avec un id de job, fais le lourd en arrière-plan, et expose un endpoint de statut.

🏋️ Exercices

1. Upload streamé borné (implémenter)

Objectif : écris un endpoint POST /files qui accepte n'importe quel fichier, le streame sur disque par chunks de 1 Mo, refuse au-delà de 10 Mo avec une 413, et renvoie {filename, bytes, sha256}. Aucune lecture intégrale en RAM autorisée. Indice/Solution : while chunk := await file.read(1024*1024), accumule total, lève HTTPException(413) et dest.unlink(missing_ok=True) dès dépassement, hashlib.sha256 mis à jour à chaque chunk, écriture via aiofiles.

2. Validation par magic bytes (production-grade)

Objectif : n'accepte que PNG, JPEG, PDF. Le test d'acceptation : renomme un .exe en cv.pdf avec Content-Type: application/pdf — ton endpoint doit le rejeter en 415. Indice/Solution : head = await file.read(8) avant tout, table de signatures {b"%PDF-": "application/pdf", ...}, head.startswith(sig). Ne fais jamais confiance à file.content_type. N'oublie pas d'écrire head en premier dans le fichier de sortie (tu l'as déjà consommé du flux).

3. Champs texte + fichier dans le même formulaire (implémenter)

Objectif : un endpoint qui reçoit file: UploadFile et metadata (un JSON {owner, tags[]}). Reproduis le piège : pourquoi un modèle Pydantic en Body(...) échoue-t-il ici, et comment le contourner proprement ? Indice/Solution : dès qu'il y a un fichier, le corps est multipart, donc pas de JSON natif. Reçois metadata: str = Form(...) puis Meta.model_validate_json(metadata) (Pydantic v2). Documente que le client doit envoyer le JSON dans un champ de formulaire, pas en body.

4. Image → Claude, avec gestion d'erreur upstream (IA, production-grade)

Objectif : POST /vision reçoit une image, l'envoie en base64 à claude-opus-4-8, renvoie la description. Le serveur ne doit jamais renvoyer une 500 opaque sur une 429/529 d'Anthropic. Indice/Solution : AsyncAnthropic en instance unique, thinking={"type": "adaptive"}, max_tokens=1024. Enveloppe l'appel dans try/except anthropic.APIStatusError → mappe en 502 Bad Gateway avec exc.type. Le SDK retry déjà les 429/5xx (backoff exponentiel, max_retries=2) ; ne ré-implémente pas le retry à la main.

5. Files API + chat multi-tours (IA, implémenter)

Objectif : POST /docs uploade un PDF vers la Files API et renvoie un file_id ; POST /docs/{file_id}/ask répond à une question en référençant le doc par ID sur plusieurs tours, sans jamais re-transmettre les octets. Indice/Solution : await claude.beta.files.upload(file=(name, bytes, content_type), betas=["files-api-2025-04-14"]). Côté question : bloc {"type": "document", "source": {"type": "file", "file_id": ...}}. Bonus : place le bloc document en tête de prompt avec cache_control: {"type": "ephemeral"} et vérifie usage.cache_read_input_tokens > 0 au 2ᵉ tour.

6. Casser puis réparer : OOM sous charge (break-then-fix)

Objectif : pars de la version naïve (await file.read() sans argument + write_bytes sync). Lance 30 uploads concurrents de 150 Mo (script httpx), observe la RAM exploser et l'event loop se figer. Puis répare. Indice/Solution : le diagnostic — lecture intégrale en RAM × concurrence + écriture synchrone bloquante. Le fix — streaming par chunks, aiofiles, et un asyncio.Semaphore(N) pour borner la concurrence du travail lourd. Mesure la RAM avant/après pour prouver le gain.

🎤 En entretien

Q : Pourquoi ne pas faire await file.read() puis traiter le résultat ? Parce que ça charge tout le fichier en RAM ; sous concurrence ça mène à l'OOM. On streame par chunks (await file.read(size)) pour borner la mémoire à la taille d'un chunk, indépendamment de la taille du fichier.

Q : Comment valider qu'un upload est bien un PDF ? Par les magic bytes du contenu (%PDF-), jamais par l'extension ni le Content-Type, tous deux contrôlés par le client et donc falsifiables.

Q : Pourquoi aiofiles plutôt que open() dans un endpoint async def ?open().write() est une I/O synchrone bloquante : elle gèle l'event loop et donc toutes les autres requêtes async pendant l'écriture. aiofiles (ou anyio.to_thread) garde la boucle libre.

Q : Un client uploade un PDF de 80 pages qu'on va interroger 20 fois via Claude. Inline base64 ou Files API ? Files API : on uploade une fois, on obtient un file_id, et on référence le doc par ID sur les 20 tours sans re-transmettre les octets. On combine avec le prompt caching (bloc document en tête, cache_control) pour que les tours suivants lisent le doc depuis le cache à ~0,1× du prix d'entrée.

Bibliothèque tech perso — Achref