Skip to content

Les opérateurs RxJS qu'il faut vraiment connaître en 2026

TL;DR — Sur les 100+ opérateurs de la lib, vous utiliserez 25 au quotidien. Le vrai art n'est pas de tous les connaître mais de choisir le bon : switchMap annule la requête précédente, mergeMap lance tout en parallèle, concatMap met en file, exhaustMap ignore tant que le courant tourne. BehaviorSubject stocke une valeur initiale et la rejoue au subscribe, ReplaySubject(n) rejoue les n dernières, Subject ne rejoue rien, AsyncSubject n'émet qu'à la complétion. Cancellation = unsubscribe automatique via takeUntilDestroyed(DestroyRef). Et retryWhen est déprécié depuis RxJS 7.8 : utilisez retry({ delay, count, resetOnSuccess }).

🧠 Mental model — ASCII + analogie

Pensez à un Observable comme à un tapis roulant d'usine. Les valeurs sont des paquets qui défilent. Les opérateurs sont des postes de travail alignés le long du tapis : chacun lit, transforme, filtre ou route. Le subscribe est le carton final qui reçoit ce qui sort du dernier poste.

                        ┌─────────────────────────────────────┐
   Observable           │     Pipeline (lazy, pull-push)      │           Observer
   (source)             │                                     │           (sink)
                        │                                     │
   ──●─●──●─●──●──>     │  map → filter → switchMap → ...     │  ──▲─▲──▲──>  next()
                        │                                     │                   error()
                        │  Rien ne coule tant que personne    │                   complete()
                        │  ne subscribe (cold) — ou tout      │
                        │  coule en parallèle (hot)            │
                        └─────────────────────────────────────┘

   Création        Combinaison       Transformation     Filtrage         Erreur
   ─────────       ─────────────     ──────────────     ────────         ──────
   of              combineLatest     map                filter            catchError
   from            withLatestFrom    switchMap          distinctUntil…    retry({...})
   fromEvent       forkJoin          mergeMap           debounceTime
   interval        zip               concatMap          throttleTime
   timer           merge             exhaustMap         audit / sample
   defer           concat                               take / skip

Analogie SwitchMap vs MergeMap vs ConcatMap vs ExhaustMap — vous êtes serveur dans un café :

  • switchMap — un client commande, vous courez à la cuisine ; il change d'avis, vous annulez la commande en cours et lancez la nouvelle. C'est l'autocomplete d'une barre de recherche.
  • mergeMap — plusieurs clients commandent, vous lancez tout en parallèle, le premier prêt sort en premier. C'est l'upload simultané de fichiers indépendants.
  • concatMap — vous prenez la commande, vous attendez qu'elle soit servie, puis vous passez à la suivante. Ordre garanti, pas de chevauchement. C'est une file de messages chat.
  • exhaustMap — vous ignorez les nouvelles commandes tant que la courante n'est pas finie. C'est un bouton "Login" qu'on ne veut pas spammer.

Le contrat d'abonnement — ce qu'un senior comprend que le débutant ignore

Un Observable n'est pas une Promise. Trois différences que vous devez pouvoir réciter en entretien :

  1. Lazy & multi-valeurs — rien ne s'exécute tant que subscribe() n'est pas appelé (une Promise s'exécute à la création, eager). Un Observable peut émettre 0, 1, N ou ∞ valeurs ; une Promise résout exactement une fois.
  2. Cancellablesubscribe() retourne une Subscription. unsubscribe() déclenche le teardown (la fonction retournée par le constructeur d'Observable), qui libère timers, sockets, listeners. Une Promise ne s'annule pas — d'où AbortController greffé par-dessus fetch. C'est la raison pour laquelle switchMap sur un HTTP annule réellement la requête en vol : son unsubscribe propage le teardown qui appelle xhr.abort().
  3. Synchrone OU asynchroneof(1,2,3) émet synchroniquement dans le thread courant, avant la ligne suivante. fromEvent, timer, http.get() sont async. Ne supposez jamais qu'un subscribe est async : un of().subscribe() a déjà fini quand la ligne d'après s'exécute. Cette ambiguïté est la source de bugs subtils (startWith qui « manque » un set de signal parce qu'il a tiré trop tôt).
ts
// La structure d'un Observable « fait main » révèle tout le contrat :
new Observable<number>((subscriber) => {
  subscriber.next(1);              // push une valeur
  const id = setInterval(() => subscriber.next(Date.now()), 1000);
  // subscriber.error(e) / subscriber.complete() terminent le flux (mutuellement exclusifs, 1 seule fois)
  return () => clearInterval(id);  // TEARDOWN : appelé à unsubscribe / complete / error
});

Corollaire mémoire : tout ce qui ouvre une ressource (interval, socket, listener) doit la fermer dans le teardown, sinon unsubscribe ne suffit pas et vous fuitez. Les opérateurs built-in respectent ce contrat ; vos Observables custom doivent le respecter aussi.

Cold vs Hot vs schedulers — le modèle complet

COLD (unicast)                         HOT (multicast)
chaque subscribe = nouvelle exécution   les subscribers partagent UNE exécution
http.get(), of(), from(), interval()    Subject, fromEvent, share()/shareReplay()
   sub A ──[req 1]──>                       producteur ──┬──> sub A (rejoint en cours de route)
   sub B ──[req 2]──>  (2 requêtes!)                     └──> sub B

Un http.get() est cold : 3 subscribe = 3 requêtes HTTP. C'est la cause #1 des doublons de requêtes en Angular (un | async dans le template + un subscribe en TS sur le même observable = 2 appels). La conversion cold→hot se fait par share/shareReplay. Les schedulers (asyncScheduler, asapScheduler, animationFrameScheduler, queueScheduler) contrôlent quand et sur quelle boucle les émissions sont planifiées — vous les croiserez surtout en testing (TestScheduler) et en perf (observeOn(animationFrameScheduler) pour aligner le rendu sur la peinture).

🛠️ Code minimal (ts + html)

Création — les 6 fondamentaux

ts
import {
  of, from, fromEvent, interval, timer, defer, EMPTY, NEVER, throwError,
} from 'rxjs';

// of : émet ses arguments puis complete. Synchrone.
of(1, 2, 3); // 1, 2, 3, complete

// from : convertit Promise / Iterable / array-like / observable-like
from([1, 2, 3]);
from(fetch('/api/users')); // Promise -> Observable

// fromEvent : DOM events, Node EventEmitter, etc.
fromEvent<KeyboardEvent>(input, 'input');

// interval : émet 0,1,2... toutes les N ms (jamais complete)
interval(1000);

// timer(initialDelay, period?) : plus flexible qu'interval
timer(2000);          // 1 émission à 2s puis complete
timer(2000, 500);     // 1ère à 2s, puis toutes les 500ms

// defer : crée une factory — chaque subscribe rejoue from scratch
const now$ = defer(() => of(Date.now()));

defer est précieux : il permet de différer la création de l'Observable jusqu'au subscribe, donc d'avoir un comportement par-souscripteur (utile pour les retries qui doivent relire Date.now() à chaque tentative).

Combinaison — combineLatest, withLatestFrom, forkJoin, zip, merge, concat

ts
import { combineLatest, withLatestFrom, forkJoin, zip, merge, concat } from 'rxjs';

// combineLatest : émet dès que chaque source a émis au moins 1 fois,
// puis à chaque émission d'une source en re-combinant la dernière valeur des autres.
combineLatest([user$, prefs$, theme$]).subscribe(([u, p, t]) => render(u, p, t));

// withLatestFrom : un Observable principal "tire" la dernière valeur des autres
search$.pipe(withLatestFrom(filters$)).subscribe(([q, f]) => /* ... */);

// forkJoin : attend que chaque source COMPLETE, puis émet le tuple final.
// Parfait pour les appels HTTP en parallèle (1-shot).
forkJoin({
  user: http.get('/me'),
  perms: http.get('/perms'),
}).subscribe(({ user, perms }) => /* ... */);

// zip : émet par index — la 1ère valeur de chaque, puis la 2e, etc.
// Avance au rythme du plus lent.
zip(clicks$, ticks$);

// merge : aplatit tout, fire & forget, ordre par arrivée
merge(saveSuccess$, autoSaveSuccess$);

// concat : enchaîne — attend qu'une source complete avant de subscribe à la suivante
concat(loadCache$, fetchFresh$);

Erreur fréquente : combineLatest([a$, b$]) n'émet rien tant que a$ ou b$ n'a pas émis au moins une fois. Si b$ est un Subject (pas un BehaviorSubject), vous bloquez votre pipeline jusqu'à la 1ère valeur.

Transformation — l'arbre de décision

ts
import { map, switchMap, mergeMap, concatMap, exhaustMap, scan } from 'rxjs';

// map : 1-to-1
of(1, 2, 3).pipe(map(x => x * 2)); // 2, 4, 6

// scan : reducer (comme Array.reduce mais en stream)
clicks$.pipe(scan((acc) => acc + 1, 0));

// switchMap : annule l'inner précédent (autocomplete, route params -> HTTP)
search$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(q => http.get(`/search?q=${q}`)),
);

