Skip to content

CQRS dans NestJS (@nestjs/cqrs)

TL;DR — CQRS sépare les écritures (Commands) des lectures (Queries) et propage les changements via des Events. @nestjs/cqrs fournit des bus typés, des handlers découverts par décorateur, et des Sagas RxJS. C'est puissant pour les domaines complexes (DDD, event sourcing), mais catastrophique en sur-architecture sur des CRUD. Règle d'or : pas de CQRS tant qu'un service injecté ne suffit plus.

🧠 Mental model — ASCII diagram + analogy

Analogie : pense à un restaurant. Le serveur (Controller) prend la commande (Command) et la passe en cuisine (CommandBus → Handler). Une fois le plat préparé, un appel vocal "service !" (Event) avertit les autres postes (notifications, stats, audit). Pour consulter le menu ou l'addition (Query), on n'appelle pas la cuisine, on lit un tableau pré-calculé (read model).

              ┌────────────────┐
   HTTP  ──▶  │   Controller   │
              └───┬────────┬───┘
                  │        │
        Command   │        │   Query
                  ▼        ▼
            ┌──────────┐ ┌──────────┐
            │ CommandBus│ │ QueryBus │
            └────┬─────┘ └────┬─────┘
                 │            │
                 ▼            ▼
        ┌────────────────┐ ┌──────────────┐
        │ CommandHandler │ │ QueryHandler │
        └───────┬────────┘ └──────────────┘
                │ mutates aggregate / publishes

            ┌─────────┐         ┌──────────┐
            │ EventBus│────────▶│   Saga   │
            └────┬────┘         └────┬─────┘
                 │                   │
                 ▼                   ▼
         ┌──────────────┐     dispatches new commands
         │ EventHandler │
         └──────────────┘

Le write side maintient la cohérence transactionnelle. Le read side est dénormalisé, optimisé pour les requêtes (souvent dans un autre store : ES, Redis, table SQL aplatie).

🛠️ Code minimal — realistic working snippet

ts
// users.module.ts
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { CreateUserHandler } from './commands/create-user.handler';
import { GetUserHandler } from './queries/get-user.handler';
import { UserCreatedHandler } from './events/user-created.handler';
import { UserSagas } from './sagas/user.sagas';

@Module({
  imports: [CqrsModule],
  providers: [
    CreateUserHandler,
    GetUserHandler,
    UserCreatedHandler,
    UserSagas,
  ],
})
export class UsersModule {}
ts
// commands/create-user.command.ts
export class CreateUserCommand {
  constructor(
    public readonly id: string,
    public readonly email: string,
  ) {}
}

// commands/create-user.handler.ts
import { CommandHandler, ICommandHandler, EventBus } from '@nestjs/cqrs';
import { CreateUserCommand } from './create-user.command';
import { UserCreatedEvent } from '../events/user-created.event';

@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(
    private readonly repo: UserRepository,
    private readonly eventBus: EventBus,
  ) {}

  async execute(cmd: CreateUserCommand): Promise<void> {
    const user = User.create(cmd.id, cmd.email); // domain factory
    await this.repo.save(user);
    // ⚠️ Nest 10/11 : publish/publishAll renvoient une Promise — await pour propager
    // les erreurs des EventHandlers SYNCHRONES (in-process). Ne pas await garde le
    // fire-and-forget historique mais avale les exceptions de projection.
    await this.eventBus.publish(new UserCreatedEvent(cmd.id, cmd.email));
  }
}
ts
// queries/get-user.query.ts
export class GetUserQuery {
  constructor(public readonly id: string) {}
}

// queries/get-user.handler.ts
@QueryHandler(GetUserQuery)
export class GetUserHandler implements IQueryHandler<GetUserQuery, UserDto> {
  constructor(private readonly readModel: UserReadModel) {}
  async execute(q: GetUserQuery): Promise<UserDto> {
    return this.readModel.findById(q.id);
  }
}
ts
// sagas/user.sagas.ts
import { Injectable } from '@nestjs/common';
import { ICommand, Saga, ofType } from '@nestjs/cqrs';
import { Observable, map, delay } from 'rxjs';
import { UserCreatedEvent } from '../events/user-created.event';
import { SendWelcomeEmailCommand } from '../commands/send-welcome-email.command';

@Injectable()
export class UserSagas {
  @Saga()
  welcomeEmail = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(UserCreatedEvent),
      delay(2000),
      map((e) => new SendWelcomeEmailCommand(e.email)),
    );
}
ts
// controller usage
@Post()
create(@Body() dto: CreateUserDto) {
  return this.commandBus.execute(new CreateUserCommand(dto.id, dto.email));
}

@Get(':id')
get(@Param('id') id: string) {
  return this.queryBus.execute(new GetUserQuery(id));
}

