Skip to content

Streams — Readable, Writable, Duplex, Transform, backpressure

TL;DR — Les streams Node sont des pipes asynchrones avec backpressure intégré. Quatre types : Readable (source), Writable (sink), Duplex (les deux, ex. sockets), Transform (Duplex où l'output dérive de l'input, ex. gzip). Le highWaterMark (16 KB par défaut pour les bytes, 16 objets en object mode) gouverne le buffer. Utilise pipeline() plutôt que .pipe() : il gère propagation d'erreur et cleanup. Itère un Readable avec for await, et interopère avec les Web Streams (Readable.toWeb(), Readable.fromWeb()). La backpressure mal gérée = OOM en prod.

🧠 Mental model — ASCII + analogie

        ┌───────────┐    push     ┌──────────┐
        │ Readable  │ ──────────► │ Writable │
        │ (source)  │             │  (sink)  │
        └───────────┘             └──────────┘
              │                         ▲
              │                         │
              └────────► Transform ─────┘
                       (lit + écrit)

  highWaterMark = capacité du seau interne (par défaut 16 KB)
  Quand le buffer interne du Writable est plein,
    write() retourne false → le Readable doit s'arrêter de pousser
    jusqu'à l'événement 'drain'. C'est ça, la backpressure.

  Modes d'un Readable :
    paused   : tu pull avec .read()  ─► "tireur"
    flowing  : data te tombe dessus via 'data' ou .pipe() ─► "robinet ouvert"

Analogie : un stream c'est un tuyau d'arrosage entre une source d'eau (le disque, le réseau) et une cible (un fichier, une socket). Le highWaterMark, c'est la capacité d'un seau au milieu du tuyau. Si la cible boit lentement, le seau se remplit et tu dois fermer le robinet sinon ça déborde. C'est la backpressure : stream.write() te dit "non, ralentis" en retournant false, et tu attends l'événement 'drain' pour reprendre.

🛠️ Code minimal (ts/js)

pipeline() — la bonne façon

ts
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
  createReadStream('input.txt'),
  createGzip({ level: 9 }),
  createWriteStream('input.txt.gz'),
);
// Erreurs propagées, descripteurs fermés automatiquement

Itérer un Readable comme un async iterator

ts
import { createReadStream } from 'node:fs';

const stream = createReadStream('huge.log', { encoding: 'utf8', highWaterMark: 64 * 1024 });
let lineCount = 0;
let leftover = '';

for await (const chunk of stream) {
  const lines = (leftover + chunk).split('\n');
  leftover = lines.pop() ?? '';
  lineCount += lines.length;
}
console.log(lineCount, 'lignes');

Custom Transform avec backpressure correcte

ts
import { Transform, TransformCallback } from 'node:stream';

class JsonLineParser extends Transform {
  private buffer = '';

  constructor() {
    super({ readableObjectMode: true }); // émet des objets, lit des bytes
  }

  _transform(chunk: Buffer, _enc: BufferEncoding, cb: TransformCallback) {
    this.buffer += chunk.toString('utf8');
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop() ?? '';
    try {
      for (const line of lines) {
        if (line.trim()) this.push(JSON.parse(line));
      }
      cb();
    } catch (err) {
      cb(err as Error);
    }
  }

  _flush(cb: TransformCallback) {
    if (this.buffer.trim()) {
      try {
        this.push(JSON.parse(this.buffer));
      } catch (err) {
        return cb(err as Error);
      }
    }
    cb();
  }
}

Écrire en respectant la backpressure (sans pipe)

ts
import { createReadStream, createWriteStream } from 'node:fs';

async function copyManual(src: string, dst: string) {
  const r = createReadStream(src);
  const w = createWriteStream(dst);

  for await (const chunk of r) {
    if (!w.write(chunk)) {
      // Buffer plein — on attend 'drain'
      await new Promise<void>((resolve, reject) => {
        w.once('drain', resolve);
        w.once('error', reject);
      });
    }
  }
  w.end();
  await new Promise<void>((resolve, reject) => {
    w.once('finish', resolve);
    w.once('error', reject);
  });
}

🎯 Patterns courants — 7

1. Streaming HTTP : ne charge jamais un body en mémoire

ts
import { createServer } from 'node:http';
import { pipeline } from 'node:stream/promises';
import { createWriteStream } from 'node:fs';