// mergeMap : parallèle, ordre non garanti (uploads, fire-and-forget)
files$.pipe(mergeMap(f => upload(f), /* concurrent= */ 3));

// concatMap : sériel, ordre garanti (commandes API ordonnées)
commands$.pipe(concatMap(cmd => http.post('/cmd', cmd)));

// exhaustMap : ignore tant que l'inner tourne (login, refresh token)
loginClicks$.pipe(exhaustMap(creds => auth.login(creds)));

Arbre de décision :

Besoin de la dernière émission seulement ?
├── OUI → switchMap          (autocomplete, search, route change)
└── NON → Importe l'ordre ?
          ├── OUI, séquentiel → concatMap   (commandes, chat msgs)
          ├── OUI, mais éviter les doublons en vol → exhaustMap (login)
          └── NON, parallèle → mergeMap    (uploads, batch)

Tableau de tradeoffs — la matrice complète à mémoriser :

OpérateurÉmissions en attenteInner précédentConcurrenceBackpressure (file)Risque mémoire
switchMapécrase la précédenteannulé1aucune (drop implicite)nul — un seul inner vivant
mergeMaptoutes lancéesconservé∞ (ou n)aucune ⇒ explosionélevé si source rapide + inner lent
concatMapmises en fileconservé1file non bornéeélevé si la file grossit sans fin
exhaustMapignoréesconservé1aucune (drop des nouvelles)nul — un seul inner vivant

Le piège staff sur mergeMap : sans cap de concurrence (mergeMap(fn, concurrent)), une source qui émet plus vite que les inners ne complètent crée un nombre non borné de souscriptions en vol — c'est un memory leak ET un DDoS sur votre propre backend. Sur tout mergeMap qui tape le réseau, posez un cap explicite (mergeMap(f => upload(f), 4)). Symétriquement, un concatMap sur une source rapide accumule une file qui grandit indéfiniment si l'inner est plus lent que le rythme d'arrivée : à ce stade vous voulez probablement switchMap (garder le dernier) ou exhaustMap (garder le premier), pas une file qui prend du retard pour toujours.

Filtrage — discipline du flux

ts
import {
  filter, distinctUntilChanged, debounceTime, throttleTime,
  audit, sample, take, skip, takeWhile, takeUntil, first, last,
} from 'rxjs';

input$.pipe(
  filter(q => q.length >= 2),
  debounceTime(300),                                  // attend 300ms de silence
  distinctUntilChanged(),                             // ignore si identique au précédent
);

scroll$.pipe(throttleTime(100, undefined, { leading: true, trailing: true }));

// audit vs sample vs throttle (les 3 cousins, à ne PAS confondre) :
//   audit(d)    : à la 1ère valeur, démarre une durée d ; émet la DERNIÈRE reçue à la fin de d (trailing)
//   throttle(d) : à la 1ère valeur, émet IMMÉDIATEMENT (leading), puis ignore pendant d
//   sample(t)   : émet la dernière valeur de source à chaque émission de l'Observable t (échantillonnage piloté par t)
// audit avec animationFrames() comme durée ⇒ au plus 1 émission par frame de peinture (coalescing rAF).
import { animationFrames } from 'rxjs';
scroll$.pipe(audit(() => animationFrames().pipe(take(1)))); // 1 valeur max par frame (~60fps)

source$.pipe(take(5));                               // 5 valeurs puis complete
source$.pipe(takeUntil(destroy$));                   // pattern classique pré-DestroyRef
source$.pipe(takeWhile(x => x.status === 'ok'));     // tant que la condition

distinctUntilChanged accepte un comparateur custom : par défaut ===, ce qui ne fonctionne pas sur les objets. Passez un selector ou isEqual de lodash.

Erreur — catchError, retry, le retour de la complétion

ts
import { catchError, retry, timeout, EMPTY, of, throwError, timer } from 'rxjs';

http.get('/api/x').pipe(
  timeout(5000),
  retry({
    count: 3,
    delay: (err, retryCount) => {
      // backoff exponentiel + jitter
      if (err.status === 401) return throwError(() => err); // pas de retry sur 401
      const wait = Math.min(1000 * 2 ** retryCount, 10_000) + Math.random() * 300;
      return timer(wait);
    },
    resetOnSuccess: true,
  }),
  catchError(err => {
    log.error(err);
    return of(fallback); // ou EMPTY pour ne rien émettre, ou throwError(...)
  }),
);

retryWhen est déprécié depuis RxJS 7.8. La nouvelle API retry({ count, delay, resetOnSuccess }) couvre tous les cas avec un delay-callback. Si vous voyez encore retryWhen dans du code legacy, migrez : la sémantique est strictement équivalente mais le typage est plus propre.

Subjects — la matrice à connaître par cœur

ts
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';

// Subject : hot, pas de valeur courante, late subscriber RIEN
const s = new Subject<number>();
s.next(1); s.next(2);
s.subscribe(console.log); // n'imprime rien
s.next(3);                 // imprime 3

// BehaviorSubject : initial value obligatoire, late subscriber reçoit la dernière
const b = new BehaviorSubject<number>(0);
b.next(1); b.next(2);
b.subscribe(console.log); // imprime 2
console.log(b.value);     // 2 — accès synchrone

// ReplaySubject(n) : rejoue les n dernières (n = Infinity par défaut)
const r = new ReplaySubject<number>(2);
r.next(1); r.next(2); r.next(3);
r.subscribe(console.log); // imprime 2, 3

// AsyncSubject : n'émet QUE la dernière valeur, et uniquement au complete()
const a = new AsyncSubject<number>();
a.next(1); a.next(2);
a.subscribe(console.log); // rien
a.next(3); a.complete();  // imprime 3
SubjectValeur initialeLate subscriberCas d'usage
Subjectnonrienevent bus, EventEmitter Angular
BehaviorSubjectouidernièrestate container, form value
ReplaySubject(n)nonn dernièreslogs récents, historique input
AsyncSubjectnonla dernière au completecache d'une 1-shot HTTP request

Multicasting — share, shareReplay

ts
import { share, shareReplay } from 'rxjs';

// Sans multicast, chaque subscribe rejoue le HTTP
const fragile$ = http.get('/me'); // 3 subscribes -> 3 requêtes

// shareReplay({ bufferSize: 1, refCount: true }) — singleton cache
const user$ = http.get('/me').pipe(
  shareReplay({ bufferSize: 1, refCount: true }),
);

// share() — pas de replay, juste partager les émissions futures
const tick$ = interval(1000).pipe(share());

refCount: true (recommandé) ferme l'observable upstream quand tous les subscribers se désabonnent, sinon le cache vit pour l'éternité — fuite mémoire silencieuse.

🎯 Patterns courants

1. Autocomplete production-grade

ts
@Component({ /* ... */ })
export class SearchComponent {
  private http = inject(HttpClient);
  private destroyRef = inject(DestroyRef);

