Event Emitter dans NestJS (@nestjs/event-emitter)
TL;DR —
@nestjs/event-emitterest un wrapper Nest autour deeventemitter2. C'est un bus in-process, synchrone par défaut, idéal pour découpler des effets de bord (envoi d'email, audit, cache invalidation) à l'intérieur d'un seul service. À ne pas confondre avec un message broker (Kafka, RabbitMQ) ni avec CQRS (commandes/queries typées). En ninja, on l'utilise pour les domain events locaux, avec une attention forte à la gestion d'erreurs, l'ordering, et l'idempotence.
🧠 Mental model — diagramme ASCII + analogie
EventEmitter2 est un répartiteur in-memory : un service publie un événement, n zéro à n listeners enregistrés sont appelés synchroniquement (ou en parallèle si async). Aucun stockage, aucune persistance, aucune garantie cross-process. Si l'application crashe entre la publication et la consommation, l'événement est perdu.
┌─────────────────────────────────────────────────────────────┐
│ Process unique (1 Node) │
│ │
│ Producer ──► EventBus (in-memory) ──► [Listener A] │
│ │ ──► [Listener B] │
│ │ ──► [Listener C] │
│ │
└─────────────────────────────────────────────────────────────┘
vs. Message broker (Kafka/RabbitMQ) :
┌─────────┐ network ┌─────────────┐ network ┌─────────────┐
│ Service │ ───────────► │ Broker │ ───────────► │ Service │
│ A │ │ (durable) │ │ B │
└─────────┘ └─────────────┘ └─────────────┘
vs. CQRS (@nestjs/cqrs) :
Command ─► CommandHandler ─► (DomainEvents) ─► EventBus ─► Sagas/ProjectionsAnalogie : un haut-parleur dans une salle de réunion. Tout le monde dans la pièce entend (les listeners). Mais personne en dehors de la pièce ne reçoit le message, et si la pièce explose pendant l'annonce, l'information est perdue. Pour atteindre une autre pièce (un autre service), il faut un téléphone (broker).
🛠️ Code minimal (ts)
// src/app.module.ts
import { Module } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { OrderModule } from './order/order.module';
import { MailModule } from './mail/mail.module';
@Module({
imports: [
EventEmitterModule.forRoot({
wildcard: true,
delimiter: '.',
newListener: false,
removeListener: false,
maxListeners: 20,
verboseMemoryLeak: true,
ignoreErrors: false,
}),
OrderModule,
MailModule,
],
})
export class AppModule {}// src/order/events/order-placed.event.ts
export class OrderPlacedEvent {
static readonly name = 'order.placed' as const;
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly amountCents: number,
public readonly occurredAt: Date = new Date(),
) {}
}// src/order/order.service.ts
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { OrderPlacedEvent } from './events/order-placed.event';
@Injectable()
export class OrderService {
constructor(private readonly events: EventEmitter2) {}
async place(customerId: string, amountCents: number) {
const orderId = crypto.randomUUID();
// ... persist order in DB (transactional) ...
this.events.emit(
OrderPlacedEvent.name,
new OrderPlacedEvent(orderId, customerId, amountCents),
);
return orderId;
}
}// src/mail/order-mail.listener.ts
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OrderPlacedEvent } from '../order/events/order-placed.event';
import { MailService } from './mail.service';
@Injectable()
export class OrderMailListener {
private readonly logger = new Logger(OrderMailListener.name);
constructor(private readonly mail: MailService) {}
@OnEvent(OrderPlacedEvent.name, { async: true, promisify: true })
async sendConfirmation(event: OrderPlacedEvent) {
try {
await this.mail.sendOrderConfirmation(event.customerId, event.orderId);
} catch (err) {
this.logger.error(
`Failed to send confirmation for ${event.orderId}`,
err as Error,
);
// Ne pas re-throw : on isole l'effet de bord
}
}
@OnEvent('order.*', { async: true })
audit(event: unknown) {
this.logger.log(`order event captured: ${JSON.stringify(event)}`);
}
}🎯 Patterns courants — 3-6 patterns
1. Domain events publiés après commit
Le piège classique : publier l'événement avant que la transaction ne soit committée. Si la transaction rollback, les listeners reçoivent un événement pour une entité qui n'existe pas. Solution : un pattern transactional outbox simplifié.
@Injectable()
export class OrderService {
constructor(
private readonly events: EventEmitter2,
@InjectDataSource() private readonly dataSource: DataSource,
) {}
async place(dto: CreateOrderDto) {
const pending: Array<{ name: string; payload: unknown }> = [];
const order = await this.dataSource.transaction(async (mgr) => {
const o = await mgr.save(Order, { ...dto });
pending.push({
name: OrderPlacedEvent.name,
payload: new OrderPlacedEvent(o.id, dto.customerId, dto.amountCents),
});
return o;
});
// Après le commit uniquement
for (const ev of pending) this.events.emit(ev.name, ev.payload);
return order;
}
}Pour aller plus loin : écrire les événements dans une table outbox dans la même transaction, puis un worker lit la table et émet vers l'event-emitter ou un broker.
2. Idempotence côté listener
Un listener peut être appelé deux fois (retry manuel, redémarrage, double publication suite à un bug). Toujours rendre le traitement idempotent.
@OnEvent(OrderPlacedEvent.name)
async onPlaced(event: OrderPlacedEvent) {
const already = await this.redis.set(
`processed:${event.orderId}`,
'1',
'EX', 86400, 'NX',
);
if (!already) return; // déjà traité
await this.mail.send(...);
}3. Wildcards et namespacing hiérarchique
Avec wildcard: true et delimiter: '.', on adopte une convention <aggregate>.<action> (ex : order.placed, order.shipped, user.registered). Un listener order.* capte tout le namespace, parfait pour l'audit.
@OnEvent('user.*', { async: true })
async onUserChange(payload: unknown, eventName?: string) {
await this.analytics.track(eventName, payload);
}Avec **, on capte toute la hiérarchie en profondeur (order.shipment.delivered). Attention : ** peut surcharger l'application si tout passe par un seul listener.
4. Debounce / throttle d'événements
Pour des événements à haute fréquence (typing, mouse move, mais aussi cache.invalidate répétés), on peut accumuler et déclencher après un délai. Implémentation simple avec lodash.debounce.
import { debounce } from 'lodash';
@Injectable()
export class CacheWarmer implements OnModuleInit {
private warm = debounce(() => this.actuallyWarm(), 500, { trailing: true });
constructor(private readonly events: EventEmitter2) {}
onModuleInit() {
this.events.on('catalog.product.updated', () => this.warm());
}
private actuallyWarm() {
// reload cache once per 500 ms regardless of burst
}
}5. Async listeners avec promisify et waitFor
Par défaut, emit() est synchrone : tous les listeners s'exécutent dans l'event loop courante. Avec { async: true, promisify: true }, le listener retourne une promesse et emitAsync() les attend.
const results = await this.events.emitAsync(OrderPlacedEvent.name, payload);
// results: tableau des valeurs retournées par chaque listenerwaitFor(event, { timeout }) permet de bloquer jusqu'à l'arrivée d'un événement spécifique (utile dans les tests ou les workflows séquentiels).
6. Distinction event-emitter vs broker
Une règle simple : si un autre processus, un autre service, ou un autre déploiement doit consommer l'événement, ce n'est pas le job de @nestjs/event-emitter. Pour cross-process, utiliser @nestjs/microservices (RabbitMQ, Kafka, NATS), bullmq pour des jobs, ou @nestjs/cqrs avec event store pour de l'event sourcing. @nestjs/event-emitter reste in-process, in-memory, sans persistence ni rejouage.
7. Bridge event-emitter vers broker (hybride)
Un pattern hybride élégant : utiliser @nestjs/event-emitter comme bus interne, puis un forwarder qui écoute les événements à publier vers l'extérieur et les pousse dans le broker. Cela maintient le code métier propre (il ne connaît qu'un bus local) et centralise la traduction DomainEvent → IntegrationEvent.
@Injectable()
export class EventForwarder implements OnModuleInit {
constructor(
private readonly bus: EventEmitter2,
@Inject('KAFKA') private readonly kafka: ClientKafka,
) {}
onModuleInit() {
this.bus.on('order.placed', async (e: OrderPlacedEvent) => {
await this.kafka.emit('integration.orders.placed', {
version: 1,
id: e.orderId,
at: e.occurredAt.toISOString(),
data: { customerId: e.customerId, amountCents: e.amountCents },
});
});
}
}Ce forwarder devient le seul point de contact avec le broker. Tests faciles : on mocke ClientKafka et on vérifie qu'à chaque emit interne, le kafka.emit est appelé avec le bon shape.
🔄 Versions — Nest 7 → 11 + libs tierces
- Nest 7 : pas de package officiel ; on utilisait
eventemitter2ou desSubjectRxJS manuellement.@nestjs/event-emitterest apparu en v1 pour Nest 8. - Nest 8 :
@nestjs/event-emitterv1 stable, support@OnEvent,EventEmitter2injectable. - Nest 9 :
@nestjs/event-emitterv2 ajouteemitAsyncet options par listener (async,promisify,prependListener). - Nest 10 :
@nestjs/event-emitterv2.x continue ; améliorations sur la collecte des listeners au démarrage (compatibility avec lazy modules). - Nest 11 :
@nestjs/event-emitterv3 (eventemitter2 v6) ;OnEventaccepte un tableau d'événements (@OnEvent(['order.placed', 'order.refunded'])). Léger breaking : la signature deverboseMemoryLeakchange. @nestjs/cqrs: à ne pas confondre. FournitEventBus,CommandBus,QueryBus,Saga. Plus typé, plus structuré, mais plus de boilerplate. Mêmes contraintes in-process sauf si on branche unEventPublishercustom (event store, Kafka).eventemitter2v6 : breaking sur les wildcards de profondeur (**désormais strictement multi-segment).
⚠️ Pitfalls — 6-10 pièges
- Émission avant commit transactionnel. Voir pattern 1. Les listeners voient une entité incohérente. Toujours
emit()après lecommit. - Erreurs avalées silencieusement. Si un listener
asyncrejette et que personne n'attache.catch(), on a ununhandledRejection. Toujours encapsuler le traitement danstry/catchou utiliseremitAsyncavecPromise.allSettled. - MaxListenersExceededWarning. Par défaut, EventEmitter limite à 10 listeners par événement. Si on enregistre dynamiquement, augmenter
maxListenersou identifier le leak (souvent unonModuleInitqui s'abonne sansOnModuleDestroypour se désabonner). - Confusion in-process / cross-service. Un développeur junior écrit un microservice qui
emit('user.created')et un autre microservice essaie d'écouter. Ça ne marchera jamais : il faut un broker. - Ordering non garanti entre listeners. L'ordre d'enregistrement détermine l'ordre d'exécution, mais avec des
asynclisteners ils s'exécutent en parallèle. Pour un ordre strict, chaîner manuellement ou utiliser un seul listener orchestrateur. - Listeners dans des modules lazy. Si un module est chargé à la demande, ses
@OnEventne sont enregistrés qu'à ce moment. Un événement émis avant le chargement est perdu. - Memory leak via closure. Stocker
this.events.on(...)avec une closure qui capture un gros objet ; si on ne fait jamaisoff(), l'objet ne sera jamais collecté. - Test en parallèle qui partage le bus.
EventEmitter2est un singleton dans le module ; deux tests qui émettent en parallèle peuvent se contaminer. UtiliserremoveAllListeners()enafterEachou recréer leTestingModule. - Couplage caché. Les événements semblent réduire le couplage, mais en pratique 12 listeners écoutant
order.placeddeviennent ingérables. Documenter les listeners, ou passer à CQRS avec sagas explicites. emitne retourne rien d'utile.emit()retournetrue/false(présence de listeners), ce qui n'indique pas le succès du traitement. Pour savoir si quelque chose a marché, utiliseremitAsync()et inspecter les résultats.
🧪 Testing — exemples concrets
Test unitaire d'un service qui émet
// src/order/order.service.spec.ts
import { Test } from '@nestjs/testing';
import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter';
import { OrderService } from './order.service';
import { OrderPlacedEvent } from './events/order-placed.event';
describe('OrderService', () => {
let service: OrderService;
let bus: EventEmitter2;
beforeEach(async () => {
const moduleRef = await Test.createTestingModule({
imports: [EventEmitterModule.forRoot()],
providers: [OrderService],
}).compile();
service = moduleRef.get(OrderService);
bus = moduleRef.get(EventEmitter2);
});
it('emits order.placed after placement', async () => {
const spy = jest.fn();
bus.on(OrderPlacedEvent.name, spy);
await service.place('cus_1', 1999);
expect(spy).toHaveBeenCalledTimes(1);
expect(spy.mock.calls[0][0]).toBeInstanceOf(OrderPlacedEvent);
expect(spy.mock.calls[0][0].customerId).toBe('cus_1');
});
});Test d'un listener avec waitFor
import { EventEmitter2 } from '@nestjs/event-emitter';
it('listener processes within 100ms', async () => {
const bus = moduleRef.get(EventEmitter2);
const evt = new OrderPlacedEvent('o1', 'c1', 100);
bus.emit(OrderPlacedEvent.name, evt);
await bus.waitFor('order.confirmation.sent', { timeout: 100 });
// assertion sur l'état du système (DB, mail mock)
});Test avec un mock complet d'EventEmitter2
Pour des tests rapides et déterministes, on injecte un mock plutôt que le bus réel.
const emitterMock = {
emit: jest.fn(),
emitAsync: jest.fn().mockResolvedValue([]),
on: jest.fn(),
off: jest.fn(),
waitFor: jest.fn(),
};
const moduleRef = await Test.createTestingModule({
providers: [
OrderService,
{ provide: EventEmitter2, useValue: emitterMock },
],
}).compile();
await service.place('cus_1', 1999);
expect(emitterMock.emit).toHaveBeenCalledWith(
OrderPlacedEvent.name,
expect.objectContaining({ customerId: 'cus_1' }),
);Test d'idempotence
it('listener is idempotent', async () => {
const evt = new OrderPlacedEvent('o1', 'c1', 100);
await listener.onPlaced(evt);
await listener.onPlaced(evt); // second call
expect(mailMock.send).toHaveBeenCalledTimes(1);
});Test d'erreur dans listener (sans crasher)
it('does not bubble error from listener', async () => {
mailMock.send.mockRejectedValueOnce(new Error('SMTP down'));
await expect(
bus.emitAsync(OrderPlacedEvent.name, new OrderPlacedEvent('o1','c1',1)),
).resolves.toBeDefined(); // emitAsync ne re-throw pas par défaut
});Test d'audit wildcard
it('audit captures all order.* events', async () => {
const audit = jest.fn();
bus.on('order.*', audit);
bus.emit('order.placed', { id: 'o1' });
bus.emit('order.shipped', { id: 'o1' });
expect(audit).toHaveBeenCalledTimes(2);
});Test de propagation de trace
it('propagates traceparent through events', async () => {
const carrier: Record<string, string> = {};
propagation.inject(context.active(), carrier);
let captured: any;
bus.on('order.placed', (p) => { captured = p; });
bus.emit('order.placed', { id: 'o1', _trace: carrier });
expect(captured._trace.traceparent).toBe(carrier.traceparent);
});Test de timing pour debounce
import { advanceBy } from 'jest-date-mock';
it('debounces cache invalidation', () => {
jest.useFakeTimers();
const spy = jest.spyOn(warmer as any, 'actuallyWarm');
for (let i = 0; i < 10; i++) bus.emit('catalog.product.updated', {});
jest.advanceTimersByTime(500);
expect(spy).toHaveBeenCalledTimes(1);
jest.useRealTimers();
});🎬 Cas d'usage concrets
E-commerce — domain events sur la commande validée
Qui : équipe checkout d'un retailer omnicanal de 4 enseignes. Quand une commande passe le statut CONFIRMED, 6 effets de bord doivent se déclencher : envoi mail de confirmation, génération facture PDF, déclenchement préparation entrepôt, mise à jour CRM, push événement analytics, invalidation cache panier.
Problème : le service OrderService.confirm() historique enchaînait les 6 appels en synchrone. Une panne SMTP bloquait la validation de commande pour le client. Il faut découpler tout en gardant la transactionnalité de la donnée principale.
@Injectable()
export class OrderService {
constructor(
private readonly repo: OrderRepository,
private readonly emitter: EventEmitter2,
) {}
async confirm(orderId: string, paymentRef: string): Promise<Order> {
const order = await this.repo.transaction(async (tx) => {
const o = await tx.findOneOrFail(orderId);
o.confirm(paymentRef);
return tx.save(o);
});
this.emitter.emit('order.confirmed', { orderId: order.id, total: order.total, customerId: order.customerId, items: order.items });
return order;
}
}
@Injectable()
export class OrderConfirmedHandlers {
@OnEvent('order.confirmed', { async: true })
async sendMail(p: OrderConfirmedPayload) { await this.mail.sendConfirmation(p.orderId); }
@OnEvent('order.confirmed', { async: true })
async generateInvoice(p: OrderConfirmedPayload) { await this.invoices.generate(p.orderId); }
@OnEvent('order.confirmed', { async: true })
async notifyWarehouse(p: OrderConfirmedPayload) { await this.wms.dispatch(p.orderId, p.items); }
@OnEvent('order.confirmed', { async: true })
async pushToCrm(p: OrderConfirmedPayload) { await this.crm.syncOrder(p.orderId); }
}Gains : la commande confirme en 80 ms (juste DB + emit), les 6 handlers tournent en parallèle. Une panne SMTP n'empêche plus la confirmation, le handler retry tourne en BullMQ derrière. Le découplage permet à 3 squads différentes de pousser leur handler sans modifier OrderService.
SaaS RH — onboarding employé orchestré par événements
Qui : éditeur SaaS RH multi-tenant. L'arrivée d'un nouvel employé déclenche 8 actions : création compte SSO, attribution équipement, envoi welcome pack, planification entretiens RH J+30 et J+90, ouverture accès SI, génération contrat, ajout slack/teams.
Problème : chaque entreprise cliente a des règles différentes (les unes utilisent Slack, d'autres Teams, certaines exigent un accès VPN). Si on code en dur les appels, on a une explosion combinatoire.
@Injectable()
export class HiringService {
constructor(private readonly emitter: EventEmitter2, private readonly repo: EmployeeRepository) {}
async onboard(input: OnboardEmployeeInput): Promise<Employee> {
const employee = await this.repo.create(input);
this.emitter.emit('hr.employee.hired', {
employeeId: employee.id,
companyId: input.companyId,
role: input.role,
startDate: input.startDate,
});
return employee;
}
}
@Injectable()
export class SsoHiringHandler {
@OnEvent('hr.employee.hired', { async: true })
async createAccount(p: EmployeeHiredPayload) {
const company = await this.companies.findOne(p.companyId);
if (!company.features.ssoEnabled) return;
await this.sso.provisionUser(p.employeeId, p.companyId);
}
}
@Injectable()
export class SlackHiringHandler {
@OnEvent('hr.employee.hired', { async: true })
async inviteToSlack(p: EmployeeHiredPayload) {
const company = await this.companies.findOne(p.companyId);
if (company.collaborationTool !== 'slack') return;
await this.slack.invite(p.employeeId, company.slackTeamId);
}
}Gains : chaque handler vérifie ses propres préconditions feature-flag. Activer/désactiver Slack pour un client se fait par toggle, plus de cascade if/else dans le service principal. Les 8 actions tournent en 2 s wall-clock au lieu de 14 s en séquentiel.
Cabinet d'avocats — audit log événementiel
Qui : SaaS legaltech soumis aux exigences de traçabilité du conseil de l'Ordre. Toute action sensible (accès dossier, modification d'une pièce, partage avec un confrère) doit être loggée avec horodatage et signature.
Problème : ajouter auditService.log(...) dans chaque méthode pollue le code et oublie 30% des cas. On veut un mécanisme transverse où chaque service "publie" ses actions et un handler central audite.
@Injectable()
export class CaseDocumentService {
constructor(private readonly emitter: EventEmitter2) {}
async openDocument(caseId: string, docId: string, userId: string) {
const doc = await this.repo.findOne(caseId, docId);
this.emitter.emit('audit.action', {
type: 'document.opened',
actorId: userId,
resource: { kind: 'document', id: docId, caseId },
timestamp: new Date(),
ip: this.requestContext.ip,
});
return doc;
}
}
@Injectable()
export class AuditHandler {
@OnEvent('audit.action')
async record(payload: AuditPayload) {
const signed = await this.signer.sign(payload);
await this.auditRepo.append(signed);
}
}Gains : un seul point de persistence audit, signé avec horodatage Notarius pour valeur probante. Couverture audit passée de 70% à 100% car le pattern emit est devenu réflexe dans la code review. Conformité prouvée lors du dernier audit du conseil de l'Ordre.
🛠️ Exemple end-to-end
Contexte : plateforme de réservation hôtelière B2C. Quand un client annule sa réservation, on doit déclencher remboursement Stripe, libérer le stock, notifier l'hôtelier, envoyer un mail au client, mettre à jour la fidélité, et garantir que tout cela soit retryable sans double-effet. Utilisation de l'outbox pattern pour combiner transactionnalité DB et émission événementielle.
// src/booking/booking.service.ts
import { Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { randomUUID } from 'node:crypto';
interface BookingCancelledPayload {
bookingId: string;
customerId: string;
hotelId: string;
refundAmount: number;
paymentIntentId: string;
reason: string;
}
@Injectable()
export class BookingService {
constructor(
private readonly ds: DataSource,
private readonly outbox: OutboxService,
private readonly emitter: EventEmitter2,
) {}
async cancel(bookingId: string, customerId: string, reason: string) {
return this.ds.transaction(async (tx) => {
const booking = await tx.getRepository(Booking).findOneOrFail({ where: { id: bookingId, customerId } });
booking.cancel(reason);
await tx.save(booking);
const eventId = randomUUID();
await this.outbox.append(tx, {
id: eventId,
type: 'booking.cancelled',
aggregateId: bookingId,
payload: {
bookingId, customerId,
hotelId: booking.hotelId,
refundAmount: booking.totalCents,
paymentIntentId: booking.paymentIntentId,
reason,
} as BookingCancelledPayload,
createdAt: new Date(),
});
return { booking, eventId };
});
}
}
// src/outbox/outbox-publisher.service.ts
@Injectable()
export class OutboxPublisherService {
constructor(
private readonly ds: DataSource,
private readonly emitter: EventEmitter2,
private readonly log: Logger,
) {}
@Cron('*/2 * * * * *')
async publishPending() {
const pending = await this.ds.getRepository(OutboxEntry).find({
where: { publishedAt: IsNull() },
take: 100,
order: { createdAt: 'ASC' },
});
for (const entry of pending) {
try {
this.emitter.emit(entry.type, entry.payload, { eventId: entry.id });
entry.publishedAt = new Date();
await this.ds.getRepository(OutboxEntry).save(entry);
} catch (e) {
this.log.error(`Outbox publish failed for ${entry.id}`, e);
}
}
}
}
// src/booking/handlers/booking-cancelled.handlers.ts
@Injectable()
export class BookingCancelledHandlers {
constructor(
private readonly idempotency: IdempotencyService,
private readonly stripe: StripeService,
private readonly inventory: InventoryService,
private readonly mail: MailService,
private readonly hotelier: HotelierNotificationService,
private readonly loyalty: LoyaltyService,
) {}
@OnEvent('booking.cancelled', { async: true })
async refund(p: BookingCancelledPayload, meta: { eventId: string }) {
await this.idempotency.once(`refund:${meta.eventId}`, async () => {
await this.stripe.refund(p.paymentIntentId, p.refundAmount);
});
}
@OnEvent('booking.cancelled', { async: true })
async releaseInventory(p: BookingCancelledPayload, meta: { eventId: string }) {
await this.idempotency.once(`inventory:${meta.eventId}`, async () => {
await this.inventory.releaseForBooking(p.bookingId);
});
}
@OnEvent('booking.cancelled', { async: true })
async notifyCustomer(p: BookingCancelledPayload, meta: { eventId: string }) {
await this.idempotency.once(`mail:${meta.eventId}`, async () => {
await this.mail.sendCancellationConfirmation(p.bookingId);
});
}
@OnEvent('booking.cancelled', { async: true })
async notifyHotelier(p: BookingCancelledPayload, meta: { eventId: string }) {
await this.idempotency.once(`hotelier:${meta.eventId}`, async () => {
await this.hotelier.pushBookingCancelled(p.hotelId, p.bookingId);
});
}
@OnEvent('booking.cancelled', { async: true })
async revertLoyalty(p: BookingCancelledPayload, meta: { eventId: string }) {
await this.idempotency.once(`loyalty:${meta.eventId}`, async () => {
await this.loyalty.revertPoints(p.customerId, p.bookingId);
});
}
}
// src/idempotency/idempotency.service.ts
@Injectable()
export class IdempotencyService {
constructor(@Inject(CACHE_MANAGER) private readonly cache: Cache) {}
async once(key: string, action: () => Promise<void>): Promise<void> {
const seen = await this.cache.get(key);
if (seen) return;
await action();
await this.cache.set(key, '1', 7 * 24 * 60 * 60 * 1000);
}
}Outbox pattern garantissant que l'événement n'est émis que si la transaction DB a commité. Publisher tourne toutes les 2 secondes et marque les entrées publiées. Chaque handler s'enveloppe dans IdempotencyService.once() pour absorber les re-publish en cas de redémarrage publisher. Le flow complet (annulation, remboursement, libération stock, mails, fidélité) tourne en moins de 4 s avec garantie de cohérence éventuelle, même en cas de crash entre la transaction et l'émission.
🔁 Quand utiliser / éviter
Utiliser quand : besoin de découpler des effets de bord (mail, audit, cache invalidation, webhook sortant) dans une même application ; domain events locaux d'un aggregate vers d'autres bounded contexts qui partagent le même process ; notification interne entre modules (un module auth qui notifie un module profile quand un utilisateur s'inscrit) ; debounce/throttle de tâches de fond ; tests d'intégration où on veut observer des effets de bord.
Éviter quand : communication entre services (utiliser RabbitMQ, Kafka, NATS) ; besoin de persistence ou de rejouage (utiliser un event store ou un broker durable) ; workflow long et résilient (utiliser BullMQ ou Temporal) ; besoin de strong consistency entre la transaction et l'effet de bord (utiliser un outbox pattern persistant) ; quand vous écrivez du CQRS strict avec sagas (préférer @nestjs/cqrs).
Comparatif court :
| Besoin | Outil recommandé |
|---|---|
| In-process decoupling | @nestjs/event-emitter |
| Cross-service messaging | @nestjs/microservices + broker |
| Background jobs avec retry | @nestjs/bullmq |
| CQRS strict + sagas | @nestjs/cqrs |
| Event sourcing | EventStoreDB, marten, Axon |
| Workflow durable | Temporal.io |
🤖 Servir des agents IA : l'event-emitter comme bus interne d'un orchestrateur LLM
L'event-emitter brille quand on construit un agent serveur : une boucle d'outils (tool-use loop) génère des steps (token reçu, outil appelé, résultat d'outil, étape terminée) qu'on veut fan-out vers plusieurs consommateurs in-process — l'endpoint SSE qui streame vers le client, un logger de coût, un buffer de persistance, un métrique. Le bus découple la boucle d'inférence des canaux de sortie.
Mental model staff : la boucle agentique est le producer, le bus est le fan-out in-process, et le transport réseau (SSE/WebSocket) est un consumer parmi d'autres. Ne JAMAIS coupler la boucle LLM directement à
res.write(): sinon vous ne pouvez plus ajouter un logger de coût ou un persisteur sans toucher la boucle, et la déconnexion client tue la génération (perte de tokens facturés).
1. Le client LLM injecté via forRootAsync (jamais new Anthropic() dans un champ)
Anti-pattern junior : private anthropic = new Anthropic() dans le service. On perd la config centralisée, les mocks de test, les retries SDK, et on lit la clé d'API n'importe où. Pattern staff : un module dynamique qui fournit un client configuré et partagé.
// src/llm/llm.module.ts
import { Module, DynamicModule } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import Anthropic from '@anthropic-ai/sdk';
export const ANTHROPIC = Symbol('ANTHROPIC');
@Module({})
export class LlmModule {
static forRootAsync(): DynamicModule {
return {
module: LlmModule,
imports: [ConfigModule],
providers: [
{
provide: ANTHROPIC,
inject: [ConfigService],
useFactory: (cfg: ConfigService) =>
new Anthropic({
apiKey: cfg.getOrThrow<string>('ANTHROPIC_API_KEY'),
maxRetries: 3, // retries SDK (429/5xx) avec backoff exponentiel
timeout: 60_000,
}),
},
],
exports: [ANTHROPIC],
global: true,
};
}
}Modèles flagship (juin 2026) : claude-opus-4-8 (raisonnement profond, agents), claude-sonnet-4-6 (équilibre coût/latence, défaut de prod), claude-haiku-4-5 (classification, routing, ultra-rapide). On route le modèle par usage, pas par habitude.
2. La boucle agentique côté serveur émet des agent.* events
La boucle de tool-use ne sait rien du transport. Elle émet. C'est le découplage qui rend l'observabilité, le costing et la persistance gratuits.
// src/agent/agent.runner.ts
import { Injectable, Inject } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import type Anthropic from '@anthropic-ai/sdk';
import { ANTHROPIC } from '../llm/llm.module';
type AgentStep =
| { kind: 'text-delta'; runId: string; text: string }
| { kind: 'tool-call'; runId: string; tool: string; input: unknown }
| { kind: 'tool-result'; runId: string; tool: string; output: unknown }
| { kind: 'done'; runId: string; stopReason: string }
| { kind: 'usage'; runId: string; inputTokens: number; outputTokens: number };
@Injectable()
export class AgentRunner {
constructor(
@Inject(ANTHROPIC) private readonly anthropic: Anthropic,
private readonly bus: EventEmitter2,
private readonly tools: ToolRegistry,
) {}
async run(runId: string, prompt: string, signal: AbortSignal): Promise<void> {
const messages: Anthropic.MessageParam[] = [{ role: 'user', content: prompt }];
// Boucle agentique : on itère tant que le modèle demande des outils
for (let turn = 0; turn < 8; turn++) {
if (signal.aborted) return;
const stream = this.anthropic.messages.stream(
{
model: 'claude-sonnet-4-6',
max_tokens: 2048,
messages,
tools: this.tools.schemas(),
},
{ signal }, // propage l'annulation au SDK -> ferme la connexion HTTP sortante
);
stream.on('text', (delta) =>
this.bus.emit('agent.step', { kind: 'text-delta', runId, text: delta }),
);
const final = await stream.finalMessage();
this.bus.emit('agent.step', {
kind: 'usage',
runId,
inputTokens: final.usage.input_tokens,
outputTokens: final.usage.output_tokens,
} satisfies AgentStep);
const toolUses = final.content.filter((c) => c.type === 'tool_use');
if (toolUses.length === 0) {
this.bus.emit('agent.step', { kind: 'done', runId, stopReason: final.stop_reason ?? 'end' });
return;
}
messages.push({ role: 'assistant', content: final.content });
const results: Anthropic.ToolResultBlockParam[] = [];
for (const tu of toolUses) {
this.bus.emit('agent.step', { kind: 'tool-call', runId, tool: tu.name, input: tu.input });
const output = await this.tools.execute(tu.name, tu.input, signal);
this.bus.emit('agent.step', { kind: 'tool-result', runId, tool: tu.name, output });
results.push({ type: 'tool_result', tool_use_id: tu.id, content: JSON.stringify(output) });
}
messages.push({ role: 'user', content: results });
}
}
}3. L'endpoint SSE consomme le bus filtré par runId + AbortController sur déconnexion
Le consumer SSE s'abonne, filtre par runId, streame, et — point critique de prod — annule la génération serveur quand le client se déconnecte. Sans ça, vous facturez des tokens que personne ne lit.
// src/agent/agent.controller.ts
import { Controller, Post, Body, Res, Req } from '@nestjs/common';
import { Response, Request } from 'express';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { randomUUID } from 'node:crypto';
import { AgentRunner } from './agent.runner';
@Controller('agent')
export class AgentController {
constructor(private readonly runner: AgentRunner, private readonly bus: EventEmitter2) {}
@Post('stream')
async stream(@Body('prompt') prompt: string, @Res() res: Response, @Req() req: Request) {
const runId = randomUUID();
const ac = new AbortController();
res.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
});
res.flushHeaders();
const onStep = (step: { runId: string; kind: string }) => {
if (step.runId !== runId) return; // FILTRE: un seul bus, N runs concurrents
res.write(`event: ${step.kind}\ndata: ${JSON.stringify(step)}\n\n`);
if (step.kind === 'done') cleanup();
};
this.bus.on('agent.step', onStep);
const cleanup = () => {
this.bus.off('agent.step', onStep); // sinon: MEMORY LEAK + MaxListenersExceeded
res.end();
};
// Déconnexion client -> annule la boucle serveur (et la connexion Anthropic)
req.on('close', () => {
ac.abort();
cleanup();
});
try {
await this.runner.run(runId, prompt, ac.signal);
} catch (err) {
res.write(`event: error\ndata: ${JSON.stringify({ message: (err as Error).message })}\n\n`);
cleanup();
}
}
}Piège n°1 en prod : oublier le
this.bus.off()danscleanup. Chaque requête SSE attache un listener ; sans détachement, après quelques milliers de requêtes vous déclenchezMaxListenersExceededWarning, puis un OOM. LerunIdfiltré +off()symétrique est non négociable. Alternative plus robuste à grande échelle :bus.oncene suffit pas (multi-events), donc on garde le coupleon/offavec une closure capturée.
4. Jobs IA longs en BullMQ : idempotence keyée sur le generation id, retry cost-aware
Pour une génération longue (rapport, batch d'embeddings), on ne tient pas une connexion SSE ouverte 5 minutes : on passe par BullMQ. L'event-emitter sert alors de pont entre le worker et le SSE de polling.
// src/agent/agent.processor.ts
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Job } from 'bullmq';
@Processor('agent-jobs')
export class AgentProcessor extends WorkerHost {
constructor(private readonly runner: AgentRunner, private readonly idem: IdempotencyService) {
super();
}
async process(job: Job<{ generationId: string; prompt: string }>): Promise<void> {
const { generationId, prompt } = job.data;
// Idempotence KEYÉE SUR generationId, pas sur job.id (un retry réutilise la même clé)
await this.idem.once(`gen:${generationId}`, async () => {
const ac = new AbortController();
await this.runner.run(generationId, prompt, ac.signal);
});
}
}// Enqueue: retry cost-aware — on NE retry PAS une erreur 400 (prompt invalide = argent perdu)
await this.agentQueue.add(
'generate',
{ generationId, prompt },
{
jobId: generationId, // dédup BullMQ: deux enqueues du même generationId = un seul job
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
},
);Règles staff pour les jobs IA :
- Idempotence sur le
generationId(id métier stable), jamais sur lejob.idBullMQ qui change au requeue. - Retry cost-aware : un
400 invalid_requestou un refus de contenu ne doit PAS être retenté (vous payez à chaque appel) ; un429/529 overloaded/5xxoui, avec backoff. Filtrer dans unshouldRetry(err). - Partial-output handling : si le worker crashe au tour 5/8, persistez les steps déjà émis (
agent.step-> persister) pour reprendre ou afficher un résultat partiel plutôt que de tout rejouer.
5. Cost-guard et rate-limit à l'edge (avant d'émettre quoi que ce soit)
Un agent qui boucle peut brûler des milliers de dollars. Le garde-fou vit à l'edge, avant la boucle, et émet un agent.budget.exceeded consommé par l'alerting.
@Injectable()
export class CostGuard {
constructor(@Inject(CACHE_MANAGER) private readonly cache: Cache, private readonly bus: EventEmitter2) {}
async assertBudget(tenantId: string, estimatedCents: number): Promise<void> {
const key = `budget:${tenantId}:${new Date().toISOString().slice(0, 10)}`;
const spent = (await this.cache.get<number>(key)) ?? 0;
if (spent + estimatedCents > DAILY_LIMIT_CENTS) {
this.bus.emit('agent.budget.exceeded', { tenantId, spent, limit: DAILY_LIMIT_CENTS });
throw new ForbiddenException('Daily AI budget exceeded');
}
await this.cache.set(key, spent + estimatedCents, 24 * 3600 * 1000);
}
}On combine avec un rate-limit (@nestjs/throttler ou un token-bucket Redis) keyé sur le tenant. Le tracking réel du coût se fait via le step usage : on consomme agent.step{kind:'usage'} pour incrémenter le compteur avec le vrai nombre de tokens facturés (input + output), pas l'estimation.
@OnEvent('agent.step', { async: true })
async trackCost(step: { kind: string; runId: string; inputTokens?: number; outputTokens?: number }) {
if (step.kind !== 'usage') return;
const cents = this.pricer.cents('claude-sonnet-4-6', step.inputTokens!, step.outputTokens!);
await this.ledger.increment(step.runId, cents);
}Pourquoi l'event-emitter ici et pas un appel direct ? Parce que le costing, l'observabilité (OTel), la persistance des steps et le transport SSE sont quatre consumers indépendants du même flux
agent.step. Les coupler dans la boucle, c'est récrire la boucle à chaque nouveau besoin. C'est exactement le cas d'usage canonique du fan-out in-process. (Pour exposer ces capacités à d'autres process — un autre service qui veut piloter l'agent — on bascule sur un endpoint MCP ou un broker : l'event-emitter reste interne au process de l'orchestrateur.)
🧰 Aller plus loin — orchestration et observabilité
Tracer les événements avec OpenTelemetry
Un système événementiel est intrinsèquement difficile à observer : le flot de causalité passe par des callbacks et non par une stack synchronique. La parade : propager un traceparent (W3C Trace Context) à travers les payloads et instrumenter manuellement le bus.
import { trace, context, propagation } from '@opentelemetry/api';
@Injectable()
export class TracedEventBus {
constructor(private readonly bus: EventEmitter2) {}
emit<T extends object>(name: string, payload: T) {
const carrier: Record<string, string> = {};
propagation.inject(context.active(), carrier);
this.bus.emit(name, { ...payload, _trace: carrier });
}
on<T extends object>(name: string, handler: (p: T) => Promise<void> | void) {
this.bus.on(name, async (p: any) => {
const ctx = propagation.extract(context.active(), p._trace ?? {});
await context.with(ctx, async () => {
const span = trace.getTracer('event-bus').startSpan(`handle ${name}`);
try {
await handler(p);
} catch (e) {
span.recordException(e as Error);
throw e;
} finally {
span.end();
}
});
});
}
}Avec cette instrumentation, les traces remontent côté Jaeger/Datadog en montrant le lien entre producteur et listeners, même si tout est in-process.
Monitoring : métriques de listeners
Exposer des métriques Prometheus par événement et par listener permet de détecter rapidement les régressions (latence, taux d'erreur). Compteur events_emitted_total{name}, histogramme event_handler_duration_seconds{name,listener}, compteur event_handler_errors_total{name,listener,reason}.
@OnEvent('order.placed', { async: true })
async onPlaced(event: OrderPlacedEvent) {
const stop = this.metrics.startTimer('order.placed', 'OrderMailListener');
try {
await this.mail.sendOrderConfirmation(event.customerId, event.orderId);
} catch (err) {
this.metrics.error('order.placed', 'OrderMailListener', err.code ?? 'unknown');
throw err;
} finally {
stop();
}
}🔗 Liens
- Doc officielle : https://docs.nestjs.com/techniques/events
eventemitter2: https://github.com/EventEmitter2/EventEmitter2- Outbox pattern : https://microservices.io/patterns/data/transactional-outbox.html
@nestjs/cqrs: https://docs.nestjs.com/recipes/cqrs- Article « Domain events vs Integration events » : https://martinfowler.com/articles/201701-event-driven.html
- Comparaison émetteurs/brokers : https://www.confluent.io/blog/event-driven-microservices-with-kafka/
- Repo source
@nestjs/event-emitter: https://github.com/nestjs/event-emitter - W3C Trace Context : https://www.w3.org/TR/trace-context/
- BullMQ pour jobs persistants : https://docs.bullmq.io
- Temporal.io pour workflows durables : https://temporal.io
- Anthropic SDK (streaming, tool use, retries) : https://github.com/anthropics/anthropic-sdk-typescript
- MCP (Model Context Protocol) : https://modelcontextprotocol.io
🏋️ Exercices
Progression : on implémente, on durcit pour la prod, puis on casse et on répare. Faites-les dans l'ordre — chacun construit sur le précédent.
Exercice 1 — Bus typé fin (implement)
Objectif : remplacer les chaînes magiques ('order.placed') par un bus typé où le nom de l'événement détermine le type du payload à la compilation.
Construire un TypedEventBus générique au-dessus de EventEmitter2 avec une map EventMap ({ 'order.placed': OrderPlacedEvent; 'order.refunded': OrderRefundedEvent }) telle que bus.emit('order.placed', payload) refuse de compiler si payload n'est pas un OrderPlacedEvent, et bus.on('order.placed', h) infère h: (e: OrderPlacedEvent) => void.
Indice / Solution
type EventMap = {
'order.placed': OrderPlacedEvent;
'order.refunded': OrderRefundedEvent;
};
@Injectable()
class TypedEventBus {
constructor(private readonly bus: EventEmitter2) {}
emit<K extends keyof EventMap>(name: K, payload: EventMap[K]): boolean {
return this.bus.emit(name, payload);
}
on<K extends keyof EventMap>(name: K, h: (p: EventMap[K]) => void): void {
this.bus.on(name as string, h as (...a: any[]) => void);
}
}Le typage vit au-dessus ; EventEmitter2 reste string-based dessous. Le coût : les wildcards (order.*) cassent l'inférence — c'est un tradeoff assumé, on garde un onWildcard(pattern, h) non typé séparé.
Exercice 2 — Outbox transactionnel complet (production-grade)
Objectif : garantir qu'aucun order.confirmed n'est émis si la transaction DB rollback, et qu'aucun n'est perdu si le process crashe entre commit et émission.
Implémenter la table outbox (colonnes id, type, payload jsonb, created_at, published_at nullable, attempts), l'écriture dans la même transaction que l'aggregate, un publisher @Cron qui lit les non-publiés avec FOR UPDATE SKIP LOCKED, émet, marque publié. Ajouter un index partiel WHERE published_at IS NULL.
Indice / Solution
Le SKIP LOCKED est la clé pour scaler à N instances du publisher sans double-émission : chaque worker verrouille un lot disjoint. Le take: 100 + index partiel évite un full-scan sur une table qui grossit. Penser au relocking : si un worker meurt après émission mais avant save(published_at), l'entrée sera réémise — d'où l'idempotence côté listener obligatoire (l'outbox garantit at-least-once, jamais exactly-once). Purger les published_at anciens via un second cron.
Exercice 3 — Streaming agent SSE de bout en bout (production-grade)
Objectif : exposer POST /agent/stream qui streame les tokens d'une boucle tool-use, avec annulation serveur à la déconnexion client et détachement propre des listeners.
Reprendre les sections 2-3 du bloc IA. Tester : (a) une réponse normale streame jusqu'à done ; (b) un curl interrompu (Ctrl-C) déclenche req.on('close') -> ac.abort() -> la connexion Anthropic se ferme ; (c) 1000 requêtes successives ne font pas grimper le nombre de listeners du bus (bus.listenerCount('agent.step') reste stable).
Indice / Solution
Le test (c) est le révélateur de fuite : loguer bus.listenerCount('agent.step') après chaque requête. S'il croît, votre off() ne matche pas la référence passée à on() (closure recréée vs même référence). Stocker la fonction onStep dans une const et passer exactement cette référence à on et off. Pour (b), vérifier que stream(..., { signal }) propage bien — l'abort doit faire rejeter finalMessage() avec un APIUserAbortError, à catcher silencieusement (ce n'est pas une vraie erreur).
Exercice 4 — Casser puis réparer : la tempête de listeners async (break-then-fix)
Objectif : reproduire un incident de prod où 12 listeners { async: true } sur order.confirmed saturent un pool de connexions DB, puis corriger sans revenir au séquentiel naïf.
Émettre 500 order.confirmed d'affilée. Chaque listener fait une requête DB. Observer l'épuisement du pool (TimeoutError: acquiring connection) car async: true lance tout en parallèle, soit 500 × 12 = 6000 requêtes concurrentes. Réparer avec un sémaphore de concurrence (p-limit) par listener, ou un débitmètre, sans bloquer l'event loop.
Indice / Solution
Le piège : { async: true } ne sérialise rien, il fan-out en parallèle total. La cause racine est le mismatch entre la cardinalité d'émission (rafale) et la capacité aval (pool de 20 connexions). Trois réparations, du moins au plus robuste : (1) p-limit(10) autour du corps du listener ; (2) router la rafale vers BullMQ avec concurrency: 10 sur le worker (back-pressure native + retry) ; (3) batching — accumuler les events 50 ms et traiter en bulk. La leçon staff : l'event-emitter n'a pas de back-pressure. Dès qu'il y a un aval limité (DB, API tierce, LLM), il faut un mécanisme de contrôle de débit explicite. C'est la frontière exacte où l'event-emitter cède la place à une vraie queue.
Exercice 5 — Cost-guard adversarial sur un agent qui boucle (break-then-fix)
Objectif : prouver qu'un agent malicieux ou buggé ne peut pas dépasser le budget journalier d'un tenant, même sous concurrence.
Écrire un test qui lance 50 runs concurrents pour le même tenant avec un DAILY_LIMIT qui n'autorise que 10 runs. Vérifier que ≤ 10 passent. Naïvement, le get-puis-set du CostGuard a une race condition (TOCTOU) : sous concurrence, les 50 lisent spent=0 avant qu'aucun n'ait écrit. Réparer.
Indice / Solution
Le get + set non atomique laisse passer tout le monde. Réparation : opération atomique côté Redis — INCRBY puis comparer le retour (const total = await redis.incrby(key, cents); if (total > limit) { await redis.decrby(key, cents); throw }), ou un script Lua check-and-increment atomique, ou un token-bucket. Le decrby compensatoire évite de "consommer" le budget d'un run rejeté. Leçon : tout garde-fou de coût/quota sous concurrence DOIT être atomique ; le pattern read-modify-write applicatif est toujours faux ici. Le step usage (tokens réels) réconcilie ensuite l'estimation avec le coût facturé.
🎤 En entretien
Q : Quand utiliseriez-vous @nestjs/event-emitter plutôt que @nestjs/cqrs ou un broker, et où est la frontière ? R : Event-emitter pour du fan-out d'effets de bord in-process, in-memory, sans garantie de livraison (mail, audit, cache). CQRS dès qu'on veut des commandes/events typés, des sagas explicites et un EventPublisher brancheable. Broker dès qu'un autre process doit consommer ou qu'il faut de la durabilité/rejouage. La frontière dure : pas de persistance, pas de back-pressure, pas de cross-process — au premier de ces trois besoins, on sort de l'event-emitter.
Q : Un listener async order.confirmed envoie un mail mais échoue. Que se passe-t-il, et comment le rendez-vous robuste ? R : Avec emit() (sync), un rejet non catché devient un unhandledRejection qui peut tuer le process ; les autres listeners ont déjà tourné. Robustesse : try/catch dans chaque listener pour isoler l'effet de bord, ou emitAsync() + Promise.allSettled pour collecter les échecs sans propager. Pour la fiabilité réelle (retry, DLQ), le listener ne fait qu'enqueue un job BullMQ — l'event-emitter notifie, la queue garantit l'exécution.
Q : Comment garantissez-vous qu'un événement n'est émis que si la transaction a commité, sans perdre l'événement si le process crashe juste après ? R : Outbox transactionnel : écrire l'event dans une table outbox dans la même transaction que l'aggregate (atomicité garantie par la DB), puis un publisher asynchrone (cron + FOR UPDATE SKIP LOCKED) lit les non-publiés et émet. Si le process meurt après commit mais avant émission, l'entrée reste published_at IS NULL et sera reprise. C'est de l'at-least-once, donc les listeners doivent être idempotents (clé sur l'event.id).
Q : Vous streamez un agent LLM via SSE et l'event-emitter. Quels sont les deux pièges de prod et comment vous les évitez ? R : (1) Fuite de listeners : chaque requête SSE fait bus.on('agent.step', ...) ; sans off() symétrique à la déconnexion, on accumule des listeners jusqu'au MaxListenersExceeded puis OOM. On filtre par runId et on détache exactement la même référence dans le cleanup. (2) Tokens facturés non lus : si le client ferme l'onglet, sans AbortController la boucle continue d'appeler Anthropic et vous payez pour rien. On câble req.on('close') -> ac.abort() qui propage le signal au SDK (stream(..., { signal })) et ferme la connexion sortante. Bonus : le costing se branche en consumer du step usage, pas dans la boucle.