WebSockets temps réel
TL;DR — Un WebSocket est une connexion TCP bidirectionnelle et persistante ouverte via un handshake HTTP, puis « upgradée » : une fois établie, serveur et client se poussent des messages sans nouvelle requête. Dans FastAPI, un endpoint
@app.websocket("/ws")reçoit un objetWebSocket, appelleawait ws.accept(), puis boucle surawait ws.receive_*()/await ws.send_*()jusqu'à ce qu'uneWebSocketDisconnectsoit levée. C'est l'outil idéal pour streamer les tokens d'un agent LLM au navigateur (entrée utilisateur + sortie token-par-token sur le même socket), gérer un chat multi-tours, ou pousser des mises à jour serveur → client. Les pièges senior sont tous liés à l'état : un WebSocket vit dans un seul worker process (le scaling horizontal exige un broker type Redis pub/sub), il n'a pas de retries HTTP ni de timeout de requête, et l'authentification se fait au handshake (pas de headerAuthorizationfiable côté navigateur → token en query string ou en premier message). Pour du pur push serveur → client unidirectionnel, préférez SSE (plus simple, traverse mieux les proxies) ; le WebSocket gagne dès qu'il faut du full-duplex (interruption d'un agent en cours de génération, par exemple).
🧠 Mental model
Venant de NestJS/Angular, vous connaissez déjà @WebSocketGateway et socket.io. Oubliez la couche socket.io une seconde et regardez le protocole brut, parce que FastAPI vous expose le WebSocket natif (RFC 6455), sans la couche de fallback/reconnexion automatique que socket.io ajoute par-dessus.
L'analogie. Une requête HTTP classique, c'est un échange de lettres : vous postez une enveloppe (request), vous attendez la réponse (response), la conversation est close. Chaque nouvelle question = une nouvelle lettre, un nouveau timbre, une nouvelle adresse. Un WebSocket, c'est un appel téléphonique : on compose le numéro (handshake HTTP GET + header Upgrade: websocket), l'autre décroche (101 Switching Protocols), et ensuite les deux parties parlent quand elles veulent, dans les deux sens, sur la même ligne ouverte, jusqu'à ce que l'un raccroche. Pas de re-numérotation entre chaque phrase.
HTTP classique (lettres) WebSocket (appel téléphonique)
───────────────────────── ──────────────────────────────
client ──request──▶ server client ──GET /ws Upgrade──▶ server
client ◀─response── server client ◀──101 Switching──── server
(connexion fermée) ════════ ligne ouverte ════════
client ──request──▶ server client ──"salut"──────────▶ server
client ◀─response── server client ◀──────"token1"──── server
client ◀──────"token2"──── server
client ──"stop"───────────▶ server
(full-duplex, asynchrone)Trois conséquences mentales à internaliser :
Le handshake est du HTTP, le reste ne l'est plus. L'auth, les cookies, l'origine : tout ce qui se décide se décide pendant le
GETinitial. Après le101, vous êtes sur un flux de frames TCP, plus sur des requêtes. C'est pourquoi un middleware FastAPI qui litrequest.headersne s'applique pas pareil.La connexion vit dans la mémoire d'un process. Votre
WebSocketPython est un objet en RAM dans un worker Uvicorn. Si vous lancezuvicorn --workers 4, deux utilisateurs connectés peuvent atterrir sur deux process qui ne partagent rien. « Broadcaster à tout le monde » devient un problème distribué — exactement comme unSet<Socket>en mémoire dans un seul pod NestJS ne voit pas les sockets des autres pods.Full-duplex = vous gérez deux flux concurrents. Lire l'entrée utilisateur et écrire la sortie en même temps, c'est deux coroutines qui tournent en parallèle sur le même socket. C'est là que
asyncio(que vous avez vu dans les leçons précédentes de ce chapitre) devient indispensable, et c'est là que se cachent les bugs de concurrence.
Le cœur : un endpoint WebSocket FastAPI, fait correctement
L'écho minimal (et pourquoi il est déjà subtil)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws/echo")
async def echo(ws: WebSocket) -> None:
await ws.accept() # complète le handshake : sans ça, rien ne passe
try:
while True:
text = await ws.receive_text()
await ws.send_text(f"echo: {text}")
except WebSocketDisconnect:
# le client a raccroché — sortie normale, PAS une erreur à logger en ERROR
passPoints non négociables, même sur 8 lignes :
await ws.accept()est obligatoire. Tant que vous ne l'appelez pas, le handshake n'est pas terminé et le client reste en attente. C'est l'équivalent de décrocher le téléphone.WebSocketDisconnectest le chemin normal de sortie, pas une exception exceptionnelle. Un utilisateur ferme son onglet → l'exception est levée sur le prochainreceive/send. La traiter comme une erreur 500 pollue vos logs et fausse vos métriques.- La boucle
while Trueest la connexion. Quand la fonctionreturn, le socket se ferme. Tant qu'on boucle, on reste en ligne.
La mauvaise façon (à reconnaître en review)
# ❌ ANTI-PATTERN — ne faites jamais ça
@app.websocket("/ws/bad")
async def bad(ws: WebSocket) -> None:
await ws.accept()
while True:
text = await ws.receive_text() # si le client part, ceci LÈVE
await ws.send_text(text) # jamais atteint, mais on ne l'a pas géré
# pas de try/except : la WebSocketDisconnect remonte, Starlette logge une stack trace
# à chaque déconnexion (= à chaque fermeture d'onglet). Vos logs deviennent illisibles.Le problème n'est pas que ça « crashe » — Starlette attrape l'exception en amont — mais que chaque déconnexion produit une stack trace de niveau ERROR. En prod, avec des centaines de connexions, vous noyez les vraies erreurs. Le try/except WebSocketDisconnect n'est pas du confort, c'est de l'hygiène opérationnelle.
Recevoir du JSON typé avec Pydantic v2
Vous ne voulez pas manipuler des str brutes. Validez chaque message entrant comme vous validez un body HTTP.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, ValidationError, Field
from typing import Literal
app = FastAPI()
class ChatMessage(BaseModel):
type: Literal["user_message"] = "user_message"
content: str = Field(min_length=1, max_length=8000)
conversation_id: str | None = None
class ServerEvent(BaseModel):
type: Literal["token", "done", "error"]
data: str
@app.websocket("/ws/chat")
async def chat(ws: WebSocket) -> None:
await ws.accept()
try:
while True:
raw = await ws.receive_json() # dict | list
try:
msg = ChatMessage.model_validate(raw)
except ValidationError as exc:
# on NE ferme PAS le socket sur une erreur de validation :
# on renvoie l'erreur et on continue à écouter.
await ws.send_json(
ServerEvent(type="error", data=exc.json()).model_dump()
)
continue
# ... traiter msg.content ...
await ws.send_json(ServerEvent(type="done", data=msg.content).model_dump())
except WebSocketDisconnect:
passLa décision senior ici : une erreur de validation ne tue pas la connexion. Sur HTTP, un body invalide renvoie 422 et la requête est finie — pas grave, le client réessaiera. Sur un WebSocket, fermer le socket force le client à tout reconnecter (re-auth, re-handshake, perte de l'état conversationnel). On renvoie un ServerEvent(type="error") et on continue. On ne ferme que sur une erreur fatale (auth échouée, protocole violé).
Injection de dépendances (DI)
FastAPI supporte Depends sur les endpoints WebSocket, mais avec une nuance : on ne peut pas renvoyer une HTTPException (il n'y a pas de réponse HTTP après le 101). On lève WebSocketException ou on ferme avec un close code.
from fastapi import Depends, WebSocket, WebSocketException, status, Query
from typing import Annotated
async def get_current_user(
ws: WebSocket,
token: Annotated[str | None, Query()] = None,
) -> str:
if token is None:
# 1008 = Policy Violation. Le client reçoit ce code dans l'event "close".
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
user_id = await verify_token(token) # votre logique JWT
if user_id is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return user_id
@app.websocket("/ws/chat")
async def chat(
ws: WebSocket,
user_id: Annotated[str, Depends(get_current_user)],
) -> None:
await ws.accept()
# user_id est disponible, validé avant le accept()
...
async def verify_token(token: str) -> str | None:
... # décode le JWT, retourne le sub ou NonePourquoi le token en query string ? Le navigateur, via l'API
WebSocketnative, ne permet pas d'ajouter des headers custom à la requête de handshake (pas deAuthorization: Bearer). Vos trois options : (1) token en query (wss://api/ws?token=...) — simple mais le token fuit dans les logs d'access et l'historique ; (2) cookie de session (envoyé automatiquement, mais attention au CSRF et àSameSite) ; (3) accepter le socket puis exiger un premier message d'auth avant tout traitement. En prod, (3) est le plus propre : le token ne traîne nulle part et vous pouvez fermer avec un code clair s'il est absent.
⚙️ Servir un agent LLM : streamer les tokens sur le WebSocket
C'est le cas d'usage qui justifie un WebSocket plutôt que SSE pour vous : un chat agentique où l'utilisateur peut envoyer un message, recevoir la réponse token par token, et interrompre la génération en cours — le tout sur une seule connexion full-duplex.
On utilise AsyncAnthropic (jamais le client synchrone dans un endpoint async — il bloquerait l'event loop), le streaming via messages.stream(), et le modèle claude-opus-4-8.
Streaming basique : un message → des tokens
import os
from anthropic import AsyncAnthropic
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, Field
app = FastAPI()
client = AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
class UserTurn(BaseModel):
content: str = Field(min_length=1, max_length=8000)
@app.websocket("/ws/agent")
async def agent(ws: WebSocket) -> None:
await ws.accept()
history: list[dict[str, str]] = []
try:
while True:
turn = UserTurn.model_validate(await ws.receive_json())
history.append({"role": "user", "content": turn.content})
assistant_text = ""
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=4096,
thinking={"type": "adaptive"}, # thinking adaptatif (jamais budget_tokens)
messages=history,
) as stream:
async for token in stream.text_stream:
assistant_text += token
await ws.send_json({"type": "token", "data": token})
history.append({"role": "assistant", "content": assistant_text})
await ws.send_json({"type": "done", "data": ""})
except WebSocketDisconnect:
passCe qui est idiomatique ici :
async with client.messages.stream(...): le streaming est obligatoire dès que la sortie peut être longue (sinon on risque un timeout HTTP côté SDK).stream.text_streamest un async iterator qui yield les deltas de texte au fur et à mesure.thinking={"type": "adaptive"}: surclaude-opus-4-8, c'est le mode de réflexion recommandé. N'utilisez jamaisbudget_tokens(supprimé sur Opus 4.7/4.8 — retourne un 400), nitemperature/top_p/top_k(également supprimés). Par défaut, l'affichage du thinking est"omitted"(texte vide) — si vous voulez streamer un résumé du raisonnement à l'UI, passezthinking={"type": "adaptive", "display": "summarized"}.- On reconstruit
assistant_textcôté serveur pour le rajouter àhistory. L'API Anthropic est stateless : à chaque tour on renvoie tout l'historique. (En production, cet historique vit en mémoire du worker — voir la section scaling plus bas.)
La version production-grade : interruption, annulation, et le piège du full-duplex
Le scénario réel : l'utilisateur tape une nouvelle question alors que l'agent est en train de générer. Il faut interrompre la génération en cours. Naïvement, on ne peut pas : la boucle est bloquée dans async for token in stream.text_stream, elle ne lit pas l'entrée. C'est le piège classique du full-duplex — une seule coroutine ne peut pas lire et écrire en même temps.
La solution : deux tâches asyncio concurrentes — un reader (lit l'entrée utilisateur) et un generator (stream l'agent) — et un mécanisme d'annulation.
import asyncio
import os
from anthropic import AsyncAnthropic, APIStatusError
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, Field, ValidationError
app = FastAPI()
client = AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
class ClientCommand(BaseModel):
type: str # "user_message" | "interrupt"
content: str | None = Field(default=None, max_length=8000)
async def stream_response(
ws: WebSocket,
history: list[dict[str, str]],
) -> str:
"""Stream une réponse de l'agent. Renvoie le texte complet généré.
Annulable via asyncio.CancelledError (déclenché par une interruption)."""
assistant_text = ""
try:
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=4096,
thinking={"type": "adaptive"},
messages=history,
) as stream:
async for token in stream.text_stream:
assistant_text += token
await ws.send_json({"type": "token", "data": token})
await ws.send_json({"type": "done", "data": ""})
except asyncio.CancelledError:
# interruption demandée par l'utilisateur : on signale, on garde
# le partiel pour l'historique, et on re-raise pour que asyncio
# complète proprement l'annulation.
await ws.send_json({"type": "interrupted", "data": ""})
raise
except APIStatusError as exc:
# 429, 500, 529 : le SDK a déjà retried (max_retries=2 par défaut).
# Si on arrive ici, c'est épuisé — on informe le client sans tuer le socket.
await ws.send_json({"type": "error", "data": f"upstream {exc.status_code}"})
return assistant_text
@app.websocket("/ws/agent")
async def agent(ws: WebSocket) -> None:
await ws.accept()
history: list[dict[str, str]] = []
current: asyncio.Task[str] | None = None
try:
while True:
raw = await ws.receive_json()
try:
cmd = ClientCommand.model_validate(raw)
except ValidationError as exc:
await ws.send_json({"type": "error", "data": exc.json()})
continue
if cmd.type == "interrupt":
if current and not current.done():
current.cancel()
# on attend que l'annulation soit traitée pour récupérer
# le texte partiel et l'ajouter à l'historique
try:
partial = await current
except asyncio.CancelledError:
partial = ""
if partial:
history.append({"role": "assistant", "content": partial})
continue
if cmd.type == "user_message" and cmd.content:
# si une génération tourne déjà, on la laisse finir ou on l'annule
if current and not current.done():
current.cancel()
try:
await current
except asyncio.CancelledError:
pass
history.append({"role": "user", "content": cmd.content})
current = asyncio.create_task(stream_response(ws, history))
# on attend la fin du stream AVANT de relire l'entrée ?
# NON — c'est tout l'intérêt : on relit l'entrée immédiatement
# pour pouvoir capter un "interrupt" pendant la génération.
# Mais il faut alors gérer la complétion ailleurs (voir note).
except WebSocketDisconnect:
if current and not current.done():
current.cancel() # toujours nettoyer la tâche orphelineLa subtilité fatale. Dans le code ci-dessus, après
asyncio.create_task(...), la boucle retourne immédiatement àawait ws.receive_json()— ce qui permet de capter l'interrupt. Mais alors, qui rajoute la réponse complète àhistoryquand le stream finit normalement ? Personne, dans cette version. C'est le vrai bug de design des WebSockets full-duplex : la complétion et la lecture sont désormais découplées. La correction propre est d'utiliser un done callback (current.add_done_callback(...)) ou uneasyncio.Queuequi sérialise les événements de complétion vers la boucle principale. Ce découplage est exactement le sujet de l'exercice 3.
Points production tirés de cet exemple :
asyncio.CancelledErrordoit être re-raise après nettoyage. L'avaler silencieusement laisse asyncio dans un état incohérent (la tâche n'est jamais marquée comme annulée).- Le SDK Anthropic retry tout seul les 429/500/529 (exponential backoff,
max_retries=2). On n'ajoute pas de retry par-dessus ; on attrapeAPIStatusErrorseulement pour le cas où les retries sont épuisés. - Toujours
cancel()la tâche orpheline dans leexcept WebSocketDisconnect. Sinon, un client qui ferme son onglet pendant une génération laisse tourner un appel LLM payant dans le vide — c'est de l'argent jeté par la fenêtre, littéralement facturé à5 USD / 25 USDpar million de tokens sur Opus 4.8.
Tool-use loop sur WebSocket
Si votre agent appelle des outils (function calling), la boucle de tool-use s'imbrique naturellement dans le streaming. On stream le texte, on détecte stop_reason == "tool_use", on exécute l'outil, on renvoie le résultat, on reboucle — et chaque étape peut pousser un event de progression sur le socket (idéal pour une UI qui montre « 🔧 appel de search_db… »).
async def run_agent_turn(ws: WebSocket, history: list[dict]) -> None:
tools = [
{
"name": "get_weather",
"description": "Récupère la météo actuelle. À appeler quand "
"l'utilisateur demande la météo d'une ville.",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
}
]
while True:
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=4096,
thinking={"type": "adaptive"},
tools=tools,
messages=history,
) as stream:
async for token in stream.text_stream:
await ws.send_json({"type": "token", "data": token})
final = await stream.get_final_message()
history.append({"role": "assistant", "content": final.content})
if final.stop_reason != "tool_use":
await ws.send_json({"type": "done", "data": ""})
return
# exécuter chaque tool_use et renvoyer les résultats
results = []
for block in final.content:
if block.type == "tool_use":
await ws.send_json({"type": "tool_call", "data": block.name})
output = await execute_tool(block.name, block.input) # votre code
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
history.append({"role": "user", "content": results})
# on reboucle : Claude voit les résultats et continueNotez la description d'outil prescriptive (« À appeler quand… ») : sur Opus 4.8, qui appelle les outils plus parcimonieusement que les générations précédentes, indiquer quand utiliser l'outil dans sa description donne un gain mesurable de taux de déclenchement.
⚙️ En production
Mode de défaillance n°1 : l'état en mémoire et le scaling horizontal
C'est le sujet senior. Votre dict d'historique, votre set de connexions actives, vivent dans la RAM d'un worker. Conséquences :
uvicorn --workers 4ou 4 pods Kubernetes = 4 silos mémoire. Un broadcast « à tous les utilisateurs » depuis le worker A ne touche pas les connexions du worker B.- Un redéploiement coupe toutes les connexions de ce worker. Pas de reprise gracieuse côté serveur — c'est au client de reconnecter (et de re-streamer ce qu'il a perdu).
┌─────────── Load Balancer (sticky ou non) ───────────┐
│ │
▼ ▼ ▼
Worker A Worker B Worker C
conns: {u1, u2} conns: {u3} conns: {u4, u5}
│ │ │
└──────────────┬───────────┴──────────────┬────────────┘
▼ ▼
Redis Pub/Sub ◀── le seul état partagé ──▶
(canal "broadcast", canaux par-room, etc.)La solution : un broker de messages (Redis pub/sub, NATS, ou un service managé). Chaque worker s'abonne aux canaux pertinents ; pour broadcaster, on publie sur Redis et tous les workers relaient à leurs connexions locales. La bibliothèque broadcaster (du créateur de Starlette) encapsule ce pattern. Pour l'historique conversationnel, ne le gardez pas en RAM : persistez-le (Redis, Postgres) et rechargez-le par conversation_id — comme ça la reconnexion sur n'importe quel worker reprend le fil.
Côté infra : les WebSockets exigent que votre load balancer et votre reverse proxy supportent l'upgrade (
proxy_set_header Upgrade $http_upgrade;chez nginx). Les ALB AWS le font ; certaines configs CDN/WAF coupent les connexions longues. Activez les timeouts d'idle assez longs et un ping/pong applicatif pour garder la ligne ouverte.
Mode de défaillance n°2 : pas de timeout, pas de backpressure
Un endpoint HTTP a un timeout naturel. Un WebSocket non : un client malveillant (ou un onglet zombie) peut tenir une connexion ouverte indéfiniment sans rien envoyer. Et si vous envoyez des tokens plus vite que le client ne les consomme, ils s'accumulent dans le buffer TCP → mémoire qui gonfle.
- Idle timeout applicatif : enveloppez
receivedansasyncio.wait_for(ws.receive_json(), timeout=300)et fermez surTimeoutError. - Limite de connexions par utilisateur : un compteur (en Redis) pour empêcher qu'un seul user ouvre 10 000 sockets.
- Backpressure :
await ws.send_json(...)applique déjà une backpressure naturelle (la coroutine suspend si le buffer est plein), mais surveillez la mémoire si vous bufferisez côté serveur.
Sécurité
- Auth au handshake (token en query / cookie / premier message — voir plus haut). Vérifiez avant
accept()quand c'est possible. - Vérifiez l'
Origin. Les WebSockets ne sont pas soumis à la same-origin policy commefetch: n'importe quel site peut ouvrir unwss://vers votre API (« Cross-Site WebSocket Hijacking »). Si vous utilisez des cookies de session pour l'auth, vous devez valider le headerOriginau handshake, sinon un site tiers peut piloter le socket avec les cookies de la victime. wss://toujours (TLS). Jamaisws://en prod.- Ne mettez jamais la clé API ou des secrets dans les messages. Sur un agent LLM, la clé Anthropic reste côté serveur ; le client n'envoie que du contenu utilisateur.
- Rate-limit les messages entrants, pas seulement les connexions : un socket ouvert qui spamme 1000 messages/s peut épuiser votre quota LLM.
Performance et observabilité
- Client async obligatoire :
AsyncAnthropic, jamaisAnthropicsynchrone dans un endpoint async — un appel bloquant gèle tout l'event loop et donc toutes les connexions du worker. - Métriques à exposer : nombre de connexions actives (gauge), durée des connexions (histogram), messages/s, taux de
WebSocketDisconnect, latence time-to-first-token de l'agent. Un pic de déconnexions = souvent un proxy qui coupe ou un déploiement. - Tracez par
conversation_id, pas par requête : une « requête » WebSocket dure toute la session. Vos traces distribuées doivent suivre la connexion, avec des spans par tour de conversation. - Coût LLM : chaque token streamé est facturé. Sur
claude-opus-4-8(5 USD entrée / 25 USD sortie par M tokens), une génération annulée non nettoyée = du gâchis pur. Pour réduire les coûts sur les longues conversations, activez le prompt caching (cache_control) sur le préfixe stable de l'historique.
Le tradeoff senior : WebSocket vs SSE
| Critère | WebSocket | SSE (Server-Sent Events) |
|---|---|---|
| Direction | Full-duplex (↔) | Serveur → client uniquement (→) |
| Protocole | TCP upgradé, frames binaires/texte | HTTP standard, text/event-stream |
| Reconnexion | À implémenter (ou socket.io) | Automatique (EventSource + Last-Event-ID) |
| Proxies / CDN | Capricieux (upgrade requis) | Traverse tout (c'est du HTTP) |
| Auth navigateur | Pas de header custom | Headers OK (fetch + ReadableStream) |
| Cas idéal | Chat agentique interruptible, jeu, collab temps réel | Stream de tokens LLM unidirectionnel, notifications |
La règle : si le client n'a qu'à recevoir (afficher la réponse de l'agent token par token, sans interruption ni édition pendant la génération), SSE est plus simple et plus robuste — c'est d'ailleurs ce que fait StreamingResponse dans la leçon SSE de ce chapitre. Le WebSocket ne se justifie que quand vous avez besoin du full-duplex : interrompre l'agent, éditer le contexte en cours de route, ou un vrai canal bidirectionnel (collab, presence). Beaucoup de chats LLM en prod tournent très bien sur SSE seul. Choisir le WebSocket « parce que c'est temps réel » sans ce besoin, c'est s'imposer la complexité de reconnexion et de scaling distribué pour rien.
🏋️ Exercices
Exercice 1 — Chat broadcast multi-clients (implémenter)
Objectif. Construire un endpoint /ws/room/{room_id} où tout message envoyé par un client est diffusé à tous les clients de la même room. Gérer proprement les connexions/déconnexions via un ConnectionManager (un dict[str, set[WebSocket]]).
Indice / Solution (esquisse)
class ConnectionManager:
def __init__(self) -> None:
self.rooms: dict[str, set[WebSocket]] = {}
async def connect(self, room: str, ws: WebSocket) -> None:
await ws.accept()
self.rooms.setdefault(room, set()).add(ws)
def disconnect(self, room: str, ws: WebSocket) -> None:
self.rooms.get(room, set()).discard(ws)
async def broadcast(self, room: str, message: dict) -> None:
dead = []
for ws in self.rooms.get(room, set()):
try:
await ws.send_json(message)
except RuntimeError: # socket déjà fermé entre-temps
dead.append(ws)
for ws in dead:
self.disconnect(room, ws)Clés : itérer sur une copie du set (ou collecter les morts), parce que broadcast peut tomber sur un socket fermé pendant l'itération. Toujours disconnect dans un finally.
Exercice 2 — Streamer un agent Anthropic avec un protocole d'événements typé (implémenter)
Objectif. Reprendre /ws/agent, mais définir un protocole strict avec des modèles Pydantic v2 discriminés : TokenEvent, ToolCallEvent, DoneEvent, ErrorEvent côté serveur ; UserMessage, Interrupt côté client. Valider chaque message entrant et sérialiser chaque sortant via Pydantic.
Indice / Solution (esquisse)
Utilisez un Field(discriminator="type") et un Annotated[Union[...], Field(discriminator="type")] pour le parsing entrant. Côté sortie, une fonction send_event(ws, event: ServerEvent) qui fait await ws.send_json(event.model_dump()). Le gain : impossible d'envoyer un event mal formé, et le client TypeScript peut générer ses types depuis le schéma OpenAPI / un export JSON Schema (TypeAdapter(ServerEvent).json_schema()).
Exercice 3 — Interruption full-duplex correcte (rendre production-grade)
Objectif. Corriger le bug de design signalé dans la section streaming : découpler la lecture de l'entrée et la complétion de la génération. Quand un stream finit normalement, sa réponse complète doit être ajoutée à history ; quand il est interrompu, le partiel aussi. Utiliser une asyncio.Queue ou un add_done_callback.
Indice / Solution (esquisse)
Pattern « event loop interne » : la boucle principale ne fait que lire l'entrée. La tâche de génération, en finissant, poste un message {"kind": "completed", "text": ...} dans une asyncio.Queue. Une troisième coroutine consomme la queue et met à jour history. Ou plus simple : current.add_done_callback(lambda t: history.append(...) if not t.cancelled() else ...). Attention : le callback s'exécute dans le contexte de l'event loop, ne faites pas d'await dedans — postez dans une queue si besoin d'async.
Exercice 4 — Auth, Origin et idle timeout (rendre production-grade)
Objectif. Durcir /ws/agent : (a) auth par premier message ({"type":"auth","token":"..."}) avec fermeture en WS_1008_POLICY_VIOLATION si absent/invalide dans les 5 s ; (b) validation du header Origin contre une allowlist ; (c) idle timeout de 5 min via asyncio.wait_for.
Indice / Solution (esquisse)
await ws.accept()
try:
auth_raw = await asyncio.wait_for(ws.receive_json(), timeout=5)
except (asyncio.TimeoutError, WebSocketDisconnect):
await ws.close(code=status.WS_1008_POLICY_VIOLATION)
return
# valider Origin : ws.headers.get("origin") in ALLOWED_ORIGINS
# valider le token du premier message ; sinon close(1008)
# boucle : await asyncio.wait_for(ws.receive_json(), timeout=300)Notez ws.close(code=...) (pas WebSocketException après accept() — on est déjà connecté).
Exercice 5 — Scaling horizontal avec Redis pub/sub (casser puis réparer)
Objectif. Lancer l'exercice 1 avec uvicorn --workers 3 et constater que le broadcast ne traverse pas les workers (deux onglets sur des workers différents ne se voient pas). Puis réparer : faire transiter les broadcasts par un canal Redis pub/sub, chaque worker relayant aux sockets locaux.
Indice / Solution (esquisse)
Au startup, chaque worker lance une tâche qui SUBSCRIBE le canal room:{id} et, à chaque message reçu de Redis, le relaie à ses connexions locales. Le broadcast ne fait plus send_json direct mais redis.publish("room:{id}", payload). La bibliothèque broadcaster ou redis.asyncio font le job. Piège : ne pas créer un abonnement Redis par connexion (explosion de connexions Redis) — un seul abonnement par worker, multiplexé.
Exercice 6 — Reconnexion sans perte et reprise de l'historique (casser puis réparer)
Objectif. Simuler une coupure (redéploiement) pendant une génération. Côté serveur : persister l'historique par conversation_id (Redis/Postgres) et offrir un message client {"type":"resume","conversation_id":"..."} qui recharge le contexte. Côté client (vous pouvez écrire un petit client Python websockets de test) : reconnecter avec backoff et reprendre.
Indice / Solution (esquisse)
Sur chaque tour, persistez history sous la clé conv:{id} après chaque message complet (pas à chaque token — trop coûteux). Au message resume, chargez history depuis le store et reprenez la boucle. Pour la reprise en cours de génération, marquez chaque token d'un index incrémental et renvoyez un Last-Event-Index au resume pour ne pas re-streamer ce que le client a déjà — c'est exactement le pattern Last-Event-ID de SSE, réimplémenté à la main (et la raison pour laquelle SSE est parfois préférable).
🎤 En entretien
Q : Quand choisiriez-vous un WebSocket plutôt que SSE pour streamer les réponses d'un agent LLM ? Seulement quand j'ai besoin de full-duplex — typiquement l'interruption de la génération en cours ou l'édition du contexte pendant que l'agent répond. Pour du pur push token-par-token unidirectionnel, SSE est plus simple, reconnecte tout seul via Last-Event-ID, et traverse mieux les proxies.
Q : Un utilisateur ferme son onglet pendant que l'agent génère. Que se passe-t-il, et qu'est-ce qui peut mal tourner ? Une WebSocketDisconnect est levée au prochain send/receive. Le piège : si la génération LLM tourne dans une asyncio.Task séparée, elle continue de tourner — et de facturer des tokens — tant qu'on ne l'a pas cancel(). Il faut impérativement annuler la tâche dans le except WebSocketDisconnect.
Q : Vous passez de 1 à 4 workers et le broadcast « à tous les utilisateurs » ne marche plus. Pourquoi, et comment réparez-vous ? Chaque worker a son propre set de connexions en mémoire ; ils ne partagent rien. Il faut un broker externe (Redis pub/sub) : on publie le broadcast sur un canal, chaque worker y est abonné et relaie à ses connexions locales. L'état conversationnel doit aussi sortir de la RAM (Redis/Postgres) pour survivre aux redéploiements et à la reconnexion sur un autre worker.
Q : Comment authentifiez-vous une connexion WebSocket depuis un navigateur, et quel risque spécifique surveillez-vous ? L'API WebSocket du navigateur ne permet pas de header Authorization, donc : token en query string, cookie de session, ou auth par premier message (le plus propre). Le risque spécifique est le Cross-Site WebSocket Hijacking : les WebSockets ne respectent pas la same-origin policy, donc si j'utilise des cookies pour l'auth, je dois valider le header Origin au handshake, sinon un site tiers peut piloter le socket avec les cookies de la victime.