  query = new FormControl('', { nonNullable: true });

  results$ = this.query.valueChanges.pipe(
    startWith(this.query.value),
    debounceTime(300),
    distinctUntilChanged(),
    filter(q => q.length >= 2),
    switchMap(q => this.http.get<Result[]>(`/search?q=${q}`).pipe(
      catchError(() => of([])),
    )),
    shareReplay({ bufferSize: 1, refCount: true }),
    takeUntilDestroyed(this.destroyRef),
  );
}

2. Polling avec backoff sur erreur

ts
const polled$ = timer(0, 5000).pipe(
  switchMap(() => http.get('/status').pipe(
    retry({ count: 3, delay: (_, i) => timer(1000 * 2 ** i) }),
    catchError(() => of(null)),
  )),
  filter(Boolean),
  distinctUntilChanged((a, b) => a.version === b.version),
);

3. Cancellation explicite via takeUntilDestroyed

ts
@Component({ /* ... */ })
export class LiveFeed {
  private destroyRef = inject(DestroyRef);

  constructor() {
    interval(1000).pipe(
      takeUntilDestroyed(this.destroyRef),
    ).subscribe(t => console.log(t));
  }
}

takeUntilDestroyed() sans argument fonctionne aussi, mais uniquement dans un injection context (constructor, field initializer). Dans un ngOnInit, passez explicitement le DestroyRef.

4. Cache + refresh manuel

ts
const refresh$ = new Subject<void>();

const data$ = refresh$.pipe(
  startWith(undefined),
  switchMap(() => http.get('/data')),
  shareReplay({ bufferSize: 1, refCount: true }),
);

// Pour forcer un refresh : refresh$.next()

🔄 Versions — Angular 16 → 20

  • Angular 16 / RxJS 7.8 — Introduction de takeUntilDestroyed, toSignal, toObservable. retryWhen officiellement déprécié au profit de retry({ delay }).
  • Angular 17 / RxJS 7.8 — Stabilisation des Signals. Les Observables restent first-class pour le HTTP, les Router params, les Forms.
  • Angular 18outputFromObservable, outputToObservable pour ponter output() ↔ Observable. httpResource (preview) en signal-first mais conserve un côté Observable via toObservable.
  • Angular 19 — Zoneless preview stable. RxJS reste pertinent mais perd du terrain pour le state synchrone au profit des Signals.
  • Angular 20 — Zoneless GA. Recommandation officielle : RxJS pour les flux async (HTTP, WebSocket, events DOM complexes), Signals pour le state synchrone et la dérivation. Pas de migration forcée.

Conséquence pratique : en 2026, vous écrivez moins de RxJS mais celui que vous écrivez est plus pointu (combinaisons HTTP, debounce search, WebSocket multiplexing).

⚠️ Pitfalls — 8 erreurs qui mordent

  1. Oublier unsubscribe sur un interval ou fromEvent — fuite mémoire garantie. Solution : takeUntilDestroyed(destroyRef) partout, ou async pipe dans le template.

  2. subscribe imbriqué — symptôme du cerveau en mode Promise. Toujours remplacer par un *Map :

    ts
    // BAD
    user$.subscribe(u => permissions$(u.id).subscribe(p => /* ... */));
    // GOOD
    user$.pipe(switchMap(u => permissions$(u.id))).subscribe(p => /* ... */);
  3. combineLatest avec un Subject — bloque jusqu'à la 1ère émission. Utilisez BehaviorSubject ou startWith().

  4. shareReplay sans refCount: true — le cache ne meurt jamais, l'upstream non plus. Toujours { bufferSize: 1, refCount: true }.

  5. switchMap sur un concatMap souhaité — l'utilisateur clique 3 fois sur "envoyer", switchMap annule les 2 premières requêtes silencieusement. Si chaque clic doit envoyer, c'est concatMap (ou mergeMap).

  6. distinctUntilChanged sur objet — par défaut compare par référence, donc inutile dès qu'on émet {...} à chaque tick. Passez un comparateur custom ou un keySelector : distinctUntilChanged((a, b) => a.id === b.id).

  7. Hot vs cold confusionhttp.get() est cold : chaque subscribe relance la requête. Subject est hot : émet pour tout le monde. Mélanger les deux sans shareReplay = doublons de requêtes.

  8. tap qui mute le payloadtap est pour les side effects (log, analytics). Modifier le payload dedans casse la pureté du pipeline. Pour transformer, utilisez map.

  9. forkJoin sur un Observable qui ne complete pasforkJoin attend la complétion. Un BehaviorSubject qui ne complete jamais bloque tout. Préférez combineLatest + take(1) si vous voulez "la 1ère valeur de chaque".

  10. async pipe dans *ngFor avec un Observable recréé à chaque CD*ngFor="let item of items$ | async" mais items$ est un getter qui crée un nouvel observable à chaque appel. Boucle infinie de subscribe/unsubscribe. Stockez items$ une seule fois en field.

🧪 Testing — fakeAsync, marble, TestBed.runInInjectionContext

Marble testing avec TestScheduler

ts
import { TestScheduler } from 'rxjs/testing';

describe('search pipeline', () => {
  let scheduler: TestScheduler;

  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  it('debounce + distinct', () => {
    scheduler.run(({ cold, expectObservable }) => {
      //                  frames:  0    5   9
      const input    = cold('-a-b---b-c|');
      // 'a' (f1) puis 'b' (f3) dans la fenêtre de 3 frames -> seul 'b' (f3) survit, émis à f6.
      // 'b' (f7) est un doublon de la valeur déjà émise -> distinctUntilChanged le mange.
      // 'c' (f9) émis 3 frames plus tard, à f12 ; complete à f12 propagé après le delay.
      const expected =       '------b---(c|)';
      const result$ = input.pipe(
        debounceTime(3, scheduler), // 3 frames "virtuels" — le scheduler injecté contrôle le temps
        distinctUntilChanged(),
      );
      expectObservable(result$).toBe(expected);
    });
  });
});

Syntaxe marble : - = 1 frame de silence, a = émission, | = complete, # = error, () = émissions groupées sur le même frame, ^ = subscribe (sur les hot observables). Dans scheduler.run(...), 1 caractère = 1 frame et debounceTime(n, scheduler) compte en frames — d'où l'injection du scheduler en 2e argument (sinon le temps réel s'applique et le test devient flaky). Règle d'or : toujours injecter le TestScheduler dans les opérateurs temporels (debounceTime, throttleTime, delay, timer, interval) en marble testing.

fakeAsync pour les tests Angular

ts
import { fakeAsync, tick, discardPeriodicTasks } from '@angular/core/testing';

it('autocomplete debounces', fakeAsync(() => {
  const fixture = TestBed.createComponent(SearchComponent);
  const cmp = fixture.componentInstance;
  let last: Result[] = [];
  cmp.results$.subscribe(r => last = r);

  cmp.query.setValue('an');
  tick(100);
  expect(last).toEqual([]);   // pas encore
  tick(250);                  // 350ms total > 300
  expect(last.length).toBeGreaterThan(0);

  discardPeriodicTasks();
}));

TestBed.runInInjectionContext pour takeUntilDestroyed

ts
it('cancels on destroy', () => {
  TestBed.runInInjectionContext(() => {
    const destroyRef = inject(DestroyRef);
    let count = 0;
    interval(10).pipe(takeUntilDestroyed(destroyRef)).subscribe(() => count++);
    // ... destroy le contexte, vérifier count
  });
});

🎬 Cas d'usage concrets

Scénario 1 — SaaS RH, autocomplete sur recherche de candidats avec switchMap

Un ATS d'une plateforme SaaS RH a un champ de recherche global (« John Doe », « product manager Paris »...) qui interroge un endpoint Elasticsearch. L'utilisateur tape vite, change d'avis, efface, retape — il faut annuler les requêtes obsolètes pour ne montrer que les résultats de la dernière frappe.

