File uploads dans NestJS
TL;DR — NestJS s'appuie sur Multer (Express) ou
@fastify/multipart(Fastify) viaMulterModule. Pour les fichiers en sortie,StreamableFilecouvre downloads, range requests et streaming. Le vrai défi senior n'est pas l'upload basique : c'est la validation par magic bytes, la prévention DoS, le scan antivirus, le streaming vers S3 sans buffer mémoire, et les uploads multipart pour les gros fichiers.
🧠 Mental model
Un upload, c'est un flux d'octets qui traverse 5 couches :
client ---multipart/form-data---> reverse proxy ---> Node.js HTTP ---> Multer/Busboy ---> handler
|
v
limites nginx/ALB
|
v
limites Multer (size/count)
|
v
validation (mime, magic, antivirus)
|
v
storage (disk / S3 / MinIO)
|
v
indexation DB (URL, owner, size, hash)Analogie : un upload est un colis postal. Le client emballe (multipart), le transporteur livre (HTTP), le quai de réception vérifie (validation), le magasin range (storage), le registre note (DB). Chaque étape peut refuser le colis. Si tu ne valides qu'à la fin, tu as déjà payé le coût réseau et disque.
Principes :
- Stream by default, buffer seulement si tu n'as pas le choix (transformations CPU-bound qui exigent l'intégralité).
- Valide tôt et plusieurs fois : taille via nginx, mimetype via Multer, magic bytes via
file-type, contenu via antivirus. - Sépare upload et traitement. L'API stocke le fichier brut et délègue conversion/scan à une queue.
🛠️ Code minimal
Installation :
npm i @nestjs/platform-express multer
npm i -D @types/multer
npm i @aws-sdk/client-s3 @aws-sdk/lib-storage # pour S3/MinIO
npm i file-type # detection magic bytesMulterModule configuré globalement avec limites strictes :
// src/uploads/multer.config.ts
import { Module } from '@nestjs/common';
import { MulterModule } from '@nestjs/platform-express';
import { memoryStorage } from 'multer';
@Module({
imports: [
MulterModule.register({
storage: memoryStorage(), // ou diskStorage / multerS3
limits: {
fileSize: 5 * 1024 * 1024, // 5 MiB
files: 5,
fields: 20,
fieldNameSize: 100,
fieldSize: 1024 * 1024,
},
fileFilter: (_req, file, cb) => {
const ok = /^(image\/(png|jpeg|webp)|application\/pdf)$/.test(file.mimetype);
cb(ok ? null : new Error('Unsupported mime type'), ok);
},
}),
],
exports: [MulterModule],
})
export class UploadModule {}Endpoint upload single :
// src/avatars/avatar.controller.ts
import {
Controller,
Post,
UploadedFile,
UseInterceptors,
ParseFilePipe,
MaxFileSizeValidator,
FileTypeValidator,
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import type { Express } from 'express';
@Controller('avatars')
export class AvatarController {
constructor(private readonly service: AvatarService) {}
@Post()
@UseInterceptors(FileInterceptor('file'))
async upload(
@UploadedFile(
new ParseFilePipe({
validators: [
new MaxFileSizeValidator({ maxSize: 2 * 1024 * 1024 }),
new FileTypeValidator({ fileType: /image\/(png|jpe?g|webp)/ }),
],
}),
)
file: Express.Multer.File,
) {
return this.service.persist(file);
}
}Endpoint upload multiple :
import { FilesInterceptor } from '@nestjs/platform-express';
@Post('batch')
@UseInterceptors(FilesInterceptor('files', 5))
uploadMany(@UploadedFiles() files: Express.Multer.File[]) {
return this.service.persistMany(files);
}Mixed (champs nommés différents) :
import { FileFieldsInterceptor } from '@nestjs/platform-express';
@Post('product')
@UseInterceptors(FileFieldsInterceptor([
{ name: 'thumbnail', maxCount: 1 },
{ name: 'gallery', maxCount: 10 },
]))
uploadProductAssets(@UploadedFiles() files: { thumbnail?: Express.Multer.File[]; gallery?: Express.Multer.File[] }) {
// ...
}🎯 Patterns courants
1. Validation par magic bytes (et pas seulement mimetype)
Le Content-Type envoyé par le client est purement déclaratif. Un attaquant peut envoyer un binaire .exe en image/png. La bibliothèque file-type lit les premiers octets et déduit le vrai type.
import { fileTypeFromBuffer } from 'file-type';
@Injectable()
export class MagicBytesValidator {
async assertImage(buffer: Buffer): Promise<string> {
const detected = await fileTypeFromBuffer(buffer);
if (!detected) throw new BadRequestException('Unknown file type');
if (!['image/png', 'image/jpeg', 'image/webp'].includes(detected.mime)) {
throw new BadRequestException(`Type ${detected.mime} not allowed`);
}
return detected.ext;
}
}À brancher dans le pipe ou le service après ParseFilePipe. Note : file-type ne couvre que les binaires avec une signature. Pour CSV/JSON/SVG, il faut parser et valider sémantiquement.
2. Streaming direct vers S3 (sans buffer mémoire)
@aws-sdk/lib-storage (Upload helper) gère multipart S3 sous le capot. On combine avec multer en memoryStorage ou un parsing en streaming via Busboy.
// src/uploads/s3.service.ts
import { Injectable } from '@nestjs/common';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { Readable } from 'node:stream';
@Injectable()
export class S3UploadService {
private readonly s3 = new S3Client({
region: process.env.AWS_REGION!,
endpoint: process.env.S3_ENDPOINT, // utile pour MinIO
forcePathStyle: !!process.env.S3_ENDPOINT,
});
async streamUpload(key: string, body: Readable, contentType: string) {
const upload = new Upload({
client: this.s3,
params: {
Bucket: process.env.S3_BUCKET!,
Key: key,
Body: body,
ContentType: contentType,
},
queueSize: 4,
partSize: 5 * 1024 * 1024,
leavePartsOnError: false,
});
upload.on('httpUploadProgress', (p) => {
// hooks pour métriques / progression websocket
});
return upload.done();
}
}3. Parsing streaming custom avec Busboy
Pour ne jamais charger en mémoire, on by-passe Multer et on consomme directement le multipart :
// src/uploads/raw-upload.controller.ts
import { Controller, Post, Req } from '@nestjs/common';
import type { Request } from 'express';
import * as Busboy from 'busboy';
@Controller('raw-upload')
export class RawUploadController {
constructor(private readonly s3: S3UploadService) {}
@Post()
async upload(@Req() req: Request) {
const bb = Busboy({ headers: req.headers, limits: { fileSize: 500 * 1024 * 1024 } });
const uploads: Promise<unknown>[] = [];
return new Promise((resolve, reject) => {
bb.on('file', (fieldname, stream, info) => {
const key = `raw/${Date.now()}-${info.filename}`;
uploads.push(this.s3.streamUpload(key, stream, info.mimeType));
});
bb.on('finish', async () => {
try {
const results = await Promise.all(uploads);
resolve({ uploaded: results.length });
} catch (e) { reject(e); }
});
bb.on('error', reject);
req.pipe(bb);
});
}
}4. Signed URLs (presigned PUT/POST)
Quand le fichier dépasse 50-100 MiB ou que tu veux décharger ton API, l'idéal est que le client uploade directement vers S3/MinIO via une URL signée.
import { PutObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
async createPresignedUploadUrl(key: string, contentType: string) {
const command = new PutObjectCommand({
Bucket: process.env.S3_BUCKET!,
Key: key,
ContentType: contentType,
});
const url = await getSignedUrl(this.s3, command, { expiresIn: 60 * 5 });
return { url, key };
}Le serveur ne reçoit jamais le binaire ; seulement la confirmation via S3 Events ou un endpoint POST /uploads/confirm.
Durcir une presigned URL (sinon tu signes un trou de sécurité) :
async createPresignedUploadUrl(key: string, contentType: string, maxBytes: number) {
const command = new PutObjectCommand({
Bucket: process.env.S3_BUCKET!,
Key: key,
ContentType: contentType,
// borne la taille côté S3 : le client ne peut pas dépasser maxBytes
ContentLength: maxBytes,
// chiffrement at-rest imposé par la policy, pas optionnel
ServerSideEncryption: 'aws:kms',
SSEKMSKeyId: process.env.KMS_KEY_ID,
});
// TTL court : une URL signée 5 min ne traîne pas dans les logs/Slack 3 jours
const url = await getSignedUrl(this.s3, command, { expiresIn: 60 * 5 });
return { url, key };
}Pièges presigned que tu dois connaître en tant que senior :
| Risque | Mitigation |
|---|---|
Le client peut uploader n'importe quel Content-Type | Signer ContentType exact + le re-valider au confirm via HeadObject |
| Le client peut uploader 10 Go | ContentLength signé + bucket policy s3:content-length-range (presigned POST) |
Confirmation jamais envoyée → orphelins en PENDING | Lifecycle rule S3 (expire les non-confirmés) + GC job sur la table |
| TOCTOU : le contenu signé ≠ contenu uploadé | Le confirm lit ETag/taille via HeadObject avant de marquer READY |
| Presigned POST vs PUT | PUT simple pour un blob ; POST (policy form) si tu veux contraindre content-length-range et préfixe de clé |
5. StreamableFile pour les downloads
Côté sortie, Nest fournit StreamableFile qui prend un Readable ou un Buffer et set les headers proprement.
import { Controller, Get, Param, Res, StreamableFile, Header } from '@nestjs/common';
import { createReadStream } from 'node:fs';
import { join } from 'node:path';
@Controller('files')
export class DownloadController {
@Get(':id')
@Header('Content-Type', 'application/pdf')
@Header('Content-Disposition', 'attachment; filename="report.pdf"')
download(@Param('id') id: string): StreamableFile {
const stream = createReadStream(join('/var/data', `${id}.pdf`));
return new StreamableFile(stream);
}
}6. Range requests (video, audio, gros PDF)
Pour supporter le seeking dans une vidéo HTML5, tu dois honorer le header Range.
@Get('video/:id')
async streamVideo(@Param('id') id: string, @Req() req: Request, @Res({ passthrough: true }) res: Response) {
const { size } = await stat(`/var/data/${id}.mp4`);
const range = req.headers.range;
if (!range) {
res.set({ 'Content-Length': size, 'Content-Type': 'video/mp4' });
return new StreamableFile(createReadStream(`/var/data/${id}.mp4`));
}
const [startStr, endStr] = range.replace(/bytes=/, '').split('-');
const start = Number(startStr);
const end = endStr ? Number(endStr) : Math.min(start + 1024 * 1024, size - 1);
res.status(206).set({
'Content-Range': `bytes ${start}-${end}/${size}`,
'Accept-Ranges': 'bytes',
'Content-Length': end - start + 1,
'Content-Type': 'video/mp4',
});
return new StreamableFile(createReadStream(`/var/data/${id}.mp4`, { start, end }));
}🔄 Versions — Nest 7 / 8 / 9 / 10 / 11
- Nest 7 :
FileInterceptoretMulterModulevia@nestjs/platform-express(déjà présent).multerv1.x. Pas deParseFilePipe; validation manuelle. - Nest 8 : Introduction de
StreamableFile(@nestjs/common) qui simplifie l'envoi de streams sans bricoler avec@Res().multertoujours en v1. - Nest 9 : Apparition de
ParseFilePipeavecFileValidator,MaxFileSizeValidator,FileTypeValidator. Avant Nest 9, il fallait écrire ses propres pipes. Attention :FileTypeValidatorv9 se base uniquement surfile.mimetype(déclaratif, peu sûr). - Nest 10 : Améliorations de
ParseFilePipe(optionfileIsRequired,errorHttpStatusCode). Support officiel de Fastify avec@fastify/multipartcôté@nestjs/platform-fastify.multerv1 reste la lib sous-jacente côté Express. - Nest 11 : Pas de breaking côté upload.
FileTypeValidatorreste mimetype-based : pour du contrôle sérieux, continuer à utiliserfile-typetoi-même.@aws-sdk/client-s3v3 est le standard (le v2 SDK n'est plus maintenu). Côté Fastify,@fastify/multipartv8 introduitattachFieldsToBody: 'keyValues'.
Note libs :
multerv1.x : stable, mais vulnérabilités historiques en CVE. Verrouille la version.busboyv1 : maintenu, lower-level que Multer, parfait pour streaming.@aws-sdk/client-s3v3 +@aws-sdk/lib-storagev3 : standard.file-typev19+ : ESM-only depuis v17, parfois pénible. Sur CommonJS, considèrefile-type-cjsou un dynamic import.
⚠️ Pitfalls
- Faire confiance au mimetype envoyé par le client.
FileTypeValidatorde Nest ne fait que comparer la string. Toujours valider via magic bytes (file-type). memoryStoragesur des gros fichiers. Tu charges 500 MiB en RAM, et 10 uploads concurrents écroulent ton pod. UsediskStorageou streaming direct vers S3.- Doublon de limites. Tu set
limits.fileSize: 5MBdans Multer mais nginx accepte 100MB. L'utilisateur upload 50MB, attend 30s, et reçoit413à la fin. Aligne nginx/ALB/Multer. - Ne pas nettoyer après une erreur. En
diskStorage, le fichier est écrit sur disque même si la validation rejette ensuite. Use unfinallyqui supprime le fichier orphelin ou un job de garbage collection. - Path traversal sur le nom de fichier. Si tu utilises
file.originalnamedirectement comme key S3 ou nom disque, un nom../../etc/passwdpeut faire des dégâts. Toujours générer un UUID et stocker le nom original en DB. - Polyglot files. Un fichier valide à la fois en GIF et en HTML peut servir un XSS quand servi par ton CDN. Forcer
Content-Disposition: attachmentet un mimetype sûr (application/octet-stream). - Antivirus non bloquant qui scanne après le storage. Tu sers le malware au user suivant pendant 30s entre l'upload et le scan. Soit tu mets en quarantaine puis publish après scan OK, soit tu scannes en streaming via ClamAV en parallèle de l'écriture.
- Bloquer l'event loop avec sharp/imagemin sur un upload synchrone. Toute transformation lourde doit aller en queue (BullMQ).
- SVG accepté comme image. Un SVG peut contenir du JS embarqué. Soit tu interdis, soit tu sanitizes avec
dompurifycôté serveur avant stockage. - Range requests sans validation de borne. Un client envoie
Range: bytes=0-99999999999. Si tu ne clampes pasendàsize - 1, tu réponds 416 ou pire un payload corrompu.
🧪 Testing
Test d'un endpoint upload avec supertest :
// avatar.e2e-spec.ts
import { Test } from '@nestjs/testing';
import { INestApplication } from '@nestjs/common';
import * as request from 'supertest';
import { AppModule } from '../src/app.module';
import { readFileSync } from 'node:fs';
import { join } from 'node:path';
describe('AvatarController (e2e)', () => {
let app: INestApplication;
beforeAll(async () => {
const moduleRef = await Test.createTestingModule({ imports: [AppModule] }).compile();
app = moduleRef.createNestApplication();
await app.init();
});
afterAll(() => app.close());
it('uploads a valid png', async () => {
const png = readFileSync(join(__dirname, 'fixtures/1x1.png'));
const res = await request(app.getHttpServer())
.post('/avatars')
.attach('file', png, { filename: 'a.png', contentType: 'image/png' })
.expect(201);
expect(res.body.id).toBeDefined();
});
it('rejects exe disguised as png', async () => {
const fakePng = Buffer.from('MZ\x90\x00fakeexe');
await request(app.getHttpServer())
.post('/avatars')
.attach('file', fakePng, { filename: 'a.png', contentType: 'image/png' })
.expect(400);
});
it('rejects oversized payload', async () => {
const big = Buffer.alloc(10 * 1024 * 1024); // 10 MiB
await request(app.getHttpServer())
.post('/avatars')
.attach('file', big, { filename: 'big.png', contentType: 'image/png' })
.expect(413);
});
});Unit test avec Express.Multer.File mocké :
import { Readable } from 'node:stream';
const fakeFile = (overrides: Partial<Express.Multer.File> = {}): Express.Multer.File => ({
fieldname: 'file',
originalname: 'test.png',
encoding: '7bit',
mimetype: 'image/png',
size: 123,
buffer: Buffer.from([0x89, 0x50, 0x4e, 0x47]),
destination: '',
filename: '',
path: '',
stream: Readable.from(Buffer.from([])),
...overrides,
});
it('rejects non-image', async () => {
await expect(service.persist(fakeFile({ mimetype: 'text/plain' })))
.rejects.toThrow(BadRequestException);
});Virus scanning avec ClamAV
ClamAV expose un daemon clamd qui scanne via TCP/UNIX socket. La lib clamscan (npm) wrap proprement.
npm i clamscan// src/uploads/antivirus.service.ts
import { Injectable, OnModuleInit, BadRequestException } from '@nestjs/common';
import NodeClam from 'clamscan';
import { Readable } from 'node:stream';
@Injectable()
export class AntivirusService implements OnModuleInit {
private clam!: NodeClam;
async onModuleInit() {
this.clam = await new NodeClam().init({
clamdscan: {
host: process.env.CLAMD_HOST ?? '127.0.0.1',
port: Number(process.env.CLAMD_PORT ?? 3310),
timeout: 60_000,
},
preference: 'clamdscan',
});
}
async assertClean(stream: Readable) {
const { isInfected, viruses } = await this.clam.scanStream(stream);
if (isInfected) {
throw new BadRequestException(`Infected: ${viruses.join(',')}`);
}
}
}Pattern recommandé : upload -> quarantaine S3 (bucket privé) -> scan async via job -> si clean, déplacer vers bucket public, sinon supprimer et notifier le user.
@Processor('antivirus')
export class AntivirusProcessor extends WorkerHost {
constructor(private av: AntivirusService, private s3: S3Service) { super(); }
async process(job: Job<{ key: string }>) {
const stream = await this.s3.getObjectStream('quarantine', job.data.key);
await this.av.assertClean(stream);
await this.s3.copy('quarantine', job.data.key, 'public', job.data.key);
await this.s3.delete('quarantine', job.data.key);
}
}Multipart S3 pour gros fichiers
Pour un upload > 100 MiB, le SDK v3 gère ça via Upload (déjà montré). Côté API tierce avec presigned, il faut signer chaque part. Le flow client est :
POST /uploads/init-> serveur appelleCreateMultipartUploadCommandet renvoie unuploadId.- Client demande N URLs presigned (
UploadPartCommand). - Client uploade les N parts en parallèle directement vers S3.
- Client envoie la liste
{PartNumber, ETag}au serveur. - Serveur appelle
CompleteMultipartUploadCommand.
async initMultipart(key: string, contentType: string) {
const { UploadId } = await this.s3.send(new CreateMultipartUploadCommand({
Bucket: 'public', Key: key, ContentType: contentType,
}));
return { uploadId: UploadId };
}
async signPart(key: string, uploadId: string, partNumber: number) {
const cmd = new UploadPartCommand({
Bucket: 'public', Key: key, UploadId: uploadId, PartNumber: partNumber,
});
return getSignedUrl(this.s3, cmd, { expiresIn: 60 * 60 });
}
async completeMultipart(key: string, uploadId: string, parts: { PartNumber: number; ETag: string }[]) {
return this.s3.send(new CompleteMultipartUploadCommand({
Bucket: 'public', Key: key, UploadId: uploadId,
MultipartUpload: { Parts: parts.sort((a, b) => a.PartNumber - b.PartNumber) },
}));
}Pipelines antivirus + transformation
Le bon design est asynchrone, par étapes, idempotent :
upload -> quarantine bucket (sync, fast)
|
v
[job] scan ClamAV
|
+----+----+
OK KO
| |
v v
[job] generate delete + notify user
thumbnails
|
v
[job] extract metadata (sharp / ffmpeg / pdfjs)
|
v
DB update + webhookChaque étape est un job BullMQ, retryable, observable. L'API HTTP rend la main en quelques millisecondes après l'écriture en quarantaine.
🏭 Production : idempotence, observabilité, coût
Idempotence du pipeline
Un upload qui déclenche une chaîne de jobs DOIT être idempotent : BullMQ peut rejouer un job (retry après crash worker, redéploiement, ack perdu). Si process() n'est pas idempotent, tu génères deux fois les thumbnails, tu factures deux fois la transformation, tu envoies deux webhooks.
Règles :
- Clé d'idempotence = hash du contenu (SHA-256), pas un timestamp ni un UUID généré à chaque retry. Le
jobIdBullMQ doit dériver du hash :queue.add('scan', data, { jobId: sha256 }). BullMQ dédupe les jobs de mêmejobId. - Opérations naturellement idempotentes :
s3.copy(PUT même clé),repo.markReady(UPDATE conditionnelWHERE state = 'PENDING_SCAN'). Évite lesINSERTnon gardés. - Effets de bord non idempotents (webhook, email, facturation) : guarder avec une table
processed_events(event_id PRIMARY KEY)ou unSETNXRedis avant d'émettre.
async process(job: Job<{ contractId: string; key: string; sha256: string }>) {
// garde de réentrance : si déjà traité, no-op
const claimed = await this.redis.set(`av:done:${job.data.sha256}`, '1', 'NX', 'EX', 86400);
if (!claimed) return; // job rejoué, on sort proprement
// ... scan + copy + mark
}Observabilité
Ce que tu instrumentes sur un système d'upload (sinon tu débogues à l'aveugle en prod) :
| Signal | Métrique (Prometheus / OTel) | Pourquoi |
|---|---|---|
| Volume | uploads_total{result=accepted|rejected_mime|rejected_size|infected} | Détecter une attaque / un client buggé |
| Latence storage | histogram upload_s3_duration_seconds | Repérer la dégradation réseau S3 |
| Taille | histogram upload_bytes | Capacity planning, tuning des limites |
| File de scan | gauge bullmq_queue_depth{queue=av} | Backpressure : la quarantaine se remplit plus vite qu'elle ne se vide |
| Échecs scan | counter av_scan_failures_total | ClamAV down ≠ fichier sain — ne JAMAIS marquer clean sur erreur |
| Coût | counter s3_egress_bytes, transform_jobs_total | Le coût d'un upload est dominé par l'egress et la transformation |
Trace distribuée : propage un traceId du controller HTTP jusqu'au job BullMQ (mets-le dans job.data) pour relier POST /upload → scan → thumbnail → webhook dans une seule trace OTel. Sans ça, un upload bloqué en PENDING est impossible à diagnostiquer.
Coût — le modèle mental
Le coût d'un upload n'est presque jamais le stockage (S3 ~0,023 $/Go/mois). Il est dominé par :
- Egress quand tu sers les fichiers (S3 → internet ~0,09 $/Go). D'où le CDN obligatoire (CloudFront/Cloudflare) qui cache et facture l'egress moins cher.
- Compute de transformation (sharp, ffmpeg, LLM extraction). 50 000 images/jour × 5 variantes = 250 000 jobs CPU. C'est là que part la facture → batcher, cacher par hash, déléguer (Cloudinary/Lambda).
- Requests S3 : 1000 petits fichiers = 1000 PUT. Préférer peu de gros objets quand possible.
Règle staff : dédupe par hash avant de payer quoi que ce soit. Si findByHash matche, tu sautes le storage ET la transformation ET le scan — c'est le levier de coût n°1 sur un système multi-tenant où les users ré-uploadent les mêmes PDF.
🎬 Cas d'usage concrets
Cabinet d'avocats — pièces de dossier juridique sur S3 privé
Qui : éditeur de plateforme de gestion de dossiers servant 200 cabinets. Chaque dossier accumule 50 à 500 pièces (PDF, scans courrier, photos preuves). Confidentialité absolue, exigence de traçabilité.
Problème : les associés veulent un drag & drop simple mais l'équipe sécurité impose chiffrement at-rest, audit log, bucket privé sans accès public. Les fichiers font jusqu'à 200 Mo (scans haute résolution).
@Controller('cases/:caseId/documents')
export class CaseDocumentController {
constructor(private readonly docs: CaseDocumentService) {}
@Post('presign')
async presign(@Param('caseId') caseId: string, @Body() dto: PresignDto, @CurrentUser() user: User) {
await this.docs.assertCanWrite(caseId, user.id);
const key = `cases/${caseId}/${randomUUID()}-${sanitize(dto.filename)}`;
const url = await this.docs.createPresignedPut(key, dto.contentType, dto.size);
await this.docs.recordPending(caseId, key, user.id);
return { url, key };
}
@Post('confirm')
async confirm(@Body() dto: ConfirmDto, @CurrentUser() user: User) {
return this.docs.markUploaded(dto.key, user.id);
}
}Gains : zéro octet transite par l'API (presigned PUT direct S3 avec SSE-KMS), aucun risque de saturation pod sur des scans 200 Mo, audit log natif via CloudTrail. Les avocats uploadent 4x plus vite, le service tient 500 uploads concurrents sans broncher.
Banque retail — KYC pièces justificatives avec scan antivirus
Qui : néobanque française, 800 nouveaux comptes/jour. Chaque ouverture exige 3 documents (CNI recto-verso, justif domicile, RIB) avec contrôle ACPR de la qualité et de l'authenticité.
Problème : impossible de faire confiance aux fichiers clients (PDF malveillants, EXE déguisés). Aucun fichier infecté ne doit atteindre les analystes KYC. Latence acceptable de quelques secondes.
@Controller('kyc/documents')
export class KycDocumentController {
@Post()
@UseInterceptors(FileInterceptor('file', { limits: { fileSize: 10 * 1024 * 1024 } }))
async upload(@UploadedFile() file: Express.Multer.File, @CurrentUser() user: User) {
const detected = await fileTypeFromBuffer(file.buffer);
if (!detected || !['application/pdf', 'image/jpeg', 'image/png'].includes(detected.mime)) {
throw new BadRequestException('Type non autorisé');
}
const key = `kyc/quarantine/${user.id}/${randomUUID()}.${detected.ext}`;
await this.s3.put('quarantine', key, file.buffer, detected.mime);
await this.queue.add('av-scan', { key, userId: user.id, kind: 'kyc' });
return { status: 'scanning', key };
}
}
@Processor('av-scan')
export class KycAvProcessor extends WorkerHost {
async process(job: Job<{ key: string; userId: string }>) {
const stream = await this.s3.getStream('quarantine', job.data.key);
const { isInfected } = await this.clam.scanStream(stream);
if (isInfected) {
await this.s3.delete('quarantine', job.data.key);
return this.notify.alertCompliance(job.data.userId, job.data.key);
}
await this.s3.copy('quarantine', job.data.key, 'kyc-clean', job.data.key);
await this.s3.delete('quarantine', job.data.key);
await this.kyc.markReady(job.data.userId, job.data.key);
}
}Gains : 14 PDF malveillants bloqués le premier mois, zéro fichier infecté côté analyste. Le pattern quarantaine + queue libère l'API en 200 ms même si ClamAV met 5 s à scanner.
Plateforme immobilière — photos d'annonces vers Cloudinary
Qui : portail d'annonces immobilières, 3 000 agents inscrits, 50 000 nouvelles photos par jour. Chaque annonce contient 8 à 30 photos haute résolution.
Problème : il faut générer 5 variantes (miniature, mobile, desktop, retina, watermark) avec optimisation WebP/AVIF. Faire ça sur l'API Nest fait exploser la RAM et bloque l'event loop.
@Controller('listings/:id/photos')
export class ListingPhotoController {
@Post()
@UseInterceptors(FileInterceptor('photo', { limits: { fileSize: 15 * 1024 * 1024 } }))
async upload(@Param('id') listingId: string, @UploadedFile() file: Express.Multer.File) {
const detected = await fileTypeFromBuffer(file.buffer);
if (!detected?.mime.startsWith('image/')) {
throw new BadRequestException('Image only');
}
// Le SDK Cloudinary expose upload_stream(options, cb) qui rend un Writable.
// On le promisifie et on y pipe le buffer via Readable.from.
const result = await new Promise<UploadApiResponse>((resolve, reject) => {
const stream = this.cloudinary.uploader.upload_stream(
{
folder: `listings/${listingId}`,
transformation: [{ quality: 'auto:good', fetch_format: 'auto' }],
eager: [{ width: 320 }, { width: 800 }, { width: 1600 }, { width: 2400 }],
eager_async: true,
notification_url: `${process.env.API_URL}/webhooks/cloudinary`,
},
(err, res) => (err || !res ? reject(err) : resolve(res)),
);
Readable.from(file.buffer).pipe(stream);
});
return this.photos.attach(listingId, result.public_id, result.secure_url);
}
}Gains : transformation déléguée à Cloudinary, l'API Nest ne fait que router le binaire. Mémoire pod stable même à 200 uploads concurrents, conversion AVIF gratuite, CDN global inclus. Coût Cloudinary : 380 €/mois pour 1,5M variantes générées.
🛠️ Exemple end-to-end
Contexte : plateforme SaaS de signature électronique pour PME. Le client uploade un PDF de contrat, on vérifie le type, on scanne, on extrait les métadonnées, on stocke en bucket privé chiffré, puis on génère une URL signée pour la signature mobile.
// src/contracts/contract-upload.controller.ts
import {
BadRequestException,
Controller,
Param,
Post,
UploadedFile,
UseInterceptors,
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import { fileTypeFromBuffer } from 'file-type';
import { PDFDocument } from 'pdf-lib';
import { randomUUID, createHash } from 'node:crypto';
@Controller('contracts/:contractId/upload')
export class ContractUploadController {
constructor(
private readonly s3: S3UploadService,
private readonly queue: AntivirusQueue,
private readonly repo: ContractRepository,
private readonly audit: AuditLogService,
) {}
@Post()
@UseInterceptors(FileInterceptor('contract', {
limits: { fileSize: 25 * 1024 * 1024, files: 1 },
}))
async upload(
@Param('contractId') contractId: string,
@UploadedFile() file: Express.Multer.File,
@CurrentUser() user: User,
) {
if (!file) throw new BadRequestException('Missing contract file');
// 1. Validate magic bytes
const detected = await fileTypeFromBuffer(file.buffer);
if (detected?.mime !== 'application/pdf') {
throw new BadRequestException('Only PDF accepted');
}
// 2. Validate PDF structure (not just signature)
let pageCount: number;
try {
const pdf = await PDFDocument.load(file.buffer, { ignoreEncryption: false });
pageCount = pdf.getPageCount();
if (pageCount > 200) throw new Error('Too many pages');
} catch (e) {
throw new BadRequestException(`Invalid PDF: ${(e as Error).message}`);
}
// 3. Hash for deduplication + integrity
const sha256 = createHash('sha256').update(file.buffer).digest('hex');
const existing = await this.repo.findByHash(sha256);
if (existing) {
await this.audit.log('contract.dedup', { contractId, sha256, userId: user.id });
return { reused: true, key: existing.s3Key };
}
// 4. Stream to quarantine bucket (SSE-KMS)
const key = `quarantine/${contractId}/${randomUUID()}.pdf`;
await this.s3.streamUpload({
bucket: process.env.S3_BUCKET_QUARANTINE!,
key,
body: file.buffer,
contentType: 'application/pdf',
serverSideEncryption: 'aws:kms',
kmsKeyId: process.env.KMS_KEY_ID,
metadata: {
contractId,
uploaderId: user.id,
sha256,
pageCount: String(pageCount),
},
});
// 5. Persist pending state
await this.repo.createPending({
id: contractId,
s3Key: key,
sha256,
pageCount,
sizeBytes: file.size,
uploaderId: user.id,
state: 'PENDING_SCAN',
});
// 6. Trigger async pipeline (AV + thumbnail + metadata)
await this.queue.enqueueScan({ contractId, key, sha256 });
// 7. Audit
await this.audit.log('contract.uploaded', {
contractId, sha256, sizeBytes: file.size, pageCount, userId: user.id,
});
return {
contractId,
state: 'PENDING_SCAN',
pageCount,
sha256,
pollUrl: `/contracts/${contractId}/status`,
};
}
}
// src/contracts/antivirus.processor.ts
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('contract-av')
export class ContractAvProcessor extends WorkerHost {
constructor(
private readonly s3: S3UploadService,
private readonly clam: AntivirusService,
private readonly repo: ContractRepository,
private readonly notify: NotificationService,
) { super(); }
async process(job: Job<{ contractId: string; key: string; sha256: string }>) {
const { contractId, key } = job.data;
const stream = await this.s3.getStream(process.env.S3_BUCKET_QUARANTINE!, key);
const { isInfected, viruses } = await this.clam.scanStream(stream);
if (isInfected) {
await this.s3.delete(process.env.S3_BUCKET_QUARANTINE!, key);
await this.repo.markRejected(contractId, viruses.join(','));
await this.notify.contractRejected(contractId, viruses);
return;
}
const cleanKey = key.replace('quarantine/', 'contracts/');
await this.s3.copy(
process.env.S3_BUCKET_QUARANTINE!, key,
process.env.S3_BUCKET_CLEAN!, cleanKey,
);
await this.s3.delete(process.env.S3_BUCKET_QUARANTINE!, key);
await this.repo.markReady(contractId, cleanKey);
await this.notify.contractReadyToSign(contractId);
}
}Validation magic-bytes + structure PDF, déduplication par SHA-256, bucket quarantaine séparé, scan async via BullMQ, copie atomique vers bucket signé, chiffrement KMS, audit log complet. L'utilisateur reçoit une réponse en moins de 600 ms même quand ClamAV met 5 secondes à scanner.
🤖 Servir un agent IA qui consomme les fichiers uploadés
Le cas réel de cette stack : un user uploade un PDF/une image, et tu veux qu'un LLM (Claude) l'analyse — extraire les clauses d'un contrat, résumer un courrier, classifier un justificatif KYC — puis streamer le résultat token par token vers le front Angular. Voici comment un staff engineer câble ça côté NestJS.
Architecture : ne JAMAIS appeler le LLM dans le handler d'upload
L'upload synchrone et l'inférence LLM ont des SLA opposés : l'upload doit rendre la main en < 1 s, l'inférence peut prendre 10–60 s. On les découple. L'upload stocke + enfile un job ; l'analyse streame via une connexion SSE séparée.
POST /documents (upload) ──► stocke S3 + enqueue ──► 202 { documentId, streamUrl }
GET /documents/:id/analyze (SSE) ──► consomme le stream LLM, relaie les tokensLe client LLM est injecté (DI), pas un new Anthropic() dans un champ
Un new Anthropic() en propriété casse les tests (pas mockable), duplique les pools de connexions, et hardcode la config. On le provide via forRootAsync.
// src/llm/llm.module.ts
import { Module, Global } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import Anthropic from '@anthropic-ai/sdk';
export const ANTHROPIC = Symbol('ANTHROPIC');
@Global()
@Module({
providers: [
{
provide: ANTHROPIC,
inject: [ConfigService],
useFactory: (config: ConfigService) =>
new Anthropic({
apiKey: config.getOrThrow('ANTHROPIC_API_KEY'),
maxRetries: 3, // le SDK retry les 429/5xx avec backoff
timeout: 60_000,
}),
},
],
exports: [ANTHROPIC],
})
export class LlmModule {}Streaming des tokens vers le front via SSE + AbortController
Le point senior : le client peut fermer l'onglet. Sans AbortController câblé sur la déconnexion HTTP, tu continues à payer des tokens Claude pour un stream que personne ne lit. On annule l'appel LLM dès que la réponse se ferme.
// src/documents/document-analysis.controller.ts
import { Controller, Get, Param, Res, Req, Inject } from '@nestjs/common';
import type { Response, Request } from 'express';
import type Anthropic from '@anthropic-ai/sdk';
import { ANTHROPIC } from '../llm/llm.module';
@Controller('documents')
export class DocumentAnalysisController {
constructor(
@Inject(ANTHROPIC) private readonly anthropic: Anthropic,
private readonly docs: DocumentService,
) {}
@Get(':id/analyze')
async analyze(@Param('id') id: string, @Req() req: Request, @Res() res: Response) {
res.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no', // désactive le buffering nginx, sinon le SSE est bloqué
});
res.flushHeaders();
// 1. Charge le fichier déjà uploadé (PDF en base64 pour l'API document)
const { base64, mediaType } = await this.docs.loadForLlm(id);
// 2. Annulation : si le client coupe, on abort l'appel Anthropic
const ac = new AbortController();
req.on('close', () => ac.abort());
try {
const stream = this.anthropic.messages.stream(
{
model: 'claude-sonnet-4-6', // bon ratio coût/qualité pour de l'extraction
max_tokens: 2048,
messages: [
{
role: 'user',
content: [
{ type: 'document', source: { type: 'base64', media_type: mediaType, data: base64 } },
{ type: 'text', text: 'Extrais les parties, le montant et la date d\'échéance. Réponds en JSON.' },
],
},
],
},
{ signal: ac.signal },
);
for await (const event of stream) {
if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
res.write(`data: ${JSON.stringify({ token: event.delta.text })}\n\n`);
}
}
const final = await stream.finalMessage();
res.write(`event: done\ndata: ${JSON.stringify({ usage: final.usage })}\n\n`);
} catch (e) {
if (ac.signal.aborted) return res.end(); // client parti, normal, on ne log pas une erreur
res.write(`event: error\ndata: ${JSON.stringify({ message: (e as Error).message })}\n\n`);
} finally {
res.end();
}
}
}Modèles Anthropic à connaître (mi-2026) : claude-opus-4-8 (flagship, raisonnement lourd), claude-sonnet-4-6 (workhorse coût/qualité — défaut pour l'extraction de documents), claude-haiku-4-5 (rapide/pas cher — classification simple, routing). Pour de la classification de justificatif KYC, Haiku suffit ; pour de l'analyse juridique fine, Opus.
Analyse lourde en job BullMQ (cost-aware, idempotent)
Quand l'analyse n'est pas interactive (batch, post-upload), elle va en queue. Les règles propres aux jobs LLM :
@Processor('doc-analysis')
export class DocAnalysisProcessor extends WorkerHost {
constructor(
@Inject(ANTHROPIC) private readonly anthropic: Anthropic,
private readonly docs: DocumentService,
) { super(); }
async process(job: Job<{ documentId: string; sha256: string }>) {
// 1. Idempotence : clé = hash du contenu. Si déjà analysé, on retourne le cache.
const cached = await this.docs.findAnalysis(job.data.sha256);
if (cached) return cached;
// 2. Cost guard : refuse les documents absurdement gros avant de payer
const tokens = await this.docs.estimateTokens(job.data.documentId);
if (tokens > 150_000) throw new Error('Document too large for single-pass analysis');
const { base64, mediaType } = await this.docs.loadForLlm(job.data.documentId);
const msg = await this.anthropic.messages.create({
model: 'claude-sonnet-4-6',
max_tokens: 4096,
messages: [{
role: 'user',
content: [
{ type: 'document', source: { type: 'base64', media_type: mediaType, data: base64 } },
{ type: 'text', text: 'Analyse ce document. JSON strict.' },
],
}],
});
// 3. Persiste keyé par hash → idempotent + cache pour les ré-uploads identiques
return this.docs.saveAnalysis(job.data.sha256, msg.content, msg.usage);
}
}Points senior pour les jobs LLM :
- Idempotence keyée sur la generation id / le hash de contenu, pas sur le
jobIdaléatoire — un retry après timeout ne doit pas re-facturer un appel Claude réussi mais dont l'ack s'est perdu. Persiste l'usage et le résultat de façon atomique. - Retry cost-aware : laisse le SDK retry les 429/overload (il backoff). Mais ne retry PAS une erreur de validation (400) ou un refus — c'est gaspiller. Configure
attemptsBullMQ bas (2–3) avecbackoffexponentiel, et distingue erreurs transitoires vs définitives. - Partial-output handling : si le stream est coupé en plein milieu (réseau, abort), tu as des tokens partiels. Soit tu les jettes (analyse atomique), soit tu persistes l'offset et tu reprends — pour de l'extraction JSON, on jette et on rejoue ; pour une longue génération, le checkpoint vaut le coup.
- Idempotency / rate-limit / cost-guard à l'edge : un interceptor en amont qui (a) dédupe par
Idempotency-Keyheader, (b) applique un rate-limit par tenant (token budget, pas juste req/s), (c) rejette en402/429si le tenant a explosé son quota mensuel — avant d'appeler Claude.
Exposer un endpoint MCP (agent qui pilote l'upload)
Si tu veux qu'un agent externe (ou Claude via MCP) déclenche des analyses, expose un outil analyze_document avec un schéma d'entrée strict (le documentId, pas le binaire), une réponse bornée, et une authz par tenant. Le binaire reste derrière une presigned URL ; l'agent manipule des références, jamais des octets — c'est la même discipline que les presigned uploads, appliquée au plan de contrôle.
🔁 Quand utiliser / éviter
| Cas | Recommandation |
|---|---|
| Fichier < 5 MiB, faible volume | Multer + memoryStorage, validation pipe |
| Fichier 5-100 MiB | Multer + diskStorage temporaire, push S3 en background |
| Fichier > 100 MiB | Presigned multipart S3 direct (skip API) |
| Vidéo / audio | Range requests + CDN + signed URL TTL court |
| Stream input -> stream output (transcoding) | Busboy + pipe direct, jamais buffer |
| Documents sensibles | Bucket privé + presigned download court (1-5min) |
| Cas où l'API doit lire le contenu (CSV import) | Multer streaming + parser stream (fast-csv) |
Évite :
- Multer en mémoire pour des uploads multi-MiB sans limite stricte.
- Stocker des fichiers en colonne BLOB MySQL/Postgres au-delà de quelques centaines de Ko.
- Servir des fichiers user-generated depuis le même domaine que ton API sans
Content-Disposition: attachment.
🏋️ Exercices
1. Validateur magic-bytes en FileValidator natif
Objectif : remplacer le FileTypeValidator mimetype-based de Nest par un vrai validateur qui lit la signature, intégrable dans ParseFilePipe.
Étends FileValidator de @nestjs/common, implémente isValid(file) async avec fileTypeFromBuffer(file.buffer), et buildErrorMessage(). Branche-le : new ParseFilePipe({ validators: [new MagicBytesValidator(['image/png','image/jpeg'])] }).
Indice/Solution : isValid peut être async depuis Nest 10. Attention : file.buffer n'existe qu'en memoryStorage ; en diskStorage, lis les premiers 4100 octets via createReadStream(path, { end: 4100 }) puis fileTypeFromStream.
2. Upload streaming Busboy → S3 avec backpressure et limite dure
Objectif : un endpoint qui pipe le multipart directement vers S3 sans jamais bufferiser, et qui coupe le stream si le fichier dépasse la limite (sans avoir tout lu).
Pars du RawUploadController du cours. Ajoute : (a) abort l'Upload S3 si Busboy émet limit sur le file stream, (b) propage l'erreur en 413, (c) supprime la part S3 déjà commencée (leavePartsOnError: false).
Indice/Solution : écoute stream.on('limit', ...) → appelle upload.abort() et req.destroy(). Le piège : si tu ne destroy() pas la requête, le client continue d'envoyer des octets dans le vide. Teste avec un fichier de 600 MiB et une limite à 500 MiB.
3. Pipeline quarantaine idempotent (production-grade)
Objectif : rendre le pipeline AV rejouable sans double effet de bord.
Implémente : dédup par SHA-256 (skip storage si hash connu), jobId = sha256 dans BullMQ, garde Redis SETNX avant le webhook final, et marquage DB conditionnel (UPDATE ... WHERE state = 'PENDING_SCAN'). Écris un test qui appelle process() deux fois sur le même job et asserte un seul webhook envoyé.
Indice/Solution : la vérité = aucune assertion sur l'ordre, seulement sur l'effet net. Mock le webhook et asserte toHaveBeenCalledTimes(1) après double process().
4. Casse-le puis répare : la fuite mémoire à 200 uploads concurrents
Objectif : reproduire l'OOM, le diagnostiquer, le corriger.
Configure memoryStorage + limite à 50 MiB, lance 200 uploads concurrents de 40 MiB (k6 ou un script Promise.all). Observe la RSS du pod exploser. Puis migre vers streaming Busboy → S3 et re-mesure.
Indice/Solution : 200 × 40 MiB = 8 Go buffer simultané → OOM kill. La correction n'est pas « augmenter la RAM » mais ne jamais matérialiser le buffer. Bonus : ajoute un sémaphore (p-limit) pour borner la concurrence storage indépendamment de la concurrence HTTP.
5. Streaming LLM annulable bout-en-bout
Objectif : un upload de PDF → analyse Claude streamée en SSE, avec un bouton Stop qui annule et côté client ET côté serveur (plus de tokens facturés).
Câble le DocumentAnalysisController SSE du cours. Côté serveur : req.on('close') → AbortController.abort(). Vérifie via les logs Anthropic que l'appel est bien coupé (l'usage final ne grimpe plus). Bonus : ajoute un cost-guard qui refuse en 402 si le tenant a dépassé son budget tokens mensuel avant d'ouvrir le stream.
Indice/Solution : le test décisif — démarre le stream, ferme la connexion à mi-parcours, et asserte que stream.controller.abort a été appelé. Sans le req.on('close'), tu paies l'intégralité de max_tokens pour rien.
6 (boss). Multipart presigned resumable
Objectif : un upload de 2 Go reprenable après coupure réseau, sans que l'API ne voie un octet.
Implémente le flow init → signPart → completeMultipart du cours côté serveur, et côté client (ou test) : upload des parts en parallèle, simule l'échec de la part 7, retry uniquement la part 7 (les ETags des autres sont déjà connus), puis complete.
Indice/Solution : l'état (uploadId + ETags des parts réussies) vit côté client ou dans une table upload_sessions. Le piège : CompleteMultipartUpload exige les parts triées par PartNumber ; un ETag manquant ou un tri faux → InvalidPart. Lifecycle rule S3 pour abort les multipart inachevés (sinon tu paies le stockage des parts orphelines indéfiniment).
🎤 En entretien
Q : Le FileTypeValidator de NestJS suffit-il à valider qu'un fichier est bien une image ? Non. Il ne compare que file.mimetype, une string déclarée par le client et trivialement falsifiable. Un .exe envoyé en image/png passe. La validation sérieuse lit les magic bytes (file-type), et pour les formats texte (SVG, CSV) parse sémantiquement — un SVG peut embarquer du JS (XSS au service).
Q : Un user uploade un fichier de 500 MiB. Décris le chemin qui ne fait PAS tomber ton pod. Jamais memoryStorage à cette taille. Soit streaming direct multipart → S3 (Busboy + @aws-sdk/lib-storage, l'API ne bufferise rien), soit — mieux — presigned PUT/multipart où le binaire ne touche jamais l'API. Aligner les limites nginx/ALB/Multer pour rejeter tôt (un 413 après 30 s d'upload est un bug UX). Borner la concurrence storage avec un sémaphore.
Q : Comment garantis-tu qu'un fichier infecté n'atteint jamais un autre utilisateur ? Pattern quarantaine : upload → bucket privé quarantine → job de scan async (ClamAV) → si clean, copie atomique vers le bucket servi, sinon delete + alerte. On ne sert jamais depuis la quarantaine. Point critique : une erreur de scan (ClamAV down) n'est pas un « clean » — on garde en quarantaine et on alerte, jamais de fail-open.
Q : Ton pipeline d'analyse LLM post-upload est rejoué par BullMQ après un crash. Quel est le risque et comment tu le neutralises ? Double facturation Claude + double effet de bord (webhook, email). On rend le job idempotent : clé = SHA-256 du contenu, résultat persisté keyé par ce hash (un re-run retourne le cache, ne ré-appelle pas l'API), garde Redis SETNX avant tout effet de bord externe, marquage DB conditionnel. On laisse le SDK retry les 429/overload (transitoires) mais pas les 400 (définitifs).
🔗 Liens
- Doc officielle : https://docs.nestjs.com/techniques/file-upload
- Multer : https://github.com/expressjs/multer
- Busboy : https://github.com/mscdex/busboy
@fastify/multipart: https://github.com/fastify/fastify-multipart@aws-sdk/lib-storage: https://github.com/aws/aws-sdk-js-v3/tree/main/lib/lib-storagefile-type: https://github.com/sindresorhus/file-type- ClamAV : https://www.clamav.net
clamscannode : https://github.com/kylefarris/clamscan- MinIO : https://min.io
- "OWASP File Upload Cheatsheet" : https://cheatsheetseries.owasp.org/cheatsheets/File_Upload_Cheat_Sheet.html