🎯 Patterns courants

  1. Aggregate Root + EventPublisher — fais hériter ton agrégat de AggregateRoot, capture les events via this.apply(event), puis mergeObjectContext(aggregate) et aggregate.commit() à la fin du handler. Cela garde les events dans le domaine, le bus ne les reçoit qu'après persistance.
  2. Read model dédié — un projecteur (@EventsHandler) écoute les events et met à jour une table aplatie (Postgres, Mongo, Redis). Les queries lisent uniquement ce modèle.
  3. Saga = orchestrateur réactif — un Saga ne mute rien, il observe les events et dispatche d'autres commands. Idéal pour des process longs (Order → Payment → Shipment).
  4. Outbox pattern — au lieu de eventBus.publish direct, écris l'event dans une table outbox dans la même transaction que l'agrégat. Un worker relit cette table et publie sur le bus / broker. Garantit l'at-least-once.
  5. Versioning d'events — chaque event a un version (ex. UserCreatedV2). Les handlers gèrent les anciennes versions via upcasting.
  6. CQRS sans event sourcing — beaucoup de projets utilisent les commands/queries mais persistent l'état mutable classique. Parfaitement valide. Ne te force pas à l'ES.

🔄 Versions — Nest 7 / 8 / 9 / 10 / 11

  • Nest 7 / @nestjs/cqrs v7 : API executeCommand / executeQuery, peu de typage générique.
  • Nest 8 / v8 : meilleurs génériques (ICommandHandler<TCommand, TResult>), QueryBus.execute<TQuery, TResult>().
  • Nest 9 / v9 : refactor interne, support natif des unhandledExceptionBus pour capturer les erreurs des handlers.
  • Nest 10 / v10 : décorateurs standardisés, support de @nestjs/cqrs avec ESM. EventBus.publishAll typé.
  • Nest 11 / v11 : amélioration du typage des Sagas, support officiel d'AsyncLocalStorage pour la corrélation des commandes (request-id propagation). EventBus.publish / publishAll renvoient des Promise (await-les pour propager les erreurs des handlers in-process). CqrsModule.forRoot() accepte désormais des options (custom command/query/event bus). Vérifie le changelog : certaines signatures de IEventPublisher ont bougé.

Mental model staff — le CommandBus/QueryBus/EventBus de Nest sont in-process par défaut (de simples Subject RxJS). Ils ne traversent pas le réseau, ne survivent pas à un crash, n'ont aucune garantie de livraison. Tout ce qui ressemble à du « messaging distribué » (Kafka, RabbitMQ, SQS) doit être branché explicitement derrière l'EventBus via un IEventPublisher custom + outbox. Confondre les deux est l'erreur d'architecture #1 sur ce module.

Notes de migration importantes :

  • v8 → v9 : ModuleRef.get(EventBus) n'est plus implicite dans certains tests, injecte explicitement.
  • v9 → v10 : RxJS 7+ requis, les Sagas qui utilisent encore les opérateurs en chaîne .pipe().pipe() avec les anciens types doivent passer aux opérateurs purs.

⚠️ Pitfalls

  1. CQRS sur du CRUD pur — créer 12 fichiers par feature pour faire un findById est un anti-pattern. Si le service unique fait le job, garde-le.
  2. Event spaghetti — quand chaque event déclenche 3 autres events qui en déclenchent 5, le flow devient indéboguable. Documente avec un diagramme et limite la profondeur (jamais plus de 2 niveaux).
  3. Frontières fuyantes — un EventHandler qui appelle un autre CommandBus.execute de manière synchrone réintroduit le couplage. Préfère un Saga explicite.
  4. Pas d'idempotence — les events peuvent être rejoués (replay, crash, broker at-least-once). Tout handler doit être idempotent : if (alreadyProcessed(eventId)) return;.
  5. Publication AVANT persistanceeventBus.publish puis repo.save qui échoue = ghost event. Toujours persister d'abord (ou utiliser outbox).
  6. Read model stale silencieux — si la projection plante, les queries renvoient des données obsolètes sans alerter. Mets un lag monitoring (last_processed_event_at).
  7. Confondre CommandHandler et EventHandler — un Command doit réussir ou échouer immédiatement et a un seul handler. Un Event peut avoir 0..N handlers et est fire-and-forget.
  8. Sagas qui maintiennent de l'état mémoire — un Saga doit être stateless ou persister son état. Sinon : crash = état perdu.

🧪 Testing

ts
import { Test } from '@nestjs/testing';
import { CqrsModule, CommandBus, EventBus } from '@nestjs/cqrs';

