Streams — Readable, Writable, Duplex, Transform, backpressure
TL;DR — Les streams Node sont des pipes asynchrones avec backpressure intégré. Quatre types :
Readable(source),Writable(sink),Duplex(les deux, ex. sockets),Transform(Duplex où l'output dérive de l'input, ex. gzip). LehighWaterMark(16 KB par défaut pour les bytes, 16 objets en object mode) gouverne le buffer. Utilisepipeline()plutôt que.pipe(): il gère propagation d'erreur et cleanup. Itère un Readable avecfor await, et interopère avec les Web Streams (Readable.toWeb(),Readable.fromWeb()). La backpressure mal gérée = OOM en prod.
🧠 Mental model — ASCII + analogie
┌───────────┐ push ┌──────────┐
│ Readable │ ──────────► │ Writable │
│ (source) │ │ (sink) │
└───────────┘ └──────────┘
│ ▲
│ │
└────────► Transform ─────┘
(lit + écrit)
highWaterMark = capacité du seau interne (par défaut 16 KB)
Quand le buffer interne du Writable est plein,
write() retourne false → le Readable doit s'arrêter de pousser
jusqu'à l'événement 'drain'. C'est ça, la backpressure.
Modes d'un Readable :
paused : tu pull avec .read() ─► "tireur"
flowing : data te tombe dessus via 'data' ou .pipe() ─► "robinet ouvert"Analogie : un stream c'est un tuyau d'arrosage entre une source d'eau (le disque, le réseau) et une cible (un fichier, une socket). Le highWaterMark, c'est la capacité d'un seau au milieu du tuyau. Si la cible boit lentement, le seau se remplit et tu dois fermer le robinet sinon ça déborde. C'est la backpressure : stream.write() te dit "non, ralentis" en retournant false, et tu attends l'événement 'drain' pour reprendre.
🛠️ Code minimal (ts/js)
pipeline() — la bonne façon
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('input.txt'),
createGzip({ level: 9 }),
createWriteStream('input.txt.gz'),
);
// Erreurs propagées, descripteurs fermés automatiquementItérer un Readable comme un async iterator
import { createReadStream } from 'node:fs';
const stream = createReadStream('huge.log', { encoding: 'utf8', highWaterMark: 64 * 1024 });
let lineCount = 0;
let leftover = '';
for await (const chunk of stream) {
const lines = (leftover + chunk).split('\n');
leftover = lines.pop() ?? '';
lineCount += lines.length;
}
console.log(lineCount, 'lignes');Custom Transform avec backpressure correcte
import { Transform, TransformCallback } from 'node:stream';
class JsonLineParser extends Transform {
private buffer = '';
constructor() {
super({ readableObjectMode: true }); // émet des objets, lit des bytes
}
_transform(chunk: Buffer, _enc: BufferEncoding, cb: TransformCallback) {
this.buffer += chunk.toString('utf8');
const lines = this.buffer.split('\n');
this.buffer = lines.pop() ?? '';
try {
for (const line of lines) {
if (line.trim()) this.push(JSON.parse(line));
}
cb();
} catch (err) {
cb(err as Error);
}
}
_flush(cb: TransformCallback) {
if (this.buffer.trim()) {
try {
this.push(JSON.parse(this.buffer));
} catch (err) {
return cb(err as Error);
}
}
cb();
}
}Écrire en respectant la backpressure (sans pipe)
import { createReadStream, createWriteStream } from 'node:fs';
async function copyManual(src: string, dst: string) {
const r = createReadStream(src);
const w = createWriteStream(dst);
for await (const chunk of r) {
if (!w.write(chunk)) {
// Buffer plein — on attend 'drain'
await new Promise<void>((resolve, reject) => {
w.once('drain', resolve);
w.once('error', reject);
});
}
}
w.end();
await new Promise<void>((resolve, reject) => {
w.once('finish', resolve);
w.once('error', reject);
});
}🎯 Patterns courants — 7
1. Streaming HTTP : ne charge jamais un body en mémoire
import { createServer } from 'node:http';
import { pipeline } from 'node:stream/promises';
import { createWriteStream } from 'node:fs';
const server = createServer(async (req, res) => {
if (req.method === 'POST' && req.url === '/upload') {
try {
await pipeline(req, createWriteStream('/tmp/upload.bin'));
res.writeHead(200).end('ok');
} catch (err) {
res.writeHead(500).end((err as Error).message);
}
} else {
res.writeHead(404).end();
}
});
server.listen(3000);Une lib comme formidable ou busboy fait du parsing multipart en streaming, sans jamais matérialiser le fichier entier.
2. CSV streaming avec transform
import { pipeline } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
import { Transform } from 'node:stream';
const toJson = new Transform({
writableObjectMode: false,
readableObjectMode: true,
transform(chunk, _enc, cb) {
// Hypothèse : chunk est une ligne complète (en pratique, utilise un splitter en amont)
const [id, name, email] = chunk.toString().split(',');
this.push({ id, name, email });
cb();
},
});
const collect = new Transform({
writableObjectMode: true,
transform(row, _enc, cb) {
console.log(row);
cb();
},
});
await pipeline(createReadStream('users.csv'), toJson, collect);3. Combiner plusieurs sources avec Readable.from()
import { Readable } from 'node:stream';
async function* generateNumbers() {
for (let i = 0; i < 1_000_000; i++) {
yield Buffer.from(`${i}\n`);
if (i % 10_000 === 0) await new Promise((r) => setImmediate(r));
}
}
const stream = Readable.from(generateNumbers());
// Compatible avec pipeline, .pipe(), etc.4. Web Streams interop (Fetch API, Service Workers, Edge runtimes)
import { Readable } from 'node:stream';
// Node Readable → Web ReadableStream
const nodeStream = Readable.from(['hello\n', 'world\n']);
const webStream: ReadableStream = Readable.toWeb(nodeStream);
// Web → Node
const fromWeb = Readable.fromWeb(webStream);
// Utiliser fetch() qui renvoie un Web ReadableStream
const res = await fetch('https://example.com/big.json');
const asNodeStream = Readable.fromWeb(res.body!);
for await (const chunk of asNodeStream) {
process.stdout.write(chunk);
}5. Limiter le débit (rate limit) avec un transform
import { Transform, TransformCallback } from 'node:stream';
import { setTimeout as sleep } from 'node:timers/promises';
class ThrottleStream extends Transform {
constructor(private readonly bytesPerSecond: number) {
super();
}
// async _transform : Node attend la promesse avant le prochain chunk.
// La backpressure du Transform fait le reste — le débit amont est borné.
async _transform(chunk: Buffer, _enc: BufferEncoding, cb: TransformCallback) {
try {
const delayMs = (chunk.length / this.bytesPerSecond) * 1000;
if (delayMs > 0) await sleep(delayMs);
cb(null, chunk);
} catch (err) {
cb(err as Error);
}
}
}Le
setTimeoutcallback "fire-and-forget" du pattern naïf est fragile : si la source pousse 10 000 chunks d'un coup, tu programmes 10 000 timers en parallèle (chacun appellecb()plus tard, mais ils sont tous déjà entrés dans_transform? non — justement, lecb()retardé bloque le chunk suivant). La versionasyncci-dessus est plus lisible et sérialise naturellement. Pour un throttle précis (token bucket, lissage des rafales), ne te repose pas surchunk.lengthseul : accumule un budget temporel (expectedTime += len / rate) et dorsexpectedTime - now, sinon la dérive d'horloge te fait throttler trop fort.
6. Tee — broadcast un Readable vers N destinations
import { PassThrough } from 'node:stream';
function tee(readable: NodeJS.ReadableStream, n: number) {
const outs = Array.from({ length: n }, () => new PassThrough());
readable.on('data', (chunk) => outs.forEach((o) => o.write(chunk)));
readable.on('end', () => outs.forEach((o) => o.end()));
readable.on('error', (err) => outs.forEach((o) => o.destroy(err)));
return outs;
}
// Attention : backpressure non gérée — utilise une lib comme `stream-tee` en prod7. Compter / observer sans modifier (passthrough)
import { PassThrough } from 'node:stream';
const counter = new PassThrough();
let total = 0;
counter.on('data', (chunk) => (total += chunk.length));
// Insère counter dans un pipeline pour mesurer le débit sans transformer.🔄 Versions — Node 18 / 20 / 22 / 24
Node 18 —
stream/promises(pipeline,finished) stable. Async iterators sur Readable stable. Web Streams expérimentaux (ReadableStream,WritableStream,TransformStreamexposés globaux).Node 20 — Web Streams stables.
Readable.toWeb()/Readable.fromWeb()/Writable.toWeb()/Writable.fromWeb()stables.stream.compose()pour composer plusieurs streams en un Duplex.Node 22 —
stream/consumers(text(),json(),buffer(),arrayBuffer(),blob()) stable. Performance des Web Streams améliorée (~30 % sur certains scénarios).ReadableStream.from(iterable)standard.Node 24 — Web Streams encore plus rapides (réécriture interne).
pipeline()supporteAbortSignalplus finement.node:streamreste l'API canonique pour la perf maximale sur Node.
💡 En 2026, sur Node 22/24, le Web Streams est la voie à privilégier pour le code portable (Cloudflare Workers, Deno, Bun, Edge runtimes). Le Node streams reste plus performant et plus ergonomique sur Node pur.
Readable.toWeb()/fromWeb()font le pont.
⚠️ Pitfalls — 11
.pipe()sans handler d'erreur —r.pipe(w)ne propage pas les erreurs et ne ferme pas la chaîne. Sirémet'error',wreste ouvert. Utilisepipeline().Buffer mémoire qui explose — si tu fais
chunks.push(chunk)puisBuffer.concat(chunks)à la fin, tu matérialises tout le fichier. Sur 4 GB de logs, tu OOM. Stream les chunks directement vers la sortie.Backpressure ignorée —
for await (const c of req) await someSlowOp(c)est OK (chaque iteration attend). Maisreq.on('data', async (c) => await someSlowOp(c))ne respecte pas la backpressure : les chunks s'accumulent en mémoire car le Readable continue à les pousser.'data'event après.pause()— passer en mode flowing avec'data'est irréversible côté.pause()si tu n'as pas le bon état interne. Préfèrefor await.highWaterMarkmal réglé — trop bas = beaucoup de context switches CPU ; trop haut = pic mémoire. Pour du HTTP,64 * 1024est souvent un bon compromis ; pour des objets lourds, ajuste l'object mode (10-100 typiquement).end()non appelé — un Writable qui n'a pas reçu.end()ne fermera jamais le fichier. Le contenu peut ne pas être flushé.pipeline()le fait pour toi.Object mode mélangé avec bytes — un Writable créé avec
objectMode: truene sait pas écrire des Buffers vers un fichier. Tu dois sérialiser avant.Lire
req.bodydeux fois — un Readable HTTP ne peut être consommé qu'une seule fois. Si tu veux le re-lire, buffer-le explicitement (en mémoire ou sur disque).new Streamau lieu deReadable—Stream(sans suffixe) est la base abstraite, pas une vraie classe utilisable. Utilise toujoursReadable/Writable/Duplex/Transform/PassThrough.Mélanger callback
cb()etthis.push()mal compris — dans un_transform,cb(null, value)est équivalent àthis.push(value); cb(). Tu ne dois jamais appelercbdeux fois.Streams ne survivent pas au process exit — un
pipeline()interrompu parprocess.exit()ne nettoie pas. Si tu veux du write atomique, écris dans un fichier temporaire puisfs.renameà la fin (atomique au niveau filesystem).
🧪 Testing — node --test
// streams.test.ts
import { test } from 'node:test';
import assert from 'node:assert/strict';
import { Readable, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
test('pipeline propage les erreurs', async () => {
const source = Readable.from(['a', 'b', 'c']);
const fail = new Transform({
transform(_chunk, _enc, cb) {
cb(new Error('boom'));
},
});
await assert.rejects(pipeline(source, fail), /boom/);
});
test('Transform respecte la backpressure', async () => {
const chunks: Buffer[] = [];
const collect = new Transform({
writableObjectMode: true,
transform(chunk, _enc, cb) {
chunks.push(chunk);
cb();
},
});
const source = Readable.from((function* () {
for (let i = 0; i < 100; i++) yield Buffer.from(`item-${i}`);
})());
await pipeline(source, collect);
assert.equal(chunks.length, 100);
});
test('Web stream interop', async () => {
const node = Readable.from(['hello']);
const web = Readable.toWeb(node);
const reader = web.getReader();
const { value } = await reader.read();
assert.equal(new TextDecoder().decode(value as Uint8Array), 'hello');
});🎬 Cas d'usage concrets
Scénario 1 — Streaming CSV factures Pennylane
Intégration comptable : import quotidien de 200 000 lignes de factures depuis l'export CSV Pennylane vers Postgres (table invoices). Le fichier fait 350 MB. Chargement naïf (readFile + csv-parse/sync) = ~1.8 GB RSS et OOM sur le worker 1 GB.
Pipeline streaming : createReadStream(file) → décompression zlib.createGunzip (le CSV arrive en .gz) → csv-parse (transform stream) → validation Zod par batch de 500 (transform) → pg-copy-streams writer pour bulk insert. Tout est connecté avec pipeline() qui propage erreurs et nettoie les ressources. Backpressure naturel : si Postgres ralentit, le gunzip pause la lecture S3.
Mesure : 200 k lignes ingérées en 38 s, RSS stable à 95 MB, débit Postgres 5 200 rows/s grâce à COPY. Le passage en streaming a aussi permis d'ajouter une retry par batch sans tout recharger.
Scénario 2 — Export documents cabinet juridique gros volumes
Un cabinet exporte les dossiers d'un client (audit GED) : 12 000 PDF totalisant 8 GB, livrés en un ZIP signé. Impossible de tout buffer en mémoire.
Pipeline : Readable.from(asyncIteratorDeFichiers) (yield les fichiers depuis Postgres avec leur chemin S3) → transform qui ouvre un read stream S3 par fichier → archiver (transform-like) qui agrège en ZIP streaming → write stream HTTP response. Le client télécharge progressivement, le serveur ne charge jamais plus que ~10 MB à la fois (buffer ZIP interne).
Astuce qui a sauvé : highWaterMark: 256 * 1024 sur le read S3 pour ne pas saturer l'event loop avec des micro-chunks de 16 KB. Et pipeline() avec un finally qui logge bytesWritten pour audit (RGPD trace export).
Scénario 3 — Streaming réponse LLM e-commerce (assistant shopping)
Site e-commerce avec assistant conversationnel branché sur Claude API. Le user pose une question, on stream les tokens en SSE vers le navigateur (UX critique : premier token < 600 ms).
Pipeline Web Streams (interopérable avec fetch) : fetch(claudeUrl).body (ReadableStream) → pipeThrough(TextDecoderStream()) → transform SSE parser (extrait data: {...}) → transform qui re-encode en SSE pour le client → écriture sur res (Node). Conversion via Readable.fromWeb / Readable.toWeb car le HTTP server Node attend des Node streams. AbortController propagé du browser (req.on('close')) jusqu'à fetch upstream — si l'utilisateur ferme l'onglet, on coupe la facturation immédiatement.
Bénéfice perçu : TTFT 420 ms, perception conversationnelle, et coût LLM réduit de 15 % (sessions abandonnées coupées net).
🛠️ Exemple end-to-end
Pipeline streaming complet pour ingestion CSV facturation : décompression + parsing + validation + COPY Postgres avec backpressure et reporting de progression.
import { createReadStream } from "node:fs";
import { pipeline } from "node:stream/promises";
import { Transform } from "node:stream";
import { createGunzip } from "node:zlib";
import { parse } from "csv-parse";
import { from as copyFrom } from "pg-copy-streams";
import pg from "pg";
import { z } from "zod";
const InvoiceRow = z.object({
external_id: z.string().min(1),
customer_siren: z.string().regex(/^\d{9}$/),
amount_cents: z.coerce.number().int().nonnegative(),
issued_at: z.coerce.date(),
});
type InvoiceRow = z.infer<typeof InvoiceRow>;
function validateBatch(opts: { onProgress?: (n: number) => void; batchSize?: number } = {}) {
const batchSize = opts.batchSize ?? 500;
let buffer: InvoiceRow[] = [];
let total = 0;
let rejected = 0;
return new Transform({
objectMode: true,
transform(chunk: Record<string, string>, _enc, cb) {
const res = InvoiceRow.safeParse(chunk);
if (!res.success) {
rejected++;
return cb();
}
buffer.push(res.data);
total++;
if (buffer.length >= batchSize) {
const out = buffer.map(rowToCopyLine).join("");
buffer = [];
opts.onProgress?.(total);
return cb(null, out);
}
cb();
},
flush(cb) {
if (buffer.length) {
const out = buffer.map(rowToCopyLine).join("");
buffer = [];
return cb(null, out);
}
console.log(`ingest done: total=${total} rejected=${rejected}`);
cb();
},
});
}
function rowToCopyLine(r: InvoiceRow) {
// tab-separated, no quoting needed since we validated with zod
return `${r.external_id}\t${r.customer_siren}\t${r.amount_cents}\t${r.issued_at.toISOString()}\n`;
}
async function ingest(file: string) {
const client = new pg.Client({ connectionString: process.env.DATABASE_URL });
await client.connect();
const copyStream = client.query(
copyFrom(`COPY invoices (external_id, customer_siren, amount_cents, issued_at) FROM STDIN`)
);
try {
await pipeline(
createReadStream(file),
createGunzip(),
parse({ columns: true, skip_empty_lines: true, trim: true }),
validateBatch({
batchSize: 500,
onProgress: (n) => {
if (n % 10_000 === 0) console.log(`progress: ${n} rows`);
},
}),
copyStream
);
} finally {
await client.end();
}
}
await ingest(process.argv[2]);Points clés : pipeline() propage les erreurs et clôt chaque étage proprement, transform en objectMode côté parser puis bascule en bytes pour COPY, batch interne dans le transform pour amortir le coût syscall, backpressure naturelle (si Postgres rame, gunzip ralentit la lecture disque), pas d'accumulation mémoire.
🔁 Quand utiliser / éviter
| Outil | Utiliser quand | Éviter quand |
|---|---|---|
pipeline() | Tout enchaînement de streams | Quand tu as besoin de contrôle fin (split, broadcast) |
.pipe() | Demo, scripts one-shot | Production (pas d'error propagation) |
for await | Itérer un Readable, traiter ligne par ligne | Quand tu veux gérer la backpressure manuellement vers un Writable |
Custom Transform | Logique métier de transformation | Quand un PassThrough + listener suffit |
| Web Streams | Code portable (Workers, Edge) | Hot path Node-only (Node streams plus rapides) |
Readable.from(iterable) | Adapter une source itérable | Si tu peux directement utiliser un async generator |
| Object mode | Pipeline de structures (rows DB, events) | Streaming de bytes (overhead inutile) |
🧬 Aller plus loin
Anatomie de la backpressure — le "vrai" mécanisme
Quand tu fais pipe() ou pipeline(), il y a une boucle interne qui fait grossièrement ça :
function pipe(src, dst) {
src.on('data', (chunk) => {
const ok = dst.write(chunk);
if (!ok) src.pause(); // ← buffer dst plein, on freeze la source
});
dst.on('drain', () => src.resume()); // ← buffer dst vidé, on reprend
}C'est ce ping-pong pause / resume qui implémente la backpressure. Si tu fais dst.write manuellement, c'est à toi de regarder le retour et d'attendre drain. Le pattern for await + await drain simule la même mécanique.
Composer un Duplex avec stream.compose
import { compose } from 'node:stream';
import { createGzip, createGunzip } from 'node:zlib';
import { Transform } from 'node:stream';
const toUpper = new Transform({
transform(chunk, _enc, cb) {
cb(null, chunk.toString().toUpperCase());
},
});
const upperThenGzip = compose(toUpper, createGzip());
// upperThenGzip est un Duplex : tu peux .write() dedans, et lire le résultat gzippé en uppercase⚠️ N'appelle pas ta variable
pipeline: tu masquerais l'importnode:stream/promiseset un futur lecteur croira composer alors quecomposene consomme rien tant que personne ne lit le Duplex retourné.composeest lazy ;pipelineest eager (il démarre le flux et résout/rejette une promesse).
Très pratique pour exposer une lib qui transforme un stream en N étapes mais retourne un seul Duplex au consommateur.
stream/consumers — collecter rapidement
import { json, text, buffer, arrayBuffer, blob } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const stream = Readable.from(['{"hello":', '"world"}']);
const obj = await json(stream);
console.log(obj); // { hello: "world" }C'est l'équivalent côté Node de await response.json() du fetch.
Pourquoi req.body HTTP/1 est un Readable, mais HTTP/2 est plus complexe
En HTTP/1, une requête a un body séquentiel : req est directement un IncomingMessage qui est un Readable. En HTTP/2, plusieurs streams se multiplexent sur une même connexion TCP : chaque request a son propre Http2Stream (Duplex). Côté code applicatif, l'API ressemble, mais les implications sur le backpressure sont différentes (le contrôle de flux HTTP/2 est plus granulaire — flow control par stream).
Detect un memory leak via streams en prod
Symptôme : RSS qui grimpe linéairement sans plafond. Cause fréquente : un .pipe() orphelin qui retient des chunks parce que la cible est lente et que personne ne gère la backpressure. Outils :
--inspect+ Chrome DevTools Memory tab → snapshots successifs, cherche lesBufferrétenus.process.memoryUsage()exposé en métrique :heapUsed,external,arrayBuffers.node:diagnostics_channelpour tracer les pipelines.
Readable à partir d'une queue interne
Pattern utile pour exposer un Readable depuis un event emitter custom :
import { Readable } from 'node:stream';
class EventBuffer extends Readable {
private queue: Buffer[] = [];
private flowing = false;
push2(data: Buffer) {
this.queue.push(data);
if (this.flowing) this.flush();
}
_read() {
this.flowing = true;
this.flush();
}
private flush() {
while (this.queue.length) {
if (!this.push(this.queue.shift()!)) {
this.flowing = false;
return;
}
}
}
}L'invariant clé : this.push(chunk) retourne false quand le buffer interne est plein → on doit arrêter de pousser et attendre le prochain _read.
Streams et performances — chiffres réels
Sur un MacBook M2 (Node 22) :
fs.readFileSync1 GB : ~600 ms (et OOM si > RAM dispo).pipeline(fs.createReadStream, fs.createWriteStream)1 GB : ~400 ms et < 100 MB RSS.pipeline(fs.createReadStream, zlib.createGzip, fs.createWriteStream)1 GB → ~50 MB gzippé : ~8 s, le bottleneck est CPU (zlib).
Le streaming n'est pas plus rapide en absolu (parfois même un peu plus lent), mais il borne la mémoire et permet de traiter des données > RAM.
Comment un staff engineer raisonne sur un pipeline en prod
La question n'est jamais "est-ce que ça marche sur mon laptop" mais "qu'est-ce qui se passe quand un étage ralentit ou meurt". Trois axes :
Où est le goulot ? Dans
A → B → C, le débit du pipeline = débit de l'étage le plus lent. La backpressure remonte automatiquement, donc si Postgres rame, c'est lecreateReadStreamen tête qui se met en pause (et pas l'inverse). Instrumente chaque étage :stream.readableLength/stream.writableLengthte disent combien d'octets/objets sont bloqués dans le buffer interne d'un étage — un buffer qui reste plein en permanence = cet étage est en aval du goulot.Qu'est-ce qui fuit en cas d'erreur ? Avec
pipeline(), une erreur sur n'importe quel étagedestroy()tous les autres. Avec.pipe(), l'étage en erreur meurt mais les autres restent ouverts → file descriptors et connexions DB qui fuient. Sur un service long-running, c'est ça qui provoque leEMFILE: too many open filesà 3h du matin.Comment ça s'annule ? Un
AbortSignalpassé àpipeline()est le seul mécanisme propre. Sans lui, une requête HTTP cliente abandonnée (req.on('close')) laisse le pipeline tourner — tu continues à lire S3 et à payer le LLM pour un client parti. Relie toujours la déconnexion cliente à l'annulation amont.
Matrice failure-mode → symptôme → fix
| Mode de défaillance | Symptôme observable | Cause racine | Fix |
|---|---|---|---|
Backpressure ignorée (on('data', async …)) | RSS monte linéairement, pas de plafond | Le handler async ne freine pas la source | for await ou pipeline() |
.pipe() sans error handling | EMFILE, connexions DB orphelines | Erreur non propagée, étages non fermés | pipeline() |
highWaterMark trop haut en object mode | Pic RSS, GC pauses | N objets × taille objet > RAM | Borne highWaterMark, calcule la mémoire max |
cb() jamais appelé dans _transform | Pipeline figé, aucun chunk après le N-ème | Branche d'erreur qui return sans cb | cb() dans tous les chemins, y compris catch |
cb() appelé deux fois | ERR_MULTIPLE_CALLBACK, crash | cb() puis this.push()+cb() | Un seul cb par appel |
| Annulation cliente non propagée | Coût upstream continue, event loop occupé | Pas d'AbortSignal relié à req.close | pipeline(..., { signal }) + req.on('close', () => ctrl.abort()) |
| Write non-atomique interrompu | Fichier corrompu/tronqué à mi-chemin | process.exit ou crash en plein flux | Écrire .tmp puis fs.rename (atomique POSIX) |
Observabilité minimale d'un pipeline
import { pipeline } from 'node:stream/promises';
import { PassThrough } from 'node:stream';
import { performance } from 'node:perf_hooks';
function metered(label: string) {
const tap = new PassThrough();
let bytes = 0;
const start = performance.now();
tap.on('data', (c) => { bytes += c.length; });
tap.on('end', () => {
const ms = performance.now() - start;
console.log(`[${label}] ${bytes} bytes en ${ms.toFixed(0)}ms = ${(bytes / 1e6 / (ms / 1000)).toFixed(1)} MB/s`);
});
return tap;
}
await pipeline(source, metered('après-source'), transform, metered('après-transform'), sink);Un PassThrough instrumenté à chaque jonction te donne le débit par segment — c'est ainsi qu'on localise le goulot sans profiler. En prod, remplace les console.log par des histogrammes Prometheus (stream_segment_throughput_bytes).
🔗 Liens
- Doc officielle : https://nodejs.org/api/stream.html
stream/promises: https://nodejs.org/api/stream.html#streamspromisesmodule- Web Streams interop : https://nodejs.org/api/webstreams.html
- "Backpressuring in Streams" : https://nodejs.org/en/learn/modules/backpressuring-in-streams
- Matteo Collina — talks sur streams et perf (search "Matteo Collina streams")
stream.compose: https://nodejs.org/api/stream.html#streamcomposestreamsstream/consumers: https://nodejs.org/api/stream.html#streamconsumersmodule- Article Sam Roberts — "Streams and you" (Node core team)
- Book "Distributed Systems with Node.js" (chapitre streams) — Thomas Hunter II
Streams + AbortSignal — annulation propre
import { pipeline } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
import { Writable } from 'node:stream';
const ctrl = new AbortController();
setTimeout(() => ctrl.abort(), 5000);
try {
await pipeline(
createReadStream('huge.bin'),
new Writable({ write(_c, _e, cb) { cb(); } }),
{ signal: ctrl.signal },
);
} catch (err) {
if ((err as any).code === 'ABORT_ERR') {
console.log('annulé proprement');
}
}pipeline() propage le signal à toutes les étapes, ferme les handles, libère les ressources.
Object mode et highWaterMark
En object mode, highWaterMark compte des objets, pas des bytes. Le défaut est 16. Pour un pipeline de logs DB :
const stream = new Transform({
objectMode: true,
highWaterMark: 1000, // 1000 rows en buffer
transform(row, _enc, cb) { /* ... */ cb(); },
});Si tu utilises object mode mais que tes "objets" sont gros (chacun 10 MB par ex.), tu peux saturer la RAM avec un highWaterMark haut. Calcule la taille mémoire totale potentielle.
Web Streams + Node Streams — petit piège
Readable.toWeb(nodeStream) consomme le node stream une fois : tu ne peux pas l'utiliser à nouveau en parallèle. Idem fromWeb consume le Web stream. Si tu veux les deux côtés, tee : nodeStream.pipe(passThrough1) + nodeStream.pipe(passThrough2) (et n'oublie pas la backpressure).
Comparaison rapide des frameworks streaming
| Lib | Cas d'usage | Particularité |
|---|---|---|
node:stream natif | Performance Node, API riche | Le plus rapide sur Node |
| Web Streams | Code portable Edge/Workers | API plus simple, moins d'options |
RxJS | Logique réactive complexe, hot/cold | Pas backpressure native, API observables |
Highland.js | Streaming fonctionnel | Maintenance limitée |
mississippi/pump | Helpers legacy | Pré-stream/promises, à éviter en 2026 |
🏋️ Exercices
Progression : implémenter → production-grade → casser puis réparer. Fais-les sur Node 22+ avec node --test.
1. Line splitter robuste (implémenter)
Objectif : écrire un Transform splitLines() qui prend des bytes arbitraires (chunks coupés n'importe où) et émet une ligne complète par push, en object mode côté lecture.
Indice/Solution : garde un leftover string entre les chunks. _transform : const parts = (leftover + chunk).split('\n'); leftover = parts.pop()!; for (const l of parts) this.push(l);. Le piège : le _flush doit émettre leftover s'il est non vide (dernière ligne sans \n final). Teste avec des chunks d'un seul octet (Readable.from([...buf].map(b => Buffer.from([b])))) pour prouver la robustesse aux coupures.
2. Bornage mémoire prouvé (production-grade)
Objectif : copier un fichier de 2 GB avec un Transform qui uppercase, en gardant le RSS sous 150 MB, et le prouver en loggant process.memoryUsage().rss toutes les secondes pendant la copie.
Indice/Solution : pipeline(createReadStream(src), upperTransform, createWriteStream(dst), { signal }). Démarre un setInterval qui logge le RSS, clearInterval dans le finally. Si le RSS grimpe, c'est que tu as cassé la backpressure quelque part (ex. un this.push en boucle sans respecter sa valeur de retour, ou un buffer interne accumulé). Compare avec une version readFileSync+writeFileSync pour voir l'OOM/pic.
3. Annulation propagée bout-en-bout (production-grade)
Objectif : un serveur HTTP qui stream un gros fichier vers le client ; si le client ferme la connexion (req.on('close')), le pipeline s'arrête en < 50 ms et libère le file descriptor.
Indice/Solution : const ctrl = new AbortController(); res.on('close', () => ctrl.abort()); puis pipeline(createReadStream(file), res, { signal: ctrl.signal }). Catch l'ABORT_ERR sans le logguer comme une vraie erreur. Vérifie avec lsof -p <pid> que le fd du fichier disparaît immédiatement après un curl interrompu (curl ... | head -c 100).
4. Fan-out avec backpressure correcte (production-grade)
Objectif : broadcaster un Readable vers N Writable lents sans faire exploser la mémoire — contrairement au pattern tee() naïf du doc qui ignore la backpressure.
Indice/Solution : le débit du fan-out doit être celui du consommateur le plus lent. Dans le for await (const chunk of source), fais await Promise.all(outs.map(o => o.write(chunk) ? Promise.resolve() : once(o, 'drain'))). Ainsi la source ne reprend que quand tous les sinks ont du budget. Teste avec un sink rapide + un sink lent (un Writable qui setTimeout(cb, 50)) et vérifie que le readableLength de la source ne diverge pas.
5. Casser puis réparer — le cb() manquant (break-then-fix)
Objectif : on te donne un Transform qui se fige silencieusement après ~16 KB. Diagnostique et répare.
class Buggy extends Transform {
_transform(chunk, _enc, cb) {
if (chunk.includes(0x00)) return; // bug
this.push(chunk.toString().toUpperCase());
cb();
}
}Indice/Solution : la branche return sur un octet nul ne rappelle jamais cb(). Node attend que cb soit appelé avant de pousser le chunk suivant ; passé le highWaterMark (16 KB), tout se fige sans erreur. Fix : if (chunk.includes(0x00)) return cb(); (skip) ou return cb(new Error('null byte')) selon la sémantique voulue. Leçon : chaque chemin de _transform/_flush doit terminer par exactement un cb().
6. Token-bucket throttle précis (break-then-fix)
Objectif : le ThrottleStream simple dérive (throttle trop fort sur des chunks irréguliers). Réécris-le en token-bucket pour un débit moyen exact sur une rafale.
Indice/Solution : maintiens let scheduledTime = performance.now(). Dans _transform : scheduledTime += (chunk.length / rate) * 1000; const wait = scheduledTime - performance.now(); if (wait > 0) await sleep(wait);. Le secret : on accumule le temps théorique plutôt que de dormir un délai par chunk indépendamment — les chunks courts qui arrivent en retard "rattrapent" et le débit moyen converge vers rate. Mesure le débit réel sur 1000 chunks de tailles aléatoires et vérifie l'écart < 2 %.
🎤 En entretien
Q : Pourquoi pipeline() plutôt que .pipe() ? Donne le scénario de prod qui casse. R : .pipe() ne propage pas les erreurs et ne ferme pas les étages en amont/aval quand l'un meurt → file descriptors et connexions DB qui fuient, et à terme EMFILE: too many open files sur un service long-running. pipeline() destroy() toute la chaîne sur erreur et supporte AbortSignal. .pipe() reste acceptable pour un script one-shot.
Q : Explique la backpressure au niveau octet. Que retourne write() et qu'attends-tu ? R : writable.write(chunk) retourne false quand le buffer interne dépasse le highWaterMark (16 KB par défaut, 16 objets en object mode). Ce false est un signal "ralentis", pas un échec : le chunk est quand même accepté. La source doit alors arrêter de pousser jusqu'à l'événement 'drain'. .pipe()/pipeline() câblent ce pause/resume automatiquement ; en manuel c'est à toi de l'attendre. L'ignorer = accumulation mémoire non bornée = OOM.
Q : for await (const c of stream) respecte-t-il la backpressure ? Et stream.on('data', async …) ? R : for await la respecte — chaque itération await avant de tirer le chunk suivant, donc un traitement lent freine la source. on('data', async cb) ne la respecte pas : l'event emitter ne await pas ton callback, le Readable continue à émettre et les chunks s'accumulent. C'est le piège classique qui transforme un service en bombe mémoire.
Q : Node streams ou Web Streams en 2026 ? Comment décides-tu ? R : Web Streams pour le code portable (Cloudflare Workers, Deno, Bun, Edge, interop fetch). Node streams pour le hot path Node-only : plus rapides, API plus riche (compose, pipeline avec signal, object mode ergonomique). Le pont est Readable.toWeb() / Readable.fromWeb() — mais attention, chaque conversion consomme le stream une fois. En pratique : Web Streams aux frontières portables, Node streams au cœur perf.