SSE & Streaming chat IA dans NestJS
TL;DR —
@Sse()expose un fluxObservable<MessageEvent>que Nest sérialise au format Server-Sent Events. C'est la techno idéale pour streamer des réponses LLM (Anthropic Claude, OpenAI, Bedrock) vers le navigateur : unidirectionnel, basé sur HTTP, traversant les proxies, avec reconnexion automatique viaLast-Event-ID. Mais SSE souffre de pièges concrets : buffering nginx, limite de 6 connexions par origine sur HTTP/1.1, Cloudflare qui coupe au bout de 100 s, encodage strict (\n\nobligatoire), backpressure invisible. Cet article disserte chaque détail.Pourquoi ce sujet est un test d'architecte. Streamer un token LLM n'est pas « écrire dans une socket ». C'est un système distribué miniature à trois maillons (client ↔ Nest ↔ provider) où chaque maillon peut tomber indépendamment, où l'argent brûle à chaque token (
AbortControllernon câblé = facture LLM pour des données jetées), et où la moindre couche intermédiaire — proxy, CDN, compression, agent HTTP — bufferise silencieusement et casse l'illusion temps réel. Un dev junior fait marcher la démo localhost ; un staff engineer raisonne sur la propagation de l'annulation, la backpressure, l'idempotence de la reprise, et le coût.
🧠 Mental model — ASCII + analogie
SSE, c'est un robinet HTTP qu'on n'arrête jamais de remplir. Le client ouvre une seule requête GET, le serveur garde la connexion ouverte et y déverse des paquets data: ...\n\n à son rythme. Pas de négociation, pas d'upgrade, pas de framing binaire : du texte UTF-8 sur du HTTP/1.1 ou HTTP/2.
Client (EventSource) Nest (@Sse) LLM provider
│ │ │
│── GET /chat/stream ─────────────►│ │
│ Accept: text/event-stream │ │
│ │── POST /v1/messages ────►│
│ │ stream: true │
│ │◄── chunk "Hel" ──────────│
│◄── data: {"delta":"Hel"}\n\n ────│ │
│ │◄── chunk "lo" ───────────│
│◄── data: {"delta":"lo"}\n\n ─────│ │
│ │◄── chunk "[DONE]" ───────│
│◄── event: done\ndata: {}\n\n ────│ │
│ (connection stays open) │ │L'analogie : un télégraphe. Le serveur tape des messages courts terminés par un double saut de ligne (le « stop » du télégraphiste), le client lit ligne par ligne sans jamais raccrocher. Si la ligne tombe, le client rappelle et dit « j'en étais à l'événement 42 » via l'en-tête Last-Event-ID, et le serveur reprend où il s'était arrêté.
Comparé à WebSocket, SSE est un one-way street : pas de frame ping, pas de protocole binaire, pas de subprotocols. En contrepartie, il passe partout (proxies HTTP, CDN, corporate firewalls), il reconnecte tout seul, et il n'exige aucune lib côté client : new EventSource(url) et c'est plié. Pour un chat LLM où le serveur produit des tokens et le client n'a qu'à les afficher, SSE est exactement la bonne abstraction.
🛠️ Code minimal (ts)
Voici un endpoint Nest qui streame une réponse Anthropic Claude vers le client. On commence par le contrôleur, le service LLM, puis le wiring DTO/garde. Le pattern complet exposé ici couvre : auth via guard, abort upstream propre, reprise via Last-Event-ID, heartbeat optionnel, encodage strict SSE.
// src/chat/chat.controller.ts
import {
Controller,
Get,
Headers,
MessageEvent,
Query,
Req,
Sse,
UseGuards,
} from '@nestjs/common';
import { Request } from 'express';
import { Observable, from, map, takeUntil, fromEvent, finalize } from 'rxjs';
import { ChatStreamService } from './chat-stream.service';
import { JwtAuthGuard } from '../auth/jwt.guard';
@Controller('chat')
@UseGuards(JwtAuthGuard)
export class ChatController {
constructor(private readonly chat: ChatStreamService) {}
@Sse('stream')
stream(
@Query('q') prompt: string,
@Headers('last-event-id') lastEventId: string | undefined,
@Req() req: Request,
): Observable<MessageEvent> {
const conversationId = req.query.cid as string;
const resumeFromTokenIndex = lastEventId ? Number(lastEventId) : 0;
const close$ = fromEvent(req, 'close');
return from(
this.chat.streamCompletion({
prompt,
conversationId,
resumeFromTokenIndex,
}),
).pipe(
map((chunk) => ({
id: String(chunk.index),
type: chunk.kind,
data: { delta: chunk.text, role: chunk.role },
})),
takeUntil(close$),
finalize(() => this.chat.releaseConnection(conversationId)),
);
}
}Le service LLM, lui, encapsule l'appel SDK Anthropic et expose un AsyncIterable que RxJS sait consommer via from().
⚠️ Anti-pattern à bannir :
new Anthropic()dans un champ de classe. On voit partoutprivate readonly client = new Anthropic({ apiKey: process.env... }). C'est un code smell senior : le client n'est plus mockable proprement en test, la clé est lue au moment de l'import du module (pas au runtime viaConfigService), on ne peut pas configurer lesmaxRetries/timeout/baseURLpar environnement, et on instancie potentiellement N clients. Le client LLM est une dépendance — on l'injecte via un providerforRootAsync. Le modèle ci-dessous utilise le flagship Anthropicclaude-opus-4-8; pour un assistant rapide et moins cher on bascule surclaude-haiku-4-5, le SDK gère les retries (429/5xx) tout seul.
// src/llm/llm.module.ts — le client est un provider DI, pas un `new` dans un champ
import { Module, Global } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import Anthropic from '@anthropic-ai/sdk';
export const ANTHROPIC = Symbol('ANTHROPIC_CLIENT');
@Global()
@Module({
imports: [ConfigModule],
providers: [
{
provide: ANTHROPIC,
inject: [ConfigService],
useFactory: (config: ConfigService): Anthropic =>
new Anthropic({
apiKey: config.getOrThrow<string>('ANTHROPIC_API_KEY'),
maxRetries: 3, // le SDK retente 429/5xx avec backoff exponentiel
timeout: 60_000,
}),
},
],
exports: [ANTHROPIC],
})
export class LlmModule {}// src/chat/chat-stream.service.ts
import { Inject, Injectable, Logger } from '@nestjs/common';
import Anthropic from '@anthropic-ai/sdk';
import { ANTHROPIC } from '../llm/llm.module';
export interface StreamChunk {
index: number;
kind: 'token' | 'done' | 'error';
text: string;
role: 'assistant' | 'system';
}
@Injectable()
export class ChatStreamService {
private readonly log = new Logger(ChatStreamService.name);
private readonly active = new Map<string, AbortController>();
// Le client est INJECTÉ, jamais `new`-é dans un champ : mockable, configurable, testable.
constructor(@Inject(ANTHROPIC) private readonly client: Anthropic) {}
async *streamCompletion(opts: {
prompt: string;
conversationId: string;
resumeFromTokenIndex: number;
}): AsyncGenerator<StreamChunk> {
const ctrl = new AbortController();
this.active.set(opts.conversationId, ctrl);
const upstream = this.client.messages.stream(
{
model: 'claude-opus-4-8', // flagship ; 'claude-haiku-4-5' pour rapide/moins cher
max_tokens: 2048,
messages: [{ role: 'user', content: opts.prompt }],
},
{ signal: ctrl.signal }, // <- propagation de l'annulation jusqu'au provider
);
let index = opts.resumeFromTokenIndex;
try {
for await (const event of upstream) {
if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
index += 1;
if (index <= opts.resumeFromTokenIndex) continue;
yield { index, kind: 'token', text: event.delta.text, role: 'assistant' };
}
}
yield { index: index + 1, kind: 'done', text: '', role: 'assistant' };
} catch (err) {
// AbortError est NORMAL (client déconnecté) — on ne le log pas en error.
if ((err as Error).name === 'AbortError') return;
this.log.error('upstream error', err);
yield { index: index + 1, kind: 'error', text: (err as Error).message, role: 'assistant' };
} finally {
this.active.delete(opts.conversationId);
}
}
releaseConnection(conversationId: string): void {
const ctrl = this.active.get(conversationId);
if (ctrl) {
ctrl.abort();
this.active.delete(conversationId);
}
}
}Pour les cas où on veut le contrôle ultime (heartbeat custom, écriture brute), on shunte @Sse et on écrit dans la Response directement. C'est ce que font les apps qui veulent envoyer des lignes commentaires : ping (Nest n'expose pas l'comment field de MessageEvent).
// src/chat/raw-stream.controller.ts
import { Controller, Get, Req, Res, UseGuards, Query } from '@nestjs/common';
import { Request, Response } from 'express';
import { JwtAuthGuard } from '../auth/jwt.guard';
import { ChatStreamService } from './chat-stream.service';
@Controller('chat')
@UseGuards(JwtAuthGuard)
export class RawStreamController {
constructor(private readonly chat: ChatStreamService) {}
@Get('raw-stream')
async raw(
@Req() req: Request,
@Res() res: Response,
@Query('q') prompt: string,
@Query('cid') cid: string,
): Promise<void> {
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders();
const heartbeat = setInterval(() => res.write(': ping\n\n'), 15_000);
const lastEventId = Number(req.header('last-event-id') ?? 0);
let aborted = false;
req.on('close', () => { aborted = true; clearInterval(heartbeat); });
try {
for await (const chunk of this.chat.streamCompletion({ prompt, conversationId: cid, resumeFromTokenIndex: lastEventId })) {
if (aborted) break;
const payload = JSON.stringify({ delta: chunk.text, kind: chunk.kind });
res.write(`id: ${chunk.index}\n`);
res.write(`event: ${chunk.kind}\n`);
res.write(`data: ${payload}\n\n`);
if (!res.write('')) await new Promise((r) => res.once('drain', r));
}
} finally {
clearInterval(heartbeat);
res.end();
}
}
}Côté client navigateur, deux options : EventSource (simple, pas d'auth header) ou fetch + ReadableStream (auth header possible, parsing manuel).
// web/src/chat/useChatStream.ts
export function subscribeToChat(prompt: string, token: string, onToken: (t: string) => void) {
const controller = new AbortController();
fetch(`/api/chat/stream?q=${encodeURIComponent(prompt)}&cid=conv-42`, {
headers: {
Accept: 'text/event-stream',
Authorization: `Bearer ${token}`,
},
signal: controller.signal,
}).then(async (res) => {
if (!res.body) throw new Error('no body');
const reader = res.body.pipeThrough(new TextDecoderStream()).getReader();
let buffer = '';
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += value;
const frames = buffer.split('\n\n');
buffer = frames.pop() ?? '';
for (const frame of frames) {
const dataLine = frame.split('\n').find((l) => l.startsWith('data: '));
if (!dataLine) continue;
const payload = JSON.parse(dataLine.slice(6));
if (payload.delta) onToken(payload.delta);
}
}
});
return () => controller.abort();
}Le même côté React avec hook custom et gestion de l'état.
// web/src/chat/useChatStream.ts
import { useEffect, useRef, useState } from 'react';
interface UseChatStreamResult {
text: string;
status: 'idle' | 'streaming' | 'done' | 'error';
start: (prompt: string) => void;
abort: () => void;
}
export function useChatStream(token: string): UseChatStreamResult {
const [text, setText] = useState('');
const [status, setStatus] = useState<UseChatStreamResult['status']>('idle');
const abortRef = useRef<AbortController | null>(null);
useEffect(() => () => abortRef.current?.abort(), []);
const start = (prompt: string) => {
abortRef.current?.abort();
const ctrl = new AbortController();
abortRef.current = ctrl;
setText('');
setStatus('streaming');
(async () => {
try {
const res = await fetch(`/api/chat/stream?q=${encodeURIComponent(prompt)}&cid=ui-${Date.now()}`, {
headers: { Accept: 'text/event-stream', Authorization: `Bearer ${token}` },
signal: ctrl.signal,
});
if (!res.ok || !res.body) throw new Error(`status ${res.status}`);
const reader = res.body.pipeThrough(new TextDecoderStream()).getReader();
let buf = '';
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += value;
const frames = buf.split('\n\n');
buf = frames.pop() ?? '';
for (const frame of frames) {
const dataLine = frame.split('\n').find((l) => l.startsWith('data: '));
if (!dataLine) continue;
const payload = JSON.parse(dataLine.slice(6));
if (payload.kind === 'token') setText((prev) => prev + payload.delta);
else if (payload.kind === 'done') setStatus('done');
else if (payload.kind === 'error') setStatus('error');
}
}
} catch (err) {
if ((err as Error).name !== 'AbortError') setStatus('error');
}
})();
};
return { text, status, start, abort: () => abortRef.current?.abort() };
}🎯 Patterns courants
Heartbeat / keep-alive. Sans trafic pendant 30 à 60 secondes, la plupart des proxies (nginx, Cloudflare, ALB) ferment la connexion. La solution canonique consiste à émettre un commentaire SSE (: ping\n\n) toutes les 15 à 25 secondes. En Nest, on merge() un interval(15_000) dans l'Observable qui produit des MessageEvent de type commentaire — sauf que Nest ne supporte pas nativement les lignes commentaires. La parade : émettre un événement de type ping avec un data: {} minuscule, ou descendre d'un cran et écrire sur res directement avec @Res({ passthrough: false }). Le compromis : on perd l'élégance de l'Observable, on gagne la maîtrise du wire format.
Reconnect via Last-Event-ID. Chaque MessageEvent peut porter un id. Le navigateur le stocke et, lors d'une reconnexion, l'envoie via l'en-tête HTTP Last-Event-ID. Côté serveur, on lit cet en-tête et on reprend là où on s'était arrêté. Pour un stream LLM, ça veut dire : indexer chaque token sortant, persister l'historique pendant N minutes (Redis, mémoire), et savoir reprendre à partir du token k+1. Si le LLM upstream ne supporte pas la reprise, on rejoue depuis le début mais on skip les tokens déjà envoyés — c'est ce que fait l'exemple ci-dessus avec resumeFromTokenIndex.
Backpressure côté serveur. SSE n'a pas de mécanisme de flow control au niveau protocole. Si le client lit lentement (mobile en 3G), les chunks s'accumulent dans le buffer TCP du serveur, puis dans le buffer applicatif Node. Nest n'expose pas la pression de retour ; il faut soit utiliser res.write() directement et écouter 'drain', soit limiter le débit côté LLM (les SDK OpenAI/Anthropic respectent généralement le rythme de consommation de l'AsyncIterable, ce qui propage la backpressure naturellement). Pour des streams très rapides (Bedrock Titan, par exemple), c'est crucial.
Auth sur SSE. EventSource ne supporte pas les en-têtes custom (pas de Authorization). Trois options : cookie HttpOnly (CSRF à gérer), token en query string (apparaît dans les logs nginx, à éviter), ou abandonner EventSource au profit de fetch avec ReadableStream. Pour une API publique avec JWT, la dernière option l'emporte. Pour une app interne avec session cookie, EventSource reste plus simple.
Multi-tenancy et isolation. Quand plusieurs utilisateurs streament en même temps, on garde une Map<conversationId, AbortController> pour pouvoir annuler une génération à la demande (bouton « Stop » côté UI envoie un POST /chat/stop?cid=...). Sans ça, on paye des tokens LLM inutiles et on garde des connexions ouvertes pour rien.
Streaming OpenAI vs Anthropic vs Bedrock. OpenAI et Anthropic exposent des SDK avec for await (const chunk of stream) qui marchent presque pareil. Bedrock, lui, est plus bas niveau : on consomme un flux EventStream AWS encodé en application/vnd.amazon.eventstream qu'il faut décoder via @aws-sdk/client-bedrock-runtime et son InvokeModelWithResponseStreamCommand. La forme des chunks varie selon le modèle (Claude, Titan, Llama). On encapsule donc derrière une interface Provider commune qui yield des StreamChunk uniformes.
Voici l'adapter Bedrock concret pour donner la mesure de l'effort.
// src/chat/providers/bedrock.provider.ts
import { Injectable } from '@nestjs/common';
import {
BedrockRuntimeClient,
InvokeModelWithResponseStreamCommand,
} from '@aws-sdk/client-bedrock-runtime';
import { StreamChunk } from '../chat-stream.service';
@Injectable()
export class BedrockProvider {
private readonly client = new BedrockRuntimeClient({ region: process.env.AWS_REGION ?? 'eu-west-3' });
async *stream(prompt: string, modelId = 'anthropic.claude-3-5-sonnet-20241022-v2:0'): AsyncGenerator<StreamChunk> {
const cmd = new InvokeModelWithResponseStreamCommand({
modelId,
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 2048,
messages: [{ role: 'user', content: prompt }],
}),
});
const res = await this.client.send(cmd);
if (!res.body) return;
let i = 0;
for await (const part of res.body) {
if (!part.chunk?.bytes) continue;
const payload = JSON.parse(new TextDecoder().decode(part.chunk.bytes));
if (payload.type === 'content_block_delta' && payload.delta?.text) {
i += 1;
yield { index: i, kind: 'token', text: payload.delta.text, role: 'assistant' };
}
}
yield { index: i + 1, kind: 'done', text: '', role: 'assistant' };
}
}Auth jetable pour EventSource. Quand on doit absolument utiliser EventSource mais qu'on a un JWT, on expose un endpoint POST /chat/ticket authentifié qui crée un token court (30 s) lié à la session et stocké en Redis. Le client appelle new EventSource(/chat/stream?ticket=${t}). Le middleware SSE échange le ticket contre la session puis le supprime (one-shot). Pas d'exposition de JWT dans les logs nginx, pas de risque de replay.
// src/auth/ticket.controller.ts
import { Controller, Post, UseGuards, Req } from '@nestjs/common';
import { randomBytes } from 'node:crypto';
import Redis from 'ioredis';
import { JwtAuthGuard } from './jwt.guard';
@Controller('chat')
@UseGuards(JwtAuthGuard)
export class TicketController {
private readonly redis = new Redis(process.env.REDIS_URL!);
@Post('ticket')
async issue(@Req() req: any): Promise<{ ticket: string }> {
const t = randomBytes(24).toString('base64url');
await this.redis.set(`sse-ticket:${t}`, JSON.stringify({ sub: req.user.sub }), 'EX', 30);
return { ticket: t };
}
}Chunk smoothing. Anthropic et OpenAI envoient parfois des bursts (50 tokens en 100 ms), puis des creux (silence 800 ms). Côté UX, c'est saccadé. Un pattern de smoothing applicatif : buffer les tokens reçus, les republier toutes les 30 ms à un rythme constant. Coûte un peu de latence (~30 ms) mais l'effet « machine à écrire » est plus naturel pour l'œil humain. Implémentation : RxJS bufferTime(30).pipe(concatMap(arr => from(arr).pipe(concatMap(t => of(t).pipe(delay(30/arr.length)))))).
Token counting en streaming. Pour facturer ou enforcer une quota, on compte les tokens AU FUR ET À MESURE. Anthropic SDK expose event.usage sur le chunk message_delta. OpenAI ne le donne qu'à la fin (paramètre stream_options: { include_usage: true } à activer en v4.50+). Stocker tokens_used dans une variable locale, l'écrire en DB quand le stream se termine (ou échoue), pour facturer même un stream partiel.
SSE multiplexing. Le navigateur HTTP/1.1 limite à 6 connexions par origine. Si l'app a 3 onglets ouverts avec 2 SSE chacun, on sature. HTTP/2 résout ça avec le multiplexing sur une seule connexion TCP. En prod, activer HTTP/2 sur le reverse proxy est non négociable.
Config nginx canonique pour SSE. Le fichier de conf minimum qui marche sans piège.
location /chat/stream {
proxy_pass http://nest-upstream;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Accel-Buffering no;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 1h;
proxy_send_timeout 1h;
chunked_transfer_encoding on;
gzip off;
}Le proxy_read_timeout 1h est cruxial : nginx tue les connexions inactives au bout de 60 s par défaut, ce qui casse les streams LLM longs sans heartbeat. Avec heartbeat 15 s, on peut laisser à 60 s, mais 1 h donne du slack.
🔄 Versions — Nest 7 → 11 + libs
| Nest | SSE | Notes |
|---|---|---|
| 7.x | @Sse() existait déjà | Reposait lourdement sur rxjs 6, MessageEvent typé approximativement. |
| 8.x | Stabilisation | Support officiel des Observable<MessageEvent> typés ; @Sse accepte un chemin paramétré. |
| 9.x | Préparation Fastify | Adaptateur Fastify supporte SSE via fastify-sse-v2, mais l'API Nest reste identique. |
| 10.x | RxJS 7 | from(asyncIterable) devient idiomatique pour brancher les SDK LLM. Pas de breaking change SSE. |
| 11.x | Express 5 / Node 20+ | req.on('close') reste valable, mais on préfère fromEvent(req, 'close') avec AbortSignal natif. |
Côté écosystème, les versions à viser en 2026 : @anthropic-ai/sdk (le SDK gère maxRetries + backoff exponentiel sur 429/5xx — ne pas réimplémenter), openai (streaming via client.chat.completions.create({ stream: true })), @aws-sdk/client-bedrock-runtime. Pour Fastify, fastify-sse-v2. Pour les tests, eventsource (Node) ou undici directement.
Modèles Anthropic à connaître (2026). Flagship :
claude-opus-4-8(le plus capable, raisonnement long et agentique). Équilibré :claude-sonnet-4-6. Rapide/économique :claude-haiku-4-5. Pour un chat assistant en streaming, le choix par défaut estclaude-opus-4-8; on descend surclaude-haiku-4-5pour des suggestions à faible enjeu où la latence prime. Toujours streamer dès quemax_tokensest élevé (évite les timeouts HTTP du SDK) et utiliser le mode adaptatif de thinking (thinking: { type: 'adaptive' }) pour les tâches complexes.
⚠️ Pitfalls — 8 à connaître par cœur
nginx buffering. Par défaut, nginx tamponne la réponse. Avec SSE, les chunks restent coincés dans le buffer jusqu'à atteindre
proxy_buffer_size. Le client ne reçoit rien pendant 5 à 30 secondes, puis tout d'un coup. Correctif obligatoire :proxy_buffering off;,proxy_cache off;,proxy_set_header X-Accel-Buffering no;, et émettre l'en-têteX-Accel-Buffering: nodepuis Nest pour forcer même si la conf nginx oublie de désactiver.Cloudflare timeout à 100 s. Cloudflare ferme toute connexion HTTP qui dépasse 100 s sur le plan gratuit (et 6000 s sur Enterprise). Un stream LLM long (raisonnement, génération de 4 000 tokens) peut dépasser. Options : Cloudflare Argo Tunnels (pas de limite), bypass Cloudflare pour la route
/chat/streamvia un sous-domaine non proxifié, ou découper la génération en plusieurs requêtes SSE successives.Encodage strict. Une ligne SSE valide se termine par
\n\n. Si on fait dudata: foo\ndata: bar\n\n, c'est un seul événement multi-lignes (concaténé avec\n). Mélanger\r\net\ncasse silencieusement Chrome. Quand on écritdata:directement (sans Nest), toujours utiliser\n(LF), jamais\r\n(CRLF).Buffer overflow sur slow client. Si le client lit à 1 KB/s et le LLM produit à 50 KB/s, le buffer Node grossit jusqu'à
OOM. La parade :res.write()retournefalsequand le buffer est plein, écouter'drain'avant de continuer. Avec@Sse, on n'a pas la main directe — il faut tomber sur@Res({ passthrough: false }).JSON dans
data. Si le LLM produit un token contenant\n(rare mais possible), un naïfdata: ${JSON.stringify(chunk)}\n\npeut être mal interprété par certains parsers tolérants. Toujours sérialiser en JSON sur une seule ligne et bannir les retours chariot dans le payload.EventSource ne supporte pas
Authorization. Beaucoup d'équipes découvrent ça après avoir tout codé. Solution : passer àfetch + ReadableStream, ou utiliser un cookie HttpOnly avec CSRF token, ou un token jetable en query string révoqué après usage.req.on('close')vsres.on('close'). Sur Express 5,req.on('close')se déclenche quand le client coupe ET quandres.end()est appelé. Sur Express 4,res.on('close')était plus fiable. En Nest 11+, préférerfromEvent(req, 'close')ou écouterAbortSignalvia@Req() reqet son éventuelreq.signal.CDN qui « cache » du SSE. Certains CDN agressifs (anciens Fastly, AWS CloudFront mal configuré) tentent de mettre en cache la réponse
text/event-streamparce qu'elle a unCache-Controlpar défaut. ForcerCache-Control: no-cache, no-transform,Connection: keep-alive,Content-Type: text/event-stream; charset=utf-8explicitement.HTTP/1.1 vs HTTP/2 limites de connexion. Six connexions par origine en HTTP/1.1. Un dashboard avec 3 SSE pour 3 widgets sature avant même que l'utilisateur clique. HTTP/2 multiplex jusqu'à 100 streams. Activer HTTP/2 ou regrouper plusieurs flux dans un seul endpoint multiplexé applicativement.
Reconnexion infinie après 5xx.
EventSourceretente automatiquement avec un backoff de 3 s. Si le serveur renvoie systématiquement 503 (LLM provider down), le navigateur martèle. Renvoyer un code HTTP 204 ou un événementevent: terminate\ndata: {}\n\npuis fermer la connexion ne suffit pas —EventSourceretente quand même. Solution : renvoyer 4xx (le browser arrête de retenter sur 4xx).AbortControllernon propagé. Si le client coupe (req.on('close')) mais qu'on n'abort pas l'appel SDK LLM upstream, on continue à payer des tokens pour des données jetées. Toujours câbler unAbortControllerdu downstream vers l'upstream. Pour des LLM via HTTP custom (Mistral, Cohere), passer{ signal: ctrl.signal }àfetch. Pour des SDK qui ne supportent pasAbortSignal(rare en 2026), wrap dansPromise.race([streamPromise, abortPromise]).Compression gzip qui buffer. nginx ou un middleware Express
compressionpeuvent activer gzip surtext/event-stream. Gzip a besoin d'un buffer minimum pour compresser efficacement → les chunks sont retenus. Exclure explicitement le content-type SSE de la compression :compression({ filter: (req, res) => res.getHeader('content-type') !== 'text/event-stream' }).Connection pool HTTP épuisé. Si Nest streame vers OpenAI, OpenAI streame vers Nest, Nest streame vers le client. Chaque connexion client occupe une connexion upstream. Default
http.AgentNode limite àmaxSockets = Infinitymais le système d'exploitation limite à ~1024 file descriptors. ConfigurerglobalAgent.maxSockets = 200explicitement et surveillerESOCKETTIMEDOUT.
🧪 Testing
Tester du SSE sans navigateur exige un client qui parse correctement le format. Trois approches.
// test/sse.e2e-spec.ts
import { Test } from '@nestjs/testing';
import { INestApplication } from '@nestjs/common';
import { AppModule } from '../src/app.module';
import { setTimeout as wait } from 'node:timers/promises';
describe('SSE /chat/stream', () => {
let app: INestApplication;
let baseUrl: string;
beforeAll(async () => {
const mod = await Test.createTestingModule({ imports: [AppModule] }).compile();
app = mod.createNestApplication();
await app.listen(0);
const port = (app.getHttpServer().address() as { port: number }).port;
baseUrl = `http://127.0.0.1:${port}`;
});
afterAll(() => app.close());
it('streams tokens and terminates with done', async () => {
const res = await fetch(`${baseUrl}/chat/stream?q=hello&cid=test-1`, {
headers: { Accept: 'text/event-stream', Authorization: 'Bearer test-jwt' },
});
expect(res.headers.get('content-type')).toMatch(/text\/event-stream/);
const reader = res.body!.pipeThrough(new TextDecoderStream()).getReader();
const events: string[] = [];
let buf = '';
const deadline = Date.now() + 5_000;
while (Date.now() < deadline) {
const { value, done } = await reader.read();
if (done) break;
buf += value;
const frames = buf.split('\n\n');
buf = frames.pop() ?? '';
events.push(...frames);
if (frames.some((f) => f.includes('"kind":"done"'))) break;
}
expect(events.length).toBeGreaterThan(2);
expect(events.at(-1)).toMatch(/"kind":"done"/);
});
it('resumes from Last-Event-ID', async () => {
const first = await fetch(`${baseUrl}/chat/stream?q=count&cid=test-2`);
const reader = first.body!.pipeThrough(new TextDecoderStream()).getReader();
await reader.read();
await reader.cancel();
const second = await fetch(`${baseUrl}/chat/stream?q=count&cid=test-2`, {
headers: { 'Last-Event-ID': '3' },
});
const r2 = second.body!.pipeThrough(new TextDecoderStream()).getReader();
const { value } = await r2.read();
expect(value).toMatch(/"index":4/);
});
});Pour les tests unitaires du service, on mocke le SDK Anthropic en exposant un AsyncGenerator synthétique, puis on consomme avec for await.
// chat-stream.service.spec.ts
it('respects resumeFromTokenIndex', async () => {
const fakeUpstream = (async function* () {
for (let i = 0; i < 5; i++) {
yield { type: 'content_block_delta', delta: { type: 'text_delta', text: `tok${i}` } };
}
})();
jest.spyOn(service['client'].messages, 'stream').mockReturnValue(fakeUpstream as any);
const out: StreamChunk[] = [];
for await (const c of service.streamCompletion({ prompt: 'x', conversationId: 'a', resumeFromTokenIndex: 2 })) {
out.push(c);
}
expect(out.filter((c) => c.kind === 'token').map((c) => c.text)).toEqual(['tok2', 'tok3', 'tok4']);
});Pour les tests de charge, autocannon ne sait pas vraiment streamer SSE. Préférer k6 avec son http.get() en mode streaming, ou un script Promise.all qui ouvre N fetch en parallèle et mesure le débit agrégé. Vérifier surtout que le serveur tient 1 000 connexions ouvertes simultanées sans fuite mémoire (cf. Node --max-old-space-size).
Un script k6 minimum pour benchmarker la stack SSE :
// k6/sse.js
import http from 'k6/http';
import { check } from 'k6';
export const options = {
scenarios: {
streamers: {
executor: 'ramping-vus',
startVUs: 0,
stages: [
{ duration: '30s', target: 100 },
{ duration: '2m', target: 500 },
{ duration: '30s', target: 0 },
],
},
},
thresholds: {
http_req_duration: ['p(99)<3000'],
http_req_failed: ['rate<0.01'],
},
};
export default function () {
const res = http.get('https://api.example.com/chat/stream?q=hello&cid=load-test', {
headers: { Accept: 'text/event-stream', Authorization: 'Bearer test-token' },
responseType: 'text',
timeout: '60s',
});
check(res, {
'status 200': (r) => r.status === 200,
'streams done event': (r) => (r.body as string).includes('"kind":"done"'),
});
}Tester aussi le scénario de coupure côté client : ouvrir le stream, attendre 200 ms, abort. Vérifier côté serveur que releaseConnection est bien appelé et que l'AbortController upstream est triggered. Sans ça, fuite de tokens LLM garantie en prod.
🎬 Cas d'usage concrets
RAG juridique — streaming des réponses d'un assistant LLM
Qui : éditeur SaaS legaltech, assistant intelligent qui répond aux questions juridiques en sourçant des arrêts de cassation. Latence perçue critique car les avocats utilisent l'outil entre deux rendez-vous.
Problème : sans streaming, l'utilisateur attendait 12 à 30 secondes devant un loader pour voir la réponse arriver d'un bloc. Avec streaming token-par-token, le first-byte tombe à 800 ms et l'utilisateur lit pendant que le modèle continue à générer.
@Controller('legal-assistant')
export class LegalAssistantController {
constructor(private readonly rag: RagService, private readonly llm: LlmService) {}
@Sse('ask/:sessionId')
async ask(@Param('sessionId') sessionId: string, @Query('q') question: string): Promise<Observable<MessageEvent>> {
return new Observable<MessageEvent>((subscriber) => {
const abort = new AbortController();
(async () => {
try {
const context = await this.rag.retrieve(question);
subscriber.next({ type: 'sources', data: { sources: context.sources } });
for await (const chunk of this.llm.stream({
messages: [{ role: 'user', content: question }],
context, signal: abort.signal,
})) {
subscriber.next({ type: 'token', data: { content: chunk.content } });
}
subscriber.next({ type: 'done', data: { sessionId } });
subscriber.complete();
} catch (e) {
subscriber.error(e);
}
})();
return () => abort.abort();
});
}
}Gains : satisfaction utilisateur passée de 6,2 à 8,7 sur 10 après bascule streaming. Coût LLM identique mais perception de rapidité multipliée par 15. L'AbortController ferme la connexion OpenAI quand l'avocat abandonne, économisant 30% de tokens facturés.
E-commerce — chat support assisté IA en streaming
Qui : retailer multicanal, équipe support de 60 agents. L'IA propose des réponses pré-rédigées que l'agent peut adopter, modifier ou rejeter. Les agents traitent en moyenne 6 conversations en parallèle.
Problème : sans streaming, l'agent attendait l'IA, perdant le contexte avec le client. Avec streaming, la suggestion apparaît mot-à-mot, l'agent peut commencer à lire et décider plus vite.
@Controller('support')
@UseGuards(JwtAuthGuard)
export class SupportSseController {
@Sse('suggest/:conversationId')
async suggest(
@Param('conversationId') convId: string,
@CurrentUser() agent: User,
): Promise<Observable<MessageEvent>> {
return new Observable<MessageEvent>((subscriber) => {
const abort = new AbortController();
(async () => {
const history = await this.conversations.getHistory(convId);
for await (const chunk of this.suggester.stream({ history, agentRole: agent.role, signal: abort.signal })) {
subscriber.next({ data: { delta: chunk.content }, type: 'delta' });
}
subscriber.complete();
})();
return () => abort.abort();
});
}
}Gains : temps moyen de réponse agent diminué de 28%, qualité maintenue (mesurée par CSAT). Le streaming évite le perçu "le système est lent" pendant les pics où l'IA met 8 s à générer une suggestion complète.
Helpdesk IT — résumés d'incidents en temps réel
Qui : ESN gérant le SI de 40 clients, helpdesk niveau 1 et 2. Chaque ticket complexe génère un fil de 30 à 100 messages. Le passage de quart entre niveaux doit transmettre un résumé synthétique.
Problème : générer un résumé prenait 45 secondes (modèle local Mistral 7B), l'agent quittait souvent l'écran. Avec SSE, le résumé apparaît phrase par phrase et l'agent peut commencer à lire dès la première seconde.
@Controller('tickets')
export class TicketSummaryController {
@Sse(':id/summary/stream')
stream(@Param('id') id: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
const abort = new AbortController();
(async () => {
const ticket = await this.tickets.findOneWithMessages(id);
for await (const event of this.summarizer.stream(ticket.messages, { signal: abort.signal })) {
if (event.type === 'token') subscriber.next({ type: 'token', data: { text: event.content } });
if (event.type === 'section') subscriber.next({ type: 'section', data: { name: event.name } });
}
subscriber.complete();
})();
return () => abort.abort();
});
}
}Gains : passages de quart raccourcis de 4 minutes à 1 minute. Le streaming par sections (problem, actions_taken, next_steps) permet à l'agent de jumper directement à la partie qui l'intéresse pendant que le reste se génère.
🛠️ Exemple end-to-end
Contexte : assistant IA d'analyse de contrats. L'utilisateur uploade un PDF, le système extrait les clauses, lance une analyse LLM en streaming avec sourcing (références à des clauses précédentes), et expose un endpoint SSE complet avec gestion de session, heartbeats, reconnexion, et persistance pour reprise.
// src/contract-assistant/assistant-sse.controller.ts
import {
Controller, Sse, Param, Query, Req, MessageEvent, Logger, UseGuards,
} from '@nestjs/common';
import { Observable } from 'rxjs';
import type { Request } from 'express';
@Controller('contracts/assistant')
@UseGuards(JwtAuthGuard)
export class ContractAssistantSseController {
private readonly log = new Logger(ContractAssistantSseController.name);
constructor(
private readonly sessions: AssistantSessionService,
private readonly clauses: ClauseExtractorService,
private readonly llm: ContractLlmService,
private readonly metrics: SseMetricsService,
) {}
@Sse(':contractId/analyze/:sessionId')
async analyze(
@Param('contractId') contractId: string,
@Param('sessionId') sessionId: string,
@Query('lastEventId') lastEventId: string | undefined,
@Req() req: Request,
@CurrentUser() user: AuthenticatedUser,
): Promise<Observable<MessageEvent>> {
await this.sessions.assertOwnership(sessionId, user.id, contractId);
return new Observable<MessageEvent>((subscriber) => {
const abort = new AbortController();
const startedAt = Date.now();
let eventIndex = lastEventId ? Number(lastEventId) : 0;
let heartbeat: NodeJS.Timeout | undefined;
this.metrics.openConnection('contract.analyze');
const send = (type: string, data: unknown) => {
eventIndex += 1;
subscriber.next({ id: String(eventIndex), type, data } as MessageEvent);
};
const onClientClose = () => {
this.log.log(`Client closed session=${sessionId} after ${Date.now() - startedAt}ms`);
abort.abort();
};
req.on('close', onClientClose);
heartbeat = setInterval(() => {
send('heartbeat', { ts: Date.now() });
}, 15_000);
(async () => {
try {
// 1. Replay if reconnect
if (lastEventId) {
const buffered = await this.sessions.getEventsAfter(sessionId, eventIndex);
for (const e of buffered) subscriber.next(e as MessageEvent);
}
// 2. Extract clauses (cached if already done)
const clauses = await this.clauses.extractCached(contractId);
send('clauses.ready', { count: clauses.length });
await this.sessions.recordEvent(sessionId, eventIndex, { type: 'clauses.ready', data: { count: clauses.length } });
// 3. Stream analysis clause by clause
for (const clause of clauses) {
send('clause.start', { id: clause.id, title: clause.title });
await this.sessions.recordEvent(sessionId, eventIndex, { type: 'clause.start', data: { id: clause.id, title: clause.title } });
let accumulator = '';
for await (const chunk of this.llm.analyzeClause(clause, { signal: abort.signal })) {
accumulator += chunk.content;
send('clause.token', { id: clause.id, delta: chunk.content });
}
send('clause.done', { id: clause.id, fullAnalysis: accumulator });
await this.sessions.recordEvent(sessionId, eventIndex, { type: 'clause.done', data: { id: clause.id, fullAnalysis: accumulator } });
}
// 4. Global summary
send('summary.start', {});
let summary = '';
for await (const chunk of this.llm.summarize(clauses, { signal: abort.signal })) {
summary += chunk.content;
send('summary.token', { delta: chunk.content });
}
send('summary.done', { content: summary });
await this.sessions.recordEvent(sessionId, eventIndex, { type: 'summary.done', data: { content: summary } });
send('analysis.complete', { sessionId, durationMs: Date.now() - startedAt });
await this.sessions.complete(sessionId);
subscriber.complete();
} catch (e) {
this.log.error(`Stream failed session=${sessionId}`, e as Error);
send('error', { message: (e as Error).message });
subscriber.error(e);
} finally {
if (heartbeat) clearInterval(heartbeat);
this.metrics.closeConnection('contract.analyze', Date.now() - startedAt);
req.removeListener('close', onClientClose);
}
})();
return () => {
if (heartbeat) clearInterval(heartbeat);
abort.abort();
};
});
}
}
// src/contract-assistant/assistant-session.service.ts
@Injectable()
export class AssistantSessionService {
constructor(@Inject(CACHE_MANAGER) private readonly cache: Cache, private readonly repo: SessionRepository) {}
async recordEvent(sessionId: string, index: number, event: { type: string; data: unknown }) {
await this.repo.appendEvent(sessionId, { index, ...event, timestamp: new Date() });
}
async getEventsAfter(sessionId: string, lastIndex: number) {
return this.repo.findEventsAfter(sessionId, lastIndex);
}
async complete(sessionId: string) {
await this.repo.updateStatus(sessionId, 'COMPLETED');
}
async assertOwnership(sessionId: string, userId: string, contractId: string) {
const session = await this.repo.findOne(sessionId);
if (!session || session.userId !== userId || session.contractId !== contractId) {
throw new ForbiddenException('Session does not belong to user');
}
}
}Authentification + ownership session, persistance event-par-event pour reprise après coupure (header Last-Event-ID), heartbeats toutes les 15 s pour traverser les proxies, AbortController pour libérer la connexion LLM si le client se déconnecte, métriques d'ouverture/fermeture pour observabilité. Le système supporte 800 analyses concurrentes avec une mémoire stable et une expérience utilisateur résiliente aux coupures réseau mobile.
🤖 Servir & orchestrer un agent IA depuis NestJS (niveau staff)
Streamer des tokens, c'est le maillon visible. Un endpoint LLM de production, c'est une edge (garde-fous au bord) + une boucle agentique (tool-use serveur) + un worker (jobs longs hors requête HTTP). Voici comment un staff engineer câble les trois.
1. La boucle agentique tool-use, côté serveur, en streaming
Un chat LLM moderne n'est pas un simple prompt → réponse. C'est une boucle : le modèle peut demander d'appeler un outil (tool_use), on exécute l'outil côté Nest, on renvoie le résultat (tool_result), et le modèle continue. Tout ça en streamant vers le client : tokens de texte, mais aussi une timeline d'événements outils (pending → running → done). C'est la pièce que les juniors ratent — ils font un seul appel et perdent l'agentique.
// src/agent/agent.service.ts
import { Inject, Injectable, Logger } from '@nestjs/common';
import Anthropic from '@anthropic-ai/sdk';
import { ANTHROPIC } from '../llm/llm.module';
// Discriminated union : le client UI sait dessiner chaque étape
export type AgentEvent =
| { kind: 'text'; delta: string }
| { kind: 'tool_call'; id: string; name: string; input: unknown; phase: 'running' }
| { kind: 'tool_result'; id: string; phase: 'done'; isError: boolean }
| { kind: 'done'; stopReason: string | null }
| { kind: 'error'; message: string };
const TOOLS: Anthropic.Tool[] = [
{
name: 'get_weather',
description: "Météo actuelle d'une ville. À appeler quand l'utilisateur demande la météo.",
input_schema: {
type: 'object',
properties: { city: { type: 'string', description: 'Nom de la ville' } },
required: ['city'],
},
},
];
@Injectable()
export class AgentService {
private readonly log = new Logger(AgentService.name);
constructor(@Inject(ANTHROPIC) private readonly client: Anthropic) {}
async *run(
prompt: string,
signal: AbortSignal, // câblé sur req.close — annule l'upstream ET la boucle
): AsyncGenerator<AgentEvent> {
const messages: Anthropic.MessageParam[] = [{ role: 'user', content: prompt }];
// Boucle agentique : on tourne tant que le modèle demande des outils.
for (let turn = 0; turn < 8; turn++) {
if (signal.aborted) return; // le client est parti : on arrête de payer
const stream = this.client.messages.stream(
{ model: 'claude-opus-4-8', max_tokens: 2048, tools: TOOLS, messages },
{ signal },
);
for await (const ev of stream) {
if (ev.type === 'content_block_delta' && ev.delta.type === 'text_delta') {
yield { kind: 'text', delta: ev.delta.text };
}
}
const final = await stream.finalMessage();
messages.push({ role: 'assistant', content: final.content });
if (final.stop_reason !== 'tool_use') {
yield { kind: 'done', stopReason: final.stop_reason };
return;
}
// Le modèle veut des outils : on les exécute et on renvoie les résultats.
const toolUses = final.content.filter((b): b is Anthropic.ToolUseBlock => b.type === 'tool_use');
const results: Anthropic.ToolResultBlockParam[] = [];
for (const tu of toolUses) {
yield { kind: 'tool_call', id: tu.id, name: tu.name, input: tu.input, phase: 'running' };
try {
const out = await this.executeTool(tu.name, tu.input, signal);
results.push({ type: 'tool_result', tool_use_id: tu.id, content: out });
yield { kind: 'tool_result', id: tu.id, phase: 'done', isError: false };
} catch (err) {
results.push({ type: 'tool_result', tool_use_id: tu.id, content: (err as Error).message, is_error: true });
yield { kind: 'tool_result', id: tu.id, phase: 'done', isError: true };
}
}
messages.push({ role: 'user', content: results });
}
yield { kind: 'error', message: 'Max agent turns exceeded' };
}
private async executeTool(name: string, input: unknown, signal: AbortSignal): Promise<string> {
if (name === 'get_weather') {
const { city } = input as { city: string };
// fetch RÉEL : on propage le signal pour pouvoir l'annuler aussi
return `Il fait 18°C et nuageux à ${city}.`;
}
throw new Error(`Outil inconnu : ${name}`);
}
}Le contrôleur qui expose ça en SSE, avec annulation client → serveur → provider :
// src/agent/agent.controller.ts
import { Controller, MessageEvent, Query, Req, Sse, UseGuards } from '@nestjs/common';
import { Observable } from 'rxjs';
import type { Request } from 'express';
import { AgentService } from './agent.service';
import { JwtAuthGuard } from '../auth/jwt.guard';
@Controller('agent')
@UseGuards(JwtAuthGuard)
export class AgentController {
constructor(private readonly agent: AgentService) {}
@Sse('stream')
stream(@Query('q') prompt: string, @Req() req: Request): Observable<MessageEvent> {
const ctrl = new AbortController();
req.on('close', () => ctrl.abort()); // déconnexion client -> on coupe tout en cascade
return new Observable<MessageEvent>((subscriber) => {
(async () => {
try {
for await (const ev of this.agent.run(prompt, ctrl.signal)) {
subscriber.next({ type: ev.kind, data: ev });
}
subscriber.complete();
} catch (err) {
subscriber.error(err);
}
})();
return () => ctrl.abort(); // unsubscribe -> annulation
});
}
}La chaîne d'annulation est le point de revue d'un staff engineer. req.on('close') → AbortController.abort() → { signal } passé à messages.stream() → le SDK ferme la connexion HTTP au provider. Si un seul maillon manque, vous payez des tokens Anthropic pour un client qui a fermé son onglet il y a 30 secondes.
2. Edge : idempotence, rate-limit, cost-guard
Avant même d'atteindre le LLM, un endpoint de prod a trois gardes au bord :
- Idempotence — le client envoie un
Idempotency-Key(ou on dérive ungenerationId). Si la même génération est rejouée (retry réseau, double-clic), on ne relance pas un appel LLM payant : on rattache au stream existant ou on renvoie le résultat en cache. - Rate-limit & cost-guard — par utilisateur, on plafonne le nombre de générations concurrentes ET un budget tokens/jour. Un
GuardNest qui lit un compteur Redis etthrow new HttpException(429)avant l'appel coûteux. - Comptage de tokens à la volée — on accumule
usage.output_tokens(exposé sur le chunkmessage_delta) et on persiste même un stream partiel, pour facturer une génération interrompue.
// src/agent/cost-guard.ts
import { CanActivate, ExecutionContext, Injectable, HttpException, HttpStatus } from '@nestjs/common';
import Redis from 'ioredis';
@Injectable()
export class CostGuard implements CanActivate {
private readonly redis = new Redis(process.env.REDIS_URL!);
private readonly DAILY_TOKEN_BUDGET = 1_000_000;
private readonly MAX_CONCURRENT = 3;
async canActivate(ctx: ExecutionContext): Promise<boolean> {
const req = ctx.switchToHttp().getRequest();
const userId = req.user.sub;
const day = new Date().toISOString().slice(0, 10);
const spent = Number(await this.redis.get(`tokens:${userId}:${day}`)) || 0;
if (spent >= this.DAILY_TOKEN_BUDGET) {
throw new HttpException('Daily token budget exceeded', HttpStatus.TOO_MANY_REQUESTS);
}
const active = await this.redis.incr(`active:${userId}`);
await this.redis.expire(`active:${userId}`, 300);
if (active > this.MAX_CONCURRENT) {
await this.redis.decr(`active:${userId}`);
throw new HttpException('Too many concurrent generations', HttpStatus.TOO_MANY_REQUESTS);
}
return true; // le décrément se fait dans le finalize() du stream
}
}3. Jobs IA longs avec BullMQ (hors requête HTTP)
Une analyse de contrat de 10 minutes ne tient pas dans une requête SSE (Cloudflare coupe à 100 s, le client mobile change de réseau). On bascule sur un worker BullMQ : la requête HTTP crée un job et rend immédiatement un generationId ; le client se reconnecte ensuite sur un stream SSE qui tail la progression depuis Redis (pub/sub). Trois invariants senior :
- Idempotence clé sur le
generationId—jobId: generationId. BullMQ dédoublonne : deux soumissions de la même génération = un seul job. Indispensable car les retries réseau et les redémarrages de pod rejouent les soumissions. - Retry cost-aware — on ne retente PAS aveuglément un appel LLM à 0,50 $. On distingue les erreurs retryables (429, 5xx, timeout réseau) des définitives (400 prompt invalide, refus). On persiste l'output partiel pour reprendre au lieu de tout régénérer.
- Partial-output handling — à chaque token, on publie sur
chan:gen:<id>et on persiste l'accumulateur. Si le worker meurt à 80 %, le retry reprend avec le contexte déjà généré au lieu de repartir de zéro et repayer.
// src/agent/analysis.processor.ts
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Inject, Logger } from '@nestjs/common';
import { Job, UnrecoverableError } from 'bullmq';
import Anthropic from '@anthropic-ai/sdk';
import Redis from 'ioredis';
import { ANTHROPIC } from '../llm/llm.module';
interface AnalysisJob { generationId: string; contractId: string; prompt: string; }
@Processor('ai-analysis', { concurrency: 8 })
export class AnalysisProcessor extends WorkerHost {
private readonly log = new Logger(AnalysisProcessor.name);
private readonly pub = new Redis(process.env.REDIS_URL!);
constructor(@Inject(ANTHROPIC) private readonly client: Anthropic) { super(); }
async process(job: Job<AnalysisJob>): Promise<{ tokens: number }> {
const { generationId, prompt } = job.data;
// Reprise : on récupère ce qui a déjà été généré (partial-output)
const prior = (await this.pub.get(`gen:partial:${generationId}`)) ?? '';
let acc = prior;
let tokens = 0;
try {
const stream = this.client.messages.stream({
model: 'claude-opus-4-8',
max_tokens: 8192,
messages: [{ role: 'user', content: prompt + (prior ? `\n\n[Reprends après]: ${prior.slice(-400)}` : '') }],
});
for await (const ev of stream) {
if (ev.type === 'content_block_delta' && ev.delta.type === 'text_delta') {
acc += ev.delta.text;
await this.pub.set(`gen:partial:${generationId}`, acc, 'EX', 3600);
await this.pub.publish(`chan:gen:${generationId}`, JSON.stringify({ delta: ev.delta.text }));
}
if (ev.type === 'message_delta') tokens += ev.usage?.output_tokens ?? 0;
}
await this.pub.publish(`chan:gen:${generationId}`, JSON.stringify({ done: true }));
return { tokens };
} catch (err) {
const e = err as { status?: number };
// Erreur définitive : pas de retry (gaspillage de tokens). Sinon BullMQ retente.
if (e.status === 400) throw new UnrecoverableError(`Prompt invalide: ${(err as Error).message}`);
throw err; // 429/5xx/timeout -> retry avec backoff (configuré sur la queue)
}
}
}Le contrôleur SSE qui tail le job via Redis pub/sub (le client peut se reconnecter sans relancer l'appel LLM) :
// src/agent/analysis.controller.ts
import { Controller, MessageEvent, Param, Post, Sse, Body } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { Observable } from 'rxjs';
import Redis from 'ioredis';
import { randomUUID } from 'node:crypto';
@Controller('analysis')
export class AnalysisController {
constructor(@InjectQueue('ai-analysis') private readonly queue: Queue) {}
@Post()
async submit(@Body() body: { contractId: string; prompt: string }): Promise<{ generationId: string }> {
const generationId = randomUUID();
// jobId = generationId : idempotence native, deux POST = un seul job
await this.queue.add('analyze', { generationId, ...body }, {
jobId: generationId,
attempts: 4,
backoff: { type: 'exponential', delay: 2000 },
});
return { generationId };
}
@Sse(':id/stream')
tail(@Param('id') id: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
const sub = new Redis(process.env.REDIS_URL!);
sub.subscribe(`chan:gen:${id}`);
sub.on('message', (_chan, payload) => {
const data = JSON.parse(payload);
subscriber.next({ data });
if (data.done) subscriber.complete();
});
return () => { void sub.quit(); }; // déconnexion client -> on libère la sub Redis
});
}
}4. Exposer un endpoint MCP / agent
Pour qu'un autre agent (Claude, un orchestrateur interne) consomme vos capacités, vous exposez un serveur MCP (Model Context Protocol) — typiquement en transport Streamable HTTP, qui est précisément… du SSE pour le canal serveur→client. La logique métier reste vos services Nest ; le serveur MCP n'est qu'un adaptateur qui mappe vos outils sur le protocole. Le contrôleur expose un GET SSE pour le flux d'événements et un POST pour les messages entrants, exactement comme un chat — mais le « client » est un LLM, pas un navigateur. Les mêmes pièges s'appliquent : heartbeat, Last-Event-ID, AbortController, et surtout l'idempotence/cost-guard au bord, parce qu'un agent qui boucle peut marteler votre endpoint bien plus fort qu'un humain.
🔁 Quand utiliser / éviter
À utiliser quand le flux est unidirectionnel serveur → client : streaming LLM, notifications push, logs en temps réel, dashboards de monitoring, tickers boursiers, progression d'upload en background. SSE est aussi un excellent fallback pour les environnements où WebSocket est bloqué (certaines corp avec proxy explicite).
À éviter quand on a besoin d'un canal bidirectionnel temps réel (chat texte instantané user↔user, jeu multijoueur, édition collaborative type Figma) : prendre WebSocket avec @nestjs/platform-socket.io ou ws. À éviter aussi pour des transferts binaires (vidéo, audio, fichiers) — préférer HTTP streaming brut ou WebRTC. Et bien sûr, pas de SSE en serverless Lambda court (15 min max, et chaque seconde coûte) sauf si on utilise AWS Lambda Function URLs avec streaming response (introduit en 2023, mais limité à 20 MB de payload total et coûts élevés).
HTTP/2 server push est mort en 2022 (Chrome l'a retiré). Ne pas le considérer comme alternative à SSE.
Comparaison fine SSE vs WebSocket vs polling. Le polling HTTP (setInterval(fetch, 2000)) coûte une requête complète à chaque tick (handshake TLS amorti par keep-alive, mais en-têtes complets, plus parsing). Pour un LLM chat token par token, polling est inacceptable (latence 2 s minimum entre les tokens). Long-polling (fetch qui attend jusqu'à un événement) est plus performant mais retombe sur les mêmes pièges proxy/CDN que SSE, sans les avantages (reconnect auto, framing standard).
WebSocket gagne quand : bidirectionnalité (chat user↔user, jeu, collab édition), nombreux events client→serveur, protocole binaire (PDF live, vidéo), subprotocols utiles. SSE gagne quand : unidirectionnel serveur→client, simplicité, fallback HTTP universel, reconnect auto natif.
Pour un chat LLM bidirectionnel (l'utilisateur peut « interrompre » le stream en cours), beaucoup d'équipes utilisent une combinaison : SSE pour le stream output, REST POST /chat/stop pour le signal d'interruption. Ça évite la complexité WebSocket alors qu'on a 95 % du flux qui est server→client.
🏋️ Exercices
Progression : faire marcher → rendre production-grade → casser puis réparer. Chaque exercice suppose un endpoint @Sse() qui streame Claude.
1. Implémenter le bouton « Stop » de bout en bout
Objectif : un POST /chat/stop?cid=... qui annule réellement la génération LLM en cours, et vérifier que la connexion Anthropic se ferme (pas seulement le stream Nest).
Indice/Solution : garder une Map<conversationId, AbortController> dans le service. /chat/stop appelle ctrl.abort(). Le { signal } passé à messages.stream() propage l'annulation jusqu'au provider. Test de validation : espionner client.messages.stream avec un faux AsyncGenerator infini, abort après 3 tokens, et asserter que le signal.aborted est devenu true côté upstream et qu'aucun token supplémentaire n'est yieldé.
2. Reprise idempotente via Last-Event-ID
Objectif : après une coupure réseau à mi-stream, le client se reconnecte et reprend au token k+1 sans re-streamer les tokens déjà reçus ni relancer un appel LLM payant complet.
Indice/Solution : indexer chaque MessageEvent (id: String(index)). Persister les tokens dans Redis (gen:partial:<cid>) avec TTL. À la reconnexion, lire le header Last-Event-ID, rejouer les tokens persistés > k depuis Redis, puis ne reprendre l'appel upstream que si la génération n'était pas terminée. Piège : un EventSource n'envoie Last-Event-ID que s'il a reçu au moins un id: — vérifiez que vous émettez bien l'id sur chaque frame.
3. Rendre le worker BullMQ cost-aware
Objectif : transformer le processor d'analyse pour qu'il ne retente JAMAIS une erreur définitive (400 prompt invalide, refus), retente avec backoff les erreurs transitoires (429/5xx), et reprenne un job interrompu à 80 % au lieu de tout régénérer.
Indice/Solution : throw new UnrecoverableError(...) sur status === 400 (BullMQ ne retente pas). Persister l'accumulateur à chaque token (gen:partial:<generationId>) ; au démarrage du job, relire ce préfixe et l'injecter dans le prompt de reprise. Asserter dans un test que deux queue.add avec le même jobId ne créent qu'un seul job (idempotence).
4. Casser le stream avec la compression, puis réparer
Objectif : reproduire le bug « les tokens arrivent par paquets de 5 secondes au lieu de token par token », puis le corriger.
Indice/Solution : activer le middleware compression() Express globalement → gzip bufferise text/event-stream. Le correctif : exclure le content-type SSE de la compression — compression({ filter: (req, res) => res.getHeader('content-type') !== 'text/event-stream' }). Variante : reproduire le même symptôme avec proxy_buffering on dans nginx et corriger via proxy_buffering off + X-Accel-Buffering: no.
5. Le slow-client OOM
Objectif : provoquer une montée mémoire (puis OOM) en streamant 50 KB/s vers un client qui lit à 1 KB/s, puis appliquer la backpressure.
Indice/Solution : avec @Sse() on n'a pas la main sur drain — tomber sur @Res({ passthrough: false }), écrire avec res.write() et, quand il renvoie false, await new Promise(r => res.once('drain', r)) avant de continuer. Mesurer process.memoryUsage().heapUsed avant/après. Bonus : montrer que côté Anthropic, consommer l'AsyncIterable lentement propage naturellement la backpressure jusqu'au provider (le SDK ne sur-bufferise pas).
6. Boucle agentique : tool-use serveur + timeline UI
Objectif : étendre AgentService.run() avec un second outil, et streamer une timeline d'événements outils (running → done) que l'UI peut afficher, tout en gardant l'annulation fonctionnelle pendant l'exécution d'un outil.
Indice/Solution : ajouter l'outil au tableau TOOLS et au switch de executeTool. Yielder un AgentEvent de type tool_call (phase running) avant l'exécution et tool_result (phase done) après. Passer le signal aux fetch internes de l'outil pour qu'un abort pendant un appel d'outil long le coupe aussi. Test : injecter un faux modèle qui renvoie un tool_use, vérifier que le résultat est bien re-soumis (role: 'user', tool_result) et que la boucle s'arrête sur stop_reason !== 'tool_use'.
🎤 En entretien
Q : Le client ferme son onglet à mi-génération. Décris précisément ce qui doit se passer côté serveur, et le coût si ça ne se passe pas. R : req.on('close') (ou l'unsubscribe de l'Observable) doit déclencher AbortController.abort(), qui via le { signal } passé au SDK ferme la connexion HTTP au provider LLM. Sans cette chaîne, le serveur continue à consommer et facturer les tokens jusqu'au bout de la génération pour des données jetées — typiquement 30 % de gaspillage sur un chat réel, et des connexions upstream qui fuient jusqu'à épuiser le pool de sockets.
Q : Pourquoi SSE plutôt que WebSocket pour un chat LLM, et quelle est la limite ? R : Le flux est à ~95 % unidirectionnel (serveur→client, tokens), et SSE offre gratuitement le reconnect auto + Last-Event-ID, passe les proxies/CDN d'entreprise, et ne demande aucune lib client. La limite est la bidirectionnalité : pour l'interruption on ajoute un simple POST /chat/stop, ce qui évite toute la complexité WebSocket. On bascule sur WebSocket seulement si on a beaucoup d'events client→serveur ou du binaire.
Q : Comment garantis-tu l'idempotence d'une génération IA déclenchée par une requête qui peut être rejouée (retry réseau, double-clic, redémarrage de pod) ? R : On dérive un generationId et on s'en sert comme jobId BullMQ : deux soumissions identiques = un seul job. On persiste l'output partiel à chaque token ; un retry reprend avec le contexte déjà généré au lieu de relancer un appel complet. Et on distingue les erreurs retryables (429/5xx) des définitives (400) pour ne pas re-brûler des tokens sur un prompt invalide.
Q : Tes tokens arrivent en rafales de plusieurs secondes au lieu de token par token. Comment tu débugges ? R : C'est presque toujours du buffering sur un maillon intermédiaire. Je vérifie dans l'ordre : (1) compression gzip activée sur text/event-stream (Express compression ou nginx gzip) — à exclure par content-type ; (2) proxy_buffering nginx — à passer à off + émettre X-Accel-Buffering: no depuis Nest ; (3) un CDN qui met en cache la réponse — forcer Cache-Control: no-cache, no-transform. Je confirme en curl -N directement sur le pod Nest pour isoler la couche fautive.
🔗 Liens
- Spec WHATWG :
https://html.spec.whatwg.org/multipage/server-sent-events.html - Nest docs :
https://docs.nestjs.com/techniques/server-sent-events - Anthropic streaming :
https://docs.anthropic.com/en/api/messages-streaming - Anthropic tool use (boucle agentique) :
https://docs.anthropic.com/en/docs/build-with-claude/tool-use - Model Context Protocol (transport Streamable HTTP / SSE) :
https://modelcontextprotocol.io/ - BullMQ (jobs IA) :
https://docs.bullmq.io/ - OpenAI streaming :
https://platform.openai.com/docs/api-reference/streaming - Bedrock streaming :
https://docs.aws.amazon.com/bedrock/latest/userguide/streaming.html - nginx SSE config :
https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering - Cloudflare timeout limits :
https://developers.cloudflare.com/workers/platform/limits/ @anthropic-ai/sdknpm :https://www.npmjs.com/package/@anthropic-ai/sdk- RxJS
from(asyncIterable):https://rxjs.dev/api/index/function/from