describe('CreateUserHandler', () => {
  let handler: CreateUserHandler;
  let eventBus: EventBus;
  let repo: jest.Mocked<UserRepository>;

  beforeEach(async () => {
    const moduleRef = await Test.createTestingModule({
      imports: [CqrsModule],
      providers: [
        CreateUserHandler,
        { provide: UserRepository, useValue: { save: jest.fn() } },
      ],
    }).compile();
    await moduleRef.init(); // important: discovers handlers

    handler = moduleRef.get(CreateUserHandler);
    eventBus = moduleRef.get(EventBus);
    repo = moduleRef.get(UserRepository) as any;
    jest.spyOn(eventBus, 'publish');
  });

  it('saves and publishes UserCreatedEvent', async () => {
    await handler.execute(new CreateUserCommand('u1', '[email protected]'));
    expect(repo.save).toHaveBeenCalled();
    expect(eventBus.publish).toHaveBeenCalledWith(
      expect.objectContaining({ id: 'u1', email: '[email protected]' }),
    );
  });
});

Pour tester un Saga, on instancie le UserSagas, on lui passe un Observable de fixtures (via of(new UserCreatedEvent(...))) et on assert sur les commandes émises (toArray + lastValueFrom).

Pour les EventHandlers, teste-les en isolation comme n'importe quel provider : instancie, appelle .handle(event), vérifie les side-effects.

Pour un test end-to-end : compile le module, utilise commandBus.execute(...), puis assert sur le read model (table/Mongo) après un petit await new Promise(r => setTimeout(r, 50)) pour laisser les events se propager (ou utilise un eventBus.subscribe jeté pour await).

🎬 Cas d'usage concrets

Banque — Ledger event-sourced

Qui — Néobanque française qui veut reconstruire à tout instant le solde d'un compte depuis l'historique brut (exigence ACPR + audit interne). Problème — Stocker uniquement le solde courant interdit la rétro-correction (annulation d'une opération frauduleuse). Stocker l'historique en SQL classique ne donne pas la rejouabilité. Comment — Chaque opération est un événement de domaine (MoneyDeposited, MoneyTransferred, FeeApplied) appliqué à un agrégat Account, l'EventStore est la source de vérité, les soldes sont des projections.

ts
@CommandHandler(TransferCommand)
export class TransferHandler implements ICommandHandler<TransferCommand> {
  constructor(private publisher: EventPublisher, private repo: AccountEventStore) {}
  async execute(cmd: TransferCommand) {
    const fromHistory = await this.repo.load(cmd.fromId);
    const toHistory = await this.repo.load(cmd.toId);
    const from = this.publisher.mergeObjectContext(Account.from(fromHistory));
    const to = this.publisher.mergeObjectContext(Account.from(toHistory));
    from.transferOut(cmd.amount, cmd.toId);
    to.transferIn(cmd.amount, cmd.fromId);
    await this.repo.save(from); await this.repo.save(to);
    from.commit(); to.commit();
  }
}

Gains — Reconstruction de solde à n'importe quelle date passée en 200 ms, projection FEC générée par replay, anti-fraude branchée via EventHandler.

E-commerce — Order saga

Qui — Marketplace française B2C qui orchestre stock + paiement + expédition sur 3 services distincts. Problème — Aucun service n'a la vue globale, et une transaction distribuée 2PC est exclue. CommentSaga RxJS qui réagit aux events (OrderPlacedReserveStockCommandStockReservedChargePaymentCommand...), avec compensations explicites.

ts
@Injectable()
export class OrderSaga {
  @Saga()
  orderPlaced = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(OrderPlacedEvent),
      map((e) => new ReserveStockCommand(e.orderId, e.items)),
    );

  @Saga()
  stockReserved = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(StockReservedEvent),
      map((e) => new ChargePaymentCommand(e.orderId, e.total)),
    );

  @Saga()
  paymentFailed = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(PaymentFailedEvent),
      map((e) => new ReleaseStockCommand(e.orderId)),
    );
}

Gains — Workflow visible dans une seule classe, compensations testées unitairement, retry automatique sur événement remis.

Immobilier — Syndic, gestion locative

Qui — Syndic français gérant 8 000 lots avec workflows complexes (préavis, état des lieux, restitution caution). Problème — Les écrans "tableau de bord locataires" agrègent 6 tables, et les pages métier ont besoin d'écrire vite (encaissement loyer). Comment — Commandes pour écrire (loyer encaissé, dépôt enregistré), QueryBus + read model dénormalisé pour les vues (TenantDashboardView rebuild via EventHandler).