const server = createServer(async (req, res) => {
  if (req.method === 'POST' && req.url === '/upload') {
    try {
      await pipeline(req, createWriteStream('/tmp/upload.bin'));
      res.writeHead(200).end('ok');
    } catch (err) {
      res.writeHead(500).end((err as Error).message);
    }
  } else {
    res.writeHead(404).end();
  }
});
server.listen(3000);

Une lib comme formidable ou busboy fait du parsing multipart en streaming, sans jamais matérialiser le fichier entier.

2. CSV streaming avec transform

ts
import { pipeline } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
import { Transform } from 'node:stream';

const toJson = new Transform({
  writableObjectMode: false,
  readableObjectMode: true,
  transform(chunk, _enc, cb) {
    // Hypothèse : chunk est une ligne complète (en pratique, utilise un splitter en amont)
    const [id, name, email] = chunk.toString().split(',');
    this.push({ id, name, email });
    cb();
  },
});

const collect = new Transform({
  writableObjectMode: true,
  transform(row, _enc, cb) {
    console.log(row);
    cb();
  },
});

await pipeline(createReadStream('users.csv'), toJson, collect);

3. Combiner plusieurs sources avec Readable.from()

ts
import { Readable } from 'node:stream';

async function* generateNumbers() {
  for (let i = 0; i < 1_000_000; i++) {
    yield Buffer.from(`${i}\n`);
    if (i % 10_000 === 0) await new Promise((r) => setImmediate(r));
  }
}

const stream = Readable.from(generateNumbers());
// Compatible avec pipeline, .pipe(), etc.

4. Web Streams interop (Fetch API, Service Workers, Edge runtimes)

ts
import { Readable } from 'node:stream';

// Node Readable → Web ReadableStream
const nodeStream = Readable.from(['hello\n', 'world\n']);
const webStream: ReadableStream = Readable.toWeb(nodeStream);

// Web → Node
const fromWeb = Readable.fromWeb(webStream);

// Utiliser fetch() qui renvoie un Web ReadableStream
const res = await fetch('https://example.com/big.json');
const asNodeStream = Readable.fromWeb(res.body!);
for await (const chunk of asNodeStream) {
  process.stdout.write(chunk);
}

5. Limiter le débit (rate limit) avec un transform

ts
import { Transform, TransformCallback } from 'node:stream';
import { setTimeout as sleep } from 'node:timers/promises';

class ThrottleStream extends Transform {
  constructor(private readonly bytesPerSecond: number) {
    super();
  }

  // async _transform : Node attend la promesse avant le prochain chunk.
  // La backpressure du Transform fait le reste — le débit amont est borné.
  async _transform(chunk: Buffer, _enc: BufferEncoding, cb: TransformCallback) {
    try {
      const delayMs = (chunk.length / this.bytesPerSecond) * 1000;
      if (delayMs > 0) await sleep(delayMs);
      cb(null, chunk);
    } catch (err) {
      cb(err as Error);
    }
  }
}

Le setTimeout callback "fire-and-forget" du pattern naïf est fragile : si la source pousse 10 000 chunks d'un coup, tu programmes 10 000 timers en parallèle (chacun appelle cb() plus tard, mais ils sont tous déjà entrés dans _transform ? non — justement, le cb() retardé bloque le chunk suivant). La version async ci-dessus est plus lisible et sérialise naturellement. Pour un throttle précis (token bucket, lissage des rafales), ne te repose pas sur chunk.length seul : accumule un budget temporel (expectedTime += len / rate) et dors expectedTime - now, sinon la dérive d'horloge te fait throttler trop fort.

6. Tee — broadcast un Readable vers N destinations

ts
import { PassThrough } from 'node:stream';

function tee(readable: NodeJS.ReadableStream, n: number) {
  const outs = Array.from({ length: n }, () => new PassThrough());
  readable.on('data', (chunk) => outs.forEach((o) => o.write(chunk)));
  readable.on('end', () => outs.forEach((o) => o.end()));
  readable.on('error', (err) => outs.forEach((o) => o.destroy(err)));
  return outs;
}

// Attention : backpressure non gérée — utilise une lib comme `stream-tee` en prod

7. Compter / observer sans modifier (passthrough)

ts
import { PassThrough } from 'node:stream';

const counter = new PassThrough();
let total = 0;
counter.on('data', (chunk) => (total += chunk.length));
// Insère counter dans un pipeline pour mesurer le débit sans transformer.

