Real-time vector updates — CDC + embedding workers + vector DB
TL;DR Indexer un dataset une fois c'est facile. Le maintenir frais quand le métier écrit en continu est ce qui sépare un POC d'une prod sérieuse. Pattern 2026 : Postgres → Debezium → Kafka → embedding worker → vector DB, idempotent (clé déterministe), avec gestion explicite des suppressions (RGPD droit à l'oubli), versioning d'embeddings (model swap = backfill controlé), reindex incrémental vs batch. SLA typique : doc créé en base → searchable en < 60s p95. Pricing freelance : 8-15 jours pour un pipeline prod-grade complet à 1400-1500€/j.
🧠 Mental model
Architecture canonique
┌──────────────┐
│ Postgres │ ┌─────────────────┐
│ (source of │ WAL │ Debezium │
│ truth │─────▶│ (connector │
│ métier) │ │ Postgres) │
└──────────────┘ └────────┬────────┘
│ change events
▼
┌───────────────┐
│ Kafka │ topic: documents.changes
│ (partition │ key: doc_id
│ par doc_id) │ retention: 7j
└───────┬───────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌─────────────┐ ┌─────────────┐
│ embed worker │ │embed worker │ │embed worker │
│ (consumer │ │ │ │ │
│ group) │ │ │ │ │
└───────┬───────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└────────────────┼───────────────┘
▼
┌───────────────┐
│ Qdrant │ upsert (idempotent)
│ (vector │ delete on tombstone
│ index) │
└───────────────┘Analogie : c'est un système d'arrosage automatique. La source d'eau (Postgres) ne sait pas qu'elle arrose. Le réseau de tuyaux (Kafka) ne perd pas une goutte. Les arroseurs (workers) sont interchangeables, en parallèle, et tolérants au "et si je rate un tour ?". Le potager (Qdrant) est toujours frais.
Les 4 garanties à offrir
┌──────────────────────────────────────────────────┐
│ 1. Freshness : update DB → vector DB < 60s │
│ 2. Idempotence : rejouer 100x = 1x │
│ 3. Suppressions : RGPD = vraiment supprimé │
│ 4. Versioning : changer de model = pas big-bang │
└──────────────────────────────────────────────────┘Le mensonge de l'« exactly-once » (à connaître absolument)
Un junior promet « exactly-once ». Un senior sait que exactly-once n'existe pas au niveau transport : Kafka garantit at-least-once par défaut (le worker peut crasher après l'upsert Qdrant mais avant le commit d'offset → le message sera rejoué). La seule façon d'obtenir un effet exactly-once observable est de rendre le traitement idempotent : upsert(id=hash(doc_id)) rejoué 10× produit exactement le même état. C'est la différence entre exactly-once delivery (impossible en pratique distribuée) et effectively-once processing (atteignable via idempotence + clé déterministe).
Crash window classique (at-least-once) :
poll → embed → upsert Qdrant ──💥 crash ici── commit offset
▲
message PAS commité → rejoué au restart
→ l'upsert idempotent absorbe le doublon ✅Ordre des opérations qui compte : commit l'offset après l'upsert, jamais avant. Si tu commit avant (ou si tu actives enable.auto.commit=true), un crash entre commit et upsert te fait perdre le message → perte de donnée silencieuse, pire qu'un doublon. Règle senior : toujours préférer un doublon idempotent à une perte. C'est pour ça que le worker met enable.auto.commit: False et commit manuellement en fin de batch.
Mental model du flux : push (CDC) vs pull (outbox/double-write)
| CDC (Debezium) | Outbox pattern | Double-write applicatif | |
|---|---|---|---|
| Source de vérité | WAL Postgres | Table outbox + WAL | Code applicatif |
| Couplage métier | Zéro (la base ne sait pas) | Faible (1 INSERT outbox dans la même tx) | Fort (l'API doit publier) |
| Garantie atomicité | Native (WAL = commit) | Native (INSERT outbox dans la tx métier) | Aucune — risque dual-write (DB ok, Kafka ko) |
| Latence WAL | Lit le WAL → impact si table très chaude | Idem CDC mais filtré sur outbox | Indépendant du WAL |
| Quand l'utiliser | Défaut. Tables métier classiques | Pas d'accès WAL, ou multi-tenant SaaS | Table trop chaude pour CDC (cf. cas AML) |
Le piège du double-write : db.commit(); kafka.publish() n'est PAS atomique. Si le process meurt entre les deux, la base a la donnée mais Qdrant ne la verra jamais. L'outbox résout ça en faisant INSERT outbox dans la même transaction que l'écriture métier, puis Debezium relaie la table outbox. C'est le pattern « transactionnel » de référence quand on ne peut/veut pas brancher le CDC directement sur la table chaude.
🛠️ Code minimal
Setup Debezium → Kafka (Postgres CDC)
// debezium-connector.json
{
"name": "annonces-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "...",
"database.dbname": "marketplace",
"topic.prefix": "marketplace",
"slot.name": "debezium_annonces",
"publication.name": "debezium_publication",
"table.include.list": "public.annonces",
"plugin.name": "pgoutput",
"tombstones.on.delete": "true"
}
}Worker Python (consumer Kafka → embed → upsert Qdrant)
# worker.py
import json, hashlib, logging, signal, sys
from confluent_kafka import Consumer, KafkaError
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
log = logging.getLogger("embed-worker")
oai = OpenAI()
qd = QdrantClient(url="https://...qdrant.io", api_key="...")
COLL = "annonces"
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "embed-worker-annonces-v1", # change quand tu changes de model
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"isolation.level": "read_committed",
})
consumer.subscribe(["marketplace.public.annonces"])
def doc_to_text(ann: dict) -> str:
return f"{ann['title']}. {ann.get('description','')} {ann.get('city','')} {ann.get('price_eur','')}€"
def stable_point_id(doc_id: int) -> int:
return int(hashlib.md5(str(doc_id).encode()).hexdigest()[:15], 16)
def embed(text: str) -> list[float]:
return oai.embeddings.create(model="text-embedding-3-small", input=text).data[0].embedding
def handle(msg_value: dict):
# Debezium envelope : {op: c/u/d/r, before: {...}, after: {...}, source: {...}}
op = msg_value.get("op")
if op in ("c", "u", "r"):
ann = msg_value["after"]
emb = embed(doc_to_text(ann))
qd.upsert(
collection_name=COLL,
points=[PointStruct(
id=stable_point_id(ann["id"]),
vector=emb,
payload={
"doc_id": ann["id"],
"title": ann["title"],
"city": ann.get("city"),
"price_eur": ann.get("price_eur"),
"updated_at": ann.get("updated_at"),
"embedding_model": "text-embedding-3-small",
"embedding_version": 1,
},
)],
)
elif op == "d":
old = msg_value["before"]
qd.delete(
collection_name=COLL,
points_selector=[stable_point_id(old["id"])],
)
def run():
running = [True]
signal.signal(signal.SIGTERM, lambda *_: running.__setitem__(0, False))
while running[0]:
msg = consumer.poll(1.0)
if msg is None: continue
if msg.error():
log.error("kafka error", extra={"err": msg.error()})
continue
if msg.value() is None: # tombstone post-delete
continue
try:
handle(json.loads(msg.value()))
consumer.commit(msg)
except Exception as e:
log.exception("handler failed; will retry")
# ne pas commit → relire au prochain run
consumer.close()
if __name__ == "__main__":
run()🎬 Cas d'usage concrets
Cas 1 — Marketplace immobilier français (SeLoger-like)
Contexte : marketplace immo régionale, 100K annonces actives, ~2 000 mises à jour/heure (création, modif prix, retrait). Demande métier : annonce postée → recherchable en moins d'1 minute, sinon l'agent immobilier appelle le support.
Décision : Postgres + Debezium → Kafka (1 topic, 12 partitions) → 6 workers Python (consumer group) → Qdrant Cloud EU. Embedding text-embedding-3-small (rapide, suffisant pour de l'annonce).
Résultat : SLA p95 = 22s entre INSERT Postgres et search Qdrant. Coût pipeline : ~150€/mois (Kafka Confluent Cloud basic + workers Scaleway Serverless Containers).
Cas 2 — E-commerce flash sales, catalogue ultra-dynamique
Contexte : site flash sales, 50K produits, ventes qui ouvrent/ferment toutes les 6h. Quand une vente ouvre, 5K produits passent de draft à live en 2 minutes. Le search doit refléter ça immédiatement, sinon les utilisateurs voient des "ajouter au panier" cassés.
Décision : pipeline Debezium + Kafka + workers, mais avec batching adaptatif côté worker (regroupe les 200 events de la même seconde pour embed en batch OpenAI = -80% appels API, -90% coût). Idempotence via la primary key produit.
Résultat : 5K mises à jour en 2 min absorbées sans backlog. Coût embeddings divisé par 4.
Cas 3 — Banque AML, analyse temps réel de transactions
Contexte : banque mutualiste FR, monitoring AML (anti-blanchiment). 800K transactions/jour, besoin d'analyser chaque transaction vs un corpus de transactions historiques (~50M) pour détecter des schémas similaires (typologies de fraude).
Décision : pas de CDC sur la table transactions (trop chaud, latence WAL critique). À la place, double-write applicatif : l'API métier écrit en base et publie un event Kafka. Workers Python avec embedding maison (BGE-M3, déployé sur Triton Scaleway GPU). Qdrant self-hosted Scaleway (souveraineté FR obligatoire). Idempotence via transaction_id.
Résultat : 800K transactions/jour embedded et indexées en < 2 min p99. Détection de schémas suspect remontée à l'équipe AML en quasi-temps réel.
🛠️ Exemple end-to-end
Contexte : marketplace immo 100K annonces, SLA p95 < 60s. Pipeline Debezium → Kafka → workers Python (batch) → Qdrant. Code complet d'un worker prod-grade.
Schéma Qdrant
# infra/qdrant_setup.py
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance, ScalarQuantization, ScalarQuantizationConfig, ScalarType
qd = QdrantClient(url=..., api_key=...)
qd.create_collection(
"annonces",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE, on_disk=True),
quantization_config=ScalarQuantization(
scalar=ScalarQuantizationConfig(type=ScalarType.INT8, always_ram=True)
),
)
for field, schema in [("city", "keyword"), ("price_eur", "integer"),
("rooms", "integer"), ("embedding_version", "integer")]:
qd.create_payload_index("annonces", field_name=field, field_schema=schema)Worker batch-aware avec idempotence et retry
# worker_batched.py
import json, time, hashlib, logging, asyncio
from collections import defaultdict
from contextlib import suppress
from confluent_kafka import Consumer, TopicPartition
from openai import AsyncOpenAI
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import PointStruct
from tenacity import retry, stop_after_attempt, wait_exponential
log = logging.getLogger("embed-worker")
log.setLevel(logging.INFO)
EMBED_MODEL = "text-embedding-3-small"
EMBED_VERSION = 1
COLLECTION = "annonces"
BATCH_SIZE = 96
BATCH_TIMEOUT_MS = 500
oai = AsyncOpenAI()
qd = AsyncQdrantClient(url="https://...qdrant.io", api_key="...")
def stable_id(doc_id: int) -> int:
return int(hashlib.md5(str(doc_id).encode()).hexdigest()[:15], 16)
def doc_to_text(ann: dict) -> str:
return (
f"{ann.get('title','')}. {ann.get('description','')} "
f"{ann.get('city','')} {ann.get('rooms','')} pièces "
f"{ann.get('surface_m2','')} m² {ann.get('price_eur','')}€"
)
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=1, max=30))
async def embed_batch(texts: list[str]) -> list[list[float]]:
r = await oai.embeddings.create(model=EMBED_MODEL, input=texts)
return [d.embedding for d in r.data]
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=1, max=30))
async def upsert_batch(points: list[PointStruct]):
await qd.upsert(collection_name=COLLECTION, points=points)
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=1, max=30))
async def delete_ids(ids: list[int]):
if ids:
await qd.delete(collection_name=COLLECTION, points_selector=ids)
def parse_envelope(value: bytes | None):
if value is None:
return None # tombstone Kafka
env = json.loads(value)
op = env.get("op")
if op in ("c", "u", "r"):
return ("upsert", env["after"])
if op == "d":
return ("delete", env["before"])
return None
async def process_batch(buffer: list[tuple[str, dict, "TopicPartition", int]], consumer):
"""buffer = list of (op, payload, tp, offset). Dédoublonner par doc_id (last write wins)."""
last_by_id: dict[int, tuple[str, dict]] = {}
last_offsets: dict[tuple[str, int], int] = {}
for op, payload, tp, offset in buffer:
last_by_id[payload["id"]] = (op, payload)
key = (tp.topic, tp.partition)
last_offsets[key] = max(last_offsets.get(key, -1), offset)
upserts = [(doc_id, p) for doc_id, (op, p) in last_by_id.items() if op == "upsert"]
deletes = [doc_id for doc_id, (op, _) in last_by_id.items() if op == "delete"]
if upserts:
texts = [doc_to_text(p) for _, p in upserts]
embeddings = await embed_batch(texts)
points = [
PointStruct(
id=stable_id(doc_id),
vector=emb,
payload={
"doc_id": doc_id,
"title": p.get("title"),
"city": p.get("city"),
"price_eur": p.get("price_eur"),
"rooms": p.get("rooms"),
"surface_m2": p.get("surface_m2"),
"updated_at": p.get("updated_at"),
"embedding_model": EMBED_MODEL,
"embedding_version": EMBED_VERSION,
},
)
for (doc_id, p), emb in zip(upserts, embeddings)
]
await upsert_batch(points)
if deletes:
await delete_ids([stable_id(d) for d in deletes])
# commit offsets per partition
tps = [TopicPartition(t, p, off + 1) for (t, p), off in last_offsets.items()]
consumer.commit(offsets=tps, asynchronous=False)
log.info("processed batch", extra={
"upserts": len(upserts), "deletes": len(deletes), "dedup_from": len(buffer),
})
async def main():
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": f"embed-worker-annonces-v{EMBED_VERSION}",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"max.poll.interval.ms": 600000,
})
consumer.subscribe(["marketplace.public.annonces"])
buffer: list = []
last_flush = time.time()
while True:
timeout = max(0.05, (BATCH_TIMEOUT_MS / 1000) - (time.time() - last_flush))
msg = consumer.poll(timeout)
if msg is not None and msg.error() is None:
parsed = parse_envelope(msg.value())
if parsed:
op, payload = parsed
tp = TopicPartition(msg.topic(), msg.partition())
buffer.append((op, payload, tp, msg.offset()))
flush_time = (time.time() - last_flush) * 1000 >= BATCH_TIMEOUT_MS
if len(buffer) >= BATCH_SIZE or (buffer and flush_time):
await process_batch(buffer, consumer)
buffer.clear()
last_flush = time.time()
if __name__ == "__main__":
asyncio.run(main())Job de re-embed (versioning : passer de v1 → v2)
# jobs/reembed_v2.py
"""Quand on change de modèle d'embedding (small → large, ou OpenAI → BGE-M3),
on backfill par lots sans interrompre la prod."""
import asyncio
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue, PointStruct
from openai import AsyncOpenAI
NEW_MODEL = "text-embedding-3-large"
NEW_VERSION = 2
NEW_DIM = 3072
qd = AsyncQdrantClient(url="...", api_key="...")
oai = AsyncOpenAI()
async def reembed_batch(limit: int = 200):
# On scroll les points encore en v1
points, next_offset = await qd.scroll(
collection_name="annonces",
scroll_filter=Filter(must=[FieldCondition(key="embedding_version", match=MatchValue(value=1))]),
limit=limit, with_payload=True, with_vectors=False,
)
while points:
texts = [doc_to_text(p.payload) for p in points]
r = await oai.embeddings.create(model=NEW_MODEL, input=texts)
new_points = [
PointStruct(
id=p.id,
vector=emb.embedding,
payload={**p.payload, "embedding_model": NEW_MODEL, "embedding_version": NEW_VERSION},
)
for p, emb in zip(points, r.data)
]
await qd.upsert(collection_name="annonces_v2", points=new_points) # collection séparée
if not next_offset: break
points, next_offset = await qd.scroll(
collection_name="annonces",
scroll_filter=Filter(must=[FieldCondition(key="embedding_version", match=MatchValue(value=1))]),
limit=limit, offset=next_offset, with_payload=True, with_vectors=False,
)Le piège du changement de dimension (small 1536 → large 3072)
Changer de modèle d'embedding n'est pas qu'un changement de poids : la dimension du vecteur change. Une collection Qdrant créée avec size=1536 refuse un vecteur de 3072 dims (Wrong input: Vector dimension error). Conséquence : tu ne peux pas faire un re-embed in-place vers large. Le seul chemin propre est blue/green :
collection "annonces" (v1, dim=1536) ← prod lit ici
collection "annonces_v2" (v2, dim=3072) ← backfill écrit ici
│
▼ quand v2 == 100% backfillée + vérifiée
alias "annonces_live" ─────swap atomique────▶ pointe sur annonces_v2Qdrant supporte les aliases (update_collection_aliases avec DeleteAlias + CreateAlias dans un seul appel atomique). La prod requête toujours annonces_live (l'alias), jamais le nom physique. Le swap est instantané, sans trou de recherche, et rollbackable (re-pointer l'alias sur v1). Ne jamais drop + create sur la collection live : ça crée une fenêtre où la recherche renvoie zéro résultat.
LLM-enrichment avant embedding (optionnel, mais ROI réel)
Sur des annonces sales (titre en CAPS, description vide, fautes), embedder le texte brut donne des vecteurs bruités. Un pattern senior 2026 : un LLM normalise/enrichit le texte avant embedding (résumé canonique, extraction d'attributs structurés, normalisation de la ville). Ici Claude est le bon outil — pas pour l'embedding (Anthropic n'expose pas d'API d'embeddings, on garde OpenAI/Voyage/BGE), mais pour l'étape de préparation.
# enrich.py — normalisation LLM avant embedding (batch, async, cache-friendly)
import logging
from enum import Enum
from anthropic import AsyncAnthropic
from anthropic import RateLimitError, APIStatusError, APITimeoutError
from pydantic import BaseModel
log = logging.getLogger("enrich")
anthropic = AsyncAnthropic(max_retries=4, timeout=20.0) # SDK gère le backoff 429/5xx + timeout/appel
class PropertyType(str, Enum):
appartement = "appartement"
maison = "maison"
terrain = "terrain"
autre = "autre"
class Enrichment(BaseModel): # schéma natif → validation côté API + typage Python
canonical_text: str
city_normalized: str
property_type: PropertyType
SYSTEM = (
"Tu normalises des annonces immobilières françaises. "
"Produis un canonical_text propre, sans CAPS abusives, prêt pour l'embedding."
)
async def enrich(raw_text: str) -> Enrichment | None:
# messages.parse() = sortie structurée native, validée et désérialisée vers Enrichment
resp = await anthropic.messages.parse(
model="claude-haiku-4-5", # cheap (1$/5$ par Mtok) : volume + latence
max_tokens=400,
system=[{"type": "text", "text": SYSTEM,
"cache_control": {"type": "ephemeral"}}], # prefix stable → cache prompt
output_config={"format": Enrichment},
messages=[{"role": "user", "content": raw_text}],
)
log.info("enrich usage", extra={"in": resp.usage.input_tokens, "out": resp.usage.output_tokens})
if resp.stop_reason == "refusal": # classifieur a décliné → fallback texte brut en amont
return None
return resp.parsed_output # instance Enrichment (jamais .input : ça, c'est un tool_use)Décisions senior dans ce snippet :
claude-haiku-4-5(1 USD / 5 USD par Mtok) et pas Opus : c'est de la normalisation à fort volume, pas du raisonnement. Réserverclaude-opus-4-8(5/25) à l'enrichissement complexe (déduplication sémantique, détection de fraude dans le cas AML).messages.parse()+ schéma Pydantic (output_config={"format": Enrichment}) plutôt que parser du XML/JSON à la main : sortie structurée native, validée côté API et désérialisée vers un type Python. On litresp.parsed_output, jamaisresp.content[0].input(.input= bloctool_use, pas une sortie structurée).cache_controlsur le system prompt : le préfixe stable (system + schéma) est mis en cache → lecture à ~0.1× sur tous les appels suivants. À 50K annonces, le cache divise nettement la facture d'input.AsyncAnthropic+max_retries+timeout: serveur → async obligatoire ; le SDK retry 429/RateLimitErroret 5xx (APIStatusError,OverloadedError) avec backoff exponentiel typé, et letimeoutpar appel borne la latence p99 (sinon un appel pendu bloque un worker). Sur gros volume, préférer streaming +asyncio.gatherpour paralléliser les appels d'enrichissement.stop_reason == "refusal": un classifieur de sécurité peut décliner (HTTP 200,contentvide) ; on testestop_reasonavant de lire la sortie, et on retombe sur le texte brut (enriched=false) plutôt que de planter le pipeline.- Pièges : cette étape ajoute de la latence et un point de panne dans le chemin de freshness. Si l'enrichissement échoue après 4 retries → DLQ, ne bloque jamais l'indexation (fallback : embedder le texte brut, marquer
enriched=false, ré-enrichir en async). Et versionne le prompt d'enrichissement (enrich_prompt_versiondans le payload) au même titre que le modèle d'embedding : changer le prompt = changer la qualité = potentiel backfill.
Suppression RGPD-grade
# api/rgpd.py
"""Quand un utilisateur demande la suppression de SES annonces."""
async def rgpd_delete_user(user_id: str):
# 1. Mark for deletion in Postgres
await pg.execute("UPDATE annonces SET deleted_at = now() WHERE owner_id = $1", user_id)
# 2. La CDC va propager l'op `u` avec deleted_at != NULL
# 3. Le worker doit interpréter ça comme un delete logique :
# soit hard delete dans Qdrant, soit ajouter payload `is_deleted=true` + filtre search
# 4. Hard delete dans Postgres après 30j (rétention légale) → cascade vers Qdrant via CDC `d`Métriques observabilité
# observability/metrics.py
from prometheus_client import Histogram, Counter, Gauge
EMBED_LATENCY = Histogram("embed_latency_seconds", "embedding API latency")
UPSERT_LATENCY = Histogram("qdrant_upsert_latency_seconds", "qdrant upsert latency")
BATCH_SIZE = Histogram("batch_size", "messages per batch")
PROCESSED = Counter("messages_processed_total", "messages processed", ["op"])
KAFKA_LAG = Gauge("kafka_consumer_lag", "lag par partition", ["partition"])
FRESHNESS = Histogram("freshness_seconds", "delta pg.commit → qdrant.upsert")Numbers réels (100K annonces, ~2K updates/h, 6 workers, OpenAI small) :
- Freshness p50 : 8s, p95 : 22s, p99 : 41s
- Coût embeddings : ~85€/mois
- Coût Kafka Confluent basic : ~80€/mois
- Workers Scaleway Serverless Containers : ~40€/mois
- Total pipeline : ~205€/mois
🎯 Patterns courants
group.idversionné (embed-worker-annonces-v1) → quand tu changes le model, nouveau consumer group, replay du log Kafka pour réindexer.- Batching adaptatif (batch size OR timeout) → équilibre coût/latence. 50-100 docs par batch OpenAI = -80% coût vs 1 par 1.
- Dedup par primary key dans le batch (last-write-wins) → si un user édite 5 fois en 2s, tu n'embeddes qu'une fois.
- ID Qdrant déterministe (
hash(doc_id)) → upsert idempotent, replay du log sans drift. tombstones.on.delete=truecôté Debezium → events de suppression propres à consommer.- Versioning d'embeddings dans le payload (
embedding_model,embedding_version) → permet de filtrer "n'utiliser que la nouvelle version" pendant un backfill. - Reindex via collection séparée puis swap atomique de l'alias — pas de "drop + create" qui crée un trou.
- DLQ (Dead Letter Queue) sur erreurs persistantes après 5 retries → topic Kafka séparé
documents.dlqavec alerting. - Backpressure : si Qdrant ralentit,
consumer.pause()automatique → Kafka stocke le retard, pas de OOM. - Mesurer la freshness depuis
source.ts_ms(Debezium) jusqu'àqdrant.upsert.timestamp→ c'est la métrique métier.
🔄 Versions & écosystème 2026
| Composant | Version 2026 | Notes |
|---|---|---|
| Debezium | 2.7+ | Connector Postgres mature, support PG 17 |
| Kafka / Confluent Cloud | 3.7+ / dedicated EU clusters | basic tier suffit pour < 5K msg/s |
| RedPanda | 24.x | Alternative Kafka, plus simple à opérer |
| Postgres logical replication | PG 17 | pgoutput plugin, slots stables |
| Qdrant async client | 1.13+ | AsyncQdrantClient, batching natif |
| Embeddings API | OpenAI embeddings-v3, Voyage 3, BGE-M3 local | Hosted ou self-hosted |
| Scaleway Serverless Containers | GA | Hosting workers Python idéal |
| Kafka Connect on Strimzi | sur Kubernetes | Pour les setups self-hosted |
⚠️ Pitfalls
- Pas de WAL slot monitoring → un Debezium qui plante laisse un slot ouvert, le WAL Postgres grossit jusqu'à saturer le disque. Toujours alerter sur
pg_replication_slots.confirmed_flush_lsn. group.idpartagé entre dev/staging/prod → consommation croisée, désastre. Group ID par env.- Pas de dédoublonnage dans le batch → si user édite 10x, tu fais 10 embeddings au lieu de 1. Cher.
- Suppression molle (
deleted_at) sans filter search → docs supprimés continuent à apparaître dans Qdrant. Soit hard delete dans Qdrant, soitis_deleted=true+must_notdans toute query. - Re-embed sur la même collection sans collection v2 → tu écris pendant que tu lis, scores incohérents. Toujours blue/green via alias.
- Pas de DLQ → un seul message poison fait crasher le worker en boucle. Toujours DLQ + alert.
auto.offset.reset=earliestaprès replay non-prévu → un consumer group qui se réinitialise repart de l'origine, dépile 10M messages. Surveille les offsets.- Embedding model gratuit-mais-instable sans pin de version → un changement silencieux fait drifter la qualité. Pin model name + version.
- Suppression RGPD via "supprimer dans Postgres seulement" → vector toujours dans Qdrant, infraction. Toujours vérifier la chaîne complète.
- Pas de test de bout-en-bout → tu ne sais pas si tout le pipeline marche. Test "j'insère un doc → je le trouve dans Qdrant en < 60s" en CI sur env staging.
💰 Pricing / ROI client
Infra (ordres de grandeur 2026)
| Composant | Setup PME (100K docs) | Setup ETI (10M docs) |
|---|---|---|
| Debezium (sur K8s) | inclus dans K8s | dédié 2 noeuds |
| Kafka Confluent Basic EU | ~80€/mois | ~600€/mois (Standard) |
| Workers (Scaleway Serverless Containers) | ~40€/mois | ~200€/mois (3-5 instances) |
| Qdrant Cloud EU | ~290€/mois | ~2 100€/mois |
| Embeddings (OpenAI small) | ~85€/mois | ~800€/mois |
| Observabilité (Grafana/Prometheus) | inclus | ~150€/mois |
| Total | ~500€/mois | ~3 850€/mois |
Pricing freelance type
| Mission | Jours | TJM | Prix HT |
|---|---|---|---|
| Audit pipeline ingestion existant | 3 | 1300€ | 3 900€ |
| Setup Debezium + Kafka + 1 worker prod | 7 | 1400€ | 9 800€ |
| Pipeline complet end-to-end + monitoring + DLQ | 12 | 1500€ | 18 000€ |
| Migration model embedding (v1 → v2) sans downtime | 6 | 1500€ | 9 000€ |
| Mise en conformité RGPD du pipeline | 5 | 1500€ | 7 500€ |
ROI client marketplace : SLA freshness 60s vs "à la prochaine batch nuit" = adoption agent immobilier +40%, churn -15%.
🧪 Testing / Eval
# tests/test_freshness_e2e.py
"""Test end-to-end : insère dans Postgres, mesure le temps avant qu'il apparaisse dans Qdrant."""
import asyncio, time, uuid, psycopg
from qdrant_client import AsyncQdrantClient
@pytest.mark.asyncio
async def test_doc_searchable_within_60s():
doc_id = int(time.time())
title = f"Test annonce {uuid.uuid4()}"
async with await psycopg.AsyncConnection.connect(DSN) as conn:
await conn.execute(
"INSERT INTO annonces (id, title, description, city, price_eur) "
"VALUES ($1,$2,$3,$4,$5)",
(doc_id, title, "test", "Paris", 500000),
)
await conn.commit()
qd = AsyncQdrantClient(url=..., api_key=...)
deadline = time.time() + 60
while time.time() < deadline:
hits = (await qd.query_points(
"annonces", query=await embed_query(title), limit=5
)).points
if any(h.payload["doc_id"] == doc_id for h in hits):
return
await asyncio.sleep(1)
pytest.fail("doc pas trouvable après 60s — freshness SLA cassé")
def test_delete_propagation():
"""Suppression RGPD : doit disparaître de Qdrant en < 60s."""
...
def test_replay_idempotence():
"""Rejouer 1000x le même event ne crée pas de doublon."""
...À monitorer en continu :
- Freshness p50/p95/p99 (histogramme Prometheus)
- Kafka consumer lag par partition
- Erreurs embedding API (rate limit, 5xx)
- Erreurs Qdrant upsert
- Taille de la DLQ
- Coût $/jour embeddings + Kafka + DB
- WAL slot lag Postgres
🏋️ Exercices
Progressifs, du « fais marcher » au « casse-le puis défends ton chiffre ». Monte un Postgres + Redpanda + Qdrant en docker-compose local pour tout faire tourner.
Exercice 1 — Pipeline de bout en bout (le socle)
Objectif : INSERT dans Postgres → vecteur searchable dans Qdrant en < 60s, avec suppression propagée. Indice/Solution : Debezium (pgoutput, tombstones.on.delete=true) → 1 topic → 1 worker confluent-kafka qui parse l'enveloppe Debezium (op ∈ c/u/r/d), embed, upsert avec id=hash(doc_id). Test d'acceptation : le test_doc_searchable_within_60s du fichier doit passer. Valide aussi la suppression : DELETE → tombstone → qd.delete.
Exercice 2 — Prouver l'idempotence sous rejeu
Objectif : rejouer 10 000× le même event Kafka ne doit créer aucun doublon ni faire dériver le score. Indice/Solution : reset le consumer group à earliest, redépile tout le log. Assert : qd.count(collection) identique avant/après, et query_points renvoie un seul point par doc_id. Le levier est l'ID déterministe — supprime le hash(doc_id) et remplace par un UUID aléatoire pour voir les doublons apparaître, puis remets le hash. Tu dois pouvoir expliquer pourquoi at-least-once + idempotence = effectively-once.
Exercice 3 — Batching adaptatif et défense du coût
Objectif : absorber un burst de 5 000 updates/2 min (cas e-commerce flash) sans backlog, et diviser la facture d'embedding par 4. Indice/Solution : implémente le worker_batched.py (batch size OR timeout) avec dédup last-write-wins par doc_id dans le buffer. Mesure : nombre d'appels API embedding avant/après, et le coût $ correspondant. Défends le chiffre — pourquoi 96 et pas 512 ? (latence p95 de freshness vs coût ; un batch trop gros gonfle la freshness, trop petit gonfle le coût). Trace une courbe coût/latence en faisant varier BATCH_SIZE.
Exercice 4 — Casse-le : le slot WAL qui sature le disque
Objectif : reproduire la panne #1 des pitfalls (Debezium down → slot ouvert → WAL grossit → disque plein), puis instrumenter l'alerte qui l'aurait prévenue. Indice/Solution : tue le connecteur Debezium, génère du trafic d'écriture, observe pg_replication_slots.confirmed_flush_lsn qui stagne pendant que pg_wal grossit (SELECT pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn))). Ajoute une alerte Prometheus sur ce lag. Bonus : configure max_slot_wal_keep_size pour que Postgres protège son disque (au prix de tuer le slot — discute le tradeoff disponibilité vs intégrité).
Exercice 5 — Migration model sans downtime (blue/green + dimension)
Objectif : migrer de text-embedding-3-small (1536) à text-embedding-3-large (3072) sans aucune fenêtre où la recherche est vide, et rollbackable. Indice/Solution : crée annonces_v2 (dim=3072), backfill via le job reembed_v2 en lisant embedding_version=1, garde le worker live qui écrit dans les deux collections pendant la transition (dual-write contrôlé), puis update_collection_aliases (DeleteAlias+CreateAlias atomique) pour basculer annonces_live sur v2. Test de rollback : re-pointe l'alias sur v1, la recherche doit rester cohérente. Piège à éviter : ne JAMAIS re-embed in-place — la collection 1536 refuse les vecteurs 3072.
Exercice 6 — RGPD adversarial : prouve la suppression sur toute la chaîne
Objectif : un utilisateur exerce son droit à l'oubli ; prouve qu'aucune trace de ses vecteurs ne subsiste — ni dans Qdrant, ni dans le snapshot, ni dans le log Kafka au-delà de la rétention. Indice/Solution : UPDATE deleted_at=now() → CDC u interprété comme delete logique → is_deleted=true + must_not dans toute query, PUIS hard delete après rétention légale → CDC d → qd.delete. Vérifie : (1) query_points ne renvoie plus le point, (2) qd.retrieve(point_id) renvoie vide, (3) la rétention Kafka (retention.ms) purge l'event en clair, (4) si tu fais des snapshots Qdrant, le snapshot post-suppression ne contient plus le vecteur. Écris le test test_delete_propagation qui mesure le délai de purge complète. C'est exactement ce qu'un DPO/auditeur CNIL te demandera de démontrer.
🎤 En entretien
Q : « Comment garantis-tu l'exactly-once entre Postgres et ta vector DB ? » R : Je ne le garantis pas au niveau transport — Kafka c'est at-least-once. J'obtiens un effectively-once via idempotence : ID Qdrant déterministe (hash(doc_id)) + commit d'offset après l'upsert. Un rejeu produit le même état, jamais un doublon.
Q : « Ta table de transactions est trop chaude pour brancher du CDC dessus. Que fais-tu ? » R : J'évite le CDC direct (latence WAL critique) et je passe par le pattern outbox — INSERT outbox dans la même transaction que l'écriture métier, Debezium relaie la table outbox. Atomique, sans dual-write. Si même ça coûte trop, double-write applicatif assumé avec réconciliation périodique.
Q : « Tu changes de modèle d'embedding en prod. Comment, sans downtime ? » R : Blue/green via alias Qdrant. La dimension change (1536→3072) donc re-embed in-place est impossible — nouvelle collection, backfill en lisant embedding_version, dual-write pendant la transition, puis swap atomique de l'alias. Rollback = re-pointer l'alias.
Q : « Comment tu mesures la fraîcheur, et quel SLA tu défends ? » R : Je mesure de source.ts_ms (timestamp du commit Postgres, fourni par Debezium) jusqu'au timestamp d'upsert Qdrant — c'est la freshness métier, pas juste la latence worker. SLA typique p95 < 60s, p99 < 90s, exposé en histogramme Prometheus. Si le p99 dérape, je regarde le consumer lag par partition d'abord.
Q : « Un message poison fait crasher ton worker en boucle. » R : DLQ après N retries (topic documents.dlq) + alerting, et le worker continue de dépiler le reste. Sans DLQ, un seul event malformé bloque toute la partition — c'est le pitfall classique. Je ne commit jamais un offset sans avoir traité ou redirigé le message.
🔁 Quand utiliser / éviter
Mettre un vrai pipeline real-time quand :
- SLA freshness < 5 min requis (marketplace, ecom flash, AML)
- Volume > 1K updates/jour
- Multiples consumers en aval (vector DB + cache + analytics)
- Souveraineté + observabilité prouvable
Rester sur batch nocturne quand :
- Catalogue quasi-statique (référentiel produit qui change peu)
- < 100 updates/jour
- POC ou démarrage projet
- Coût Kafka/Debezium > valeur métier de la freshness
Approche hybride : CDC pour les tables chaudes (annonces, produits), batch pour les tables froides (corpus juridique).
🔗 Liens
- Debezium docs : https://debezium.io/documentation/
- Confluent Cloud EU : https://www.confluent.io/regions/
- Redpanda : https://redpanda.com
- Kafka Streams + Python (Faust est mort en 2024, utiliser
confluent-kafkadirectement) - Qdrant async client : https://github.com/qdrant/qdrant-client
- Pattern Outbox (alternative à CDC quand pas de WAL access) : https://microservices.io/patterns/data/transactional-outbox.html
- RGPD CNIL guide vector DB : 2025
- Scaleway Messaging & Streaming (Kafka managé EU) : https://www.scaleway.com/en/messaging-and-streaming/
→ Voir aussi : 02-qdrant.md (cible vector DB), 01-pgvector.md (alternative simple), 04-pinecone-self-hosted.md, 05-hybrid-search.md.