ts
@EventsHandler(RentPaidEvent, LateFeeAppliedEvent)
export class TenantDashboardProjector implements IEventHandler {
  constructor(@InjectModel('tenant_dashboard') private model: Model<any>) {}
  async handle(event: RentPaidEvent | LateFeeAppliedEvent) {
    if (event instanceof RentPaidEvent) {
      await this.model.updateOne(
        { tenantId: event.tenantId },
        { $inc: { totalPaid: event.amount, paymentsCount: 1 },
          $set: { lastPaymentAt: event.paidAt, balance: event.newBalance } },
        { upsert: true },
      );
    } else {
      await this.model.updateOne(
        { tenantId: event.tenantId },
        { $inc: { lateFees: event.fee, balance: event.fee } },
      );
    }
  }
}

Gains — Dashboards p95 < 50 ms (1 lecture Mongo vs 6 JOINs SQL), commandes < 80 ms, projections rebuildables en cas d'évolution schéma.

🛠️ Exemple end-to-end

Contexte — La marketplace B2C lance une saga de commande complète : PlaceOrderCommand → vérification stock → paiement → expédition. Chaque étape est event-sourcée, les échecs déclenchent des commandes de compensation, et un read model OrderTimeline permet au front d'afficher la progression en temps réel.

ts
// src/order/domain/order.aggregate.ts
export class Order extends AggregateRoot {
  private id!: string;
  private items: OrderItem[] = [];
  private status: 'placed' | 'stock_reserved' | 'paid' | 'shipped' | 'failed' = 'placed';
  private total!: number;
  private failureReason?: string;

  static place(id: string, items: OrderItem[], total: number): Order {
    const order = new Order();
    order.apply(new OrderPlacedEvent(id, items, total));
    return order;
  }

  reserveStock() {
    if (this.status !== 'placed') throw new InvalidStateError();
    this.apply(new StockReservedEvent(this.id));
  }

  capturePayment(chargeId: string) {
    if (this.status !== 'stock_reserved') throw new InvalidStateError();
    this.apply(new PaymentCapturedEvent(this.id, chargeId, this.total));
  }

  ship(carrier: string, tracking: string) {
    if (this.status !== 'paid') throw new InvalidStateError();
    this.apply(new OrderShippedEvent(this.id, carrier, tracking));
  }

  fail(reason: string, stage: string) {
    this.apply(new OrderFailedEvent(this.id, reason, stage));
  }

  // Event handlers (state mutators)
  onOrderPlacedEvent(e: OrderPlacedEvent)        { this.id = e.id; this.items = e.items; this.total = e.total; this.status = 'placed'; }
  onStockReservedEvent(_e: StockReservedEvent)   { this.status = 'stock_reserved'; }
  onPaymentCapturedEvent(_e: PaymentCapturedEvent){ this.status = 'paid'; }
  onOrderShippedEvent(_e: OrderShippedEvent)     { this.status = 'shipped'; }
  onOrderFailedEvent(e: OrderFailedEvent)        { this.status = 'failed'; this.failureReason = e.reason; }
}
ts
// src/order/application/place-order.handler.ts
@CommandHandler(PlaceOrderCommand)
export class PlaceOrderHandler implements ICommandHandler<PlaceOrderCommand> {
  constructor(private publisher: EventPublisher, private repo: OrderEventStore) {}

  async execute(cmd: PlaceOrderCommand) {
    const order = this.publisher.mergeObjectContext(
      Order.place(cmd.id, cmd.items, cmd.total),
    );
    await this.repo.save(order);
    order.commit();
    return { orderId: cmd.id };
  }
}

@CommandHandler(ReserveStockCommand)
export class ReserveStockHandler implements ICommandHandler<ReserveStockCommand> {
  constructor(
    private publisher: EventPublisher,
    private repo: OrderEventStore,
    private stock: StockApiClient,
  ) {}

  async execute(cmd: ReserveStockCommand) {
    const order = this.publisher.mergeObjectContext(
      Order.from(await this.repo.load(cmd.orderId)),
    );
    try {
      await this.stock.reserve(cmd.orderId, cmd.items);
      order.reserveStock();
    } catch (e) {
      order.fail((e as Error).message, 'stock');
    }
    await this.repo.save(order);
    order.commit();
  }
}

// Similar handlers: CapturePaymentHandler, ShipOrderHandler, CompensateStockHandler
ts
// src/order/application/order.saga.ts
@Injectable()
export class OrderSaga {
  @Saga()
  orderPlaced = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(OrderPlacedEvent),
      map((e) => new ReserveStockCommand(e.id, e.items)),
    );

  @Saga()
  stockReserved = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(StockReservedEvent),
      map((e) => new CapturePaymentCommand(e.orderId)),
    );

  @Saga()
  paymentCaptured = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(PaymentCapturedEvent),
      delay(100),                                  // throttle to the shipping API
      map((e) => new ShipOrderCommand(e.orderId)),
    );

  @Saga()
  orderFailed = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(OrderFailedEvent),
      filter((e) => e.stage !== 'stock'),
      map((e) => new CompensateStockCommand(e.orderId)),
    );
}
ts
// src/order/application/order-timeline.projector.ts
@EventsHandler(
  OrderPlacedEvent, StockReservedEvent, PaymentCapturedEvent,
  OrderShippedEvent, OrderFailedEvent,
)
export class OrderTimelineProjector implements IEventHandler {
  constructor(@InjectRepository(OrderTimelineView) private view: Repository<OrderTimelineView>) {}