L'équipe utilise searchControl.valueChanges.pipe(debounceTime(250), distinctUntilChanged(), switchMap(q => api.search(q))). Le switchMap annule l'HTTP en cours dès qu'une nouvelle frappe arrive — exactement le comportement souhaité. Sans switchMap, on aurait des courses : la réponse à « jo » pouvait arriver après celle de « john » et écraser l'affichage. Le debounceTime économise ~80 % des requêtes (un utilisateur tape rarement à intervalle inférieur à 250 ms en moyenne sur ce produit), et distinctUntilChanged filtre les re-frappes identiques (collage répété).

Le tech lead a documenté la règle : HTTP cancellable = switchMap. La PR review rejette systématiquement un mergeMap sur un HTTP de recherche.

Scénario 2 — E-commerce, recherche produit avec debounce + cache local

Un site e-commerce propose une recherche produit qui supporte des filtres facettés (catégorie, marque, prix). Chaque interaction recharge la liste. L'équipe ajoute un cache local en mémoire : si l'utilisateur fait A → B → A, on ne re-requête pas pour A.

Le pipeline : combineLatest([query$, filters$]).pipe(debounceTime(200), distinctUntilChanged(deepEqual), switchMap(([q, f]) => cache.get(key(q, f)) ?? api.search(q, f).pipe(tap(r => cache.set(key(q, f), of(r)))))). Le cache est un Map<string, Observable<Result>> (vidé après 5 min). Avantage : navigation arrière → instant. Plus de spinner sur des recherches déjà vues.

Le piège évité : ne pas utiliser shareReplay global sur l'observable de recherche. shareReplay partage un seul observable, alors qu'on veut un cache par clé. Le Map est la bonne structure.

Scénario 3 — Dashboard banque, ordre live de transactions avec concatMap

Un poste de travail trader doit traiter une file d'ordres émise par l'utilisateur (passer un ordre, annuler, modifier). Les opérations doivent s'exécuter dans l'ordre : si l'utilisateur passe O1 puis O2, le serveur doit recevoir O1 avant O2. Une parallélisation (mergeMap) introduit un risque où O2 part avant O1 si O1 est plus lente.

L'équipe utilise orderActions$.pipe(concatMap(action => api.execute(action))). Le concatMap sérialise : il attend la complétion de la requête précédente avant de lancer la suivante. Combiné à retry({ count: 2, delay: 500 }) pour les erreurs réseau transitoires, le flux est robuste. Et grâce à tap, on enregistre dans l'audit log à chaque étape (tap({ next: log, error: logError })).

Côté UI, un scan accumule l'historique pour l'afficher en temps réel : orderResults$.pipe(scan((acc, r) => [...acc, r], [] as OrderResult[])).


🛠️ Exemple end-to-end

Use case : autocomplete d'un ATS avec debounceTime, distinctUntilChanged, switchMap, gestion d'erreur via catchError, désabonnement via takeUntilDestroyed, et indicateur de loading dérivé du flux.

ts
// candidate-search.api.ts
import { Injectable, inject } from '@angular/core';
import { HttpClient, HttpParams } from '@angular/common/http';
import { Observable, delay } from 'rxjs';

export interface CandidateHit {
  id: string;
  fullName: string;
  headline: string;
}

@Injectable({ providedIn: 'root' })
export class CandidateSearchApi {
  private readonly http = inject(HttpClient);

  search(query: string): Observable<CandidateHit[]> {
    return this.http
      .get<CandidateHit[]>('/api/candidates/search', {
        params: new HttpParams().set('q', query),
      })
      .pipe(delay(0)); // assure microtask, utile en tests
  }
}
ts
// candidate-search.component.ts
import { ChangeDetectionStrategy, Component, DestroyRef, inject, signal } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import {
  catchError,
  debounceTime,
  distinctUntilChanged,
  filter,
  finalize,
  of,
  switchMap,
  tap,
} from 'rxjs';
import { CandidateHit, CandidateSearchApi } from './candidate-search.api';

@Component({
  selector: 'app-candidate-search',
  standalone: true,
  changeDetection: ChangeDetectionStrategy.OnPush,
  imports: [ReactiveFormsModule],
  template: `
    <input
      type="search"
      [formControl]="query"
      placeholder="Rechercher un candidat…"
      aria-label="Rechercher"
    />

    @if (loading()) {
      <p>Recherche en cours…</p>
    } @else if (error()) {
      <p class="error">{{ error() }}</p>
    } @else if (results().length === 0 && (query.value ?? '').length >= 2) {
      <p>Aucun résultat.</p>
    } @else {
      <ul>
        @for (hit of results(); track hit.id) {
          <li>
            <strong>{{ hit.fullName }}</strong> — {{ hit.headline }}
          </li>
        }
      </ul>
    }
  `,
})
export class CandidateSearchComponent {
  private readonly api = inject(CandidateSearchApi);

  protected readonly query = new FormControl('', { nonNullable: true });
  protected readonly loading = signal(false);
  protected readonly error = signal<string | null>(null);
  protected readonly results = signal<CandidateHit[]>([]);

  constructor() {
    this.query.valueChanges
      .pipe(
        debounceTime(250),
        distinctUntilChanged(),
        filter((q) => q.trim().length >= 2),
        tap(() => {
          this.loading.set(true);
          this.error.set(null);
        }),
        switchMap((q) =>
          this.api.search(q).pipe(
            catchError(() => {
              this.error.set('Erreur lors de la recherche.');
              return of<CandidateHit[]>([]);
            }),
            finalize(() => this.loading.set(false)),
          ),
        ),
        takeUntilDestroyed(),
      )
      .subscribe((hits) => this.results.set(hits));
  }
}
ts
// candidate-search.component.spec.ts
import { TestBed, fakeAsync, tick } from '@angular/core/testing';
import { provideHttpClient } from '@angular/common/http';
import { provideHttpClientTesting, HttpTestingController } from '@angular/common/http/testing';
import { CandidateSearchComponent } from './candidate-search.component';

it('cancels obsolete searches (switchMap)', fakeAsync(() => {
  const fixture = TestBed.configureTestingModule({
    imports: [CandidateSearchComponent],
    providers: [provideHttpClient(), provideHttpClientTesting()],
  }).createComponent(CandidateSearchComponent);
  fixture.detectChanges();

  const ctrl = TestBed.inject(HttpTestingController);
  const cmp = fixture.componentInstance;

  cmp['query'].setValue('jo');
  tick(250);
  const req1 = ctrl.expectOne((r) => r.params.get('q') === 'jo');

  cmp['query'].setValue('joh');
  tick(250);
  const req2 = ctrl.expectOne((r) => r.params.get('q') === 'joh');

  req1.flush([]); // résolu après req2, mais switchMap a annulé l'abonnement
  req2.flush([{ id: '1', fullName: 'John Doe', headline: 'PM' }]);

  fixture.detectChanges();
  expect(cmp['results']().length).toBe(1);
}));

Le pipeline est complet : debounce 250 ms, dédup, filtre seuil 2 caractères, switchMap cancellable, gestion d'erreur isolée dans le catchError interne (sinon le stream meurt), finalize pour le loading, takeUntilDestroyed pour le cleanup automatique. Les résultats sont poussés dans un signal pour OnPush.


🔁 Quand utiliser / éviter

Utilisez RxJS quand :

  • Vous avez un flux async multi-valeurs (WebSocket, événements DOM agrégés, polling).
  • Vous combinez plusieurs sources async indépendantes (combineLatest, forkJoin).
  • Vous avez besoin de cancellation propre (switchMap sur HTTP).
  • Vous voulez du backpressure (debounce, throttle, audit).

Évitez RxJS quand :

  • Vous avez juste un state synchrone local (formulaire, toggle, compteur) — Signals.
  • C'est une Promise unique sans cancellation utile — async/await ou firstValueFrom(...).
  • Vous dérivez une valeur d'une autre — computed().

Règle pragmatique : si vous écrivez BehaviorSubject + getter + next(), c'est probablement un signal qui s'ignore.

