WebSockets & SSE dans NestJS
TL;DR — Nest offre des Gateways unifiés pour Socket.IO et WebSocket natif (
ws). On y ajoute namespaces, rooms, et un adapter Redis pour scaler horizontalement (sinon broadcast cross-instance impossible). Pour du push simple unidirectionnel,@Sse(Server-Sent Events) est souvent suffisant, plus léger qu'un WS, traverse mieux les proxies HTTP. Auth WS = jamais via cookie naïf, toujours token vérifié auhandshake.
🧠 Mental model — ASCII diagram + analogy
Analogie : un WS est un téléphone décroché (canal bidirectionnel persistant). SSE est une radio (un sens seulement, serveur → client, mais simple et fiable). HTTP classique = télégrammes.
┌────────────────────┐
│ Client (browser) │
└─────────┬──────────┘
│ ws://... or http (SSE)
▼
┌──────────────────────┐
│ Nest Gateway/SSE │
│ @SubscribeMessage │
│ @Sse('stream') │
└──────────┬───────────┘
│
▼
┌──────────────────────────┐
│ Redis adapter (pub/sub) │ ◀── scaling horizontal
└──────────┬───────────────┘
│
┌──────────┴───────────┬─────────────────┐
▼ ▼ ▼
instance-1 instance-2 instance-3Sans Redis adapter, un user connecté à instance-1 ne recevra jamais un emit fait depuis instance-2. Indispensable dès qu'il y a > 1 replica.
Comment un staff engineer raisonne sur le choix WS / SSE / polling
Le réflexe junior est « temps réel = WebSocket ». Le réflexe staff est l'inverse : commence par le transport le plus simple qui satisfait la contrainte, et ne monte en complexité que sous pression d'une exigence mesurable. Le coût caché d'un WS n'est pas dans le code applicatif — c'est dans l'opérationnel : sticky sessions ou adapter pub/sub, observabilité des connexions ouvertes, heartbeats, reconnexion, drain au déploiement (rolling update qui coupe 50 K sockets d'un coup), et un protocole binaire que tu ne peux pas curl.
| Question à se poser | Si oui → | Pourquoi |
|---|---|---|
| Le client doit-il envoyer des messages à faible latence ? | WebSocket | SSE est mono-directionnel ; un POST séparé pour l'upstream casse l'ordre et double la latence |
| Le flux est-il serveur → client uniquement (notifs, ticks, tokens LLM) ? | SSE | HTTP standard, reconnexion native, débuggable, multiplexé HTTP/2 |
| < 1 update/min, données non critiques ? | Polling | Zéro connexion persistante = zéro problème de drain/scale/heartbeat |
| Besoin de rooms ciblées + multi-instance ? | WS + adapter Redis | SSE ciblé impose un bus côté serveur de toute façon |
| Trafic mobile / réseaux hostiles (proxies d'entreprise) ? | SSE > WS | Le Upgrade WS est souvent bloqué ; SSE passe partout où HTTP passe |
Le piège du "on prendra WS au cas où". Un canal bidirectionnel non utilisé est de la dette : tu paies l'opérationnel d'un WS pour un usage SSE. Si 95 % du trafic est serveur→client et 5 % est un clic occasionnel, fais du SSE + un endpoint REST pour les 5 %. C'est plus simple à scaler, à monitorer et à drainer.
Tableau de comparaison des coûts opérationnels
| Préoccupation | Polling | SSE | WebSocket (Socket.IO + Redis) |
|---|---|---|---|
| Connexions ouvertes simultanées | 0 (rafales) | N | N |
| Mémoire/connexion serveur | ~0 | ~10–50 KB (buffer HTTP) | ~30–100 KB (état socket + buffers) |
| Scale horizontal | trivial | trivial (pas de sticky si stateless) | adapter pub/sub obligatoire |
| Drain au rolling deploy | gratuit | client EventSource reconnecte seul | il faut gérer le reconnect + re-auth + re-join rooms |
| Débuggabilité | curl | curl -N | wscat / outils dédiés, binaire |
| Backpressure | N/A | géré par TCP + le navigateur | à gérer manuellement (voir pitfall #4) |
🛠️ Code minimal — realistic working snippet
Gateway Socket.IO avec namespace + room
import {
WebSocketGateway, WebSocketServer, SubscribeMessage,
OnGatewayConnection, OnGatewayDisconnect, MessageBody, ConnectedSocket,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway({
namespace: '/chat',
cors: { origin: process.env.WEB_ORIGIN, credentials: true },
})
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() server: Server;
async handleConnection(client: Socket) {
const token = client.handshake.auth?.token;
const user = await this.auth.verify(token);
if (!user) return client.disconnect(true);
client.data.user = user;
}
handleDisconnect(client: Socket) {
// cleanup, logs
}
@SubscribeMessage('joinRoom')
joinRoom(@ConnectedSocket() client: Socket, @MessageBody() roomId: string) {
client.join(`room:${roomId}`);
return { ok: true }; // ack envoyé au client (callback)
}
@SubscribeMessage('message')
onMessage(@ConnectedSocket() client: Socket, @MessageBody() data: { roomId: string; text: string }) {
const user = client.data.user;
this.server.to(`room:${data.roomId}`).emit('message', {
from: user.id, text: data.text, at: Date.now(),
});
}
}Redis adapter pour scaling
// redis-io.adapter.ts
import { IoAdapter } from '@nestjs/platform-socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { ServerOptions } from 'socket.io';
export class RedisIoAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter>;
async connectToRedis(): Promise<void> {
const pub = createClient({ url: process.env.REDIS_URL });
const sub = pub.duplicate();
await Promise.all([pub.connect(), sub.connect()]);
this.adapterConstructor = createAdapter(pub, sub);
}
createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options);
server.adapter(this.adapterConstructor);
return server;
}
}
// main.ts
const adapter = new RedisIoAdapter(app);
await adapter.connectToRedis();
app.useWebSocketAdapter(adapter);SSE pour push unidirectionnel
import { Controller, Sse, MessageEvent } from '@nestjs/common';
import { interval, map, Observable } from 'rxjs';
@Controller('events')
export class EventsController {
constructor(private bus: EventEmitter2) {}
@Sse('stream')
stream(): Observable<MessageEvent> {
return new Observable((subscriber) => {
const handler = (data: any) => subscriber.next({ data });
this.bus.on('notification', handler);
return () => this.bus.off('notification', handler);
});
}
// Heartbeat pour traverser proxies/load balancers
// Le champ `type: 'ping'` permet au client de filtrer ces events (addEventListener('ping'))
// sans polluer le handler `message` par défaut.
@Sse('heartbeat')
heartbeat(): Observable<MessageEvent> {
return interval(15_000).pipe(map(() => ({ data: 'pong', type: 'ping' })));
}
}Production : commentaire SSE pour le keep-alive. Un
MessageEventNest est sérialisé endata: .... Pour un vrai keep-alive invisible côté client, certains préfèrent une ligne de commentaire SSE (: keepalive\n\n) que le navigateur ignore. Nest ne l'expose pas via@Sse, mais tu peux écrire dans leResponsebrut (@Res()) si tu as ce besoin. Dans 95 % des cas, un eventpingtypé suffit et reste observable.
🎯 Patterns courants
- Auth au handshake — vérifie le JWT dans
handleConnectionAVANT toute logique. Disconnect immédiat si invalide. Ne stocke jamais le secret sur le client. - Rooms par ressource —
room:order:${orderId},room:user:${userId}. Permet broadcast ciblé sans connaître les socket IDs. - Guards et pipes sur événements —
@UseGuards(WsJwtGuard)+@UsePipes(new ValidationPipe())fonctionnent sur@SubscribeMessage. Même DTO validation qu'en HTTP. - Backpressure côté serveur — Socket.IO bufferise. Si un client est lent, la mémoire explose. Surveille
client.conn.transport.writeBuffer.length, déconnecte si > seuil. - Ack + retries client-side —
client.emit('msg', data, (ack) => { ... })côté client garantit la livraison. Nest renvoie l'ack enreturndu handler. - Reconnexion exponentielle — Socket.IO le fait par défaut. Pour
wsnatif, implémente côté client avec jitter (1s, 2s, 4s, 8s + random).
🔄 Versions — Nest 7 / 8 / 9 / 10 / 11
- Nest 7 : Socket.IO 2.x par défaut, protocol v3, peu compatible avec les clients modernes.
- Nest 8 : passage Socket.IO 4.x, protocol v4 (breaking pour clients v2). Installe
@nestjs/platform-socket.ioséparément. - Nest 9 :
@nestjs/websocketsstable, support natif Fastify WS via@nestjs/platform-ws. - Nest 10 : RxJS 7 obligatoire pour SSE.
@SseretourneObservable<MessageEvent>strict. - Nest 11 : Socket.IO 4.7+, support TypeScript 5, types
Servergénériques (Server<ListenEvents, EmitEvents, ServerSideEvents, SocketData>).
Library notes :
- Socket.IO 2 → 4 : protocole incompatible, clients web doivent upgrader (
socket.io-client@4). @socket.io/redis-adapterv7+ : utiliseredisv4 (clients await connect). Anciens projets surioredis-adapter→ migre.- Pour
wsnatif (sans Socket.IO) :@nestjs/platform-ws, plus léger mais pas de rooms/namespaces auto, à recoder.
⚠️ Pitfalls
- Pas d'adapter Redis avec multi-instances — un emit sur instance-A n'atteint pas les clients sur instance-B. Bug fantôme en prod.
- Auth via query string
?token=...— le token finit dans les logs nginx, dans le navigateur history, etc. Utiliseauth: { token }dans le handshake Socket.IO (header WS). - Cookies + cross-origin —
cors.credentials: true+withCredentials: truecôté client + headers exacts. 90 % des bugs WS prod = CORS. - Reverse proxy sans
Upgradeheader — nginx/HAProxy doit forwarderConnection: UpgradeetUpgrade: websocket, avecproxy_read_timeout> durée connexion (sinon coupure périodique). - SSE derrière un buffer (nginx, CloudFront) — la réponse est bufferisée, le client ne voit rien. Ajoute
X-Accel-Buffering: noou désactive le buffer. - Pas de heartbeat — load balancers ferment les connexions idle après 60 s. Envoie un ping/pong toutes les 25 s (Socket.IO le fait, à toi pour
wsnatif et SSE). - Stockage de session in-memory —
client.data.userest local à l'instance. Pour des opérations cross-instance (kick, broadcast user), stocke aussi en Redis (set des socketIds par userId). - Pas de rate-limit sur les events — un client peut spammer 10k messages/s. Throttle côté gateway (
@nestjs/throttlerne marche pas direct sur WS, implémente manuellement avec un compteur Redis par userId).
📊 Observabilité & production — ce qu'un staff engineer instrumente
Un WS/SSE qui « marche en local » et un qui survit à 50 K connexions en prod sont deux systèmes différents. La différence se joue sur ce que tu mesures et ce que tu fais au déploiement.
Métriques à exposer (Prometheus / OpenTelemetry)
| Métrique | Type | Pourquoi |
|---|---|---|
ws_connections_active | gauge | détecte les fuites (sockets jamais fermés) et dimensionne la mémoire |
ws_connections_total / ws_disconnections_total{reason} | counter | un pic de disconnects = problème réseau, deploy, ou kick massif |
ws_messages_in_total / ws_messages_out_total{event} | counter | volume par type d'event, base du capacity planning |
ws_handshake_auth_failures_total | counter | un pic = attaque ou token expiré côté client |
ws_send_buffer_bytes (histogram) | histogram | révèle les clients lents avant l'OOM (voir backpressure) |
sse_active_streams | gauge | équivalent SSE des connexions ouvertes |
llm_stream_aborted_total{reason} | counter | combien d'annulations client → tokens économisés |
@WebSocketGateway()
export class InstrumentedGateway implements OnGatewayConnection, OnGatewayDisconnect {
constructor(@InjectMetric('ws_connections_active') private gauge: Gauge) {}
handleConnection() { this.gauge.inc(); }
handleDisconnect() { this.gauge.dec(); }
}Le problème du déploiement : drainer proprement
Un rolling deploy tue le process → tous les sockets de l'instance tombent simultanément. Sans précaution : 50 K clients se reconnectent dans la même seconde (thundering herd), saturent le nouveau pod, qui tombe à son tour.
// Drain gracieux : on prévient les clients, on laisse le temps de reconnecter ailleurs
async function gracefulShutdown(server: Server) {
server.emit('server:draining', { reconnectInMs: jitter(0, 30_000) }); // jitter côté client
server.disconnectSockets(false); // ferme proprement, le client reconnecte avec backoff
await new Promise((r) => setTimeout(r, 5_000));
}Côté client : reconnexion avec jitter (base * 2^attempt + random(0, base)) — jamais de reconnexion synchrone, sinon tu reconstruis le thundering herd à chaque deploy.
Tracing distribué d'une connexion
Le socketId n'est pas un trace ID. Propage un correlationId du handshake jusqu'aux logs et aux events Redis : un bug « le user X ne reçoit pas ses ticks » se diagnostique en suivant correlationId à travers instance → Redis PUBLISH → instance → emit. Sans ça, tu débugges à l'aveugle sur 3 replicas.
Sécurité runtime souvent oubliée
- Limite de connexions par IP/user — un seul user qui ouvre 10 K sockets = DoS. Compteur Redis
connections:${userId}, refus au-delà du seuil auhandshake. - Taille max des messages entrants — Socket.IO
maxHttpBufferSize(défaut 1 MB) ; baisse-le si tes events sont petits, sinon un client envoie 1 MB × 10k/s. - Validation stricte au handshake — origin allow-list et vérification du token, jamais l'un sans l'autre.
🧪 Testing
Unit — instancie le gateway avec un mock de Server :
const server = { to: jest.fn().mockReturnThis(), emit: jest.fn() } as any;
const gw = new ChatGateway(authMock);
gw.server = server;
gw.onMessage({ data: { user: { id: 'u1' } } } as any, { roomId: 'r1', text: 'hi' });
expect(server.to).toHaveBeenCalledWith('room:r1');
expect(server.emit).toHaveBeenCalledWith('message', expect.any(Object));E2E — démarre l'app et connecte un vrai client :
import { io, Socket } from 'socket.io-client';
let client: Socket;
beforeAll(async () => {
await app.listen(0);
const url = await app.getUrl();
client = io(`${url}/chat`, { auth: { token: 'valid-jwt' } });
await new Promise((r) => client.on('connect', r));
});
it('joins a room and receives broadcast', (done) => {
client.on('message', (msg) => {
expect(msg.text).toBe('hello');
done();
});
client.emit('joinRoom', 'r1', () => {
client.emit('message', { roomId: 'r1', text: 'hello' });
});
});SSE — utilise EventSource côté Node (eventsource package) ou un raw fetch streaming + parse data: lines.
🎬 Cas d'usage concrets
FinTech — Trading live ticks
Qui — Plateforme française de trading retail multi-asset (actions, ETF, crypto). Problème — 50 K utilisateurs simultanés qui veulent voir le prix de leurs watchlists en temps réel, avec une latence inférieure à 200 ms. Le polling REST tuerait le backend. Comment — Gateway WebSocket avec Redis Pub/Sub pour broadcaster les ticks, abonnement par symbole pour limiter la bande passante.
@WebSocketGateway({ namespace: 'market', cors: true })
export class MarketGateway {
@WebSocketServer() server!: Server;
constructor(private redis: RedisClient) {
this.redis.subscribe('tick:*', (channel, message) => {
const symbol = channel.split(':')[1];
this.server.to(`sym:${symbol}`).emit('tick', JSON.parse(message));
}, { pattern: true });
}
@SubscribeMessage('watch')
watch(@MessageBody() body: { symbols: string[] }, @ConnectedSocket() client: Socket) {
for (const sym of body.symbols.slice(0, 50)) client.join(`sym:${sym}`);
return { watching: body.symbols };
}
@SubscribeMessage('unwatch')
unwatch(@MessageBody() body: { symbols: string[] }, @ConnectedSocket() client: Socket) {
for (const sym of body.symbols) client.leave(`sym:${sym}`);
}
}Gains — Latence p95 80 ms, scaling horizontal trivial via l'adapter Redis, push uniquement aux clients intéressés.
E-commerce — Live cart sync
Qui — Pure player français mode qui veut synchroniser le panier entre desktop et mobile d'un même utilisateur en temps réel. Problème — Sans sync live, l'utilisateur ajoute un article sur mobile et ne le voit pas sur desktop sans refresh, source d'abandon de panier. Comment — Room par userId (authentifiée via JWT à l'upgrade), broadcast des mutations de panier.
@WebSocketGateway({ namespace: 'cart' })
@UseGuards(WsJwtAuthGuard)
export class CartGateway implements OnGatewayConnection {
@WebSocketServer() server!: Server;
async handleConnection(client: Socket) {
const user = client.data.user as AuthUser;
if (!user) return client.disconnect();
client.join(`user:${user.id}`);
}
pushCartUpdate(userId: string, cart: Cart) {
this.server.to(`user:${userId}`).emit('cart.updated', cart);
}
}
@Injectable()
export class CartService {
constructor(private gateway: CartGateway) {}
async addItem(userId: string, sku: string, qty: number) {
const cart = await this.repo.addItem(userId, sku, qty);
this.gateway.pushCartUpdate(userId, cart);
return cart;
}
}Gains — Taux de conversion +4% sur les utilisateurs multi-devices, UX cohérente, code service inchangé (sauf 1 ligne).
Industrie — Helpdesk alertes
Qui — Usine française d'embouteillage qui supervise 80 machines avec capteurs IoT. Problème — Le superviseur en salle de contrôle doit recevoir les alertes de température, vibration, débit en moins d'une seconde, sans installer une vraie app desktop. Le navigateur intranet doit suffire. Comment — SSE depuis Nest vers le navigateur, le backend agrège les events MQTT et les pousse via Observable.
@Controller('alerts')
export class AlertController {
constructor(private bus: AlertBus) {}
@Sse('stream')
stream(@Query('lineIds') lineIds: string): Observable<MessageEvent> {
const wanted = lineIds.split(',');
return this.bus.alerts$.pipe(
filter((a) => wanted.includes(a.lineId)),
map((a) => ({ data: a, type: a.severity, id: a.id })),
);
}
}
@Injectable()
export class AlertBus {
private subject = new Subject<Alert>();
alerts$ = this.subject.asObservable();
constructor(mqtt: MqttClient) {
mqtt.subscribe('sensors/+/alert');
mqtt.on('message', (topic, payload) => {
this.subject.next(JSON.parse(payload.toString()));
});
}
}Gains — Pas d'installation client, reverse-proxy nginx standard, latence p95 350 ms incluant le MQTT broker.
🛠️ Exemple end-to-end
Contexte — La plateforme de trading ci-dessus déploie un endpoint complet : authentification JWT à l'upgrade, abonnement watchlists, rate-limiting par utilisateur, reconnexion gracieuse, et envoi initial du snapshot de prix au moment de l'abonnement pour éviter le "flash blanc".
// src/market/market.gateway.ts
@WebSocketGateway({
namespace: 'market',
cors: { origin: process.env.WEB_ORIGIN, credentials: true },
transports: ['websocket'],
})
@UseFilters(WsExceptionFilter)
export class MarketGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() server!: Server;
private readonly logger = new Logger(MarketGateway.name);
private readonly limiter = new Map<string, { count: number; reset: number }>();
constructor(
private jwt: JwtService,
private prices: PriceService,
@Inject('REDIS_SUB') private sub: Redis,
) {}
afterInit() {
this.sub.psubscribe('tick:*');
this.sub.on('pmessage', (_pattern, channel, message) => {
const symbol = channel.slice(5);
try {
const payload = JSON.parse(message);
this.server.to(`sym:${symbol}`).emit('tick', payload);
} catch (e) {
this.logger.error(`bad tick payload on ${channel}`, e);
}
});
}
async handleConnection(client: Socket) {
try {
const token =
client.handshake.auth?.token ?? client.handshake.headers.authorization?.replace(/^Bearer\s+/, '');
if (!token) return client.disconnect(true);
const payload = await this.jwt.verifyAsync<{ sub: string; tier: 'free' | 'pro' }>(token);
client.data.user = { id: payload.sub, tier: payload.tier };
client.data.subs = new Set<string>();
client.join(`user:${payload.sub}`);
this.logger.log(`connected user=${payload.sub} sid=${client.id}`);
} catch (e) {
this.logger.warn(`auth failed sid=${client.id}: ${(e as Error).message}`);
client.disconnect(true);
}
}
handleDisconnect(client: Socket) {
this.limiter.delete(client.id);
this.logger.log(`disconnected sid=${client.id}`);
}
@SubscribeMessage('watch')
async watch(
@MessageBody() body: { symbols: string[] },
@ConnectedSocket() client: Socket,
) {
if (!this.rateLimit(client)) {
return { ok: false, error: 'RATE_LIMITED' };
}
const max = client.data.user.tier === 'pro' ? 200 : 30;
const subs: Set<string> = client.data.subs;
const requested = body.symbols.slice(0, max);
const toAdd = requested.filter((s) => !subs.has(s));
const toRemove = [...subs].filter((s) => !requested.includes(s));
for (const sym of toAdd) {
client.join(`sym:${sym}`);
subs.add(sym);
}
for (const sym of toRemove) {
client.leave(`sym:${sym}`);
subs.delete(sym);
}
// Send initial snapshot so UI never shows blank cells
if (toAdd.length) {
const snapshot = await this.prices.snapshot(toAdd);
client.emit('snapshot', snapshot);
}
return { ok: true, watching: [...subs] };
}
@SubscribeMessage('ping')
ping() { return { pong: Date.now() }; }
private rateLimit(client: Socket): boolean {
const now = Date.now();
const entry = this.limiter.get(client.id) ?? { count: 0, reset: now + 1000 };
if (now > entry.reset) { entry.count = 0; entry.reset = now + 1000; }
entry.count += 1;
this.limiter.set(client.id, entry);
return entry.count <= 20; // 20 messages/s per socket
}
}// src/market/price.service.ts
@Injectable()
export class PriceService {
constructor(@Inject('REDIS') private redis: Redis) {}
async snapshot(symbols: string[]): Promise<Array<{ symbol: string; bid: number; ask: number; ts: number }>> {
if (!symbols.length) return [];
const pipeline = this.redis.pipeline();
for (const s of symbols) pipeline.hgetall(`px:${s}`);
const results = await pipeline.exec();
return symbols.map((s, i) => {
const row = results![i][1] as Record<string, string>;
return { symbol: s, bid: Number(row.bid), ask: Number(row.ask), ts: Number(row.ts) };
});
}
}// src/market/market.module.ts
@Module({
imports: [
JwtModule.registerAsync({
useFactory: (cfg: ConfigService) => ({
secret: cfg.get('JWT_SECRET'),
signOptions: { expiresIn: '1h' },
}),
inject: [ConfigService],
}),
],
providers: [
MarketGateway, PriceService, WsExceptionFilter,
{ provide: 'REDIS', useFactory: () => new Redis(process.env.REDIS_URL!) },
{ provide: 'REDIS_SUB', useFactory: () => new Redis(process.env.REDIS_URL!) },
],
})
export class MarketModule {}
// main.ts
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const adapter = new IoRedisAdapter(app);
await adapter.connectToRedis();
app.useWebSocketAdapter(adapter);
await app.listen(3000);
}L'authentification est faite à l'upgrade (handleConnection) — un client non authentifié est jeté avant tout SubscribeMessage. L'adapter Redis (IoRedisAdapter) propage les emit entre instances, indispensable pour scaler horizontalement. Le snapshot initial évite le flash blanc côté UI, et le rate-limit par socket protège des clients buggés qui réabonnent en boucle.
🤖 Servir un agent IA : streaming LLM, tool-use loop, annulation
C'est le cas d'usage moderne de @Sse et des Gateways dans une stack NestJS : exposer Claude (ou un agent outillé) à un front Angular en streamant les tokens au fil de l'eau. SSE est presque toujours le bon choix ici — le flux est unidirectionnel (serveur → client), traverse les proxies, et le SDK Anthropic produit un AsyncIterable qui se mappe trivialement sur un Observable.
Le mental model d'un staff engineer
Trois invariants gouvernent un endpoint LLM streamé en production :
- Le client peut partir à tout moment (onglet fermé, navigation, bouton Stop). Si tu ne propages pas l'annulation jusqu'à l'API LLM, tu continues à payer des tokens pour une réponse que personne ne lit. Un
AbortControllercâblé sur leclosede la requête est non négociable. - Le coût est asymétrique et non remboursable. Un
emitWS perdu coûte 0 ; un appel LLM coûte de l'argent réel à chaque token. Idempotence, garde de coût et rate-limit vivent au bord (au moment du@Sse/@SubscribeMessage), pas dans le service métier. - Le client LLM est une dépendance injectée, pas un
new Anthropic()dans un champ. Tu veux le mocker en test, le configurer viaforRootAsync, et bénéficier des retries SDK (429/5xx avec backoff exponentiel,maxRetries: 2par défaut).
Faits Anthropic (2026) — modèles phares :
claude-opus-4-8(le plus capable),claude-sonnet-4-6(équilibre vitesse/intelligence),claude-haiku-4-5(rapide/économique). Toujours utiliser l'API streaming pour les sorties longues (évite les timeouts HTTP) et laisser le SDK gérer les retries. Sur Opus 4.8 :thinking: { type: 'adaptive' }, jamaisbudget_tokens(400). Pas detemperature/top_p(400).
Le client LLM injecté via forRootAsync (pas de new Anthropic() baladeur)
// llm.module.ts
import { Module, DynamicModule } from '@nestjs/common';
import Anthropic from '@anthropic-ai/sdk';
export const ANTHROPIC = Symbol('ANTHROPIC');
@Module({})
export class LlmModule {
static forRootAsync(): DynamicModule {
return {
module: LlmModule,
global: true,
providers: [
{
provide: ANTHROPIC,
inject: [ConfigService],
useFactory: (cfg: ConfigService) =>
new Anthropic({
apiKey: cfg.getOrThrow('ANTHROPIC_API_KEY'),
maxRetries: 2, // retries SDK : 429/5xx avec backoff exponentiel
timeout: 60_000, // par requête (le streaming garde la connexion vivante)
}),
},
],
exports: [ANTHROPIC],
};
}
}Injecté partout via @Inject(ANTHROPIC) private anthropic: Anthropic — testable, configurable, un seul point de vérité.
SSE qui streame les tokens Claude, avec annulation au disconnect
Le piège classique : un @Sse Nest renvoie un Observable, mais l'AsyncIterable du SDK doit être annulable. On câble un AbortController et on l'abandonne dès que l'Observable est désabonné (ce qui arrive quand le client SSE ferme la connexion).
// chat.controller.ts
import { Controller, Sse, Query, MessageEvent, Inject } from '@nestjs/common';
import { Observable } from 'rxjs';
import Anthropic from '@anthropic-ai/sdk';
@Controller('chat')
export class ChatController {
constructor(@Inject(ANTHROPIC) private anthropic: Anthropic) {}
@Sse('stream')
stream(@Query('q') prompt: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
const ac = new AbortController();
(async () => {
try {
const stream = this.anthropic.messages.stream(
{
model: 'claude-opus-4-8',
max_tokens: 4096,
thinking: { type: 'adaptive' }, // pas de budget_tokens sur Opus 4.8
messages: [{ role: 'user', content: prompt }],
},
{ signal: ac.signal }, // <-- propage l'annulation jusqu'à l'API
);
stream.on('text', (delta) => {
subscriber.next({ data: { type: 'token', text: delta } });
});
const final = await stream.finalMessage();
subscriber.next({ data: { type: 'done', usage: final.usage } });
subscriber.complete();
} catch (err) {
if (ac.signal.aborted) return; // annulation propre, pas une erreur
subscriber.next({ data: { type: 'error' }, type: 'error' });
subscriber.complete();
}
})();
// Teardown : appelé quand le client SSE se déconnecte (EventSource.close())
return () => ac.abort(); // <-- coupe le flux LLM, arrête la facturation
});
}
}Le return () => ac.abort() est le cœur du sujet : RxJS appelle cette fonction de cleanup dès que l'Observable n'a plus d'abonné, c'est-à-dire dès que le client ferme l'EventSource. Sans ça, Claude continue de générer (et tu paies) jusqu'à end_turn dans le vide.
Détail SDK qui mord en prod.
this.anthropic.messages.stream(...)retourne unMessageStream(async-iterable, avec.on('text')et.finalMessage()). Quand lesignalest aborté,finalMessage()rejette avec uneAPIUserAbortError. C'est pour ça qu'on garde l'(async () => { ... })()dans untry/catchet qu'on testeac.signal.aborteden premier dans lecatch: un abort n'est pas une erreur applicative, c'est le comportement nominal. Le SDK gère les retries (429/5xx,maxRetries: 2) automatiquement à l'intérieur du stream — tu n'as pas à les recoder.
Suivre le coût et l'usage (sans quoi tu pilotes à l'aveugle)
Le finalMessage() porte usage.input_tokens / usage.output_tokens / usage.cache_read_input_tokens. Logge-les systématiquement, keyés par user et par génération — c'est ta seule source de vérité pour la facturation interne, le capacity planning, et la détection d'abus.
const final = await stream.finalMessage();
this.metrics.recordLlmUsage({
userId,
generationId,
model: 'claude-opus-4-8',
inputTokens: final.usage.input_tokens,
outputTokens: final.usage.output_tokens,
cacheReadTokens: final.usage.cache_read_input_tokens ?? 0,
// coût estimé : Opus 4.8 = $5/MTok in, $25/MTok out (cache read ~0.1x in)
});
subscriber.next({ data: { type: 'done', usage: final.usage } });Sur un abort, tu ne récupères pas le usage complet — d'où l'intérêt de compter les tokens streamés côté serveur (ou d'accepter la perte de précision sur les annulations, qui sont par nature partielles).
Le piège de la sortie buffer. Nginx/CloudFront bufferisent les réponses HTTP par défaut → le client ne voit aucun token avant la fin. Ajoute l'en-tête
X-Accel-Buffering: no(déjà couvert en pitfall #5) — sans lui, le streaming SSE est invisible.
La boucle agentique (tool-use) côté serveur
Quand l'agent doit appeler des outils (recherche, accès DB, API métier), la boucle vit côté serveur : tu streames vers le client les tokens et la trace des appels d'outils, et tu renvoies les résultats à Claude jusqu'à end_turn. Le client n'exécute aucun outil — il observe.
@Sse('agent')
agent(@Query('q') prompt: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
const ac = new AbortController();
const messages: Anthropic.MessageParam[] = [{ role: 'user', content: prompt }];
(async () => {
try {
while (true) {
const res = await this.anthropic.messages.create(
{ model: 'claude-opus-4-8', max_tokens: 4096, tools: this.tools, messages },
{ signal: ac.signal },
);
// Stream la trace des outils vers le front (timeline pending|running|done)
for (const block of res.content) {
if (block.type === 'tool_use') {
subscriber.next({ data: { type: 'tool_call', id: block.id, name: block.name } });
} else if (block.type === 'text') {
subscriber.next({ data: { type: 'text', text: block.text } });
}
}
if (res.stop_reason !== 'tool_use') {
subscriber.next({ data: { type: 'done', usage: res.usage } });
break;
}
// Exécute les outils côté serveur, renvoie les résultats à Claude
messages.push({ role: 'assistant', content: res.content });
const toolResults: Anthropic.ToolResultBlockParam[] = [];
for (const block of res.content) {
if (block.type === 'tool_use') {
const out = await this.runTool(block.name, block.input); // garde tes secrets côté serveur
toolResults.push({ type: 'tool_result', tool_use_id: block.id, content: out });
subscriber.next({ data: { type: 'tool_result', id: block.id } });
}
}
messages.push({ role: 'user', content: toolResults });
}
subscriber.complete();
} catch (err) {
if (!ac.signal.aborted) subscriber.next({ data: { type: 'error' }, type: 'error' });
subscriber.complete();
}
})();
return () => ac.abort();
});
}BullMQ pour les jobs IA longs (idempotence, retry conscient du coût, sortie partielle)
Pour un agent qui tourne plusieurs minutes (Opus 4.8 sur du long-horizon peut prendre 10-15 min sur une seule requête), ne tiens pas une connexion SSE ouverte tout du long : lance un job BullMQ et streame l'avancement via un canal séparé (WS room ou SSE re-connectable par generationId).
@Processor('ai-generation')
export class AiGenerationProcessor {
constructor(@Inject(ANTHROPIC) private anthropic: Anthropic, private redis: Redis) {}
async process(job: Job<{ generationId: string; prompt: string }>) {
const { generationId, prompt } = job.data;
// Idempotence : clé sur le generationId, pas le job.id (les retries BullMQ rejouent)
const done = await this.redis.get(`gen:${generationId}:done`);
if (done) return JSON.parse(done); // déjà généré → ne re-paie pas l'appel LLM
const stream = this.anthropic.messages.stream({
model: 'claude-opus-4-8',
max_tokens: 8192,
thinking: { type: 'adaptive' },
messages: [{ role: 'user', content: prompt }],
});
let partial = '';
stream.on('text', async (delta) => {
partial += delta;
// Sortie partielle persistée : si le worker crash, on reprend / on affiche ce qu'on a
await this.redis.set(`gen:${generationId}:partial`, partial, 'EX', 3600);
await job.updateProgress({ chars: partial.length });
});
const final = await stream.finalMessage();
const result = { text: partial, usage: final.usage };
await this.redis.set(`gen:${generationId}:done`, JSON.stringify(result), 'EX', 86400);
return result;
}
}| Concern | Pattern | Pourquoi |
|---|---|---|
| Idempotence | clé Redis sur generationId (fourni par le client, pas auto-généré) | un retry BullMQ ne doit jamais re-déclencher un appel LLM payant déjà réussi |
| Retry conscient du coût | attempts: 2 côté BullMQ, mais garde l'idempotence ci-dessus | retry sur erreur réseau, pas sur une génération déjà aboutie |
| Sortie partielle | persiste partial à chaque delta | crash worker → on affiche/reprend le partiel au lieu de tout reperdre |
| Garde de coût | compteur Redis incr cost:user:${id} avec TTL, refusé avant l'appel | un user ne doit pas brûler 100€ de tokens via une boucle buggée |
Annulation d'un job en cours. Dans un worker BullMQ, il n'y a pas d'
EventSourcequi se ferme — l'annulation vient d'ailleurs (le user clique Stop sur une autre connexion). Pattern : un flag Redisgen:${generationId}:cancelque l'UI pose, et unAbortControllerdans le worker qui poll ce flag (ou écoute un canal pub/sub) etac.abort()quand il passe àtrue. Passe{ signal: ac.signal }àmessages.stream(...)comme dans le SSE. Sans ce câblage, le bouton Stop coupe l'affichage mais le worker continue de payer des tokens jusqu'àend_turn.
Garde au bord : idempotence + rate-limit + coût (avant tout token)
@Injectable()
export class CostGuard implements CanActivate {
constructor(private redis: Redis) {}
async canActivate(ctx: ExecutionContext): Promise<boolean> {
const req = ctx.switchToHttp().getRequest();
const userId = req.user.id;
// Garde de coût : N générations / heure par user (les tokens coûtent de l'argent réel)
const count = await this.redis.incr(`llm:rate:${userId}`);
if (count === 1) await this.redis.expire(`llm:rate:${userId}`, 3600);
if (count > 50) throw new HttpException('LLM_QUOTA_EXCEEDED', 429);
return true;
}
}WS ou SSE pour un agent ? La règle de décision
| Cas | Transport | Raison |
|---|---|---|
| Chat LLM streamé, un prompt → une réponse | SSE | mono-directionnel, reconnectable, curl-able |
| Agent avec interruptions client (bouton Stop qui doit aussi steerer, pas juste couper) | WS | tu veux pousser user.interrupt / tool_confirmation upstream sans rouvrir un canal |
| Job long (> quelques minutes) | ni l'un ni l'autre directement | BullMQ + canal de progression reconnectable par generationId |
| Plusieurs agents/threads observés en parallèle | WS (rooms par thread) ou SSE multiplexé | une room par threadId map proprement sur les sessions multi-agent |
Le piège : tenir un SSE ouvert 15 minutes pour un agent long-horizon. Les load balancers le coupent (proxy_read_timeout), et une coupure réseau perd tout. Pour les runs longs, découple : SSE pour le live court, BullMQ + canal reconnectable pour le long (déjà couvert ci-dessus).
Exposer un endpoint MCP / agent (ton NestJS consommé par Claude)
Le cas inverse : ton serveur NestJS expose ses capacités métier à un agent. Tu déclares un endpoint Streamable HTTP qui parle le protocole MCP ; tes services deviennent des tools que Claude découvre et appelle.
// Squelette d'un endpoint MCP exposé en SSE (Streamable HTTP transport)
@Controller('mcp')
export class McpController {
constructor(private readonly catalog: ProductCatalogService) {}
@Sse('stream')
@UseGuards(McpAuthGuard) // auth à l'edge — un agent distant n'est JAMAIS de confiance implicite
handle(@Req() req: Request): Observable<MessageEvent> {
// Le serveur MCP route les JSON-RPC `tools/list` et `tools/call` vers tes services.
// search_products → this.catalog.search(...) ; chaque tool a un input_schema strict.
return this.mcpServer.connect(req); // bibliothèque MCP server-side (SDK @modelcontextprotocol)
}
}Trois invariants pour un endpoint MCP de production :
- Auth et autorisation à l'edge. Un agent qui appelle
delete_orderdoit prouver qu'il en a le droit — vérifie le token, scope les tools par rôle. LeMcpAuthGuardest non négociable. - Tools = surface d'attaque. Chaque tool exposé est une porte. N'expose que ce qui est nécessaire, valide l'
input(DTO +ValidationPipe) comme tu le ferais pour un endpoint HTTP public, et garde les opérations destructives derrière une confirmation ou un scope dédié. - Idempotence des effets de bord. Un agent peut réessayer un tool ;
create_invoicedoit être idempotent (clé fournie par l'appelant) pour ne pas créer 3 factures sur un retry réseau — exactement la même logique que les jobs BullMQ ci-dessus.
L'auth-au-handshake (WS), la garde-au-bord (SSE LLM) et l'auth-au-tool (MCP) sont la même idée appliquée à trois transports : la confiance se vérifie au point d'entrée, jamais en aval.
🔁 Quand utiliser / éviter
WebSockets :
- chat, collaboration temps réel (curseurs, présence), gaming
- besoin bidirectionnel (client peut envoyer aussi)
- latence < 100 ms requise
SSE :
- push serveur → client uniquement (notifications, live feed, progress bar)
- traverse les proxies HTTP standards
- reconnexion automatique native (
EventSource) - compatible HTTP/2 (multiplexé, moins coûteux)
Évite WS si SSE suffit : SSE est plus simple, plus debuggable (c'est du HTTP), pas de protocole binaire. Pour 80 % des cas "afficher des updates live", SSE suffit.
Évite les deux si polling suffit : pour des données < 1 update/min, un simple setInterval côté client est plus robuste opérationnellement.
Évite Socket.IO si tu n'as pas besoin de fallback : depuis 2020+, WS est partout supporté. Le fallback long-polling de Socket.IO ajoute du poids. ws natif + reconnect manuel est souvent plus propre.
🆚 Tableau de décision rapide
| Critère | HTTP polling | SSE | WebSocket |
|---|---|---|---|
| Direction | client → serveur | serveur → client | bidirectionnel |
| Sur HTTP/HTTPS standard ? | ✅ | ✅ | ✅ (mais avec upgrade) |
| Reconnexion auto navigateur | manuel | natif (EventSource) | manuel ou via Socket.IO |
| Binaire | non | non (texte) | oui |
| Traverse proxies / CDN | ✅ | ✅ (avec config) | ❌ souvent problématique |
| Scaling horizontal | trivial | trivial avec sticky off | nécessite adapter pub/sub |
| Charge serveur (connexions ouvertes) | basse | moyenne | moyenne |
🔐 Sécurité — checklist WS
[ ] Auth vérifiée au handshake (jamais on-the-fly sur premier message)
[ ] Token JWT court (15 min) + refresh hors WS
[ ] CORS strict (origin allow-list)
[ ] Validation DTO sur chaque @SubscribeMessage
[ ] Rate-limit par userId (Redis incr + TTL)
[ ] Logging des disconnects avec raison
[ ] Pas de données sensibles dans les broadcasts (chaque user reçoit ce qu'il doit recevoir)
[ ] HTTPS/WSS obligatoire en prod
[ ] Heartbeat pour détecter les zombies🏋️ Exercices
Progression : implémenter → rendre production-grade → casser puis réparer. Chaque exercice suppose le précédent terminé.
1. Presence tracker multi-instance
Objectif — Afficher en temps réel la liste des users en ligne, correcte même avec 3 replicas derrière l'adapter Redis.
Indice/Solution — client.data.user est local à l'instance, donc un simple Set en mémoire ne voit qu'un tiers des users. Stocke les présences dans un Redis SET (SADD online ${userId} au connect, SREM au disconnect) et broadcaste la liste via une room presence. Piège : un crash d'instance laisse des fantômes — ajoute un TTL par user rafraîchi au heartbeat (SET presence:${userId} 1 EX 30) et reconstruis la liste à partir des clés vivantes, pas du SET brut.
2. SSE de progression de job, re-connectable
Objectif — Streamer la progression d'un job BullMQ via @Sse, qui survit à une coupure réseau côté client sans reperdre l'état.
Indice/Solution — EventSource se reconnecte tout seul et renvoie le header Last-Event-ID. Émets chaque MessageEvent avec un champ id (offset du job), lis Last-Event-ID à la reconnexion (via @Headers('last-event-id')) et reprends le flux depuis cet offset en relisant l'état persisté (gen:${id}:partial en Redis). Sans ça, une coupure de 2s = progression perdue.
3. Streaming Claude avec bouton Stop qui annule vraiment
Objectif — Endpoint @Sse qui streame Opus 4.8, avec une annulation client qui coupe aussi la facturation côté API LLM.
Indice/Solution — Reprends le return () => ac.abort() de la section IA. Pour vérifier que l'annulation marche : log usage.output_tokens du finalMessage vs. le nombre de tokens réellement streamés avant le abort. Si tu vois la génération aller jusqu'au bout dans les logs serveur alors que le client est parti, ton signal n'est pas câblé sur l'appel SDK — c'est le bug #1.
4. Casser puis réparer : le broadcast fantôme
Objectif — Reproduire le bug "emit cross-instance perdu", le diagnostiquer, le corriger.
Indice/Solution — Lance 2 instances sans adapter Redis, connecte un client sur chacune, émets depuis l'une vers une room partagée → le client de l'autre instance ne reçoit rien. Diagnostic : redis-cli MONITOR ne montre aucun PUBLISH sur le canal Socket.IO. Réparation : branche RedisIoAdapter, vérifie que MONITOR montre maintenant les PUBLISH socket.io#/.... Bonus : mesure la latence ajoutée par le hop Redis (typiquement < 2 ms en LAN).
5. Backpressure : tuer un client lent
Objectif — Protéger le serveur d'un client qui ne consomme pas assez vite (mémoire qui explose).
Indice/Solution — Simule un client lent (un socket.io-client qui ne lit jamais). Émets 50k messages/s vers lui. Surveille client.conn.transport.writeBuffer.length (Socket.IO) ou le bufferedAmount (ws natif) — il grimpe sans limite. Corrige : un intervalle qui disconnect(true) tout client dont le buffer dépasse un seuil. Sans cette garde, un seul client lent OOM l'instance.
6. Rate-limit distribué sur les events WS
Objectif — Throttler les @SubscribeMessage par userId de façon cohérente entre instances (@nestjs/throttler ne marche pas sur WS).
Indice/Solution — Compteur Redis INCR ratelimit:${userId}:${windowSec} avec EXPIRE au premier hit (sliding window simple). Au-delà du seuil, renvoie un ack { ok: false, error: 'RATE_LIMITED' } sans exécuter le handler. Piège : un INCR + EXPIRE non atomiques laissent des clés sans TTL si l'instance crash entre les deux — utilise un script Lua ou SET ... NX EX pour l'atomicité.
7. Agent SSE reconnectable qui ne re-paie jamais un appel LLM
Objectif — Endpoint d'agent qui streame Opus 4.8, survit à une coupure réseau de 3 s côté client, et ne déclenche jamais deux fois l'appel LLM facturé pour la même génération.
Indice/Solution — Combine trois mécanismes vus dans le fichier : (a) generationId fourni par le client → idempotence Redis (gen:${id}:done) ; (b) job BullMQ qui persiste le partial à chaque delta ; (c) SSE qui, à la reconnexion, lit Last-Event-ID, relit gen:${id}:partial et reprend depuis l'offset. Test de validation : coupe le réseau au milieu, reconnecte, vérifie dans les logs serveur qu'il y a exactement un messages.stream(...) pour ce generationId (pas un par reconnexion). Si tu vois deux appels LLM, ton idempotence est keyée sur le mauvais identifiant (probablement le job.id que BullMQ change au retry, au lieu du generationId stable).
🎤 En entretien
Q : Pourquoi un WebSocket ne scale pas horizontalement sans adapter, alors que du HTTP stateless oui ? Parce qu'une connexion WS est stateful et ancrée à une instance : le socket vit sur instance-A, donc un emit déclenché depuis instance-B (autre requête, autre worker) n'a aucun moyen d'atteindre ce socket. L'adapter Redis (pub/sub) résout ça en publiant chaque emit sur un canal que toutes les instances écoutent. Sans lui, le broadcast cross-instance est silencieusement perdu — un bug fantôme qui n'apparaît qu'à partir de 2 replicas.
Q : SSE ou WebSocket pour streamer les tokens d'un LLM vers un front ? SSE, presque toujours. Le flux est unidirectionnel (serveur → client), donc le canal bidirectionnel du WS est du poids mort. SSE c'est du HTTP standard : reconnexion native via EventSource + Last-Event-ID, traverse les proxies, multiplexé sur HTTP/2, debuggable au curl. Le seul vrai piège est le buffering des reverse-proxies (X-Accel-Buffering: no). On réserve le WS au vrai bidirectionnel à faible latence (chat collaboratif, curseurs, gaming).
Q : Un client ferme l'onglet pendant que Claude génère. Que se passe-t-il, et comment l'éviter ? Sans propagation d'annulation, l'API LLM continue de générer jusqu'à end_turn — tu paies des tokens pour une réponse que personne ne lit. La correction : un AbortController dont le signal est passé à l'appel SDK, et un teardown RxJS (return () => ac.abort()) appelé quand l'Observable SSE perd son abonné, c'est-à-dire à la fermeture de l'EventSource. C'est testable : compare les tokens facturés au nombre réellement streamé avant le abort.
Q : Comment garantis-tu qu'un retry de job IA ne re-facture pas un appel LLM déjà réussi ? Idempotence keyée sur un generationId fourni par le client (pas le job.id, que BullMQ change au retry). Avant l'appel LLM, on lit un flag Redis gen:${generationId}:done ; s'il existe, on renvoie le résultat caché sans toucher l'API. Le retry BullMQ couvre les erreurs réseau ; l'idempotence couvre le "déjà payé, déjà abouti". Les deux sont orthogonaux et indispensables — un appel LLM est un effet de bord coûteux et non remboursable, donc traité comme une transaction financière.
Q : Un client lent ne consomme pas assez vite. Que se passe-t-il côté serveur, et comment tu te protèges ? Socket.IO (et ws natif) bufferise les messages non envoyés en mémoire serveur. Un client lent ou bloqué fait grimper writeBuffer.length (ou bufferedAmount) sans limite → la mémoire de l'instance explose, et un seul client peut OOM le pod et faire tomber tous les autres. La protection : surveiller la taille du buffer par socket et disconnect(true) au-delà d'un seuil. C'est de la backpressure manuelle — le protocole ne te protège pas. C'est aussi pour ça que SSE est plus sûr par défaut : le backpressure y est porté par TCP et le navigateur, pas par un buffer applicatif que tu dois surveiller.
Q : Tu fais un rolling deploy avec 50 K WebSockets ouverts. Que se passe-t-il si tu ne fais rien, et comment tu drains proprement ? Sans rien : le process meurt, les 50 K sockets tombent dans la même seconde, et tous les clients se reconnectent simultanément (thundering herd) sur le nouveau pod — qui sature et peut tomber à son tour. Le drain propre : émettre un event server:draining avec un délai de reconnexion jittéré par client, fermer les sockets gracieusement (disconnectSockets), laisser quelques secondes, puis quitter. Côté client, reconnexion avec backoff exponentiel + jitter, jamais synchrone. Le jitter est la clé — sans lui, tu reconstruis le thundering herd à chaque deploy.
🔗 Liens
- Nest WebSockets : https://docs.nestjs.com/websockets/gateways
- Nest SSE : https://docs.nestjs.com/techniques/server-sent-events
- Socket.IO Redis adapter : https://socket.io/docs/v4/redis-adapter/
- Socket.IO scaling guide : https://socket.io/docs/v4/using-multiple-nodes/
- MDN — SSE : https://developer.mozilla.org/docs/Web/API/Server-sent_events
- WebSocket RFC 6455
- Heroku WebSocket debugging best practices
- "WebSockets vs Server-Sent Events" — Smashing Magazine