  async handle(event: any) {
    const orderId = event.orderId ?? event.id;
    const step = this.toStep(event);
    await this.view.save({
      orderId,
      step,
      at: new Date(),
      payload: JSON.stringify(event),
    });
  }

  private toStep(event: any): string {
    return {
      OrderPlacedEvent: 'placed', StockReservedEvent: 'stock_reserved',
      PaymentCapturedEvent: 'paid', OrderShippedEvent: 'shipped',
      OrderFailedEvent: 'failed',
    }[event.constructor.name] ?? 'unknown';
  }
}
ts
// src/order/api/order.controller.ts
@Controller('orders')
export class OrderController {
  constructor(private bus: CommandBus, private query: QueryBus) {}

  @Post()
  place(@Body() dto: PlaceOrderDto) {
    return this.bus.execute(new PlaceOrderCommand(dto.id, dto.items, dto.total));
  }

  @Get(':id/timeline')
  timeline(@Param('id') id: string) {
    return this.query.execute(new GetOrderTimelineQuery(id));
  }
}

L'EventPublisher.mergeObjectContext attache l'agrégat à la machinerie d'événements (sinon commit() ne diffuserait rien), la saga découple les étapes (changer le provider de paiement = changer 1 handler), et le projector OrderTimelineView est rebuildable en rejouant l'EventStore — précieux quand le format du front évolue.

🔁 Quand utiliser / éviter

Utilise CQRS quand :

  • domaine métier riche, règles complexes, beaucoup d'invariants
  • lectures et écritures ont des charges asymétriques (write rare, read massif)
  • audit trail / event sourcing requis (réglementaire, finance, médical)
  • équipe à l'aise avec DDD et asynchrone

Évite CQRS quand :

  • CRUD simple, 80 % du temps tu fais juste findOne/update
  • petite équipe, MVP, time-to-market court
  • le read model et le write model sont identiques à 99 %
  • tu n'as pas encore le besoin d'une projection séparée

Alternative légère : un service Nest classique avec EventEmitter2 (@nestjs/event-emitter) pour les side-effects. Tu gardes 90 % du bénéfice (découplage) sans la surcharge structurelle.

🧬 Comparaison avec un simple service Nest

AspectService classiqueCQRS
Fichiers par feature1-25-10
Latence ajoutée~0légère (bus dispatch)
Testabilité unitaireBonneExcellente (handlers isolés)
AuditabilitéFaibleForte (events)
Courbe d'apprentissagePlateRaide
Refactor en milieu de projetFacileCoûteux

Un service Nest classique avec une méthode create() et une méthode findById() est parfait jusqu'à ce que tu aies :

  • besoin de plusieurs side-effects découplés (envoyer email + invalider cache + métrique + webhook),
  • besoin d'un audit trail réglementaire,
  • besoin de scaler les lectures séparément.

Quand tu introduis CQRS, fais-le progressivement : commence par un seul bounded context (la partie la plus complexe du domaine), garde les CRUD simples en service classique. Mixer les styles est OK, le purisme est contre-productif.

🏭 Production — perf, scale, observability, sécurité

Ce que personne ne te dit avant le premier incident en prod.