toSignal / toObservable — la frontière, et ses pièges

C'est le pont quotidien en Angular 16-20. Quatre détails qui distinguent un senior :

ts
import { toSignal, toObservable } from '@angular/core/rxjs-interop';

// 1. toSignal subscribe IMMÉDIATEMENT (eager) et unsubscribe automatiquement à la destruction
//    du contexte d'injection — pas de takeUntilDestroyed nécessaire.
const user = toSignal(this.user$, { initialValue: null });

// 2. Sans initialValue, le type est T | undefined (la 1ère lecture avant toute émission = undefined).
const u2 = toSignal(this.user$);                 // Signal<User | undefined>

// 3. requireSync: true => assume une émission SYNCHRONE au subscribe (BehaviorSubject, shareReplay
//    chaud). Le type perd le | undefined, MAIS lève une erreur runtime si rien n'émet sync.
const u3 = toSignal(this.behaviorSubject$, { requireSync: true }); // Signal<User>

// 4. toObservable se base sur un effect() : il émet la valeur COURANTE puis les changements,
//    coalescés (pas une émission par set intermédiaire). Doit tourner en injection context.
const query$ = toObservable(this.querySignal);

Pièges classiques : (a) toSignal lève si appelé hors injection context — passez { injector } si vous l'utilisez dans une méthode tardive ; (b) toObservable ne rejoue pas chaque set() synchrone, il coalesce via le scheduler de signaux — ne comptez pas dessus pour capturer des transitions intermédiaires ; (c) ne faites pas toSignal(toObservable(s)) en rond, vous créez une boucle d'effets ; (d) toSignal rend le flux hot et partagé (un seul subscribe interne), donc plus besoin de shareReplay en aval.

🧰 Annexe — Opérateurs moins connus mais salvateurs

groupBy + mergeMap — partitionner un flux

Imaginez un flux d'événements { userId, action }. Vous voulez un sous-flux par utilisateur, chacun avec sa propre logique de debounce.

ts
import { groupBy, mergeMap, debounceTime } from 'rxjs';

events$.pipe(
  groupBy(e => e.userId),
  mergeMap(group$ => group$.pipe(
    debounceTime(500),               // debounce PAR utilisateur
    map(latest => ({ userId: group$.key, latest })),
  )),
).subscribe(console.log);

C'est l'équivalent d'un Map<userId, Subject> mais déclaratif. Attention : groupBy ne complete pas les groupes inactifs par défaut — utilisez groupBy(keyFn, { duration: g => g.pipe(timeout(60_000)) }) pour les fermer après inactivité.

expand — récursion sur flux (pagination)

ts
import { expand, reduce, EMPTY } from 'rxjs';

function loadAllPages<T>(url: string) {
  return http.get<{ items: T[]; next: string | null }>(url).pipe(
    expand(r => r.next ? http.get<typeof r>(r.next) : EMPTY),
    reduce((acc, r) => [...acc, ...r.items], [] as T[]),
  );
}

expand ré-injecte la sortie en entrée. Tant que next n'est pas null, on continue. Élégant pour la pagination cursor-based.

bufferTime / bufferCount — fenêtres glissantes

ts
// Émet un array de toutes les valeurs reçues dans la dernière seconde
clicks$.pipe(bufferTime(1000)).subscribe(arr => log('clicks/s:', arr.length));

// Émet par paquet de 10
events$.pipe(bufferCount(10)).subscribe(batch => http.post('/bulk', batch));

Très utile pour batcher des événements analytics ou des metrics.

pairwise — comparer N et N-1

ts
import { pairwise, startWith } from 'rxjs';

scroll$.pipe(
  startWith(0),
  pairwise(),
  map(([prev, curr]) => curr > prev ? 'down' : 'up'),
);

connect (RxJS 7.5+) — remplacement moderne de publish

ts
import { connect } from 'rxjs';

source$.pipe(
  connect(shared => merge(
    shared.pipe(filter(x => x > 0), map(x => `pos: ${x}`)),
    shared.pipe(filter(x => x < 0), map(x => `neg: ${x}`)),
  )),
);

Permet d'utiliser un même observable plusieurs fois dans une combinaison sans le réabonner plusieurs fois (un seul subscribe upstream). Replace publish().refCount() etc.

🧰 Cookbook avancé

Recipe : circuit breaker

ts
function withCircuitBreaker<T>(threshold = 5, cooldownMs = 30_000) {
  let failures = 0;
  let openUntil = 0;

  return (source$: Observable<T>) => defer(() => {
    if (Date.now() < openUntil) {
      return throwError(() => new Error('circuit open'));
    }
    return source$.pipe(
      tap({
        next: () => { failures = 0; },
        error: () => {
          failures++;
          if (failures >= threshold) openUntil = Date.now() + cooldownMs;
        },
      }),
    );
  });
}

// usage
http.get('/api/x').pipe(withCircuitBreaker(3, 10_000));

Recipe : retry avec jitter exponentiel

ts
function retryWithBackoff<T>(maxRetries = 5, baseDelay = 500) {
  return (source$: Observable<T>) => source$.pipe(
    retry({
      count: maxRetries,
      delay: (_err, attempt) => {
        const exp = baseDelay * 2 ** attempt;
        const jitter = Math.random() * exp * 0.3;
        return timer(exp + jitter);
      },
      resetOnSuccess: true,
    }),
  );
}

Recipe : long-polling avec annulation propre

ts
function longPoll<T>(url: string, intervalMs = 5000) {
  return timer(0, intervalMs).pipe(
    switchMap(() => http.get<T>(url)),
    distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
    share({ resetOnRefCountZero: true }),
  );
}

Recipe : WebSocket auto-reconnect

ts
function wsStream<T>(url: string) {
  return defer(() => new Observable<T>(subscriber => {
    const ws = new WebSocket(url);
    ws.onmessage = e => subscriber.next(JSON.parse(e.data));
    ws.onerror = e => subscriber.error(e);
    ws.onclose = () => subscriber.complete();
    return () => ws.close();
  })).pipe(
    retry({ delay: 2000 }),
    share({ resetOnRefCountZero: true }),
  );
}

🔭 Observabilité & debugging — comment un senior débugge un pipeline en prod

Le drame de RxJS : une stack trace pointe sur Subscriber.next dans les entrailles de la lib, jamais sur votre code. Outillage à connaître :

ts
// 1. tap nommé pour tracer chaque phase d'un pipeline (à retirer ou gater en prod)
function trace<T>(label: string) {
  return tap<T>({
    subscribe: () => console.log(`[${label}] subscribe`),
    next: (v) => console.log(`[${label}] next`, v),
    error: (e) => console.log(`[${label}] error`, e),
    complete: () => console.log(`[${label}] complete`),
    finalize: () => console.log(`[${label}] teardown`),
  });
}

source$.pipe(trace('raw'), debounceTime(300), trace('debounced'), switchMap(/*…*/), trace('result'));

Règles de diagnostic terrain :

  • « Mon observable n'émet rien » → 99% du temps il manque un subscribe (lazy) ou une source combineLatest/forkJoin qui n'a jamais émis/complété. Mettez un trace('subscribe') : si vous ne voyez même pas la ligne subscribe, personne n'écoute.
  • « Ça émet 2 fois » → cold observable abonné deux fois (template | async + subscribe TS). Fix : shareReplay({ bufferSize: 1, refCount: true }) une seule fois, ou toSignal.
  • « Ça ne s'arrête jamais / la conso monte » → unsubscribe manquant. Auditez : tout subscribe() manuel doit avoir takeUntilDestroyed. En dev, les RxJS DevTools (extension navigateur) et rxjs-spy (tag('search') + spy.show()) visualisent les souscriptions actives.
  • Sourcemaps & traces : configurez Error.stackTraceLimit haut en dev et intégrez Sentry avec le contexte du tag trace. En prod, on ne log pas chaque next (coût + bruit) ; on instrumente les error et les transitions d'état métier (tap({ error })), pas le débit.

