Microservices NestJS
TL;DR —
@nestjs/microservicespropose une abstraction transport (TCP, Redis, NATS, Kafka, gRPC, RabbitMQ, MQTT) au-dessus d'un même modèle de controller. Tu choisis entre request-response (@MessagePattern) et pub/sub (@EventPattern). Une app hybrid sert HTTP + microservice dans le même process. Les vrais défis ne sont pas le code Nest, mais : idempotence, retries, ordering, exactly-once illusion, observability.
🧠 Mental model — ASCII diagram + analogy
Analogie : pense à une entreprise.
- TCP = ligne téléphonique directe (1-1, rapide, fragile).
- Redis Pub/Sub = un haut-parleur, tout le monde entend (volatile, pas de queue).
- NATS = un facteur ultra-rapide avec accusé optionnel (JetStream).
- Kafka = un journal d'événements immuable horodaté (replay possible).
- RabbitMQ = un bureau de poste classique avec files, routing, DLQ.
- gRPC = un contrat strict (Protobuf), comme un échange de formulaires signés.
┌──────────────┐
Client ───▶ │ ClientProxy │ ───▶ Transport ───▶ Broker
└──────────────┘ │
▼
┌─────────────────────────┐
│ Microservice Nest App │
│ @MessagePattern('cmd') │
│ @EventPattern('evt') │
└─────────────────────────┘Request-response (@MessagePattern) : le client attend une réponse, le broker corrèle via un correlationId + replyTo. Latence visible.
Event-based (@EventPattern) : fire-and-forget, idéal pour broadcast. Aucune réponse, plusieurs handlers possibles.
🛠️ Code minimal — realistic working snippet
Hybrid app (HTTP + microservice TCP)
// main.ts
import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.TCP,
options: { host: '0.0.0.0', port: 4001 },
});
await app.startAllMicroservices();
await app.listen(3000); // HTTP
}
bootstrap();Controller mixte
import { Controller, Get } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx, RmqContext } from '@nestjs/microservices';
@Controller('orders')
export class OrdersController {
@Get()
list() { return this.svc.list(); }
// Request-response
@MessagePattern({ cmd: 'order.create' })
async create(@Payload() dto: CreateOrderDto) {
return this.svc.create(dto);
}
// Pub/sub event with manual ack (RabbitMQ)
@EventPattern('order.shipped')
async onShipped(@Payload() data: any, @Ctx() ctx: RmqContext) {
try {
await this.svc.markShipped(data.id);
ctx.getChannelRef().ack(ctx.getMessage());
} catch (err) {
ctx.getChannelRef().nack(ctx.getMessage(), false, false); // -> DLQ
}
}
}Client (autre service)
@Module({
imports: [
ClientsModule.register([
{
name: 'ORDERS',
transport: Transport.NATS,
options: { servers: ['nats://nats:4222'] },
},
]),
],
})
export class BillingModule {}@Injectable()
export class BillingService {
constructor(@Inject('ORDERS') private orders: ClientProxy) {}
async charge(orderId: string) {
const order = await firstValueFrom(
this.orders.send({ cmd: 'order.get' }, { id: orderId })
.pipe(timeout(2000), retry({ count: 2, delay: 200 })),
);
this.orders.emit('billing.charged', { orderId });
return order;
}
}Kafka (consumer group)
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: { brokers: ['kafka:9092'] },
consumer: { groupId: 'orders-svc' },
},
});🎯 Patterns courants
- Hybrid HTTP + MS — un seul binaire expose une API REST et écoute des events. Pratique en early-stage. Sépare en deux process quand la charge l'exige.
- Saga choreography — pas de coordinateur central, chaque service réagit aux events. Simple mais flow implicite.
- Saga orchestration — un service "Orchestrator" envoie des commands successives (
@MessagePattern) et compense en cas d'échec. Plus debuggable, mais point central. - Idempotency key — chaque message porte un
messageId. Le consumer stocke les IDs traités (Redis SET avec TTL, ou table SQL avec PK). Rejouer = no-op. - Outbox + relay — écris l'event dans une table SQL
outboxlors du commit DB. Un worker relit et publie sur le broker. Évite le "dual write problem". - DLQ + redelivery policy — un consumer qui plante 3 fois pousse le message en Dead Letter Queue, alerte humaine, replay manuel après fix.
🔄 Versions — Nest 7 / 8 / 9 / 10 / 11
- Nest 7 : RxJS 6,
send()retourneObservable, peu de typage des payloads. - Nest 8 : passage RxJS 7. Support Kafka maturé. Génériques sur
ClientProxy.send<TResult, TInput>. - Nest 9 : nouveau client gRPC (
@grpc/grpc-js) par défaut, anciensgrpcdeprecated.Transport.NATSv2. - Nest 10 : RabbitMQ via
amqplibmis à jour,RmqContext.getChannelRef()stable, support natif deBatchingProducerKafka. - Nest 11 : amélioration du serializer/deserializer (interfaces stables), TypeScript 5 obligatoire pour certains decorators. Vérifie les types
ClientKafka(la signaturesubscribeToResponseOfn'a pas changé mais le typage des messages oui).
Library notes :
@nestjs/microservices≤ v9 : Kafka utilisaitkafkajsclassique, en v10+ pleinement compatible avec les versions récentes (>= 2.2).natsv1 → v2 : breaking change, nouveau format de connexion (servers array), preferred pour Nest 9+.amqp-connection-managerrecommandé en wrapper pour reconnect automatique RabbitMQ.
⚠️ Pitfalls
- Pas de timeout sur
send()— un broker lent fait pendre toute la requête HTTP appelante. Toujours.pipe(timeout(N)). emit()non-blocking confondu avecsend()—emitne renvoie pas de promesse utile (juste l'accusé local). Si tu attends une réponse, c'estsend.- Ordering non garanti par défaut — Kafka garantit l'ordre par partition (clé), Redis/NATS pub-sub ne garantissent rien. Si l'ordre compte, partitionne par
aggregateId. - At-least-once mal compris — tu vas recevoir le même message 2 fois un jour. Toujours idempotent. Ne suppose jamais exactly-once (mythe).
- Dual write —
db.save()puisclient.emit()= si la 2e plante, event perdu. Utilise outbox ou transactional broker. - Pas de DLQ configurée — un message poison bloque la queue et reboucle infiniment (CPU 100 %, alerte 3 h du mat). Définis maxRetries + DLQ dès le jour 1.
- gRPC : oublier de
subscribeToResponseOf— sur Kafka avant unsend, il faut s'abonner aux topics de réponse dansonModuleInit. Sinon timeout silencieux. - Serializer custom non symétrique — un côté JSON, l'autre Avro, debug atroce. Garde le même serializer des deux côtés ou rends-le explicite.
🧪 Testing
Unit : teste les handlers comme des méthodes normales, pas besoin du transport.
const ctrl = new OrdersController(svc);
await ctrl.create({ id: 'o1' } as any);
expect(svc.create).toHaveBeenCalled();Integration — démarre un transport en mémoire (TCP sur port aléatoire) :
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.TCP,
options: { port: 0 },
});
await app.listen();
const port = (app as any).server.server.address().port;
const client = ClientProxyFactory.create({
transport: Transport.TCP,
options: { port },
});
await client.connect();
const res = await firstValueFrom(client.send({ cmd: 'order.create' }, dto));
expect(res).toBeDefined();
await client.close();
await app.close();Contract testing — pour Kafka/gRPC, utilise des fichiers .proto ou Avro schemas en source de vérité, et Pact pour les contrats entre services.
E2E avec Testcontainers — pour Redis/Kafka/RabbitMQ, démarre le broker dans un container Docker à chaque suite (lent mais réaliste).
🎬 Cas d'usage concrets
Industrie — ERP modulaire
Qui — Industriel français (fabrication de composants automobiles, 12 sites) qui remplace son ERP monolithique par des modules NestJS indépendants : production, inventory, quality, maintenance. Problème — Les équipes sont colocalisées par usine et veulent déployer à leur cadence. Le monolithe imposait une release tous les 3 mois. Les modules doivent communiquer via Kafka (réseau industriel non fiable, besoin de retry). Comment — Chaque module est un service NestJS exposant gRPC pour les requêtes synchrones et publiant des events Kafka pour les changements d'état.
// production/main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(ProductionModule, {
transport: Transport.KAFKA,
options: {
client: { brokers: ['kafka-1:9092', 'kafka-2:9092'] },
consumer: { groupId: 'production-service' },
},
});
@Controller()
export class ProductionEventsController {
@EventPattern('inventory.shortage_detected')
async onShortage(@Payload() event: ShortageEvent) {
await this.production.pauseLine(event.lineId, event.sku);
}
}Gains — Release indépendante par module (déploiement 5x/semaine vs 4x/an), tolérance aux pannes réseau via le replay Kafka, équipes autonomes.
E-commerce — Marketplace type Mirakl
Qui — Marketplace française B2C qui agrège des milliers de vendeurs avec des SLA différents (catalogue, commande, paiement, expédition). Problème — Le service catalog est massif en lecture (cache CDN), order est massif en écriture, payment doit être isolé pour la PCI-DSS. Un seul monolithe imposait du sur-provisioning. Comment — Microservices NestJS par domaine, communication NATS pour les commandes/queries cross-services, Redis pour le pub/sub local.
// order/order.service.ts
@Injectable()
export class OrderService {
constructor(
@Inject('CATALOG_SVC') private catalog: ClientProxy,
@Inject('PAYMENT_SVC') private payment: ClientProxy,
) {}
async placeOrder(dto: PlaceOrderDto) {
const prices = await firstValueFrom(
this.catalog.send<PriceMap>('pricing.compute', dto.items).pipe(timeout(2000)),
);
const charge = await firstValueFrom(
this.payment.send<ChargeResult>('payment.authorize', {
amount: prices.total, method: dto.paymentMethod,
}).pipe(timeout(5000)),
);
return this.repo.create({ ...dto, total: prices.total, chargeId: charge.id });
}
}Gains — Scaling indépendant (catalogue 20 pods, paiement 4 pods), isolation PCI-DSS, p95 commande < 400 ms.
Banque — Core bancaire + services périphériques
Qui — Banque européenne avec un core mainframe COBOL intouchable et une ceinture de services modernes (mobile, KYC, agrégation budgétaire). Problème — Le core ne peut absorber qu'un trafic limité. Les services périphériques doivent appeler le core via une queue MQ et fonctionner en mode dégradé si le core est lent. Comment — Façades NestJS qui exposent du REST/gRPC aux apps front et parlent au core via RabbitMQ avec circuit breaker.
@Injectable()
export class CoreBankFacade {
constructor(
@Inject('CORE_MQ') private core: ClientProxy,
private breaker: CircuitBreakerService,
) {}
async getBalance(accountId: string): Promise<number> {
return this.breaker.exec('core.balance', async () => {
return firstValueFrom(
this.core.send<number>('balance.query', { accountId }).pipe(timeout(800)),
);
}, async () => this.cache.getStale(accountId));
}
}Gains — Le mobile reste réactif même si le core sature (fallback cache), évolution des services périphériques sans toucher COBOL, observabilité moderne (OpenTelemetry).
🛠️ Exemple end-to-end
Contexte — L'ERP industriel ci-dessus déploie un module quality qui détecte des défauts via vision IA et publie un event defect.detected. Le module production réagit en stoppant la ligne, le module maintenance planifie un contrôle, et un dashboard agrège tout via SSE. On utilise NATS comme transport, schéma Avro pour les events, et un health check par module.
// quality/src/main.ts
async function bootstrap() {
const app = await NestFactory.create(QualityModule);
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.NATS,
options: {
servers: ['nats://nats:4222'],
queue: 'quality-service',
},
});
await app.startAllMicroservices();
await app.listen(3001);
}
bootstrap();
// quality/src/quality.controller.ts
@Controller('quality')
export class QualityController {
constructor(
@Inject('EVENTS') private events: ClientProxy,
private detector: DefectDetector,
) {}
@Post('inspect')
async inspect(@Body() dto: InspectDto): Promise<InspectResult> {
const result = await this.detector.analyze(dto.imageUrl);
if (result.defects.length > 0) {
this.events.emit('defect.detected', {
lineId: dto.lineId,
partRef: dto.partRef,
defects: result.defects,
severity: result.maxSeverity,
detectedAt: new Date().toISOString(),
eventId: randomUUID(),
});
}
return result;
}
@MessagePattern('quality.status')
status() {
return { status: 'ok', queueDepth: this.detector.pendingJobs() };
}
}// production/src/production.controller.ts
@Controller()
export class ProductionEventsController {
private readonly logger = new Logger(ProductionEventsController.name);
constructor(
private lines: LineService,
@Inject('EVENTS') private events: ClientProxy,
) {}
@EventPattern('defect.detected')
async onDefect(@Payload() event: DefectDetectedEvent, @Ctx() ctx: NatsContext) {
this.logger.log(`defect ${event.eventId} on line ${event.lineId}`);
if (event.severity === 'critical') {
await this.lines.pause(event.lineId, `defect:${event.partRef}`);
this.events.emit('line.paused', {
lineId: event.lineId,
reason: 'critical_defect',
triggerEventId: event.eventId,
});
}
}
}// maintenance/src/maintenance.controller.ts
@Controller()
export class MaintenanceEventsController {
constructor(private scheduler: WorkOrderScheduler) {}
@EventPattern('defect.detected')
async onDefect(@Payload() event: DefectDetectedEvent) {
await this.scheduler.create({
lineId: event.lineId,
type: 'inspection',
priority: event.severity === 'critical' ? 'P1' : 'P3',
triggerEventId: event.eventId,
dueAt: new Date(Date.now() + (event.severity === 'critical' ? 3_600_000 : 86_400_000)),
});
}
@MessagePattern('maintenance.workorders')
list(@Payload() filter: { lineId?: string }) {
return this.scheduler.list(filter);
}
}// dashboard/src/dashboard.controller.ts
@Controller('dashboard')
export class DashboardController {
private subject = new Subject<MessageEvent>();
constructor(@Inject('EVENTS') private events: ClientProxy) {}
// ⚠️ Piège : empiler deux @EventPattern sur la même méthode NE marche PAS —
// Nest ne retient que le dernier décorateur. Il faut une méthode par pattern.
@EventPattern('defect.detected')
onDefect(@Payload() event: unknown, @Ctx() ctx: NatsContext) {
this.subject.next({ data: { topic: ctx.getSubject(), payload: event } });
}
@EventPattern('line.paused')
onLinePaused(@Payload() event: unknown, @Ctx() ctx: NatsContext) {
this.subject.next({ data: { topic: ctx.getSubject(), payload: event } });
}
@Sse('stream')
stream(): Observable<MessageEvent> {
return this.subject.asObservable();
}
}// shared/src/clients.module.ts
@Module({
imports: [
ClientsModule.registerAsync([{
name: 'EVENTS',
useFactory: () => ({
transport: Transport.NATS,
options: { servers: [process.env.NATS_URL!] },
}),
}]),
],
exports: [ClientsModule],
})
export class ClientsSharedModule {}L'@EventPattern garantit le fanout (chaque consumer reçoit l'event), le queue group NATS distribue la charge entre instances du même service, et l'eventId permet l'idempotence côté consumer (rejouer NATS lors d'un déploiement ne duplique pas les work orders). Les 3 services peuvent monter en version indépendamment tant qu'ils respectent le schéma Avro de defect.detected.
🤖 Servir et orchestrer des agents IA depuis NestJS
C'est ici que ton stack (Python + NestJS + Angular) rencontre les LLM. Un microservice NestJS qui sert un agent IA n'est pas un controller @Post qui fait new Anthropic() dans une méthode. C'est un système distribué avec ses propres défis : tokens en streaming, boucle agentique (tool-use) côté serveur, annulation propre (client disconnect → abort modèle + serveur), jobs longs et coûteux (BullMQ : idempotence par generationId, retry conscient du coût, sortie partielle), et une DI propre du client LLM (jamais new Anthropic() en dur).
Anthropic facts (à jour) : modèle phare
claude-opus-4-8,claude-sonnet-4-6(équilibré),claude-haiku-4-5(rapide/économe). Utilise le SDK officiel@anthropic-ai/sdk(jamais un fetch manuel), en streaming par défaut pour tout output long, avec les retries SDK (maxRetries, 429/5xx en backoff exponentiel). Pas debudget_tokensnitemperaturesur Opus 4.7/4.8 →thinking: { type: 'adaptive' }+output_config: { effort: 'high' }.
Mental model — où vit l'IA dans une archi microservices
HTTP/SSE (token stream) Queue (BullMQ / Redis)
Angular ◀────────────────────────────── NestJS edge (BFF) ──────▶ ai-worker (NestJS MS)
(EventSource / ▲ │ │
fetch reader) │ AbortController │ @MessagePattern │ boucle agentique
│ (Stop button) │ / SSE │ + tool-use
└──────── annule ──────────┘ ▼
Anthropic SDK (stream)Deux topologies, deux usages :
- Synchrone interactif (chat) : l'utilisateur attend les tokens. Edge NestJS ouvre un
streamAnthropic et relaie en SSE vers Angular. Pas de queue — la latence visible est le produit. - Asynchrone (génération longue, batch, agent autonome) : l'edge enqueue un job (
generationId), répond202 Accepted, et un worker exécute la boucle agentique. Le résultat (ou ses tokens) revient via SSE/WebSocket ou webhook.
1. Client LLM injecté via forRootAsync (jamais new Anthropic() en champ)
Un new Anthropic() en dur dans un service casse les tests (pas de mock), fuit la clé API dans le code, et empêche la config par environnement. On en fait un provider DI configurable de façon asynchrone.
// llm/llm.module.ts
import { Module, DynamicModule } from '@nestjs/common';
import Anthropic from '@anthropic-ai/sdk';
import { ConfigModule, ConfigService } from '@nestjs/config';
export const ANTHROPIC = Symbol('ANTHROPIC');
@Module({})
export class LlmModule {
static forRootAsync(): DynamicModule {
return {
module: LlmModule,
imports: [ConfigModule],
providers: [
{
provide: ANTHROPIC,
inject: [ConfigService],
useFactory: (config: ConfigService) =>
new Anthropic({
apiKey: config.getOrThrow<string>('ANTHROPIC_API_KEY'),
maxRetries: 4, // retries SDK : 429 + 5xx en backoff exponentiel
timeout: 60_000, // hard timeout par requête HTTP
}),
},
],
exports: [ANTHROPIC],
global: true,
};
}
}// usage : on injecte le client typé, testable, mockable
@Injectable()
export class CompletionService {
constructor(@Inject(ANTHROPIC) private readonly anthropic: Anthropic) {}
}En test : { provide: ANTHROPIC, useValue: mockAnthropic }. Zéro réseau, zéro clé.
2. Streaming de tokens en SSE (edge interactif)
L'edge ouvre un stream Anthropic et le transforme en Observable<MessageEvent> pour le décorateur @Sse. Point critique : propager l'AbortController pour que la déconnexion du client tue la requête modèle (sinon tu paies des tokens dans le vide).
// chat/chat.controller.ts
import { Controller, Post, Body, Req, Sse, MessageEvent } from '@nestjs/common';
import { Observable } from 'rxjs';
import type { Request } from 'express';
@Controller('chat')
export class ChatController {
constructor(private readonly chat: ChatService) {}
@Sse('stream')
stream(@Body() dto: ChatDto, @Req() req: Request): Observable<MessageEvent> {
const ac = new AbortController();
// Déconnexion HTTP du client → on abort le stream Anthropic (stop de facturation)
req.on('close', () => ac.abort());
return this.chat.streamTokens(dto, ac.signal);
}
}// chat/chat.service.ts
@Injectable()
export class ChatService {
constructor(@Inject(ANTHROPIC) private readonly anthropic: Anthropic) {}
streamTokens(dto: ChatDto, signal: AbortSignal): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
const stream = this.anthropic.messages.stream(
{
model: 'claude-opus-4-8',
max_tokens: 4096,
messages: dto.messages,
},
{ signal }, // le SDK propage l'abort jusqu'à la connexion HTTP sortante
);
stream.on('text', (delta) => subscriber.next({ data: { type: 'token', text: delta } }));
stream.on('error', (err) => subscriber.error(err));
stream.on('end', () => subscriber.complete());
// teardown RxJS : si l'Observable est unsubscribe (client parti), on abort
return () => stream.abort();
});
}
}Côté Angular, on consomme avec EventSource (ou fetch + getReader() si on a besoin de POST + headers), un buffer append-only coalescé par requestAnimationFrame sous zoneless, et un bouton Stop câblé sur un AbortController qui annule client ET serveur (l'abort HTTP déclenche le req.on('close') ci-dessus). Voir le fichier Angular sur les UIs d'agents pour le détail du rendu.
3. La boucle agentique (tool-use) côté serveur
Un agent = un LLM qui peut appeler tes fonctions (chercher en base, appeler un autre microservice, lire un fichier). La boucle : le modèle renvoie stop_reason: 'tool_use' → tu exécutes l'outil → tu renvoies le tool_result → tu reboucles, jusqu'à end_turn. C'est un point d'intégration microservices naturel : un outil peut être un client.send({ cmd: 'inventory.check' }, ...).
// agent/agent.service.ts
@Injectable()
export class AgentService {
constructor(
@Inject(ANTHROPIC) private readonly anthropic: Anthropic,
@Inject('INVENTORY') private readonly inventory: ClientProxy,
) {}
private readonly tools: Anthropic.Tool[] = [
{
name: 'check_stock',
description: 'Vérifie le stock disponible pour un SKU. Appelle ceci quand on demande la disponibilité.',
input_schema: {
type: 'object',
properties: { sku: { type: 'string' } },
required: ['sku'],
},
},
];
async run(prompt: string, signal: AbortSignal): Promise<string> {
const messages: Anthropic.MessageParam[] = [{ role: 'user', content: prompt }];
for (let step = 0; step < 8; step++) { // garde-fou : jamais de boucle infinie
const res = await this.anthropic.messages.create(
{ model: 'claude-opus-4-8', max_tokens: 4096, tools: this.tools, messages },
{ signal },
);
messages.push({ role: 'assistant', content: res.content });
if (res.stop_reason !== 'tool_use') {
return res.content.find((b) => b.type === 'text')?.text ?? '';
}
// Exécute chaque tool_use → renvoie les tool_result dans UN seul message user
const toolResults: Anthropic.ToolResultBlockParam[] = [];
for (const block of res.content) {
if (block.type !== 'tool_use') continue;
try {
const out = await this.dispatchTool(block.name, block.input, signal);
toolResults.push({ type: 'tool_result', tool_use_id: block.id, content: out });
} catch (err) {
// erreur outil : on le DIT au modèle, il s'adapte (≠ crash de la boucle)
toolResults.push({
type: 'tool_result', tool_use_id: block.id, is_error: true,
content: (err as Error).message,
});
}
}
messages.push({ role: 'user', content: toolResults });
}
throw new Error('Agent: profondeur max atteinte sans réponse finale');
}
private async dispatchTool(name: string, input: any, signal: AbortSignal): Promise<string> {
if (name === 'check_stock') {
const stock = await firstValueFrom(
this.inventory.send({ cmd: 'inventory.check' }, input).pipe(timeout(2000)),
);
return JSON.stringify(stock);
}
throw new Error(`Outil inconnu : ${name}`);
}
}Failure modes à connaître en entretien : boucle infinie (toujours un maxSteps), erreur d'outil non catchée (renvoie is_error: true, ne crash pas la boucle), pause_turn sur les outils server-side (re-send pour reprendre), abort en plein milieu (le signal doit traverser et messages.create ET les client.send d'outils).
4. Jobs IA dans BullMQ — idempotence, retry conscient du coût, sortie partielle
Pour les générations longues/coûteuses, on enqueue au lieu de bloquer. Les pièges sont exactement ceux des microservices (at-least-once, idempotence) mais le retry coûte de l'argent réel.
// edge : enqueue idempotent — la jobId EST la generationId
await this.queue.add(
'generate',
{ generationId, prompt },
{
jobId: generationId, // dédup : un même generationId = un seul job
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
},
);// ai-worker.processor.ts
@Processor('ai')
export class AiWorker extends WorkerHost {
constructor(
@Inject(ANTHROPIC) private readonly anthropic: Anthropic,
private readonly store: GenerationStore, // Redis/SQL : statut + sortie partielle
) { super(); }
async process(job: Job<{ generationId: string; prompt: string }>): Promise<void> {
const { generationId, prompt } = job.data;
// 1) IDEMPOTENCE : déjà terminé ? (rejeu BullMQ après un déploiement) → no-op
if (await this.store.isDone(generationId)) return;
// 2) Retry conscient du coût : on reprend la sortie partielle, on ne régénère pas tout
const partial = await this.store.getPartial(generationId);
const stream = this.anthropic.messages.stream({
model: 'claude-sonnet-4-6', // batch : on descend en gamme, coût/token plus bas
max_tokens: 8192,
messages: [
{ role: 'user', content: prompt },
...(partial ? [{ role: 'assistant' as const, content: partial }] : []),
],
});
stream.on('text', (delta) => this.store.appendPartial(generationId, delta)); // checkpoint
const final = await stream.finalMessage();
await this.store.markDone(generationId, final);
}
}Décisions de staff : la jobId = generationId (dédup native BullMQ, pas de table d'idempotence custom). attempts borné — un 400 (mauvais prompt) ne doit jamais retry (gaspillage), seul un 429/5xx le mérite ; câble le retry sur le type d'erreur SDK (Anthropic.RateLimitError / InternalServerError), pas sur attempts aveugle. La sortie partielle transforme un retry coûteux en reprise quasi-gratuite (et cache-read côté Anthropic). Un cost-guard au edge (tokens/jour par tenant) évite qu'un client en boucle te ruine.
5. Exposer un endpoint MCP / agent
Pour qu'un autre agent (Claude, un orchestrateur) consomme ton microservice comme un outil, expose un serveur MCP (Model Context Protocol) — typiquement un endpoint HTTP streamable. Ton microservice NestJS devient alors un outil découvrable : chaque @MessagePattern existant peut se mapper à un tool MCP (name, description, input_schema). Le contrat est le même problème que tes events Avro/Protobuf : versionne le schéma des tools, sinon un agent appelant casse silencieusement.
Edge concerns (résumé staff)
| Préoccupation | Au edge (BFF) | Dans le worker |
|---|---|---|
| Idempotence | jobId = generationId | store.isDone() avant tout appel |
| Rate-limit | par tenant (Redis token bucket) | maxRetries SDK sur 429 |
| Cost-guard | budget tokens/jour, refus 429 | modèle moins cher pour le batch |
| Annulation | req.on('close') → abort | signal propagé à create/stream |
| Observabilité | trace + generationId en baggage | tokens in/out + stop_reason loggés |
| Sortie partielle | SSE token-par-token | checkpoint append-only par delta |
🔁 Quand utiliser / éviter
Utilise :
- équipes multiples, bounded contexts clairs
- scaling indépendant (write-heavy vs read-heavy)
- isolation des pannes (un service down ≠ tout down)
- event sourcing, audit, integration multi-langages
Évite :
- 1 équipe, 1 produit, < 10 endpoints → monolithe modulaire
- besoin de transactions ACID multi-services → revois le découpage
- équipe sans observability stack (logs centralisés, tracing, metrics)
- prototype / MVP : la complexité opérationnelle te tuera
Choix de transport :
- TCP : démos, intra-cluster simple, jamais en prod sérieuse (pas de queue, pas de durabilité).
- Redis : pub/sub léger, idéal pour notifications volatiles.
- NATS : ultra-rapide, JetStream pour persistence. Bon défaut moderne.
- Kafka : event streaming, replay, gros volume. Overkill < 1000 msg/s.
- RabbitMQ : routing complexe, DLQ natives, mature. Excellent pour task queues.
- gRPC : RPC synchrone typé, polyglotte. Pas un broker.
- MQTT : IoT.
🧭 Décision rapide — quel transport ?
Besoin sync request-response strict, typé ? ──▶ gRPC
Besoin event streaming + replay ? ──▶ Kafka
Besoin routing complexe + DLQ natif ? ──▶ RabbitMQ
Besoin perf + simplicité + JetStream ? ──▶ NATS
Notifications volatiles, sans persistence ? ──▶ Redis Pub/Sub
Démo / intra-cluster simple ? ──▶ TCP
IoT (capteurs, faible bande passante) ? ──▶ MQTT🪤 Retries + idempotence : recette concrète
// 1. Producteur : ajoute un messageId stable
this.client.emit('order.created', {
messageId: order.id + ':created', // déterministe
payload: { ... },
});
// 2. Consumer : check + ack
@EventPattern('order.created')
async onCreated(@Payload() msg: { messageId: string; payload: any }) {
const seen = await this.idem.tryReserve(msg.messageId, 86400);
if (!seen) return; // déjà traité, on ack et on sort
try {
await this.handle(msg.payload);
} catch (err) {
await this.idem.release(msg.messageId); // re-essayable
throw err;
}
}Le store d'idempotency : Redis SET key 1 NX EX 86400 est suffisant pour 99 % des cas. Pour des garanties fortes (paiements), table SQL avec PK unique + transaction.
🏋️ Exercices
Progression : implémenter → rendre production-grade → casser puis réparer. Chaque exercice suppose un cluster local (Docker Compose : NATS ou RabbitMQ + Redis).
Exercice 1 — Request-response typé avec timeout et fallback
Objectif — Un service gateway appelle order.get sur un service orders via NATS, avec timeout 1s, 2 retries, et un fallback cache si le broker est lent.
Indice/Solution — this.client.send<Order, { id: string }>({ cmd: 'order.get' }, { id }).pipe(timeout(1000), retry({ count: 2, delay: 200 }), catchError(() => of(this.cache.getStale(id)))). Vérifie que sans le timeout, couper le service orders fait pendre la requête HTTP appelante indéfiniment.
Exercice 2 — Idempotence + outbox contre le dual-write
Objectif — Le service orders écrit en DB ET publie order.created. Garantir qu'aucun event n'est perdu si le process crash entre les deux, ET qu'un consumer qui reçoit l'event 2× ne crée pas 2 facturations.
Indice/Solution — Producteur : table outbox écrite dans la même transaction SQL que l'order ; un relay (cron ou CDC) lit l'outbox et publie. Consumer : messageId = order.id + ':created' déterministe + SET key 1 NX EX 86400 Redis avant traitement. Teste le crash : kill le process juste après le commit DB, avant le publish — l'event doit partir au prochain tour du relay.
Exercice 3 — DLQ et message poison
Objectif — Un consumer RabbitMQ qui plante 3× sur un message doit l'envoyer en Dead Letter Queue et continuer à traiter les suivants, sans boucler à 100% CPU.
Indice/Solution — x-dead-letter-exchange + x-delivery-limit (quorum queues) ou compteur x-death. Sur la 3e tentative : ctx.getChannelRef().nack(msg, false, false) (requeue=false → DLQ). Casse-le d'abord : avec requeue=true, observe la boucle infinie et le CPU qui sature. Puis répare avec la DLQ.
Exercice 4 — Streaming LLM en SSE avec annulation propre
Objectif — Endpoint @Sse qui stream les tokens d'claude-opus-4-8 ; quand le client ferme l'onglet, la requête Anthropic est réellement abortée (vérifiable : plus de tokens facturés après close).
Indice/Solution — AbortController créé dans le handler, req.on('close', () => ac.abort()), signal passé à anthropic.messages.stream(params, { signal }), et teardown RxJS return () => stream.abort(). Casse-le : oublie le req.on('close') et observe (logs stop_reason / usage) que le modèle continue à générer après la déconnexion. Répare.
Exercice 5 — Boucle agentique server-side avec outil = microservice
Objectif — Un agent expose un tool check_stock qui, en interne, fait un client.send({ cmd: 'inventory.check' }). La boucle doit gérer : erreur d'outil (→ is_error), profondeur max (→ throw borné), et abort en plein milieu.
Indice/Solution — Boucle for (step < 8), accumule les tool_result dans un seul message user, propage signal à messages.create ET aux client.send. Casse-le : fais throw l'outil sans catch → la boucle crash au lieu de laisser le modèle se rattraper. Répare avec is_error: true.
Exercice 6 (hard) — Job BullMQ IA idempotent avec reprise sur sortie partielle
Objectif — Worker qui génère un long document. Sur retry après crash, il reprend la sortie partielle au lieu de tout régénérer, ne retry jamais sur une erreur 400, et est dédupliqué par generationId.
Indice/Solution — jobId = generationId (dédup), store.isDone() en garde, store.appendPartial() sur chaque text delta (checkpoint), reprise via un message assistant contenant le partiel. Retry sélectif : dans le process, catch et throw uniquement si err instanceof Anthropic.RateLimitError || err instanceof Anthropic.InternalServerError ; sur BadRequestError, marque le job failed sans relance. Casse-le : laisse attempts: 3 retry aveuglément un prompt invalide → 3× le coût pour 3× le même 400. Répare avec le retry conditionnel par type d'erreur.
🎤 En entretien
Q : « exactly-once existe-t-il dans un système de messaging ? » Non, c'est un mythe au niveau transport. On obtient at-least-once (le broker peut redélivrer) et on simule l'effet exactly-once par idempotence côté consumer (messageId déterministe + store des IDs traités). « At-most-once » existe (fire-and-forget sans ack) mais perd des messages. Le combo réel : at-least-once + idempotence.
Q : « comment garantir qu'un event est publié si et seulement si la transaction DB commit ? » (dual-write) Le pattern Outbox : on écrit l'event dans une table outbox dans la même transaction que la donnée métier. Un relay (polling ou CDC type Debezium) lit l'outbox et publie ensuite. Le client.emit() direct après db.save() est cassé : si le publish échoue, l'event est perdu et la DB a déjà commit.
Q : « @MessagePattern vs @EventPattern ? quand un timeout est-il obligatoire ? »@MessagePattern = request-response (le client send() attend une réponse corrélée par correlationId/replyTo) ; @EventPattern = pub/sub fire-and-forget (emit(), plusieurs handlers, pas de réponse). Le timeout() est obligatoire sur tout send() : sans lui, un broker ou un consumer lent fait pendre la requête HTTP appelante, qui propage le blocage en cascade (thread/connexion épuisés).
Q : « comment annuler proprement une génération LLM en streaming quand le client se déconnecte ? » Un AbortController par requête, câblé sur req.on('close') côté serveur (déclenché par la fermeture HTTP), avec le signal propagé jusqu'à anthropic.messages.stream(..., { signal }). L'abort coupe la connexion sortante vers Anthropic → arrêt de la facturation des tokens. Côté Angular, le bouton Stop appelle son propre AbortController qui ferme le fetch/EventSource, ce qui déclenche le close serveur. Annulation des deux côtés : sans propagation serveur, le modèle continue à générer (et à coûter) dans le vide.
🔗 Liens
- Docs Nest : https://docs.nestjs.com/microservices/basics
- Kafka transport : https://docs.nestjs.com/microservices/kafka
- gRPC : https://docs.nestjs.com/microservices/grpc
- Outbox pattern — https://microservices.io/patterns/data/transactional-outbox.html
- Sam Newman — Building Microservices (2e éd.)
- Chris Richardson — Microservices Patterns
- NATS JetStream docs : https://docs.nats.io/nats-concepts/jetstream
- KafkaJS : https://kafka.js.org/