Latence & cohérence (le piège de l'eventual consistency)

Le read model est mis à jour après la command, de façon asynchrone. Conséquence concrète : un POST /orders suivi immédiatement d'un GET /orders/:id peut renvoyer un 404 ou un état stale. C'est le read-your-own-writes problem.

StratégieMécanismeCoût
Read-your-writes optimisteLe front affiche l'état attendu localement après la commandUI tolère le rollback
Synchronous projectionLe CommandHandler met à jour le read model dans la même transactionPerd le découplage, recouple write/read
Version tokenLa command renvoie une version, le GET attend que la projection ait atteint cette version (poll/long-poll)Latence ajoutée
Accept-and-redirect202 Accepted + Location: /orders/:id/timelineForce le client à poller — honnête

Règle staff : assume eventual consistency par défaut, et ne paie le coût de la cohérence forte que sur les rares chemins qui l'exigent (solde bancaire affiché juste après virement).

Observabilité — un bus async est un trou noir sans corrélation

  • Correlation ID : propage un correlationId (et causationId = id de l'event/command parent) sur chaque command/event. Utilise AsyncLocalStorage (officiellement supporté Nest 11) pour ne pas le passer à la main partout. Sans ça, tu ne peux pas reconstituer la chaîne OrderPlaced → ReserveStock → StockReserved → ChargePayment dans tes logs.
  • Metrics : compteur par type de command/event, histogramme de latence par handler, gauge de projection lag (now - last_processed_event_at). Le projection lag qui grimpe = alerte avant que les utilisateurs voient des données stale.
  • Tracing : un span OpenTelemetry par handler, parenté via le causationId. La saga devient lisible comme un waterfall.
  • Dead-letter : un event qui throw N fois doit partir en DLQ + alerte, pas boucler en silence.

Sécurité

  • Validation au bord : valide le DTO dans le controller (class-validator), pas dans le handler. Une command qui atteint le bus est supposée déjà bien formée.
  • Autorisation = command-level : l'authz se fait avant le dispatch (guard sur le controller) ou en début de handler, jamais dans un EventHandler (trop tard, l'écriture est faite).
  • PII dans les events : les events sont souvent persistés (event store) pour toujours. Ne mets pas de PII brute dedans si tu dois pouvoir effacer (RGPD droit à l'oubli) → crypto-shredding (chiffre la PII, jette la clé pour « effacer »).

Scale

  • Le read side scale horizontalement trivialement (réplicas de lecture, cache). C'est tout l'intérêt.
  • Le write side scale par partitionnement par aggregate id (un agrégat = une frontière de concurrence). L'optimistic concurrency (expectedVersion à l'append dans l'event store) est ta protection contre les writes concurrents sur le même agrégat.
  • Les projecteurs : un seul consumer par projection pour garder l'ordre, OU partitionner par clé. Le rebuild de projection (replay depuis l'event 0) doit être un job offline qui construit une nouvelle table en parallèle, puis bascule (zero-downtime reprojection).

🤖 Senior — servir/orchestrer des agents IA depuis CQRS

CQRS est un fit naturel pour l'IA agentique : une génération LLM est exactement une « command longue, asynchrone, à side-effects, observable » — pas une lecture instantanée. La séparation command/query + events + saga modélise proprement la boucle agentique.

Mental model : StartGenerationCommand (write) déclenche un job ; les tokens streamés sont des events (TokenGeneratedEvent) projetés dans un read model GenerationStream que le client lit via SSE ; la boucle tool-use (LLM demande un outil → on l'exécute → on renvoie le résultat) est une saga (ToolCallRequestedEventExecuteToolCommandToolResultEvent → relance génération).

Le client LLM est un provider DI, jamais un new Anthropic() dans un champ

ts
// llm/llm.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import Anthropic from '@anthropic-ai/sdk';

export const LLM_CLIENT = Symbol('LLM_CLIENT');

@Module({
  imports: [ConfigModule],
  providers: [
    {
      provide: LLM_CLIENT,
      inject: [ConfigService],
      useFactory: (config: ConfigService) =>
        new Anthropic({
          apiKey: config.getOrThrow('ANTHROPIC_API_KEY'),
          maxRetries: 3,            // retries SDK natifs (429/5xx, backoff exponentiel)
          timeout: 60_000,
        }),
    },
  ],
  exports: [LLM_CLIENT],
})
export class LlmModule {}

Pourquoi DI : testabilité (mock le LLM_CLIENT), config centralisée, un seul keep-alive HTTP agent, et tu peux swapper claude-opus-4-8claude-sonnet-4-6claude-haiku-4-5 par config sans toucher les handlers.

Command de génération — streaming des tokens en events + AbortController

ts
// generation/start-generation.handler.ts
import { CommandHandler, ICommandHandler, EventBus } from '@nestjs/cqrs';
import { Inject } from '@nestjs/common';
import type Anthropic from '@anthropic-ai/sdk';
import { LLM_CLIENT } from '../llm/llm.module';

@CommandHandler(StartGenerationCommand)
export class StartGenerationHandler
  implements ICommandHandler<StartGenerationCommand>
{
  constructor(
    @Inject(LLM_CLIENT) private readonly llm: Anthropic,
    private readonly eventBus: EventBus,
    private readonly aborts: AbortRegistry, // map generationId -> AbortController
  ) {}

  async execute(cmd: StartGenerationCommand): Promise<void> {
    const controller = this.aborts.register(cmd.generationId);
    const stream = this.llm.messages.stream(
      {
        model: 'claude-sonnet-4-6',
        max_tokens: 1024,
        messages: cmd.messages,
      },
      { signal: controller.signal },
    );

    try {
      for await (const event of stream) {
        if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
          // chaque token = un event de domaine, projeté dans le read model SSE
          await this.eventBus.publish(
            new TokenGeneratedEvent(cmd.generationId, event.delta.text),
          );
        }
      }
      await this.eventBus.publish(new GenerationCompletedEvent(cmd.generationId));
    } catch (err) {
      if (controller.signal.aborted) {
        await this.eventBus.publish(new GenerationCancelledEvent(cmd.generationId));
      } else {
        await this.eventBus.publish(
          new GenerationFailedEvent(cmd.generationId, (err as Error).message),
        );
      }
    } finally {
      this.aborts.release(cmd.generationId);
    }
  }
}

Exposer le flux au client — SSE qui rejoue les TokenGeneratedEvent

ts
// generation/generation.controller.ts
import { Controller, Post, Body, Sse, Param, Delete } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
import { Observable, filter, map, takeWhile } from 'rxjs';

@Controller('generations')
export class GenerationController {
  constructor(
    private readonly bus: CommandBus,
    private readonly streamHub: GenerationStreamHub, // Subject par generationId
  ) {}

  @Post()
  async start(@Body() dto: StartGenerationDto) {
    // 202 + on ne bloque pas : la génération vit dans la command async
    await this.bus.execute(
      new StartGenerationCommand(dto.generationId, dto.messages),
    );
    return { generationId: dto.generationId };
  }

  @Sse(':id/stream')
  stream(@Param('id') id: string): Observable<MessageEvent> {
    return this.streamHub.for(id).pipe(
      takeWhile((e) => e.type !== 'done', true),
      map((e) => ({ data: e } as MessageEvent)),
    );
  }

  @Delete(':id')
  cancel(@Param('id') id: string) {
    // Stop côté serveur : déclenche l'AbortController du handler en cours
    return this.bus.execute(new CancelGenerationCommand(id));
  }
}

Point staff : le client disconnect (onglet fermé, navigation) doit annuler la génération côté serveur (sinon tu paies des tokens dans le vide). Avec @Sse, abonne-toi à request.on('close') (ou la cleanup de l'Observable) pour dispatcher CancelGenerationCommand. Le Stop button du front ET le disconnect réseau pointent vers le même AbortController.

La boucle tool-use agentique = une Saga

ts
// agent/agent.saga.ts
@Injectable()
export class AgentSaga {
  // Le LLM a demandé un outil → on l'exécute (côté serveur, jamais le LLM)
  @Saga()
  toolRequested = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(ToolCallRequestedEvent),
      map((e) => new ExecuteToolCommand(e.generationId, e.toolName, e.input)),
    );

  // L'outil a répondu → on renvoie le résultat au LLM et on relance la génération
  @Saga()
  toolResolved = (events$: Observable<any>): Observable<ICommand> =>
    events$.pipe(
      ofType(ToolResultEvent),
      map((e) => new ContinueGenerationCommand(e.generationId, e.toolUseId, e.result)),
    );
}

La saga rend la boucle agentique inspectable (chaque tour est un event horodaté et corrélé) et rejouable en test. Mets une garde de profondeur (maxToolTurns) pour éviter une boucle infinie LLM↔outil.

Jobs IA via BullMQ — idempotence, retry cost-aware, partial output

Pour les générations longues/coûteuses (batch, RAG d'un gros doc), passe par une file plutôt que de tenir la requête HTTP ouverte :

  • Idempotence keyée sur le generationId : jobId: generationId → BullMQ déduplique. Un retry réseau du client ne relance pas une génération payante.
  • Retry cost-aware : ne retry que les erreurs transitoires (429, 5xx, timeout réseau). Une erreur 400 (prompt invalide) ou un refus de contenu ne doit jamais retry — tu brûles du budget. Le SDK Anthropic gère déjà 429/5xx avec backoff ; au niveau BullMQ, configure un backoff plus long et un attempts bas (2-3).
  • Partial output : persiste les tokens déjà streamés (TokenGeneratedEvent projetés). Sur retry, tu peux soit reprendre, soit au moins ne pas reperdre le travail affiché à l'utilisateur.
  • Cost guard au bord : avant de dispatcher la command, un guard/intercepteur vérifie le quota tokens de l'utilisateur (rate-limit par user, budget mensuel). Le cost-guard est une préoccupation de l'edge, pas du handler.
ts
await this.queue.add(
  'generate',
  { generationId, messages },
  {
    jobId: generationId,                 // idempotence
    attempts: 3,
    backoff: { type: 'exponential', delay: 2_000 },
    removeOnComplete: 1000,
  },
);

Le pendant Angular de ce flux (rendu token-par-token sous zoneless, timeline de tool-calls en union discriminée, bouton Stop câblé à l'AbortController) est traité dans le fichier Angular dédié au streaming IA.

🏋️ Exercices

Progressifs : implémenter → durcir pour la prod → casser puis réparer. Fais-les dans un module Nest 11 réel avec @nestjs/cqrs v11.

1. Le bus minimal (implémenter)

Objectif — Câbler un CreateArticleCommand + GetArticleQuery + ArticleCreatedEvent avec un EventHandler qui logge, le tout testé. Indice/Solution — Décore les handlers, déclare-les en providers, await moduleRef.init() dans le test pour que la découverte par décorateur ait lieu, spy sur eventBus.publish.

2. Read model + projecteur dénormalisé (implémenter)

Objectif — Un ArticleListProjector (@EventsHandler) maintient une table aplatie article_list_view ; GetArticleListQuery lit uniquement cette table. Démontre que write store et read store divergent. Indice/Solution — Le projecteur upsert sur articleId. Écris un test e2e qui crée 3 articles via le CommandBus puis assert sur la view après un tick — observe l'eventual consistency (le GET juste après le POST peut être vide).

3. Saga + compensation (production-grade)

Objectif — Implémente la saga PublishArticle : ArticlePublishedIndexInSearchCommand → si IndexFailedEvent, compense par UnpublishArticleCommand. Ajoute correlationId + projection lag metric. Indice/Solutionfilter sur le stage d'échec pour ne compenser que les bonnes branches. Propage le correlationId via AsyncLocalStorage. Expose une gauge projection_lag_seconds.

4. Outbox at-least-once (production-grade)

Objectif — Remplace eventBus.publish direct par une écriture en table outbox dans la même transaction que l'agrégat ; un worker relit l'outbox et publie. Garantis qu'un crash entre save et publish ne perd aucun event. Indice/Solution — Transaction unique { save(aggregate); insert(outbox) }. Worker avec SELECT ... FOR UPDATE SKIP LOCKED, marque published_at, publie sur le bus/broker. Idempotence côté consumer (table processed_event_id).

5. Casse-le puis répare (break it then fix it)

Objectif — Introduis sciemment 3 bugs classiques, observe le symptôme, corrige : (a) publish AVANT save → ghost event ; (b) EventHandler non idempotent + event rejoué → double effet ; (c) saga qui dispatche une command qui re-déclenche la même saga → boucle infinie. Indice/Solution — (a) inverse l'ordre, ou outbox. (b) garde if (await alreadyProcessed(eventId)) return. (c) garde de profondeur (causationId chain length) + ne fais jamais qu'une saga dispatche une command dont l'event de succès la re-trigger.

6. Génération LLM en CQRS (intégration stack, hard)

ObjectifStartGenerationCommand streame les tokens d'Anthropic en TokenGeneratedEvent, exposés via SSE. Un Stop (client) ET un disconnect réseau annulent la génération côté serveur via AbortController. Boucle tool-use modélisée en saga avec garde de profondeur. Indice/SolutionAbortRegistry map generationId → AbortController. request.on('close')CancelGenerationCommand. Le SDK gère les retries 429/5xx ; n'await publish que si tu veux propager les erreurs de projection. Garde maxToolTurns dans la saga.

🎤 En entretien

Q : CQRS implique-t-il forcément l'event sourcing ? Non. CQRS = séparer le modèle d'écriture du modèle de lecture. L'event sourcing = stocker l'état comme une séquence d'events. On peut faire du CQRS sur un état mutable classique (deux modèles, un seul store SQL). On peut faire de l'ES sans vraie séparation read/write. Les confondre est un red flag.

Q : Le bus @nestjs/cqrs garantit-il la livraison des events ? Non. C'est un bus in-process (RxJS Subject) : pas de persistance, pas de retry, pas de survie au crash. Pour de l'at-least-once il faut un outbox + un broker (Kafka/Rabbit/SQS) derrière un IEventPublisher custom. Croire que eventBus.publish est durable est l'erreur d'archi #1 sur ce module.

Q : Comment gères-tu le « read-your-own-writes » avec un read model asynchrone ? J'assume l'eventual consistency par défaut et je choisis par chemin : 202 Accepted + polling honnête, ou version token (la command renvoie une version, le GET attend qu'elle soit projetée), ou mise à jour optimiste côté front, ou — seulement si la cohérence forte est exigée (solde bancaire) — projection synchrone dans la transaction au prix du recouplage.

Q : Comment rendre un EventHandler / une projection sûr face au replay et aux crashes ? Idempotence (processed_event_id keyé sur l'event id), persistance avant publication (ou outbox), ordering par aggregate id, projection lag monitoré, et reprojection zero-downtime (rebuild en parallèle puis bascule). Un handler qui n'est pas idempotent finira par double-compter le jour d'un replay.

🔗 Liens

Bibliothèque tech perso — Achref