Observabilité au sens SRE : exposez des métriques depuis vos pipelines critiques — taux d'erreur après retry épuisé, latence du switchMap HTTP (timestamp à l'émission source vs à l'inner complete), profondeur de file d'un concatMap. Un scan qui accumule { inFlight, completed, failed } branché sur un signal alimente un panneau de debug interne.

🤖 RxJS pour les UIs d'agents IA — streaming de tokens, trace d'outils, Stop

C'est ici que RxJS gagne en 2026 : une UI de chat agentique (consommant un endpoint NestJS qui stream du Claude — claude-opus-4-8, claude-sonnet-4-6, claude-haiku-4-5 — en SSE) est par essence un flux multi-valeurs async, cancellable, à fenêtres. Signals gèrent le rendu, RxJS gère le transport et l'orchestration temporelle.

Mental model — la frontière RxJS / Signals

  fetch ReadableStream            Observable<ChatEvent>           Signal<Message[]>
  (transport SSE)        ─────►   (parse, buffer, coalesce)  ──►  (rendu OnPush/zoneless)
  TextDecoder + getReader         scan / bufferTime(rAF)          toSignal()

Règle d'architecte : un seul Observable possède le transport et la cancellation ; le template ne voit qu'un signal. Ne jamais subscribe dans le template pour ça.

1. Source : SSE via fetch + ReadableStream (et non EventSource)

EventSource ne supporte que GET et n'envoie pas de headers (donc pas de Authorization propre, pas de body POST). Pour un agent on poste un payload → fetch + getReader + AbortController.

ts
import { Observable } from 'rxjs';

export type AgentEvent =
  | { type: 'token'; text: string }
  | { type: 'tool_call'; id: string; name: string; args: unknown }
  | { type: 'tool_result'; id: string; ok: boolean }
  | { type: 'done' }
  | { type: 'error'; message: string };

// Observable cold : chaque subscribe ouvre une connexion + un AbortController dédié.
function streamAgent(body: unknown, signal: AbortSignal): Observable<AgentEvent> {
  return new Observable<AgentEvent>((subscriber) => {
    const ctrl = new AbortController();
    // Relier l'abort externe (Stop) à l'abort interne (fetch).
    signal.addEventListener('abort', () => ctrl.abort(), { once: true });

    (async () => {
      try {
        const res = await fetch('/api/agent/stream', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json', Accept: 'text/event-stream' },
          body: JSON.stringify(body),
          signal: ctrl.signal,
        });
        if (!res.ok || !res.body) throw new Error(`HTTP ${res.status}`);

        const reader = res.body.getReader();
        const decoder = new TextDecoder();
        let buf = '';

        for (;;) {
          const { value, done } = await reader.read();
          if (done) break;
          buf += decoder.decode(value, { stream: true });

          // Frames SSE séparées par double newline. On garde le reliquat partiel.
          const frames = buf.split('\n\n');
          buf = frames.pop() ?? '';
          for (const frame of frames) {
            const line = frame.split('\n').find((l) => l.startsWith('data:'));
            if (!line) continue;
            const payload = line.slice(5).trim();
            if (payload === '[DONE]') { subscriber.next({ type: 'done' }); continue; }
            subscriber.next(JSON.parse(payload) as AgentEvent);
          }
        }
        subscriber.complete();
      } catch (err) {
        // AbortError n'est PAS une erreur applicative : Stop ⇒ complete propre.
        if ((err as Error).name === 'AbortError') subscriber.complete();
        else subscriber.error(err);
      }
    })();

    // Teardown = unsubscribe ⇒ on coupe le réseau. C'est le cœur de la cancellation.
    return () => ctrl.abort();
  });
}

Le point staff : le return () => ctrl.abort() du constructeur d'Observable câble unsubscribe → annulation réseau. Combiné à switchMap, renvoyer un nouveau prompt annule automatiquement le stream précédent — côté client et côté serveur (le serveur NestJS détecte le req.on('close') et abort() l'appel SDK Anthropic en cours).

2. Accumulation : scan append-only + coalescing rAF sous zoneless

Sous zoneless, chaque signal.set() schedule un CD. Un LLM rapide émet 50-100 tokens/s : set par token = thrash. On coalesce par frame d'animation avec bufferTime(0, animationFrameScheduler) et on n'aplatit qu'une fois par frame.

ts
import {
  scan, bufferTime, filter, map, animationFrameScheduler, EMPTY, of,
} from 'rxjs';
import { catchError } from 'rxjs';

interface AgentState {
  text: string;
  tools: ToolTrace[];
  status: 'streaming' | 'done' | 'error';
  error?: string;
}

type ToolTrace = {
  id: string;
  name: string;
  phase: 'pending' | 'running' | 'done' | 'error';
};

const initial: AgentState = { text: '', tools: [], status: 'streaming' };

const state$ = this.prompt$.pipe(
  // Nouveau prompt ⇒ switchMap annule le stream précédent (client + serveur).
  switchMap((p) =>
    streamAgent({ prompt: p }, this.stopSignal()).pipe(
      // Regroupe tous les events arrivés dans la même frame (~16ms).
      bufferTime(0, animationFrameScheduler),
      filter((batch) => batch.length > 0),
      // Un seul recalcul d'état par frame ⇒ un seul CD par frame.
      scan((s, batch) => reduceBatch(s, batch), initial),
      catchError((e) => of<AgentState>({ ...initial, status: 'error', error: String(e) })),
    ),
  ),
);

function reduceBatch(s: AgentState, batch: AgentEvent[]): AgentState {
  let { text, tools, status, error } = s;
  for (const e of batch) {
    switch (e.type) {
      case 'token':       text += e.text; break;
      case 'tool_call':   tools = [...tools, { id: e.id, name: e.name, phase: 'running' }]; break;
      case 'tool_result': tools = tools.map((t) =>
                            t.id === e.id ? { ...t, phase: e.ok ? 'done' : 'error' } : t); break;
      case 'done':        status = 'done'; break;
      case 'error':       status = 'error'; error = e.message; break;
    }
  }
  return { text, tools, status, error };
}

Pourquoi bufferTime(0, animationFrameScheduler) et pas auditTime(16) ? Les deux marchent ; animationFrameScheduler s'aligne exactement sur le rythme de peinture du navigateur (jamais plus d'un recalcul par frame, et zéro frame perdue à attendre un timer qui drift). scan est append-only : on ne mute jamais l'état précédent, ce qui rend le rendu OnPush/zoneless trivialement correct (nouvelle référence ⇒ CD ciblé).

3. Composant zoneless : signal + Stop câblé sur AbortController

ts
import {
  ChangeDetectionStrategy, Component, DestroyRef, inject, signal,
} from '@angular/core';
import { takeUntilDestroyed, toSignal } from '@angular/core/rxjs-interop';
import { Subject } from 'rxjs';

@Component({
  selector: 'app-agent-chat',
  standalone: true,
  changeDetection: ChangeDetectionStrategy.OnPush,
  template: `
    <article [class.streaming]="state().status === 'streaming'">
      <pre>{{ state().text }}</pre>

      <ol class="trace">
        @for (t of state().tools; track t.id) {
          <li [attr.data-phase]="t.phase">{{ t.name }} — {{ t.phase }}</li>
        }
      </ol>

      @if (state().status === 'streaming') {
        <button type="button" (click)="stop()">Stop</button>
      }
      @if (state().status === 'error') {
        <p class="error">{{ state().error }}</p>
      }
    </article>
  `,
})
export class AgentChatComponent {
  private readonly destroyRef = inject(DestroyRef);
  private readonly prompt$ = new Subject<string>();

  // Un AbortController par run ; stop() l'abort, ce qui coupe fetch ET le SDK serveur.
  private abort = new AbortController();
  protected readonly stopSignal = signal<AbortSignal>(this.abort.signal);

