Queues avec BullMQ (@nestjs/bullmq)
TL;DR — Les jobs asynchrones décorrèlent les pics de charge et les tâches lentes (envoi email, traitement image, webhook, appel LLM).
@nestjs/bullmq(Redis Streams) remplace l'ancien@nestjs/bull(Redis Lists). Tu déclares des queues, des workers (processors), tu configures concurrence, retries, backoff, et tu rends tes processors idempotents car Redis garantit at-least-once. La récurrence passe par les Job Schedulers (upsertJobScheduler, qui remplacent les repeatable jobs dépréciés). Observability via Bull Board (à protéger derrière un guard). Distinction clé : queues = travail à exécuter, scheduler = quand l'exécuter.
🧠 Mental model — ASCII diagram + analogy
Analogie : un restaurant avec un comptoir de commandes (Producer), une file d'attente (Queue Redis), des cuisiniers (Workers), et un manager qui vérifie les plats ratés (DLQ + retries).
Producer ──add(job)──▶ ┌──────────────┐ ──fetch──▶ Worker (concurrency: 5)
│ Redis Queue │
Producer ──add(job)──▶ │ (BullMQ) │ ──fetch──▶ Worker
└──────────────┘
│
│ retries with backoff
▼
┌──────────────┐
│ Failed jobs │ ──manual replay──▶
│ (DLQ-like) │
└──────────────┘
Bull Board ◀────────── observe queues, jobs, latenciesUn job a un cycle de vie : waiting → active → completed | failed → (retry) | delayed. Les Job Schedulers (cron-like) ont en plus un planificateur interne qui réinjecte des copies du job template à chaque tick.
États précis (à connaître en entretien)
| État | Signification | Sortie possible |
|---|---|---|
waiting | en file, prêt à être pris | → active |
prioritized | en file mais classé par priority (ZSET séparé) | → active |
delayed | planifié pour plus tard (delay) ou backoff en cours | → waiting quand l'heure arrive |
active | un worker l'exécute (lock détenu) | → completed / failed |
waiting-children | bloqué tant que ses enfants (Flow) ne sont pas finis | → waiting |
completed | terminé OK (gardé selon removeOnComplete) | terminal |
failed | toutes les tentatives épuisées (gardé selon removeOnFail) | → waiting via retry() manuel |
Mental model du lock : un worker actif détient un lock (clé Redis avec TTL =
lockDuration, 30 s par défaut) qu'il renouvelle (renewLock) tant queprocess()tourne. Si le process crash sans libérer le lock, BullMQ détecte le stalled job après expiration et le réinjecte — c'est la source N°1 de double exécution silencieuse. Unprocess()qui bloque l'event loop (CPU sync) ne renouvelle pas le lock → le job est considéré stalled et relancé alors qu'il tourne encore. D'où l'obligation d'idempotence et de jobs non-bloquants.
🛠️ Code minimal — realistic working snippet
Setup
// app.module.ts
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST,
port: 6379,
},
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: { age: 3600, count: 1000 },
removeOnFail: { age: 86400 },
},
}),
BullModule.registerQueue({ name: 'emails' }),
],
})
export class AppModule {}Producer
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
@Injectable()
export class EmailsService {
constructor(@InjectQueue('emails') private q: Queue) {}
async sendWelcome(userId: string) {
await this.q.add(
'welcome',
{ userId },
{
jobId: `welcome:${userId}`, // déduplication
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
},
);
}
async scheduleDigest() {
// BullMQ v5.16+ : `upsertJobScheduler` remplace l'ancien `repeat` (déprécié).
// Idempotent par schedulerId : ré-appeler au boot ne crée PAS de doublon.
await this.q.upsertJobScheduler(
'daily-digest', // schedulerId stable (clé d'idempotence)
{ pattern: '0 7 * * *', tz: 'Europe/Paris' },
{ name: 'daily-digest', data: {} }, // template du job répété
);
}
}Processor (Worker)
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('emails', { concurrency: 5, limiter: { max: 100, duration: 60_000 } })
export class EmailsProcessor extends WorkerHost {
constructor(private mailer: MailerService, private idem: IdempotencyStore) {
super();
}
async process(job: Job): Promise<any> {
const key = `processed:${job.queueName}:${job.id}`;
if (await this.idem.has(key)) return; // déjà traité, no-op
switch (job.name) {
case 'welcome':
await this.mailer.sendWelcome(job.data.userId);
break;
case 'daily-digest':
await this.mailer.sendDigest();
break;
default:
throw new Error(`unknown job ${job.name}`);
}
await this.idem.set(key, true, 86400);
}
@OnWorkerEvent('failed')
onFailed(job: Job, err: Error) {
this.logger.error(`Job ${job.id} failed`, { attempt: job.attemptsMade, err });
}
}Bull Board (UI)
Deux options. Préfère le module Nest first-class (@bull-board/nestjs, GA) qui enregistre les queues via DI plutôt que de bricoler l'adapter Express à la main :
// app.module.ts — npm i @bull-board/nestjs @bull-board/api @bull-board/express
import { BullBoardModule } from '@bull-board/nestjs';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
@Module({
imports: [
BullBoardModule.forRoot({
route: '/admin/queues',
adapter: ExpressAdapter, // ou FastifyAdapter
}),
BullBoardModule.forFeature({
name: 'emails',
adapter: BullMQAdapter, // chaque queue enregistrée déclarativement
}),
],
})
export class AppModule {}⚠️ Sécurité prod :
/admin/queuesexpose le contenu des jobs (PII, tokens…) et permet de rejouer/supprimer des jobs. Mets-le derrière un guard d'auth (@UseGuards), un reverse-proxy avec basic-auth, ou un réseau interne — jamais public. C'est un panneau d'admin, pas un dashboard read-only.
Version manuelle (si tu n'utilises pas le module Nest), à brancher dans main.ts après app.init() :
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [new BullMQAdapter(emailsQueue)],
serverAdapter,
});
app.use('/admin/queues', authMiddleware, serverAdapter.getRouter());🎯 Patterns courants
- Idempotence via
jobId—jobId: 'welcome:userId'empêche d'enqueuer le même job 2x. Combiné à un store d'idempotency dans le processor, tu couvres les rejeux Redis. - Fan-out — un job "split" génère N jobs enfants via
Flow/addBulk. Permet parallélisation massive. Ex. envoyer une newsletter à 1M users = 1 job parent → 1000 jobs de 1000 users. - Rate limiting —
limiter: { max: 100, duration: 60_000 }sur le worker ou la queue. Indispensable pour respecter les rate limits d'APIs externes (Stripe, SendGrid). - Priorités —
priority: 1(haute) àNumber.MAX_SAFE_INTEGER(basse). À utiliser avec parcimonie (impact perf Redis), préfère des queues séparées pour des priorités très différentes. - Sandboxed processors — pour des jobs CPU-bound (image processing, PDF), exécute le processor dans un child process séparé pour ne pas bloquer l'event loop principal. BullMQ supporte cela via path-based processors.
- Graceful shutdown — sur SIGTERM, appelle
worker.close()qui attend les jobs en cours.OnModuleDestroydu Nest gère ça si bien configuré.
🔄 Versions — Nest 7 / 8 / 9 / 10 / 11
- Nest 7-8 :
@nestjs/bull(basé surbullv3/v4, Redis Lists). Décorateur@Process(), classes avec méthodes décorées. - Nest 9 : introduction de
@nestjs/bullmq. Coexistence des deux.bulllegacy considéré en maintenance. - Nest 10 :
@nestjs/bullmqrecommandé.WorkerHost+@Processorstyle.bullmqv3/v4. - Nest 11 :
@nestjs/bullmqv11,bullmqv5. API stable. Job Schedulers (upsertJobScheduler) remplacent les repeatable jobs (dépréciés depuis bullmq 5.16). Propagation de contexte (request-id) possible via AsyncLocalStorage.
Migration bull → bullmq (clé) :
- Redis Streams au lieu de Lists → meilleure perf, ordering strict, moins de polling.
- API entièrement Promise-based, plus de callbacks.
Queue.add(jobName, data, opts)au lieu deQueue.add(data, opts)(le nom devient obligatoire).eventsséparés en classeQueueEvents(un worker ne reçoit que ses propres events ; les events globaux —completed,failedcross-worker — passent parQueueEvents).- Repeatable jobs : la clé interne change → ne pas mélanger les deux libs sur la même queue.
Migration repeatable → Job Scheduler (bullmq 5.16+) :
queue.add(name, data, { repeat })→queue.upsertJobScheduler(schedulerId, repeatOpts, { name, data }).- Inspection/purge :
getJobSchedulers(),removeJobScheduler(schedulerId)(remplacentgetRepeatableJobs/removeRepeatable). - L'option
immediatelyest dépréciée (5.19+) : la première exécution d'un nouveau scheduler est toujours immédiate, puis suit l'intervalle.
⚠️ Pitfalls
- Processor non idempotent — at-least-once = ton job WILL run twice un jour. Si tu charges la carte Stripe sans idempotency key, le client paye 2x.
- Pas de
removeOnComplete— Redis sature. Tes 10M jobs complétés mangent 50 GB. ToujoursremoveOnComplete: { age, count }. - Concurrency trop élevée — 100 workers concurrent qui hit la même DB = connection pool exhausted. Adapte à la capacité downstream.
- Backoff fixed sur erreur transitoire — un service externe down 5 min, ton job retry chaque 30 s = 600 tentatives ratées. Utilise
exponentialavec un max raisonnable (attempts: 5). - Repeatable jobs dupliqués — l'ancien
queue.add(..., { repeat })cumulait une planification de plus à chaque boot si la clé interne changeait (le hash durepeatinclut l'heure de prochaine exécution).upsertJobScheduler(schedulerId, repeat, template)corrige ça par design : leschedulerIdest la clé d'idempotence, ré-appeler ne fait qu'upsert. Si tu es encore sur l'ancienne API, set unjobIdstable ou nettoie viaremoveRepeatableau boot. Migration :getJobSchedulers()/removeJobScheduler(id)pour inspecter/purger. - Pas d'observabilité — sans Bull Board ou metrics Prometheus (
bullmq-otel), tu découvres une queue de 500k jobs en retard quand le client appelle. Monitoring obligatoire. - Faire du long-polling DB dans le processor — bloque le worker. Toujours async/await proprement, et timeout les appels externes.
- Confondre
delayet un scheduler —delay: 60000= un job unique à exécuter dans 1 min.upsertJobScheduler(id, { every: 60000 }, ...)= un job récurrent toutes les minutes.every(intervalle fixe en ms) ne se confond pas avecpattern(expression cron, sensible au fuseau viatz).
🧪 Testing
Unit du processor — instancie directement, appelle process(job) :
const proc = new EmailsProcessor(mailer, idemMock);
await proc.process({
id: '1', name: 'welcome', data: { userId: 'u1' }, queueName: 'emails',
} as any);
expect(mailer.sendWelcome).toHaveBeenCalledWith('u1');Integration — utilise un Redis réel (Testcontainers ou redis-memory-server) :
beforeAll(async () => {
app = await Test.createTestingModule({
imports: [
BullModule.forRoot({ connection: { host: 'localhost', port: redisPort } }),
BullModule.registerQueue({ name: 'emails' }),
],
providers: [EmailsService, EmailsProcessor],
}).compile().then((m) => m.createNestApplication());
await app.init();
});
it('processes a job end to end', async () => {
const svc = app.get(EmailsService);
await svc.sendWelcome('u1');
await new Promise((r) => setTimeout(r, 500)); // wait for worker
// assert side-effect (mailer called, DB row, etc.)
});Mock du Queue — pour les services qui dispatchent, mock getQueueToken('emails') :
const queueMock = { add: jest.fn() };
const moduleRef = await Test.createTestingModule({
providers: [EmailsService, { provide: getQueueToken('emails'), useValue: queueMock }],
}).compile();🎬 Cas d'usage concrets
FinTech — Ingestion factures nocturne
Qui — Cabinet comptable en ligne français (40 K clients) qui ingère les factures fournisseurs envoyées dans la journée et les classe via OCR + NLP. Problème — 300 K factures arrivent entre 18h et 23h, l'OCR coûte 800 ms par doc. Traiter en synchrone explose le tunnel d'upload. Il faut traiter pendant la nuit, prioriser les clients premium et limiter le débit vers le fournisseur OCR (1000 req/min). Comment — Queue invoice-ingest avec priorité, rate limiter BullMQ, retry exponentiel et DLQ.
@Processor('invoice-ingest', { concurrency: 10, limiter: { max: 1000, duration: 60_000 } })
export class InvoiceIngestProcessor extends WorkerHost {
constructor(private ocr: OcrClient, private nlp: NlpClient, private repo: InvoiceRepo) { super(); }
async process(job: Job<{ invoiceId: string; tenantId: string }>) {
const text = await this.ocr.extract(job.data.invoiceId);
const parsed = await this.nlp.classify(text);
await this.repo.markIngested(job.data.invoiceId, parsed);
return { lines: parsed.lines.length };
}
}
await queue.add('ingest', { invoiceId, tenantId }, {
priority: tier === 'premium' ? 1 : 10,
attempts: 5,
backoff: { type: 'exponential', delay: 30_000 },
removeOnComplete: { age: 86_400, count: 10_000 },
removeOnFail: false, // keep failures for inspection
});Gains — Traitement nocturne terminé avant 7h, respect du quota OCR, premium ingéré en moins d'une heure même en pic.
LegalTech — Contrats batch
Qui — Plateforme de signature électronique française qui doit générer 50 K bordereaux PDF récapitulatifs en fin de mois pour ses clients corporate. Problème — La génération PDF (Puppeteer) consomme 1 Go de RAM par worker. Faire ça en API HTTP fait sauter l'instance. Le pic est concentré sur 4 heures. Comment — Queue avec workers dédiés sur machines mémoire, FlowProducer pour orchestrer "génère PDF" → "signe via HSM" → "envoie email".
const flow = new FlowProducer({ connection: redis });
await flow.add({
name: 'finalize',
queueName: 'pdf-flow',
data: { contractId, recipientEmail },
children: [
{ name: 'render', queueName: 'pdf-render', data: { contractId } },
{ name: 'sign', queueName: 'pdf-sign', data: { contractId } },
],
});
@Processor('pdf-render', { concurrency: 2 }) // memory-bound
export class RenderProcessor extends WorkerHost {
async process(job: Job<{ contractId: string }>) {
const html = await this.tpl.render(job.data.contractId);
return this.pdf.toPdf(html, { format: 'A4' });
}
}Gains — Workers scalent indépendamment (10 pour render mémoire-bound, 50 pour email IO-bound), reprise sur crash sans regénération depuis le début.
E-commerce — Refresh stock multi-entrepôts
Qui — Retailer omnicanal français qui synchronise le stock entre 80 magasins, 4 entrepôts et 6 marketplaces partenaires. Problème — Toutes les 5 min, on doit pousser les niveaux de stock vers Amazon, Cdiscount, FNAC, etc. Chacun a son rate-limit. Une erreur sur un endpoint ne doit pas bloquer les autres. Comment — Queue par marketplace + repeatable job toutes les 5 min, idempotency via clé sku + warehouse + version.
@Injectable()
export class StockSyncScheduler implements OnModuleInit {
constructor(@InjectQueue('amazon-sync') private q: Queue) {}
async onModuleInit() {
// upsertJobScheduler est idempotent : un seul scheduler malgré N boots / N replicas.
await this.q.upsertJobScheduler(
'amazon-sync-cron', // schedulerId stable
{ pattern: '*/5 * * * *' },
{ name: 'sync-batch', data: {} },
);
}
}
@Processor('amazon-sync', { concurrency: 4, limiter: { max: 200, duration: 60_000 } })
export class AmazonSyncProcessor extends WorkerHost {
async process(_job: Job) {
const updates = await this.stock.diffSinceLastSync('amazon');
for (const chunk of chunkArray(updates, 50)) {
await this.amazon.bulkUpdate(chunk);
}
return { synced: updates.length };
}
}Gains — Pannes marketplace isolées (une queue à l'arrêt n'impacte pas les autres), pas de duplication grâce à jobId, observabilité via Bull Board.
🛠️ Exemple end-to-end
Contexte — Le cabinet comptable ci-dessus déploie le pipeline complet : un upload utilisateur déclenche une job d'ingestion, qui chaîne OCR + NLP + validation comptable + indexation Elasticsearch. Chaque étape a sa queue dédiée pour permettre un scaling indépendant, et un dashboard remonte la progression.
// src/invoice/invoice.queues.ts
@Module({
imports: [
BullModule.forRootAsync({
useFactory: (cfg: ConfigService) => ({
connection: { url: cfg.get('REDIS_URL') },
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 30_000 },
removeOnComplete: { age: 86_400, count: 10_000 },
},
}),
inject: [ConfigService],
}),
BullModule.registerQueue(
{ name: 'invoice-ingest' },
{ name: 'invoice-ocr' },
{ name: 'invoice-nlp' },
{ name: 'invoice-validate' },
{ name: 'invoice-index' },
{ name: 'invoice-dlq' },
),
],
exports: [BullModule],
})
export class InvoiceQueuesModule {}// src/invoice/invoice.orchestrator.ts
@Injectable()
export class InvoiceOrchestrator {
constructor(private flow: FlowProducer) {}
async enqueue(invoiceId: string, tenantId: string, priority: 'premium' | 'standard') {
return this.flow.add({
name: 'index',
queueName: 'invoice-index',
data: { invoiceId, tenantId },
opts: { priority: priority === 'premium' ? 1 : 10 },
children: [{
name: 'validate',
queueName: 'invoice-validate',
data: { invoiceId, tenantId },
children: [{
name: 'nlp',
queueName: 'invoice-nlp',
data: { invoiceId, tenantId },
children: [{
name: 'ocr',
queueName: 'invoice-ocr',
data: { invoiceId, tenantId },
}],
}],
}],
});
}
}// src/invoice/processors/ocr.processor.ts
@Processor('invoice-ocr', {
concurrency: 10,
limiter: { max: 1000, duration: 60_000 }, // OCR vendor quota
})
export class OcrProcessor extends WorkerHost {
private readonly logger = new Logger(OcrProcessor.name);
constructor(private ocr: OcrClient, private repo: InvoiceRepo) { super(); }
async process(job: Job<{ invoiceId: string }>) {
this.logger.log(`OCR ${job.data.invoiceId} (attempt ${job.attemptsMade + 1})`);
const pdf = await this.repo.downloadPdf(job.data.invoiceId);
const text = await this.ocr.extract(pdf);
await this.repo.update(job.data.invoiceId, { ocrText: text, ocrAt: new Date() });
return { textLength: text.length };
}
}
// src/invoice/processors/nlp.processor.ts
@Processor('invoice-nlp', { concurrency: 20 })
export class NlpProcessor extends WorkerHost {
constructor(private nlp: NlpClient, private repo: InvoiceRepo) { super(); }
async process(job: Job<{ invoiceId: string }>) {
const invoice = await this.repo.findById(job.data.invoiceId);
const classification = await this.nlp.classify(invoice.ocrText);
await this.repo.update(job.data.invoiceId, {
supplierId: classification.supplierId,
vatRate: classification.vatRate,
lines: classification.lines,
categoryCode: classification.categoryCode,
});
return { confidence: classification.confidence };
}
}
// src/invoice/processors/validate.processor.ts
@Processor('invoice-validate', { concurrency: 30 })
export class ValidateProcessor extends WorkerHost {
constructor(private repo: InvoiceRepo, private rules: AccountingRulesEngine) { super(); }
async process(job: Job<{ invoiceId: string }>) {
const invoice = await this.repo.findById(job.data.invoiceId);
const report = this.rules.validate(invoice);
if (!report.valid) {
await this.repo.update(job.data.invoiceId, { status: 'needs_review', issues: report.issues });
throw new UnrecoverableError(`Invalid invoice: ${report.issues.join(', ')}`);
}
await this.repo.update(job.data.invoiceId, { status: 'validated' });
return { rulesApplied: report.appliedRules.length };
}
}
// src/invoice/processors/index.processor.ts
@Processor('invoice-index', { concurrency: 15 })
export class IndexProcessor extends WorkerHost {
constructor(private es: ElasticsearchService, private repo: InvoiceRepo) { super(); }
async process(job: Job<{ invoiceId: string; tenantId: string }>) {
const invoice = await this.repo.findById(job.data.invoiceId);
await this.es.index({
index: `invoices-${job.data.tenantId}`,
id: invoice.id,
document: {
supplierId: invoice.supplierId,
total: invoice.total,
date: invoice.date,
categoryCode: invoice.categoryCode,
text: invoice.ocrText,
},
});
await this.repo.update(invoice.id, { status: 'indexed', indexedAt: new Date() });
}
}// src/invoice/processors/dlq.listener.ts
// ⚠️ @QueueEventsListener lie UNE classe à UNE seule queue (la metadata est écrasée
// si tu empiles plusieurs décorateurs). Pour écouter N queues, fabrique N listeners —
// ici une factory partage la même logique de routage DLQ.
function makeDlqListener(stage: string) {
@QueueEventsListener(stage)
class DlqListener {
constructor(
@InjectQueue('invoice-dlq') private dlq: Queue,
private alerts: AlertService,
) {}
@OnQueueEvent('failed')
async onFailed(ev: { jobId: string; failedReason: string; prev: string }) {
await this.dlq.add('dead', {
originJobId: ev.jobId,
reason: ev.failedReason,
stage, // la queue d'origine (OCR / NLP / validate)
at: new Date().toISOString(),
});
await this.alerts.notify('ops', `Job ${ev.jobId} dead in ${stage}: ${ev.failedReason}`);
}
}
return DlqListener;
}
// providers: ['invoice-ocr', 'invoice-nlp', 'invoice-validate'].map(makeDlqListener)
export const DlqOcrListener = makeDlqListener('invoice-ocr');
export const DlqNlpListener = makeDlqListener('invoice-nlp');
export const DlqValidateListener = makeDlqListener('invoice-validate');Pourquoi pas trois décorateurs empilés ?
@QueueEventsListener('q')pose une metadataqueueNameunique sur la classe ; empiler@QueueEventsListener('a') @QueueEventsListener('b')ne garde que la dernière valeur — les events des autres queues sont silencieusement ignorés (les jobs morts d'OCR/NLP ne tomberaient jamais en DLQ). Une classe = une queue ; pour N queues, N classes (factory ci-dessus) ou unQueueEventsinstancié manuellement par queue.
// src/invoice/invoice.controller.ts
@Controller('invoices')
export class InvoiceController {
constructor(
private uploader: InvoiceUploader,
private orchestrator: InvoiceOrchestrator,
) {}
@Post()
@UseInterceptors(FileInterceptor('file'))
async upload(
@UploadedFile() file: Express.Multer.File,
@CurrentUser() user: AuthUser,
) {
const invoiceId = await this.uploader.store(file, user.tenantId);
await this.orchestrator.enqueue(invoiceId, user.tenantId, user.tier);
return { invoiceId, status: 'queued' };
}
}Le FlowProducer garantit l'ordre OCR → NLP → validate → index sans coupler les processors entre eux, chaque queue scale indépendamment (OCR limité au quota fournisseur, NLP CPU-bound, validate et index IO-bound), UnrecoverableError empêche le retry sur les invalidations métier (la facture est définitivement non valide), et le DlqListener route les jobs morts vers une queue d'inspection plutôt que de les perdre.
🔁 Quand utiliser / éviter
Utilise une queue :
- tâches > 100 ms qui peuvent attendre (email, génération de rapport, webhook sortant)
- pics de charge à lisser (Black Friday, campagne marketing)
- besoin de retry, de DLQ, d'audit
- découplage entre services
Évite :
- opérations < 50 ms et critiques temps-réel (auth, paiement synchrone) → garde inline
- besoin de transactions ACID avec la même DB → reste dans la même transaction
- volume très faible (< 10/jour) → un simple
setTimeoutou cron suffit
Queue vs Scheduling vs Event Bus :
- Queue = travail à exécuter, FIFO + retries, 1 consumer par job.
- Scheduling (
@nestjs/schedule) = quand exécuter (cron, interval). Souvent combiné : un cron ajoute des jobs à une queue. - Event bus (Nest CQRS, Kafka, NATS) = notification 1-to-N, pas de garantie de traitement par tous.
BullMQ vs alternatives :
- BullMQ : excellent défaut Node.js + Redis, écosystème mature, Bull Board.
- RabbitMQ +
@nestjs/microservices: si tu as déjà RabbitMQ pour autre chose. - AWS SQS : managé, parfait sur AWS, intégration via
@ssut/nestjs-sqs. - Temporal / Inngest : workflows complexes, état durable, retries longs, hors scope BullMQ.
🤖 Servir des jobs IA depuis NestJS (Anthropic / agents)
C'est le cas d'usage qui justifie une queue dans une app IA : un appel LLM coûte 2–60 s, consomme un budget tokens, peut échouer (rate limit, 529 overloaded), et tu veux le découpler du cycle requête HTTP. Mais l'IA casse les hypothèses naïves de BullMQ — un retour ici est rempli de pièges seniors.
1. Client LLM injecté via DI (forRootAsync), jamais new Anthropic() dans un champ
Instancier le SDK dans un champ du processor casse les tests (impossible à mocker), duplique la config, et empêche le partage du pool de connexions. Fournis-le comme provider :
// anthropic.module.ts
import { Module } from '@nestjs/common';
import Anthropic from '@anthropic-ai/sdk';
export const ANTHROPIC = Symbol('ANTHROPIC');
@Module({
providers: [
{
provide: ANTHROPIC,
useFactory: (cfg: ConfigService) =>
new Anthropic({
apiKey: cfg.getOrThrow('ANTHROPIC_API_KEY'),
maxRetries: 4, // le SDK retry déjà 429/5xx/529 en backoff exponentiel
timeout: 120_000, // un appel long ne doit pas pendre indéfiniment
}),
inject: [ConfigService],
},
],
exports: [ANTHROPIC],
})
export class AnthropicModule {}Modèles (juin 2026) : flagship
claude-opus-4-8(long-horizon agentique, raisonnement le plus capable), équilibréclaude-sonnet-4-6(le meilleur rapport vitesse/intelligence, défaut pour la plupart des jobs LLM en queue), rapide/écoclaude-haiku-4-5(classification, extraction, routage). Utilise les IDs exacts sans suffixe de date (claude-sonnet-4-6, jamaisclaude-sonnet-4-6-20251114). Le SDK gère déjà les retries 429/5xx/529 — ne double pas avec le backoff BullMQ sur ces erreurs (voir §4).Comment un staff choisit le modèle par job : ne hardcode pas un modèle unique. Un job de génération longue (rapport, refacto agentique) part sur
claude-opus-4-8; un résumé/classification de masse part surclaude-haiku-4-5(10× moins cher, et la queue lisse le débit). Le modèle est une donnée du job (job.data.model), pas une constante du processor — ça permet de router par tier client, de A/B tester, et d'ajuster le coût sans redéployer. Pense aussi àthinking: { type: 'adaptive' }+output_config: { effort: 'high' }sur Opus 4.8/Sonnet 4.6 pour les jobs qui demandent du raisonnement (l'ancienbudget_tokensrenvoie un 400 sur ces modèles).
2. Idempotence keyée sur un generationId, pas sur le job.id
At-least-once + appel LLM = double génération = double facturation tokens. La clé d'idempotence doit être métier (l'output qu'on veut produire), pas l'ID technique du job (qui change au retry si tu re-enqueues) :
@Processor('llm-jobs', { concurrency: 8, limiter: { max: 50, duration: 60_000 } })
export class LlmProcessor extends WorkerHost {
private readonly logger = new Logger(LlmProcessor.name);
constructor(
@Inject(ANTHROPIC) private llm: Anthropic,
private store: GenerationStore, // persiste l'état par generationId
) { super(); }
async process(job: Job<{ generationId: string; prompt: string }>) {
const { generationId, prompt } = job.data;
// 1. Sortie déjà produite ? no-op (rejeu Redis ou retry après succès partiel)
const existing = await this.store.getCompleted(generationId);
if (existing) return { reused: true, generationId };
// 2. Appel LLM (streaming pour éviter les timeouts sur long output)
const stream = this.llm.messages.stream({
model: 'claude-sonnet-4-6',
max_tokens: 4096,
messages: [{ role: 'user', content: prompt }],
});
// 3. Persiste l'output partiel au fil de l'eau (clé du §5)
let text = '';
for await (const ev of stream) {
if (ev.type === 'content_block_delta' && ev.delta.type === 'text_delta') {
text += ev.delta.text;
await this.store.appendPartial(generationId, ev.delta.text);
}
}
const final = await stream.finalMessage();
await this.store.markCompleted(generationId, {
text,
usage: final.usage, // input/output tokens → pour le cost guard
});
return { generationId, outputTokens: final.usage.output_tokens };
}
}3. Cost guard et rate-limit au bord (avant d'enqueuer)
Le pire anti-pattern : enqueuer 100 000 jobs LLM puis découvrir la facture. Mets le garde-fou à l'entrée, pas dans le worker :
@Injectable()
export class LlmDispatcher {
constructor(
@InjectQueue('llm-jobs') private q: Queue,
private budget: CostGuard, // compteur tokens/€ par tenant, fenêtre glissante
) {}
async enqueue(tenantId: string, generationId: string, prompt: string) {
if (await this.budget.wouldExceed(tenantId, estimateTokens(prompt))) {
throw new TooManyRequestsException('Budget IA du tenant épuisé');
}
await this.q.add('generate', { generationId, prompt }, {
jobId: `gen:${generationId}`, // dédup à l'enqueue
attempts: 3,
backoff: { type: 'exponential', delay: 10_000 },
removeOnComplete: { age: 3600, count: 1000 },
removeOnFail: false, // garder pour audit/replay
});
}
}Le limiter sur le worker ({ max: 50, duration: 60_000 }) protège ton quota API Anthropic (RPM/TPM) ; le cost guard à l'enqueue protège ton budget € par tenant. Les deux sont nécessaires — ils gardent des ressources différentes.
4. Retry cost-aware : ne paie pas deux fois une erreur permanente
Une 400 (prompt invalide), une stop_reason: "refusal", ou un dépassement de budget ne doivent jamais retry — chaque tentative re-paie l'input. Distingue transitoire (retry) de permanent (UnrecoverableError) :
import { UnrecoverableError } from 'bullmq';
try {
// ... appel LLM
} catch (err) {
if (err instanceof Anthropic.RateLimitError || err instanceof Anthropic.InternalServerError) {
throw err; // transitoire → BullMQ retry en backoff
}
if (err instanceof Anthropic.BadRequestError) {
throw new UnrecoverableError(`Prompt invalide: ${err.message}`); // permanent → DLQ direct
}
throw err;
}⚠️ Le SDK Anthropic retry déjà 429/5xx/529 (
maxRetries). Si tu laissesattempts: 5côté BullMQ etmaxRetries: 4côté SDK, une 529 prolongée déclenche 4×5 = 20 appels. Choisis : soit le SDK gère les transitoires courts (recommandé) et BullMQ gère le crash worker / timeout, soit l'inverse. Ne combine pas les deux backoffs aveuglément.
5. Disconnect client → annuler le job (AbortController de bout en bout)
Si l'utilisateur ferme l'onglet pendant une génération, continuer à brûler des tokens est un gaspillage pur. Propage l'annulation : disconnect HTTP → annulation du stream Anthropic (AbortController), et côté worker, vérifie périodiquement un flag d'annulation persisté.
// Côté SSE (controller) : annule le stream si le client se déconnecte
@Sse('generations/:id/stream')
stream(@Param('id') id: string, @Req() req: Request): Observable<MessageEvent> {
const ac = new AbortController();
req.on('close', () => ac.signal.aborted || ac.abort());
// ... passe ac.signal au client LLM si la génération est synchrone ;
// si elle est en queue, écris un flag d'annulation que le worker lit (voir ci-dessous)
}// Côté worker : vérifier l'annulation au début et entre les chunks
async process(job: Job) {
if (await this.store.isCancelled(job.data.generationId)) return { cancelled: true };
const ac = new AbortController();
const stream = this.llm.messages.stream(
{ model: 'claude-sonnet-4-6', max_tokens: 4096, messages: [...] },
{ signal: ac.signal },
);
for await (const ev of stream) {
if (await this.store.isCancelled(job.data.generationId)) { ac.abort(); break; }
// ... append partial
}
}6. Streamer les tokens au client : worker → Redis Pub/Sub → SSE
Le worker ne tient pas la connexion HTTP du client (autre process, souvent autre machine). Pattern : le worker publie les deltas sur un canal Redis Pub/Sub par generationId ; le controller SSE s'y abonne et relaie au navigateur. Le store appendPartial du §2 sert de buffer de rattrapage : si le client se (re)connecte en cours de route, on rejoue l'output déjà produit avant de basculer sur le live — sinon une reconnexion réseau perd tous les tokens émis pendant la coupure (Redis Pub/Sub est fire-and-forget, aucun replay).
// Côté worker : publier chaque delta sur un canal dédié
async process(job: Job<{ generationId: string; prompt: string; model?: string }>) {
const { generationId, prompt } = job.data;
const pub = this.redis.duplicate(); // un client Pub/Sub dédié (pas le client BullMQ)
const channel = `gen:${generationId}`;
const stream = this.llm.messages.stream({
model: job.data.model ?? 'claude-sonnet-4-6',
max_tokens: 4096,
messages: [{ role: 'user', content: prompt }],
});
try {
for await (const ev of stream) {
if (ev.type === 'content_block_delta' && ev.delta.type === 'text_delta') {
await this.store.appendPartial(generationId, ev.delta.text); // buffer de rattrapage
await pub.publish(channel, JSON.stringify({ t: 'delta', text: ev.delta.text }));
}
}
const final = await stream.finalMessage();
await this.store.markCompleted(generationId, { usage: final.usage });
await pub.publish(channel, JSON.stringify({ t: 'done', usage: final.usage }));
} finally {
await pub.quit();
}
}// Côté controller SSE : rejouer le buffer, puis basculer sur le live
@Sse('generations/:id/stream')
async stream(@Param('id') id: string): Promise<Observable<MessageEvent>> {
const sub = this.redis.duplicate();
await sub.subscribe(`gen:${id}`);
return new Observable<MessageEvent>((subscriber) => {
let live = false;
const queued: string[] = [];
// 1. Live d'abord (mais on bufferise tant que le rattrapage n'est pas fini → ordre garanti)
sub.on('message', (_ch, raw) => {
if (!live) { queued.push(raw); return; }
subscriber.next({ data: raw } as MessageEvent);
});
// 2. Rejouer ce qui a déjà été produit, PUIS vider la file live bufferisée
this.store.getPartial(id).then((partial) => {
if (partial) subscriber.next({ data: JSON.stringify({ t: 'replay', text: partial }) } as MessageEvent);
for (const raw of queued) subscriber.next({ data: raw } as MessageEvent);
queued.length = 0;
live = true;
});
// 3. Cleanup au disconnect (sinon fuite de connexions Redis sous charge)
return () => { sub.unsubscribe(`gen:${id}`).then(() => sub.quit()); };
});
}Pièges seniors de ce pattern :
- Ordre live/replay : si tu envoies le live avant d'avoir fini de rejouer le buffer, le client reçoit les tokens dans le désordre. D'où le
queued[]qui retient le live le temps du rattrapage. Alternative plus robuste à grande échelle : un Redis Stream (XADD/XREADavec unlastId) qui donne le replay nativement et remplace le couple Pub/Sub + buffer.- Un client Redis par connexion SSE :
subscribemet le client en mode abonnement (il ne peut plus faire de commandes normales). Utilise.duplicate()et ferme-le au disconnect, sinon tu épuises le pool de connexions Redis (un onglet ouvert = une connexion bloquée).- Backpressure / heartbeat : un client lent ne doit pas faire gonfler la mémoire du process Node. Garde des deltas petits, et envoie un commentaire SSE keep-alive (
:\n\n) toutes les ~15 s pour que les proxies (nginx, ALB) ne coupent pas la connexion idle.- Multi-réplicas : Pub/Sub diffuse à tous les abonnés sur tous les pods — c'est exactement ce qu'on veut ici (le client peut être connecté à un pod différent de celui qui a lancé le job). Mais ça veut dire que chaque pod reçoit chaque message ; à très grand volume, partitionne par canal (déjà le cas : un canal par
generationId).
7. Le loop agentique server-side modélisé en queue
Pour un loop agentique (Claude demande un tool, tu l'exécutes, tu renvoies le résultat, Claude continue), chaque tour est un appel LLM → un job. Au lieu de tenir le loop dans une seule requête HTTP de plusieurs minutes (qui timeout, ne survit pas à un redéploiement, et brûle un worker entier en attente I/O), modélise-le comme une queue où chaque job ré-enqueue le tour suivant tant que stop_reason === 'tool_use'.
@Processor('agent-loop', { concurrency: 6 })
export class AgentLoopProcessor extends WorkerHost {
constructor(
@Inject(ANTHROPIC) private llm: Anthropic,
@InjectQueue('agent-loop') private q: Queue,
private tools: ToolRegistry, // tes tools exécutables côté serveur
private store: AgentRunStore, // historique des messages persisté par runId
) { super(); }
async process(job: Job<{ runId: string; turn: number }>) {
const { runId, turn } = job.data;
if (turn > 25) throw new UnrecoverableError(`runaway agent: ${runId}`); // garde-fou anti-boucle
const messages = await this.store.getMessages(runId); // contexte reconstruit depuis Redis/DB
const res = await this.llm.messages.create({
model: 'claude-opus-4-8',
max_tokens: 4096,
tools: this.tools.schemas(),
messages,
});
await this.store.appendAssistant(runId, res.content); // persiste AVANT d'exécuter (idempotence)
if (res.stop_reason !== 'tool_use') {
await this.store.markDone(runId, res);
return { done: true, turn };
}
// Exécute les tool_use demandés, persiste les tool_result
const toolResults = [];
for (const block of res.content) {
if (block.type !== 'tool_use') continue;
const out = await this.tools.run(block.name, block.input); // tes tools : DB, API interne…
toolResults.push({ type: 'tool_result', tool_use_id: block.id, content: out });
}
await this.store.appendUser(runId, toolResults);
// Ré-enqueue le tour suivant — jobId déterministe = idempotence sur le rejeu Redis
await this.q.add('step', { runId, turn: turn + 1 }, { jobId: `${runId}:${turn + 1}` });
return { done: false, turn };
}
}Ce que la queue te donne gratuitement, et qu'un loop in-request n'a pas :
- Retry par tour : si le tour 7 échoue (rate limit, tool down), seul le tour 7 retry — les 6 premiers sont déjà persistés, pas re-payés.
- Reprise sur crash : un redéploiement mid-run réinjecte le job courant ; le contexte est reconstruit depuis le store (
getMessages), pas perdu. - Observabilité du trace : chaque tour est un job inspectable dans Bull Board ; le
turnet lestop_reasonsont visibles. - Cost guard cumulé : tu peux sommer les
usageparrunIdet avorter le run (annuler les jobs suivants) quand le budget tenant est dépassé — impossible à faire proprement dans une boucle synchrone. - Annulation : même mécanisme qu'au §5 — le worker vérifie un flag
isCancelled(runId)au début de chaque tour ; un Stop client n'a pas à attendre la fin du run.
Flow vs ré-enqueue : un
FlowProducer(parent « run » + enfants « tool call ») est tentant, mais le nombre de tours est dynamique (on ne sait pas à l'avance combien d'allers-retours Claude fera) — or un Flow a son arbre fixé à la création. Le ré-enqueue (« chaque job crée le suivant ») est le bon modèle pour un loop de longueur inconnue. Garde le Flow pour les pipelines statiques (OCR → NLP → index du cas comptable plus haut).
Exposer un endpoint MCP/agent : si tu sers ce loop à d'autres systèmes, le contrôleur ne lance que le run (
POST /agents/runs→add('step', { runId, turn: 0 })→ 202 +runId) et expose le trace en SSE (§6, en s'abonnant àagent:${runId}). Le client ne tient jamais une connexion ouverte pendant tout le run — il poll le statut ou écoute le stream. C'est le même découplage requête/exécution que pour une génération simple, étendu au multi-tours.
📈 Observabilité & production (niveau staff)
Une queue sans métriques est une bombe à retardement. Ce qu'un staff surveille :
| Signal | Pourquoi | Comment |
|---|---|---|
Queue depth (waiting + delayed) | détecte un retard avant que le client n'appelle | queue.getJobCounts() exporté en gauge Prometheus, alerte si > seuil |
| Job latency (enqueue→completed) | SLA métier | histogramme via events QueueEvents |
| Failure rate + DLQ size | régression downstream | compteur sur failed, alerte sur croissance DLQ |
| Active vs concurrency | workers saturés ou idle | ratio actifs / capacité |
| Stalled count | crashs / locks expirés | métrique stalled de BullMQ |
| Redis memory | jobs non purgés saturent Redis | INFO memory, removeOnComplete obligatoire |
// Tracing distribué : propager le trace-id du HTTP jusque dans le job
await queue.add('task', { ...data, traceId: trace.getActiveSpan()?.spanContext().traceId });
// + bullmq-otel pour instrumenter automatiquement add/process en spans OpenTelemetryCapacity planning — règle de pouce : concurrency × nb_workers ≤ capacité downstream (pool DB, RPM de l'API externe). 50 workers × concurrency 10 = 500 appels concurrents : si ton pool Postgres fait 20 connexions, tu satures. Dimensionne par le maillon le plus faible, pas par Redis.
HA & graceful shutdown — sur SIGTERM, worker.close() attend les jobs en cours (Nest le fait via OnModuleDestroy si enableShutdownHooks() est appelé). Sans ça, un déploiement kill des jobs active mid-flight → stalled → rejeu → double exécution. Toujours : un store d'idempotence + un grace period de shutdown > durée max d'un job.
🏋️ Exercices
Progression : implémenter → rendre production-grade → casser puis réparer. Fais-les dans l'ordre, chacun construit sur le précédent.
1. Pipeline d'email transactionnel idempotent
Objectif : enqueuer un email de bienvenue qui ne part jamais deux fois, même sous retry et redéploiement. Indice/Solution : queue emails, jobId: welcome:${userId} (dédup à l'enqueue) + store d'idempotence (processed:${queueName}:${jobId} dans Redis avec TTL) vérifié au début de process(). Teste en relançant le même add() 3× et en killant le worker mid-process : l'email ne doit partir qu'une fois. Vérifie que le removeOnComplete est bien configuré.
2. Scheduler récurrent sans doublons (migration Job Scheduler)
Objectif : un digest quotidien à 7h Europe/Paris qui ne se duplique pas après N boots ni avec 3 replicas. Indice/Solution : upsertJobScheduler('daily-digest', { pattern: '0 7 * * *', tz: 'Europe/Paris' }, { name: 'digest', data: {} }) dans OnModuleInit. Démarre 3 instances de l'app → getJobSchedulers() doit retourner un seul scheduler. Bonus : écris la version cassée avec l'ancien add(..., { repeat }) sans jobId stable, observe l'accumulation, puis migre.
3. Fan-out d'une newsletter à 1M users avec rate limiting
Objectif : envoyer à 1M users sans dépasser 100 req/min vers le provider (SendGrid) ni faire exploser la mémoire. Indice/Solution : un job « split » qui addBulk des jobs de 1000 destinataires (jamais 1M jobs d'un coup — pic mémoire Redis). limiter: { max: 100, duration: 60_000 } sur le worker. Mesure le temps total et la mémoire Redis (INFO memory). Bonus : ajoute une priorité pour que les users premium passent devant via une queue séparée (pas via priority, qui dégrade les perfs Redis à ce volume).
4. Génération LLM streamée, annulable et cost-guarded (production-grade)
Objectif : exposer POST /generations qui enqueue un appel Claude, streame les tokens en SSE, s'annule au disconnect client, et respecte un budget tokens par tenant. Indice/Solution : client @anthropic-ai/sdk injecté via forRootAsync ; cost guard à l'enqueue (rejette en 429 si budget dépassé) ; worker qui streame (messages.stream), publie les deltas sur Redis Pub/Sub par generationId, persiste l'output partiel ; SSE qui s'abonne au canal et abort sur req.on('close') ; idempotence sur generationId. Distingue les erreurs : BadRequestError → UnrecoverableError, RateLimitError → retry. Vérifie qu'un onglet fermé arrête bien de consommer des tokens.
5. Casse-le : le job fantôme qui double-facture (break then fix)
Objectif : reproduire une double exécution, comprendre pourquoi, puis la rendre impossible. Indice/Solution : écris un process() qui (a) charge une carte Stripe sans clé d'idempotence et (b) fait un sleep synchrone de 40 s (bloque l'event loop > lockDuration de 30 s). Lance-le : le job est marqué stalled, réinjecté, et la carte est chargée deux fois. Fix en deux temps : rends le sleep async (libère l'event loop, le lock se renouvelle) et ajoute une clé d'idempotence Stripe + un store local. Conclusion à formuler : « at-least-once n'est pas un bug, c'est le contrat — l'idempotence est ta responsabilité ».
6. DLQ + replay + observabilité (staff-grade)
Objectif : router les jobs morts vers une DLQ, alerter, et permettre un replay manuel sélectif après correction d'un bug downstream. Indice/Solution : QueueEvents('main') qui écoute failed, ajoute le job mort dans une queue dlq avec la raison + le stage, et notifie l'ops. Endpoint admin POST /dlq/:jobId/replay qui ré-enqueue dans la queue d'origine. Expose queue.getJobCounts() en gauges Prometheus et alerte si dlq.depth croît. Bonus : bullmq-otel pour tracer enqueue→complete en spans, et vérifie que le traceId HTTP se propage jusque dans le job.
7. Loop agentique en queue : reprise sur crash sans double tool-call (staff-grade, break then fix)
Objectif : implémenter le loop agentique du §7 (chaque tour ré-enqueue le suivant), puis prouver qu'il survit à un crash worker mid-run sans ré-exécuter un tool à effet de bord deux fois. Indice/Solution : queue agent-loop, jobId: ${runId}:${turn} déterministe, contexte persisté par runId. Mets un garde-fou turn > 25 → UnrecoverableError. Casse-le : place un tool à effet de bord (ex. send_email, create_invoice) et fais crasher le worker après l'exécution du tool mais avant le appendUser(toolResults) + le ré-enqueue. Au restart, le job stalled est réinjecté → le tour recommence → l'email part deux fois. Fix : persiste le résultat du tool de façon idempotente (clé ${runId}:${block.id}) et vérifie cette clé avant de ré-exécuter, exactement comme pour Stripe (§5). Conclusion à formuler : « dans un loop, l'idempotence doit être au niveau du tool call individuel, pas seulement du job — un seul tour peut contenir N effets de bord ». Bonus : ajoute un cost guard cumulé (SUM(usage) WHERE runId) qui annule les jobs suivants quand le budget tenant explose, et un flag isCancelled(runId) lu au début de chaque tour pour le Stop client.
🎤 En entretien
Q : BullMQ garantit-il l'exactly-once ? Comment gères-tu la duplication ? R : Non — Redis garantit at-least-once. Un crash worker, un lock expiré (job stalled), ou un retry après succès partiel rejouent le job. La parade est l'idempotence : clé métier (pas le job.id) + store qui marque « déjà traité », et clés d'idempotence côté services externes (Stripe, providers). « Exactly-once » est un mythe distribué ; on vise « effet exactly-once » via idempotence.
Q : Différence entre delay, repeat/Job Scheduler, et @nestjs/schedule ? R : delay = un job unique différé. Job Scheduler (upsertJobScheduler, ex-repeat) = jobs récurrents durables survivant aux restarts, stockés dans Redis, idempotents par schedulerId. @nestjs/schedule = cron in-process, non durable, perdu au restart et dupliqué sur N replicas. Pattern courant : un cron @nestjs/schedule léger ne fait qu'add() dans une queue — mais pour de la récurrence robuste multi-instance, le Job Scheduler de BullMQ est le bon outil.
Q : Comment empêcher un retry infini de saturer un service externe down ? R : attempts borné + backoff: { type: 'exponential', delay } (pas fixed, qui martèle), et UnrecoverableError pour les échecs permanents (400, validation métier) qui partent direct en DLQ sans consommer de tentatives. Pour un appel LLM, on ajoute un cost guard à l'enqueue : retry ne doit jamais re-payer une erreur qu'on sait permanente.
Q : Tu sers des générations Claude via une queue. Quels sont les 3 pièges spécifiques à l'IA ? R : (1) Idempotence sur le contenu (generationId) sinon double facturation tokens ; (2) ne pas doubler les retries — le SDK Anthropic retry déjà 429/5xx/529, le combiner aveuglément à attempts BullMQ multiplie les appels et le coût ; (3) annulation de bout en bout (disconnect client → AbortController → flag d'annulation lu par le worker) pour ne pas brûler des tokens pour un client parti. Bonus : streamer via Redis Pub/Sub + buffer de rattrapage pour la reconnexion.
Q : Comment exécuter un loop agentique (tool use) Claude qui peut durer plusieurs minutes, de façon résiliente ? R : Pas dans une requête HTTP (timeout, perte au redéploiement, worker bloqué en I/O). On modélise le loop comme une queue où chaque tour ré-enqueue le suivant tant que stop_reason === 'tool_use', avec le contexte (messages) persisté par runId et un jobId déterministe (${runId}:${turn}). Bénéfices : retry par tour sans re-payer les précédents, reprise sur crash en reconstruisant le contexte, trace inspectable tour par tour, cost guard cumulé qui peut avorter le run, et annulation via un flag lu en début de tour. Piège clé : l'idempotence doit descendre au niveau du tool call individuel (${runId}:${toolUseId}), car un seul tour peut déclencher plusieurs effets de bord — sinon un rejeu du tour rejoue tous ses tools. On garde un FlowProducer pour les pipelines à arbre fixe ; le ré-enqueue pour les loops de longueur inconnue.
🔗 Liens
@nestjs/bullmqdocs : https://docs.nestjs.com/techniques/queues- BullMQ official : https://docs.bullmq.io/
- Bull Board : https://github.com/felixmosh/bull-board
- "BullMQ Patterns" — https://docs.bullmq.io/patterns
- OpenTelemetry pour BullMQ :
bullmq-otel - Article "Stop using Bull, use BullMQ" — community
- Migration guide bull → bullmq : https://docs.bullmq.io/bull/upgrade-guide
- Job Schedulers (remplacent repeatable jobs) : https://docs.bullmq.io/guide/job-schedulers
@bull-board/nestjs(module Nest first-class) : https://www.npmjs.com/package/@bull-board/nestjs- SDK Anthropic (streaming, retries, AbortController) : https://github.com/anthropics/anthropic-sdk-typescript
- Idempotency keys explained — Stripe engineering blog