🔄 Versions — Node 18 / 20 / 22 / 24

  • Node 18stream/promises (pipeline, finished) stable. Async iterators sur Readable stable. Web Streams expérimentaux (ReadableStream, WritableStream, TransformStream exposés globaux).

  • Node 20 — Web Streams stables. Readable.toWeb() / Readable.fromWeb() / Writable.toWeb() / Writable.fromWeb() stables. stream.compose() pour composer plusieurs streams en un Duplex.

  • Node 22stream/consumers (text(), json(), buffer(), arrayBuffer(), blob()) stable. Performance des Web Streams améliorée (~30 % sur certains scénarios). ReadableStream.from(iterable) standard.

  • Node 24 — Web Streams encore plus rapides (réécriture interne). pipeline() supporte AbortSignal plus finement. node:stream reste l'API canonique pour la perf maximale sur Node.

💡 En 2026, sur Node 22/24, le Web Streams est la voie à privilégier pour le code portable (Cloudflare Workers, Deno, Bun, Edge runtimes). Le Node streams reste plus performant et plus ergonomique sur Node pur. Readable.toWeb() / fromWeb() font le pont.

⚠️ Pitfalls — 11

  1. .pipe() sans handler d'erreurr.pipe(w) ne propage pas les erreurs et ne ferme pas la chaîne. Si r émet 'error', w reste ouvert. Utilise pipeline().

  2. Buffer mémoire qui explose — si tu fais chunks.push(chunk) puis Buffer.concat(chunks) à la fin, tu matérialises tout le fichier. Sur 4 GB de logs, tu OOM. Stream les chunks directement vers la sortie.

  3. Backpressure ignoréefor await (const c of req) await someSlowOp(c) est OK (chaque iteration attend). Mais req.on('data', async (c) => await someSlowOp(c)) ne respecte pas la backpressure : les chunks s'accumulent en mémoire car le Readable continue à les pousser.

  4. 'data' event après .pause() — passer en mode flowing avec 'data' est irréversible côté .pause() si tu n'as pas le bon état interne. Préfère for await.

  5. highWaterMark mal réglé — trop bas = beaucoup de context switches CPU ; trop haut = pic mémoire. Pour du HTTP, 64 * 1024 est souvent un bon compromis ; pour des objets lourds, ajuste l'object mode (10-100 typiquement).

  6. end() non appelé — un Writable qui n'a pas reçu .end() ne fermera jamais le fichier. Le contenu peut ne pas être flushé. pipeline() le fait pour toi.

  7. Object mode mélangé avec bytes — un Writable créé avec objectMode: true ne sait pas écrire des Buffers vers un fichier. Tu dois sérialiser avant.

  8. Lire req.body deux fois — un Readable HTTP ne peut être consommé qu'une seule fois. Si tu veux le re-lire, buffer-le explicitement (en mémoire ou sur disque).

  9. new Stream au lieu de ReadableStream (sans suffixe) est la base abstraite, pas une vraie classe utilisable. Utilise toujours Readable/Writable/Duplex/Transform/PassThrough.

  10. Mélanger callback cb() et this.push() mal compris — dans un _transform, cb(null, value) est équivalent à this.push(value); cb(). Tu ne dois jamais appeler cb deux fois.

  11. Streams ne survivent pas au process exit — un pipeline() interrompu par process.exit() ne nettoie pas. Si tu veux du write atomique, écris dans un fichier temporaire puis fs.rename à la fin (atomique au niveau filesystem).

🧪 Testing — node --test