  protected readonly state = toSignal(
    this.prompt$.pipe(/* … pipeline state$ ci-dessus … */ takeUntilDestroyed(this.destroyRef)),
    { initialValue: { text: '', tools: [], status: 'done' } as AgentState },
  );

  send(prompt: string) {
    this.abort = new AbortController();
    this.stopSignal.set(this.abort.signal);
    this.prompt$.next(prompt);
  }

  stop() {
    this.abort.abort(); // client cancel ⇒ teardown ⇒ ctrl.abort() ⇒ TCP close ⇒ serveur abort
  }
}

Le contrat de cancellation complet en 2026 : Stop UI → AbortController.abort() → teardown de l'Observable → fetch abort → fermeture TCP → req.on('close') côté NestJS → AbortController serveur → messages.stream({ signal }) du SDK Anthropic interrompu → tokens non facturés. Un seul maillon manquant et vous payez des tokens pour un onglet déjà fermé. C'est exactement le genre de détail qu'un staff engineer vérifie de bout en bout.

Markdown : le <pre> ci-dessus affiche du texte brut. Pour rendre le markdown du LLM en HTML, passez la sortie de scan dans un marked.parse() puis DomSanitizer.sanitize(SecurityContext.HTML, …)jamais bypassSecurityTrustHtml sur de la sortie LLM (injection de prompt → XSS).

5. Rendu markdown streamé — computed + DomSanitizer (jamais bypass…)

Le markdown se rend depuis le signal de texte, pas dans le pipeline RxJS — la dérivation synchrone est le job de computed. On re-parse à chaque frame (le texte change ~60x/s) ; marked est assez rapide pour ça, et on sanitize systématiquement car le contenu vient d'un LLM (donc potentiellement empoisonné par injection de prompt).

ts
import { Component, computed, inject, signal, SecurityContext } from '@angular/core';
import { DomSanitizer } from '@angular/platform-browser';
import { marked } from 'marked';

@Component({
  selector: 'app-markdown-view',
  standalone: true,
  template: `<div [innerHTML]="safeHtml()"></div>`,
})
export class MarkdownViewComponent {
  private readonly sanitizer = inject(DomSanitizer);
  readonly markdown = signal('');

  // sanitize(HTML, …) RETIRE le dangereux et renvoie une string sûre.
  // bypassSecurityTrustHtml() ferait CONFIANCE au contenu — interdit sur de la sortie LLM.
  readonly safeHtml = computed(() =>
    this.sanitizer.sanitize(SecurityContext.HTML, marked.parse(this.markdown(), { async: false }) as string) ?? '',
  );
}

Subtilité streaming : tant que les tokens arrivent, le markdown est incomplet (une code-fence non fermée, une [ de lien sans ]). marked gère gracieusement le markdown partiel — mais pour éviter le flicker d'un bloc qui se reconstruit, beaucoup d'équipes ne re-rendent le markdown qu'au passage status: 'done' et affichent du texte brut (<pre>) pendant le streaming. Tradeoff : fidélité finale vs fluidité perçue.

4. Côté NestJS — rappel des points de jonction

Le pipeline RxJS ci-dessus suppose un endpoint qui se comporte bien. Côté serveur (détaillé dans les fiches NestJS) : client Anthropic DI'd via forRootAsync (jamais new Anthropic() en field), streaming SSE qui relaie messages.stream(), AbortController lié à req.on('close'), idempotency keyée sur un generationId (un retry BullMQ ne doit pas relancer une génération déjà partiellement facturée), rate-limit + cost-guard à l'edge, et la boucle agentique tool-use (émettre tool_call, exécuter l'outil, ré-injecter tool_result, reprendre le stream). Le SDK gère les retries réseau ; ne les réimplémentez pas au-dessus.

📚 Cheat-sheet condensée à imprimer

CRÉATION         COMBINE              FLATTEN            FILTER          ERROR
of               combineLatest        switchMap          filter          catchError
from             withLatestFrom       mergeMap           debounceTime    retry({delay})
fromEvent        forkJoin             concatMap          throttleTime    timeout
interval         zip                  exhaustMap         distinctUntil   throwError
timer            merge                                   take/skip       EMPTY
defer            concat               TRANSFORM          first/last
EMPTY            startWith            map                takeUntil       MULTICAST
NEVER            endWith              scan               takeWhile       share
                 race                 pluck (deprec.)    pairwise        shareReplay
                                      pipe                                connect

SUBJECT VS
Subject : pas de valeur courante, late = rien
BehaviorSubject(init) : valeur courante, late = last
ReplaySubject(n) : pas d'init, late = n last
AsyncSubject : émet UNIQUEMENT au complete()

🔗 Liens

📖 Glossaire

  • Cold Observable : factory — chaque subscribe rejoue depuis le début. http.get(), of(), from().
  • Hot Observable : émet indépendamment des subscribers. Subject, fromEvent.
  • Multicasting : transformer un cold en hot pour partager. share, shareReplay.
  • Backpressure : ralentir/agréger un producteur trop rapide. debounceTime, throttleTime, audit, sample.
  • Glitch : état intermédiaire incohérent observable. RxJS n'est pas glitch-free par défaut (contrairement aux Signals).
  • Marble : représentation visuelle d'un flux dans le temps pour les tests.
  • Higher-order Observable : un Observable d'Observable. Aplati par *Map, switchAll, mergeAll.

🏋️ Exercices

Progression : implémenter → rendre production-grade → casser puis réparer. Faites-les dans l'ordre, chacun s'appuie sur le précédent.

Exercice 1 — Typeahead cancellable (implémenter)

Objectif : construire un autocomplete valueChanges → debounce → distinct → switchMap(http) qui n'affiche jamais un résultat obsolète.

Contraintes : seuil de 2 caractères, debounce 250 ms, erreur HTTP isolée dans un catchError interne (le stream ne doit jamais mourir), résultats poussés dans un signal. Écrivez un test fakeAsync qui prouve qu'une réponse à « jo » arrivant après la requête « john » n'écrase pas l'affichage.

Indice / Solution

catchError doit être à l'intérieur du switchMap, pas après — sinon la 1ère erreur tue le stream parent et l'input devient mort. Le test : setValue('jo'); tick(250); setValue('john'); tick(250); puis flush req1 en dernier ; switchMap a déjà unsubscribe req1, donc req1.flush() n'a aucun effet sur le signal.

Exercice 2 — shareReplay qui fuit (casser puis réparer)

Objectif : reproduire une fuite mémoire avec shareReplay, l'observer, puis la corriger.

Créez data$ = interval(1000).pipe(tap(() => console.count('upstream')), shareReplay(1)). Subscribe puis unsubscribe. Constatez que console.count continue de grimper. Réparez et expliquez la sémantique exacte du fix.

Indice / Solution

shareReplay(1) (forme nombre) = refCount: false ⇒ l'upstream interval ne s'arrête jamais même après unsubscribe de tous. Fix : shareReplay({ bufferSize: 1, refCount: true }). Piège bonus : refCount: true re-souscrit l'upstream si on re-subscribe après être tombé à zéro — acceptable pour interval, catastrophique pour un http.get() qu'on voulait cacher. Pour un cache 1-shot persistant, gardez refCount: false mais avec un takeUntilDestroyed qui borne sa vie.

Exercice 3 — Le bon flatten sous charge (raisonner + mesurer)

Objectif : démontrer empiriquement la différence switchMap / mergeMap / concatMap / exhaustMap sur un même flux de clics rapides.

Construisez un bouton qui, à chaque clic, lance un timer(800) simulant un POST. Logguez le start et le end de chaque inner avec un id. Cliquez 5 fois en < 800 ms. Comptez combien d'inners démarrent, combien complètent, dans quel ordre, pour chacun des 4 opérateurs. Rédigez la règle de choix en une phrase par opérateur.

Indice / Solution

Attendu : switchMap ⇒ 5 starts, 1 complete (les 4 premiers annulés). mergeMap ⇒ 5 starts, 5 completes, ordre d'arrivée. concatMap ⇒ 5 starts décalés (file), 5 completes ordonnés, durée totale ~4 s. exhaustMap ⇒ 1 start, 4 clics ignorés. La règle vit dans l'arbre de décision plus haut ; cet exo en fait une intuition musculaire.

Exercice 4 — Custom operator : retry avec budget de temps (production-grade)

Objectif : écrire un opérateur retryWithBudget(maxMs) qui retry avec backoff exponentiel + jitter mais abandonne dès que le temps total dépasse maxMs, et ne retry jamais un 4xx (sauf 429).

Signature : retryWithBudget<T>(maxMs: number): MonoTypeOperatorFunction<T>. Doit s'utiliser dans un .pipe() standard et fonctionner avec forkJoin.

Indice / Solution

Capturez const start = Date.now() dans un defer(() => …) pour que le budget se réinitialise par souscription (sinon un 2e subscribe hérite d'un budget déjà épuisé). Dans retry({ delay }) : if (Date.now() - start > maxMs) return throwError(() => err); ; if (err.status >= 400 && err.status < 500 && err.status !== 429) return throwError(() => err); ; sinon timer(Math.min(base * 2 ** i, cap) + Math.random() * jitter). resetOnSuccess: true.

Exercice 5 — Stream d'agent IA : Stop de bout en bout (intégration stack)

Objectif : câbler le pipeline de la section IA et prouver que cliquer « Stop » coupe réellement le réseau, pas juste l'affichage.

Implémentez streamAgent + le scan/bufferTime(0, animationFrameScheduler) + un bouton Stop. Branchez un faux endpoint SSE qui émet un token toutes les 100 ms à l'infini. Test : démarrez, attendez 500 ms, cliquez Stop, vérifiez via un spy que AbortController.abort() a été appelé et que le ReadableStream.getReader().cancel() (ou l'AbortError) s'est propagé. Le compteur de tokens doit se figer.

Indice / Solution

Le test clé n'est pas « le texte arrête de grandir » (ça, switchMap/takeUntilDestroyed le donnent gratuitement) mais « le fetch reçoit le signal ». Espionnez AbortController.prototype.abort. Le piège : si vous mettez le catchError au mauvais endroit, l'AbortError remonte comme une vraie erreur et passe status: 'error' au lieu de 'done' — votre teardown doit traiter AbortError comme une complétion propre (voir le if (err.name === 'AbortError') subscriber.complete()).

Exercice 6 — Multiplexer un WebSocket par topic (architecte)

Objectif : un seul WebSocket, plusieurs composants qui s'abonnent à des topic différents, sans rouvrir de socket et sans fuite.

À partir de wsStream (recipe plus haut), exposez messagesFor(topic: string): Observable<Msg> qui filtre le flux partagé. Garantissez : (a) une seule connexion réelle quel que soit le nombre d'abonnés, (b) reconnexion auto, (c) socket fermé quand le dernier abonné part.

Indice / Solution

share({ resetOnRefCountZero: true }) sur le wsStream donne (a) et (c). messagesFor = socket$.pipe(filter(m => m.topic === topic)). Pour (b), le retry({ delay: 2000 }) est à l'intérieur du defer, donc avant le share — sinon chaque abonné déclenche sa propre logique de retry. Piège : si un composant fait take(1) et descend à refCount 0 entre deux messages, resetOnRefCountZero ferme la socket ; ajoutez un delay de grâce via share({ resetOnRefCountZero: () => timer(5000) }) si vous voulez un keep-alive.

🎤 En entretien

Q — switchMap sur un HTTP POST de paiement : quel bug ? R — switchMap annule la requête précédente. Si l'utilisateur double-clique « Payer », le 1er POST est annulé côté client mais a peut-être déjà atteint le serveur → débit ambigu, ou paiement perdu. Pour une action non-idempotente déclenchée par clic, c'est exhaustMap (ignore le 2e clic tant que le 1er tourne) ou concatMap (sérialise) — jamais switchMap. Côté serveur on double avec une clé d'idempotence.

Q — shareReplay(1) vs shareReplay({ bufferSize: 1, refCount: true }) ? R — La forme nombre a refCount: false : l'upstream ne se ferme jamais, même quand plus personne n'écoute → fuite (et requête/socket qui survit). Avec refCount: true, l'upstream se ferme à zéro abonné. Le tradeoff : refCount: true re-déclenche l'upstream si on re-subscribe après être tombé à zéro — bien pour un flux live, mais peut re-fetcher un cache 1-shot. Le choix dépend de la sémantique « cache éternel » vs « ressource liée au cycle de vie ».

Q — Pourquoi RxJS n'est-il pas glitch-free, et qu'est-ce que ça change ? R — Avec combineLatest([a$, b$])a$ et b$ dérivent d'une même source, une émission amont peut propager a$ puis b$ séquentiellement, exposant un état intermédiaire incohérent (le « glitch ») avant la valeur finale. Les Signals, eux, sont glitch-free (graphe pull, recalcul cohérent). Pratique : si vous voyez un flash de valeur intermédiaire dans un combineLatest de dérivées, c'est un glitch — migrez la dérivation synchrone vers computed(), gardez RxJS pour l'async.

Q — Comment annulez-vous un stream LLM de bout en bout depuis un bouton Stop ? R — Un AbortController par run, partagé au transport fetch ; le teardown de l'Observable (return () => ctrl.abort()) câble unsubscribe → abort réseau ; switchMap/takeUntilDestroyed déclenchent ce teardown automatiquement. Le AbortError doit être traité comme une complétion propre, pas une erreur. Et la chaîne ne vaut que si le serveur écoute req.on('close') pour abort() l'appel SDK Anthropic — sinon vous facturez des tokens pour un client déjà parti.

Q — Différence entre concatMap et un mergeMap avec concurrent: 1 ? R — Sémantiquement identiques (les deux sérialisent et mettent en file). concatMap est juste l'alias lisible de mergeMap(fn, 1) — préférez-le pour l'intention. Le vrai sujet est la file non bornée : les deux accumulent les émissions en attente si l'inner est plus lent que la source. Si ce n'est pas voulu, c'est exhaustMap (jeter les nouvelles) ou switchMap (jeter les anciennes).

Q — Comment garantissez-vous un test RxJS temporel déterministe (pas flaky) ? R — TestScheduler + marble syntax dans scheduler.run(), en injectant le scheduler dans tout opérateur temporel (debounceTime(n, scheduler)). Le temps devient virtuel : 1 caractère = 1 frame, aucune horloge réelle, donc zéro flakiness. Côté composant Angular, fakeAsync + tick() joue le même rôle pour la zone. Le piège classique : oublier d'injecter le scheduler ⇒ l'opérateur utilise asyncScheduler réel ⇒ le test passe en local et casse en CI lent.

Q — Vous voyez combineLatest qui ne déclenche jamais. Diagnostic ? R — combineLatest n'émet qu'après que chaque source ait émis au moins une fois. Une source qui est un Subject nu (pas de valeur initiale) ou un Observable qui ne fait que complete sans émettre bloque tout le tuple. Fix : startWith(seed) sur la source paresseuse, ou un BehaviorSubject, ou repenser avec withLatestFrom si une source doit seulement « fournir » et non « déclencher ».

Récap final

RxJS en 2026 = moins, mais mieux. Maîtrisez les 25 opérateurs cités, l'arbre switchMap/mergeMap/concatMap/exhaustMap, la matrice des Subjects, et takeUntilDestroyed. Le reste est de la culture générale — Google vous dépannera. Le piège n'est jamais "je ne connais pas l'opérateur", c'est "j'ai pris le mauvais flatten" ou "j'ai oublié de shareReplay". Les Signals prennent une partie du gâteau pour le state synchrone, mais pour tout ce qui est flux temporel async, RxJS reste irremplaçable. Investissez dans la maîtrise de switchMap (90% des cas HTTP) et de shareReplay({ bufferSize: 1, refCount: true }) (90% des cas cache) — vous serez déjà au-dessus de la moyenne.

Bibliothèque tech perso — Achref