ts
// streams.test.ts
import { test } from 'node:test';
import assert from 'node:assert/strict';
import { Readable, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';

test('pipeline propage les erreurs', async () => {
  const source = Readable.from(['a', 'b', 'c']);
  const fail = new Transform({
    transform(_chunk, _enc, cb) {
      cb(new Error('boom'));
    },
  });
  await assert.rejects(pipeline(source, fail), /boom/);
});

test('Transform respecte la backpressure', async () => {
  const chunks: Buffer[] = [];
  const collect = new Transform({
    writableObjectMode: true,
    transform(chunk, _enc, cb) {
      chunks.push(chunk);
      cb();
    },
  });

  const source = Readable.from((function* () {
    for (let i = 0; i < 100; i++) yield Buffer.from(`item-${i}`);
  })());

  await pipeline(source, collect);
  assert.equal(chunks.length, 100);
});

test('Web stream interop', async () => {
  const node = Readable.from(['hello']);
  const web = Readable.toWeb(node);
  const reader = web.getReader();
  const { value } = await reader.read();
  assert.equal(new TextDecoder().decode(value as Uint8Array), 'hello');
});

🎬 Cas d'usage concrets

Scénario 1 — Streaming CSV factures Pennylane

Intégration comptable : import quotidien de 200 000 lignes de factures depuis l'export CSV Pennylane vers Postgres (table invoices). Le fichier fait 350 MB. Chargement naïf (readFile + csv-parse/sync) = ~1.8 GB RSS et OOM sur le worker 1 GB.

Pipeline streaming : createReadStream(file) → décompression zlib.createGunzip (le CSV arrive en .gz) → csv-parse (transform stream) → validation Zod par batch de 500 (transform) → pg-copy-streams writer pour bulk insert. Tout est connecté avec pipeline() qui propage erreurs et nettoie les ressources. Backpressure naturel : si Postgres ralentit, le gunzip pause la lecture S3.

Mesure : 200 k lignes ingérées en 38 s, RSS stable à 95 MB, débit Postgres 5 200 rows/s grâce à COPY. Le passage en streaming a aussi permis d'ajouter une retry par batch sans tout recharger.

Scénario 2 — Export documents cabinet juridique gros volumes

Un cabinet exporte les dossiers d'un client (audit GED) : 12 000 PDF totalisant 8 GB, livrés en un ZIP signé. Impossible de tout buffer en mémoire.

Pipeline : Readable.from(asyncIteratorDeFichiers) (yield les fichiers depuis Postgres avec leur chemin S3) → transform qui ouvre un read stream S3 par fichier → archiver (transform-like) qui agrège en ZIP streaming → write stream HTTP response. Le client télécharge progressivement, le serveur ne charge jamais plus que ~10 MB à la fois (buffer ZIP interne).

Astuce qui a sauvé : highWaterMark: 256 * 1024 sur le read S3 pour ne pas saturer l'event loop avec des micro-chunks de 16 KB. Et pipeline() avec un finally qui logge bytesWritten pour audit (RGPD trace export).

Scénario 3 — Streaming réponse LLM e-commerce (assistant shopping)

Site e-commerce avec assistant conversationnel branché sur Claude API. Le user pose une question, on stream les tokens en SSE vers le navigateur (UX critique : premier token < 600 ms).

Pipeline Web Streams (interopérable avec fetch) : fetch(claudeUrl).body (ReadableStream) → pipeThrough(TextDecoderStream()) → transform SSE parser (extrait data: {...}) → transform qui re-encode en SSE pour le client → écriture sur res (Node). Conversion via Readable.fromWeb / Readable.toWeb car le HTTP server Node attend des Node streams. AbortController propagé du browser (req.on('close')) jusqu'à fetch upstream — si l'utilisateur ferme l'onglet, on coupe la facturation immédiatement.

Bénéfice perçu : TTFT 420 ms, perception conversationnelle, et coût LLM réduit de 15 % (sessions abandonnées coupées net).

🛠️ Exemple end-to-end

Pipeline streaming complet pour ingestion CSV facturation : décompression + parsing + validation + COPY Postgres avec backpressure et reporting de progression.

ts
import { createReadStream } from "node:fs";
import { pipeline } from "node:stream/promises";
import { Transform } from "node:stream";
import { createGunzip } from "node:zlib";
import { parse } from "csv-parse";
import { from as copyFrom } from "pg-copy-streams";
import pg from "pg";
import { z } from "zod";

const InvoiceRow = z.object({
  external_id: z.string().min(1),
  customer_siren: z.string().regex(/^\d{9}$/),
  amount_cents: z.coerce.number().int().nonnegative(),
  issued_at: z.coerce.date(),
});
type InvoiceRow = z.infer<typeof InvoiceRow>;

function validateBatch(opts: { onProgress?: (n: number) => void; batchSize?: number } = {}) {
  const batchSize = opts.batchSize ?? 500;
  let buffer: InvoiceRow[] = [];
  let total = 0;
  let rejected = 0;

  return new Transform({
    objectMode: true,
    transform(chunk: Record<string, string>, _enc, cb) {
      const res = InvoiceRow.safeParse(chunk);
      if (!res.success) {
        rejected++;
        return cb();
      }
      buffer.push(res.data);
      total++;
      if (buffer.length >= batchSize) {
        const out = buffer.map(rowToCopyLine).join("");
        buffer = [];
        opts.onProgress?.(total);
        return cb(null, out);
      }
      cb();
    },
    flush(cb) {
      if (buffer.length) {
        const out = buffer.map(rowToCopyLine).join("");
        buffer = [];
        return cb(null, out);
      }
      console.log(`ingest done: total=${total} rejected=${rejected}`);
      cb();
    },
  });
}

function rowToCopyLine(r: InvoiceRow) {
  // tab-separated, no quoting needed since we validated with zod
  return `${r.external_id}\t${r.customer_siren}\t${r.amount_cents}\t${r.issued_at.toISOString()}\n`;
}

async function ingest(file: string) {
  const client = new pg.Client({ connectionString: process.env.DATABASE_URL });
  await client.connect();
  const copyStream = client.query(
    copyFrom(`COPY invoices (external_id, customer_siren, amount_cents, issued_at) FROM STDIN`)
  );

  try {
    await pipeline(
      createReadStream(file),
      createGunzip(),
      parse({ columns: true, skip_empty_lines: true, trim: true }),
      validateBatch({
        batchSize: 500,
        onProgress: (n) => {
          if (n % 10_000 === 0) console.log(`progress: ${n} rows`);
        },
      }),
      copyStream
    );
  } finally {
    await client.end();
  }
}

await ingest(process.argv[2]);

Points clés : pipeline() propage les erreurs et clôt chaque étage proprement, transform en objectMode côté parser puis bascule en bytes pour COPY, batch interne dans le transform pour amortir le coût syscall, backpressure naturelle (si Postgres rame, gunzip ralentit la lecture disque), pas d'accumulation mémoire.


🔁 Quand utiliser / éviter

OutilUtiliser quandÉviter quand
pipeline()Tout enchaînement de streamsQuand tu as besoin de contrôle fin (split, broadcast)
.pipe()Demo, scripts one-shotProduction (pas d'error propagation)
for awaitItérer un Readable, traiter ligne par ligneQuand tu veux gérer la backpressure manuellement vers un Writable
Custom TransformLogique métier de transformationQuand un PassThrough + listener suffit
Web StreamsCode portable (Workers, Edge)Hot path Node-only (Node streams plus rapides)
Readable.from(iterable)Adapter une source itérableSi tu peux directement utiliser un async generator
Object modePipeline de structures (rows DB, events)Streaming de bytes (overhead inutile)

🧬 Aller plus loin

Anatomie de la backpressure — le "vrai" mécanisme

Quand tu fais pipe() ou pipeline(), il y a une boucle interne qui fait grossièrement ça :

ts
function pipe(src, dst) {
  src.on('data', (chunk) => {
    const ok = dst.write(chunk);
    if (!ok) src.pause();        // ← buffer dst plein, on freeze la source
  });
  dst.on('drain', () => src.resume()); // ← buffer dst vidé, on reprend
}

C'est ce ping-pong pause / resume qui implémente la backpressure. Si tu fais dst.write manuellement, c'est à toi de regarder le retour et d'attendre drain. Le pattern for await + await drain simule la même mécanique.

Composer un Duplex avec stream.compose

ts
import { compose } from 'node:stream';
import { createGzip, createGunzip } from 'node:zlib';
import { Transform } from 'node:stream';

const toUpper = new Transform({
  transform(chunk, _enc, cb) {
    cb(null, chunk.toString().toUpperCase());
  },
});

const upperThenGzip = compose(toUpper, createGzip());
// upperThenGzip est un Duplex : tu peux .write() dedans, et lire le résultat gzippé en uppercase

⚠️ N'appelle pas ta variable pipeline : tu masquerais l'import node:stream/promises et un futur lecteur croira composer alors que compose ne consomme rien tant que personne ne lit le Duplex retourné. compose est lazy ; pipeline est eager (il démarre le flux et résout/rejette une promesse).

Très pratique pour exposer une lib qui transforme un stream en N étapes mais retourne un seul Duplex au consommateur.

stream/consumers — collecter rapidement

ts
import { json, text, buffer, arrayBuffer, blob } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const stream = Readable.from(['{"hello":', '"world"}']);
const obj = await json(stream);
console.log(obj); // { hello: "world" }

C'est l'équivalent côté Node de await response.json() du fetch.

Pourquoi req.body HTTP/1 est un Readable, mais HTTP/2 est plus complexe

En HTTP/1, une requête a un body séquentiel : req est directement un IncomingMessage qui est un Readable. En HTTP/2, plusieurs streams se multiplexent sur une même connexion TCP : chaque request a son propre Http2Stream (Duplex). Côté code applicatif, l'API ressemble, mais les implications sur le backpressure sont différentes (le contrôle de flux HTTP/2 est plus granulaire — flow control par stream).

Detect un memory leak via streams en prod

Symptôme : RSS qui grimpe linéairement sans plafond. Cause fréquente : un .pipe() orphelin qui retient des chunks parce que la cible est lente et que personne ne gère la backpressure. Outils :

  • --inspect + Chrome DevTools Memory tab → snapshots successifs, cherche les Buffer rétenus.
  • process.memoryUsage() exposé en métrique : heapUsed, external, arrayBuffers.
  • node:diagnostics_channel pour tracer les pipelines.

Readable à partir d'une queue interne

Pattern utile pour exposer un Readable depuis un event emitter custom :

ts
import { Readable } from 'node:stream';

class EventBuffer extends Readable {
  private queue: Buffer[] = [];
  private flowing = false;

  push2(data: Buffer) {
    this.queue.push(data);
    if (this.flowing) this.flush();
  }

  _read() {
    this.flowing = true;
    this.flush();
  }

  private flush() {
    while (this.queue.length) {
      if (!this.push(this.queue.shift()!)) {
        this.flowing = false;
        return;
      }
    }
  }
}

L'invariant clé : this.push(chunk) retourne false quand le buffer interne est plein → on doit arrêter de pousser et attendre le prochain _read.

Streams et performances — chiffres réels

Sur un MacBook M2 (Node 22) :

  • fs.readFileSync 1 GB : ~600 ms (et OOM si > RAM dispo).
  • pipeline(fs.createReadStream, fs.createWriteStream) 1 GB : ~400 ms et < 100 MB RSS.
  • pipeline(fs.createReadStream, zlib.createGzip, fs.createWriteStream) 1 GB → ~50 MB gzippé : ~8 s, le bottleneck est CPU (zlib).

Le streaming n'est pas plus rapide en absolu (parfois même un peu plus lent), mais il borne la mémoire et permet de traiter des données > RAM.

Comment un staff engineer raisonne sur un pipeline en prod

La question n'est jamais "est-ce que ça marche sur mon laptop" mais "qu'est-ce qui se passe quand un étage ralentit ou meurt". Trois axes :

  1. Où est le goulot ? Dans A → B → C, le débit du pipeline = débit de l'étage le plus lent. La backpressure remonte automatiquement, donc si Postgres rame, c'est le createReadStream en tête qui se met en pause (et pas l'inverse). Instrumente chaque étage : stream.readableLength / stream.writableLength te disent combien d'octets/objets sont bloqués dans le buffer interne d'un étage — un buffer qui reste plein en permanence = cet étage est en aval du goulot.

  2. Qu'est-ce qui fuit en cas d'erreur ? Avec pipeline(), une erreur sur n'importe quel étage destroy() tous les autres. Avec .pipe(), l'étage en erreur meurt mais les autres restent ouverts → file descriptors et connexions DB qui fuient. Sur un service long-running, c'est ça qui provoque le EMFILE: too many open files à 3h du matin.

  3. Comment ça s'annule ? Un AbortSignal passé à pipeline() est le seul mécanisme propre. Sans lui, une requête HTTP cliente abandonnée (req.on('close')) laisse le pipeline tourner — tu continues à lire S3 et à payer le LLM pour un client parti. Relie toujours la déconnexion cliente à l'annulation amont.

Matrice failure-mode → symptôme → fix

Mode de défaillanceSymptôme observableCause racineFix
Backpressure ignorée (on('data', async …))RSS monte linéairement, pas de plafondLe handler async ne freine pas la sourcefor await ou pipeline()
.pipe() sans error handlingEMFILE, connexions DB orphelinesErreur non propagée, étages non ferméspipeline()
highWaterMark trop haut en object modePic RSS, GC pausesN objets × taille objet > RAMBorne highWaterMark, calcule la mémoire max
cb() jamais appelé dans _transformPipeline figé, aucun chunk après le N-èmeBranche d'erreur qui return sans cbcb() dans tous les chemins, y compris catch
cb() appelé deux foisERR_MULTIPLE_CALLBACK, crashcb() puis this.push()+cb()Un seul cb par appel
Annulation cliente non propagéeCoût upstream continue, event loop occupéPas d'AbortSignal relié à req.closepipeline(..., { signal }) + req.on('close', () => ctrl.abort())
Write non-atomique interrompuFichier corrompu/tronqué à mi-cheminprocess.exit ou crash en plein fluxÉcrire .tmp puis fs.rename (atomique POSIX)

Observabilité minimale d'un pipeline

ts
import { pipeline } from 'node:stream/promises';
import { PassThrough } from 'node:stream';
import { performance } from 'node:perf_hooks';

function metered(label: string) {
  const tap = new PassThrough();
  let bytes = 0;
  const start = performance.now();
  tap.on('data', (c) => { bytes += c.length; });
  tap.on('end', () => {
    const ms = performance.now() - start;
    console.log(`[${label}] ${bytes} bytes en ${ms.toFixed(0)}ms = ${(bytes / 1e6 / (ms / 1000)).toFixed(1)} MB/s`);
  });
  return tap;
}

await pipeline(source, metered('après-source'), transform, metered('après-transform'), sink);

Un PassThrough instrumenté à chaque jonction te donne le débit par segment — c'est ainsi qu'on localise le goulot sans profiler. En prod, remplace les console.log par des histogrammes Prometheus (stream_segment_throughput_bytes).

🔗 Liens

Streams + AbortSignal — annulation propre

ts
import { pipeline } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
import { Writable } from 'node:stream';

const ctrl = new AbortController();
setTimeout(() => ctrl.abort(), 5000);

try {
  await pipeline(
    createReadStream('huge.bin'),
    new Writable({ write(_c, _e, cb) { cb(); } }),
    { signal: ctrl.signal },
  );
} catch (err) {
  if ((err as any).code === 'ABORT_ERR') {
    console.log('annulé proprement');
  }
}

pipeline() propage le signal à toutes les étapes, ferme les handles, libère les ressources.

Object mode et highWaterMark

En object mode, highWaterMark compte des objets, pas des bytes. Le défaut est 16. Pour un pipeline de logs DB :

ts
const stream = new Transform({
  objectMode: true,
  highWaterMark: 1000, // 1000 rows en buffer
  transform(row, _enc, cb) { /* ... */ cb(); },
});

Si tu utilises object mode mais que tes "objets" sont gros (chacun 10 MB par ex.), tu peux saturer la RAM avec un highWaterMark haut. Calcule la taille mémoire totale potentielle.

Web Streams + Node Streams — petit piège

Readable.toWeb(nodeStream) consomme le node stream une fois : tu ne peux pas l'utiliser à nouveau en parallèle. Idem fromWeb consume le Web stream. Si tu veux les deux côtés, tee : nodeStream.pipe(passThrough1) + nodeStream.pipe(passThrough2) (et n'oublie pas la backpressure).

Comparaison rapide des frameworks streaming

LibCas d'usageParticularité
node:stream natifPerformance Node, API richeLe plus rapide sur Node
Web StreamsCode portable Edge/WorkersAPI plus simple, moins d'options
RxJSLogique réactive complexe, hot/coldPas backpressure native, API observables
Highland.jsStreaming fonctionnelMaintenance limitée
mississippi/pumpHelpers legacyPré-stream/promises, à éviter en 2026

🏋️ Exercices

Progression : implémenter → production-grade → casser puis réparer. Fais-les sur Node 22+ avec node --test.

1. Line splitter robuste (implémenter)

Objectif : écrire un Transform splitLines() qui prend des bytes arbitraires (chunks coupés n'importe où) et émet une ligne complète par push, en object mode côté lecture.

Indice/Solution : garde un leftover string entre les chunks. _transform : const parts = (leftover + chunk).split('\n'); leftover = parts.pop()!; for (const l of parts) this.push(l);. Le piège : le _flush doit émettre leftover s'il est non vide (dernière ligne sans \n final). Teste avec des chunks d'un seul octet (Readable.from([...buf].map(b => Buffer.from([b])))) pour prouver la robustesse aux coupures.

2. Bornage mémoire prouvé (production-grade)

Objectif : copier un fichier de 2 GB avec un Transform qui uppercase, en gardant le RSS sous 150 MB, et le prouver en loggant process.memoryUsage().rss toutes les secondes pendant la copie.

Indice/Solution : pipeline(createReadStream(src), upperTransform, createWriteStream(dst), { signal }). Démarre un setInterval qui logge le RSS, clearInterval dans le finally. Si le RSS grimpe, c'est que tu as cassé la backpressure quelque part (ex. un this.push en boucle sans respecter sa valeur de retour, ou un buffer interne accumulé). Compare avec une version readFileSync+writeFileSync pour voir l'OOM/pic.

3. Annulation propagée bout-en-bout (production-grade)

Objectif : un serveur HTTP qui stream un gros fichier vers le client ; si le client ferme la connexion (req.on('close')), le pipeline s'arrête en < 50 ms et libère le file descriptor.

Indice/Solution : const ctrl = new AbortController(); res.on('close', () => ctrl.abort()); puis pipeline(createReadStream(file), res, { signal: ctrl.signal }). Catch l'ABORT_ERR sans le logguer comme une vraie erreur. Vérifie avec lsof -p <pid> que le fd du fichier disparaît immédiatement après un curl interrompu (curl ... | head -c 100).

4. Fan-out avec backpressure correcte (production-grade)

Objectif : broadcaster un Readable vers N Writable lents sans faire exploser la mémoire — contrairement au pattern tee() naïf du doc qui ignore la backpressure.

Indice/Solution : le débit du fan-out doit être celui du consommateur le plus lent. Dans le for await (const chunk of source), fais await Promise.all(outs.map(o => o.write(chunk) ? Promise.resolve() : once(o, 'drain'))). Ainsi la source ne reprend que quand tous les sinks ont du budget. Teste avec un sink rapide + un sink lent (un Writable qui setTimeout(cb, 50)) et vérifie que le readableLength de la source ne diverge pas.

5. Casser puis réparer — le cb() manquant (break-then-fix)

Objectif : on te donne un Transform qui se fige silencieusement après ~16 KB. Diagnostique et répare.

ts
class Buggy extends Transform {
  _transform(chunk, _enc, cb) {
    if (chunk.includes(0x00)) return;        // bug
    this.push(chunk.toString().toUpperCase());
    cb();
  }
}

Indice/Solution : la branche return sur un octet nul ne rappelle jamais cb(). Node attend que cb soit appelé avant de pousser le chunk suivant ; passé le highWaterMark (16 KB), tout se fige sans erreur. Fix : if (chunk.includes(0x00)) return cb(); (skip) ou return cb(new Error('null byte')) selon la sémantique voulue. Leçon : chaque chemin de _transform/_flush doit terminer par exactement un cb().

6. Token-bucket throttle précis (break-then-fix)

Objectif : le ThrottleStream simple dérive (throttle trop fort sur des chunks irréguliers). Réécris-le en token-bucket pour un débit moyen exact sur une rafale.

Indice/Solution : maintiens let scheduledTime = performance.now(). Dans _transform : scheduledTime += (chunk.length / rate) * 1000; const wait = scheduledTime - performance.now(); if (wait > 0) await sleep(wait);. Le secret : on accumule le temps théorique plutôt que de dormir un délai par chunk indépendamment — les chunks courts qui arrivent en retard "rattrapent" et le débit moyen converge vers rate. Mesure le débit réel sur 1000 chunks de tailles aléatoires et vérifie l'écart < 2 %.

🎤 En entretien

Q : Pourquoi pipeline() plutôt que .pipe() ? Donne le scénario de prod qui casse. R : .pipe() ne propage pas les erreurs et ne ferme pas les étages en amont/aval quand l'un meurt → file descriptors et connexions DB qui fuient, et à terme EMFILE: too many open files sur un service long-running. pipeline() destroy() toute la chaîne sur erreur et supporte AbortSignal. .pipe() reste acceptable pour un script one-shot.

Q : Explique la backpressure au niveau octet. Que retourne write() et qu'attends-tu ? R : writable.write(chunk) retourne false quand le buffer interne dépasse le highWaterMark (16 KB par défaut, 16 objets en object mode). Ce false est un signal "ralentis", pas un échec : le chunk est quand même accepté. La source doit alors arrêter de pousser jusqu'à l'événement 'drain'. .pipe()/pipeline() câblent ce pause/resume automatiquement ; en manuel c'est à toi de l'attendre. L'ignorer = accumulation mémoire non bornée = OOM.

Q : for await (const c of stream) respecte-t-il la backpressure ? Et stream.on('data', async …) ? R : for await la respecte — chaque itération await avant de tirer le chunk suivant, donc un traitement lent freine la source. on('data', async cb) ne la respecte pas : l'event emitter ne await pas ton callback, le Readable continue à émettre et les chunks s'accumulent. C'est le piège classique qui transforme un service en bombe mémoire.

Q : Node streams ou Web Streams en 2026 ? Comment décides-tu ? R : Web Streams pour le code portable (Cloudflare Workers, Deno, Bun, Edge, interop fetch). Node streams pour le hot path Node-only : plus rapides, API plus riche (compose, pipeline avec signal, object mode ergonomique). Le pont est Readable.toWeb() / Readable.fromWeb() — mais attention, chaque conversion consomme le stream une fois. En pratique : Web Streams aux frontières portables, Node streams au cœur perf.

Bibliothèque tech perso — Achref