Skip to content

Real-time vector updates — CDC + embedding workers + vector DB

TL;DR Indexer un dataset une fois c'est facile. Le maintenir frais quand le métier écrit en continu est ce qui sépare un POC d'une prod sérieuse. Pattern 2026 : Postgres → Debezium → Kafka → embedding worker → vector DB, idempotent (clé déterministe), avec gestion explicite des suppressions (RGPD droit à l'oubli), versioning d'embeddings (model swap = backfill controlé), reindex incrémental vs batch. SLA typique : doc créé en base → searchable en < 60s p95. Pricing freelance : 8-15 jours pour un pipeline prod-grade complet à 1400-1500€/j.

🧠 Mental model

Architecture canonique

   ┌──────────────┐
   │  Postgres    │      ┌─────────────────┐
   │  (source of  │ WAL  │   Debezium      │
   │   truth      │─────▶│   (connector    │
   │   métier)    │      │    Postgres)    │
   └──────────────┘      └────────┬────────┘
                                  │ change events

                          ┌───────────────┐
                          │     Kafka     │  topic: documents.changes
                          │  (partition   │  key: doc_id
                          │   par doc_id) │  retention: 7j
                          └───────┬───────┘

                  ┌───────────────┼───────────────┐
                  │               │               │
                  ▼               ▼               ▼
         ┌───────────────┐ ┌─────────────┐ ┌─────────────┐
         │ embed worker  │ │embed worker │ │embed worker │
         │  (consumer    │ │             │ │             │
         │   group)      │ │             │ │             │
         └───────┬───────┘ └──────┬──────┘ └──────┬──────┘
                 │                │               │
                 └────────────────┼───────────────┘

                          ┌───────────────┐
                          │    Qdrant     │  upsert (idempotent)
                          │   (vector     │  delete on tombstone
                          │    index)     │
                          └───────────────┘

Analogie : c'est un système d'arrosage automatique. La source d'eau (Postgres) ne sait pas qu'elle arrose. Le réseau de tuyaux (Kafka) ne perd pas une goutte. Les arroseurs (workers) sont interchangeables, en parallèle, et tolérants au "et si je rate un tour ?". Le potager (Qdrant) est toujours frais.

Les 4 garanties à offrir

   ┌──────────────────────────────────────────────────┐
   │ 1. Freshness    : update DB → vector DB < 60s    │
   │ 2. Idempotence  : rejouer 100x = 1x              │
   │ 3. Suppressions : RGPD = vraiment supprimé       │
   │ 4. Versioning   : changer de model = pas big-bang │
   └──────────────────────────────────────────────────┘

Le mensonge de l'« exactly-once » (à connaître absolument)

Un junior promet « exactly-once ». Un senior sait que exactly-once n'existe pas au niveau transport : Kafka garantit at-least-once par défaut (le worker peut crasher après l'upsert Qdrant mais avant le commit d'offset → le message sera rejoué). La seule façon d'obtenir un effet exactly-once observable est de rendre le traitement idempotent : upsert(id=hash(doc_id)) rejoué 10× produit exactement le même état. C'est la différence entre exactly-once delivery (impossible en pratique distribuée) et effectively-once processing (atteignable via idempotence + clé déterministe).

   Crash window classique (at-least-once) :
   poll → embed → upsert Qdrant ──💥 crash ici── commit offset

                          message PAS commité → rejoué au restart
                          → l'upsert idempotent absorbe le doublon ✅

Ordre des opérations qui compte : commit l'offset après l'upsert, jamais avant. Si tu commit avant (ou si tu actives enable.auto.commit=true), un crash entre commit et upsert te fait perdre le message → perte de donnée silencieuse, pire qu'un doublon. Règle senior : toujours préférer un doublon idempotent à une perte. C'est pour ça que le worker met enable.auto.commit: False et commit manuellement en fin de batch.

Mental model du flux : push (CDC) vs pull (outbox/double-write)

CDC (Debezium)Outbox patternDouble-write applicatif
Source de véritéWAL PostgresTable outbox + WALCode applicatif
Couplage métierZéro (la base ne sait pas)Faible (1 INSERT outbox dans la même tx)Fort (l'API doit publier)
Garantie atomicitéNative (WAL = commit)Native (INSERT outbox dans la tx métier)Aucune — risque dual-write (DB ok, Kafka ko)
Latence WALLit le WAL → impact si table très chaudeIdem CDC mais filtré sur outboxIndépendant du WAL
Quand l'utiliserDéfaut. Tables métier classiquesPas d'accès WAL, ou multi-tenant SaaSTable trop chaude pour CDC (cf. cas AML)

Le piège du double-write : db.commit(); kafka.publish() n'est PAS atomique. Si le process meurt entre les deux, la base a la donnée mais Qdrant ne la verra jamais. L'outbox résout ça en faisant INSERT outbox dans la même transaction que l'écriture métier, puis Debezium relaie la table outbox. C'est le pattern « transactionnel » de référence quand on ne peut/veut pas brancher le CDC directement sur la table chaude.

🛠️ Code minimal

Setup Debezium → Kafka (Postgres CDC)

json
// debezium-connector.json
{
  "name": "annonces-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "pg.internal",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "...",
    "database.dbname": "marketplace",
    "topic.prefix": "marketplace",
    "slot.name": "debezium_annonces",
    "publication.name": "debezium_publication",
    "table.include.list": "public.annonces",
    "plugin.name": "pgoutput",
    "tombstones.on.delete": "true"
  }
}

Worker Python (consumer Kafka → embed → upsert Qdrant)

python
# worker.py
import json, hashlib, logging, signal, sys
from confluent_kafka import Consumer, KafkaError
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct

log = logging.getLogger("embed-worker")
oai = OpenAI()
qd = QdrantClient(url="https://...qdrant.io", api_key="...")
COLL = "annonces"

consumer = Consumer({
    "bootstrap.servers": "kafka:9092",
    "group.id": "embed-worker-annonces-v1",   # change quand tu changes de model
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "isolation.level": "read_committed",
})
consumer.subscribe(["marketplace.public.annonces"])


def doc_to_text(ann: dict) -> str:
    return f"{ann['title']}. {ann.get('description','')} {ann.get('city','')} {ann.get('price_eur','')}€"


def stable_point_id(doc_id: int) -> int:
    return int(hashlib.md5(str(doc_id).encode()).hexdigest()[:15], 16)


def embed(text: str) -> list[float]:
    return oai.embeddings.create(model="text-embedding-3-small", input=text).data[0].embedding


def handle(msg_value: dict):
    # Debezium envelope : {op: c/u/d/r, before: {...}, after: {...}, source: {...}}
    op = msg_value.get("op")
    if op in ("c", "u", "r"):
        ann = msg_value["after"]
        emb = embed(doc_to_text(ann))
        qd.upsert(
            collection_name=COLL,
            points=[PointStruct(
                id=stable_point_id(ann["id"]),
                vector=emb,
                payload={
                    "doc_id": ann["id"],
                    "title": ann["title"],
                    "city": ann.get("city"),
                    "price_eur": ann.get("price_eur"),
                    "updated_at": ann.get("updated_at"),
                    "embedding_model": "text-embedding-3-small",
                    "embedding_version": 1,
                },
            )],
        )
    elif op == "d":
        old = msg_value["before"]
        qd.delete(
            collection_name=COLL,
            points_selector=[stable_point_id(old["id"])],
        )


def run():
    running = [True]
    signal.signal(signal.SIGTERM, lambda *_: running.__setitem__(0, False))
    while running[0]:
        msg = consumer.poll(1.0)
        if msg is None: continue
        if msg.error():
            log.error("kafka error", extra={"err": msg.error()})
            continue
        if msg.value() is None:    # tombstone post-delete
            continue
        try:
            handle(json.loads(msg.value()))
            consumer.commit(msg)
        except Exception as e:
            log.exception("handler failed; will retry")
            # ne pas commit → relire au prochain run

    consumer.close()


if __name__ == "__main__":
    run()

🎬 Cas d'usage concrets

Cas 1 — Marketplace immobilier français (SeLoger-like)

Contexte : marketplace immo régionale, 100K annonces actives, ~2 000 mises à jour/heure (création, modif prix, retrait). Demande métier : annonce postée → recherchable en moins d'1 minute, sinon l'agent immobilier appelle le support.

Décision : Postgres + Debezium → Kafka (1 topic, 12 partitions) → 6 workers Python (consumer group) → Qdrant Cloud EU. Embedding text-embedding-3-small (rapide, suffisant pour de l'annonce).

Résultat : SLA p95 = 22s entre INSERT Postgres et search Qdrant. Coût pipeline : ~150€/mois (Kafka Confluent Cloud basic + workers Scaleway Serverless Containers).

Cas 2 — E-commerce flash sales, catalogue ultra-dynamique

Contexte : site flash sales, 50K produits, ventes qui ouvrent/ferment toutes les 6h. Quand une vente ouvre, 5K produits passent de draft à live en 2 minutes. Le search doit refléter ça immédiatement, sinon les utilisateurs voient des "ajouter au panier" cassés.

Décision : pipeline Debezium + Kafka + workers, mais avec batching adaptatif côté worker (regroupe les 200 events de la même seconde pour embed en batch OpenAI = -80% appels API, -90% coût). Idempotence via la primary key produit.

Résultat : 5K mises à jour en 2 min absorbées sans backlog. Coût embeddings divisé par 4.

Cas 3 — Banque AML, analyse temps réel de transactions

Contexte : banque mutualiste FR, monitoring AML (anti-blanchiment). 800K transactions/jour, besoin d'analyser chaque transaction vs un corpus de transactions historiques (~50M) pour détecter des schémas similaires (typologies de fraude).

Décision : pas de CDC sur la table transactions (trop chaud, latence WAL critique). À la place, double-write applicatif : l'API métier écrit en base et publie un event Kafka. Workers Python avec embedding maison (BGE-M3, déployé sur Triton Scaleway GPU). Qdrant self-hosted Scaleway (souveraineté FR obligatoire). Idempotence via transaction_id.

Résultat : 800K transactions/jour embedded et indexées en < 2 min p99. Détection de schémas suspect remontée à l'équipe AML en quasi-temps réel.

🛠️ Exemple end-to-end

Contexte : marketplace immo 100K annonces, SLA p95 < 60s. Pipeline Debezium → Kafka → workers Python (batch) → Qdrant. Code complet d'un worker prod-grade.

Schéma Qdrant

python
# infra/qdrant_setup.py
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance, ScalarQuantization, ScalarQuantizationConfig, ScalarType

qd = QdrantClient(url=..., api_key=...)
qd.create_collection(
    "annonces",
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE, on_disk=True),
    quantization_config=ScalarQuantization(
        scalar=ScalarQuantizationConfig(type=ScalarType.INT8, always_ram=True)
    ),
)
for field, schema in [("city", "keyword"), ("price_eur", "integer"),
                      ("rooms", "integer"), ("embedding_version", "integer")]:
    qd.create_payload_index("annonces", field_name=field, field_schema=schema)

Worker batch-aware avec idempotence et retry

python
# worker_batched.py
import json, time, hashlib, logging, asyncio
from collections import defaultdict
from contextlib import suppress
from confluent_kafka import Consumer, TopicPartition
from openai import AsyncOpenAI
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import PointStruct
from tenacity import retry, stop_after_attempt, wait_exponential

log = logging.getLogger("embed-worker")
log.setLevel(logging.INFO)

EMBED_MODEL = "text-embedding-3-small"
EMBED_VERSION = 1
COLLECTION = "annonces"
BATCH_SIZE = 96
BATCH_TIMEOUT_MS = 500

oai = AsyncOpenAI()
qd = AsyncQdrantClient(url="https://...qdrant.io", api_key="...")


def stable_id(doc_id: int) -> int:
    return int(hashlib.md5(str(doc_id).encode()).hexdigest()[:15], 16)


def doc_to_text(ann: dict) -> str:
    return (
        f"{ann.get('title','')}. {ann.get('description','')} "
        f"{ann.get('city','')} {ann.get('rooms','')} pièces "
        f"{ann.get('surface_m2','')}{ann.get('price_eur','')}€"
    )


@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=1, max=30))
async def embed_batch(texts: list[str]) -> list[list[float]]:
    r = await oai.embeddings.create(model=EMBED_MODEL, input=texts)
    return [d.embedding for d in r.data]


@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=1, max=30))
async def upsert_batch(points: list[PointStruct]):
    await qd.upsert(collection_name=COLLECTION, points=points)


@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=1, max=30))
async def delete_ids(ids: list[int]):
    if ids:
        await qd.delete(collection_name=COLLECTION, points_selector=ids)


def parse_envelope(value: bytes | None):
    if value is None:
        return None  # tombstone Kafka
    env = json.loads(value)
    op = env.get("op")
    if op in ("c", "u", "r"):
        return ("upsert", env["after"])
    if op == "d":
        return ("delete", env["before"])
    return None


async def process_batch(buffer: list[tuple[str, dict, "TopicPartition", int]], consumer):
    """buffer = list of (op, payload, tp, offset). Dédoublonner par doc_id (last write wins)."""
    last_by_id: dict[int, tuple[str, dict]] = {}
    last_offsets: dict[tuple[str, int], int] = {}
    for op, payload, tp, offset in buffer:
        last_by_id[payload["id"]] = (op, payload)
        key = (tp.topic, tp.partition)
        last_offsets[key] = max(last_offsets.get(key, -1), offset)

    upserts = [(doc_id, p) for doc_id, (op, p) in last_by_id.items() if op == "upsert"]
    deletes = [doc_id for doc_id, (op, _) in last_by_id.items() if op == "delete"]

    if upserts:
        texts = [doc_to_text(p) for _, p in upserts]
        embeddings = await embed_batch(texts)
        points = [
            PointStruct(
                id=stable_id(doc_id),
                vector=emb,
                payload={
                    "doc_id": doc_id,
                    "title": p.get("title"),
                    "city": p.get("city"),
                    "price_eur": p.get("price_eur"),
                    "rooms": p.get("rooms"),
                    "surface_m2": p.get("surface_m2"),
                    "updated_at": p.get("updated_at"),
                    "embedding_model": EMBED_MODEL,
                    "embedding_version": EMBED_VERSION,
                },
            )
            for (doc_id, p), emb in zip(upserts, embeddings)
        ]
        await upsert_batch(points)

    if deletes:
        await delete_ids([stable_id(d) for d in deletes])

    # commit offsets per partition
    tps = [TopicPartition(t, p, off + 1) for (t, p), off in last_offsets.items()]
    consumer.commit(offsets=tps, asynchronous=False)
    log.info("processed batch", extra={
        "upserts": len(upserts), "deletes": len(deletes), "dedup_from": len(buffer),
    })


async def main():
    consumer = Consumer({
        "bootstrap.servers": "kafka:9092",
        "group.id": f"embed-worker-annonces-v{EMBED_VERSION}",
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
        "max.poll.interval.ms": 600000,
    })
    consumer.subscribe(["marketplace.public.annonces"])

    buffer: list = []
    last_flush = time.time()

    while True:
        timeout = max(0.05, (BATCH_TIMEOUT_MS / 1000) - (time.time() - last_flush))
        msg = consumer.poll(timeout)
        if msg is not None and msg.error() is None:
            parsed = parse_envelope(msg.value())
            if parsed:
                op, payload = parsed
                tp = TopicPartition(msg.topic(), msg.partition())
                buffer.append((op, payload, tp, msg.offset()))

        flush_time = (time.time() - last_flush) * 1000 >= BATCH_TIMEOUT_MS
        if len(buffer) >= BATCH_SIZE or (buffer and flush_time):
            await process_batch(buffer, consumer)
            buffer.clear()
            last_flush = time.time()


if __name__ == "__main__":
    asyncio.run(main())

Job de re-embed (versioning : passer de v1 → v2)

python
# jobs/reembed_v2.py
"""Quand on change de modèle d'embedding (small → large, ou OpenAI → BGE-M3),
   on backfill par lots sans interrompre la prod."""
import asyncio
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue, PointStruct
from openai import AsyncOpenAI

NEW_MODEL = "text-embedding-3-large"
NEW_VERSION = 2
NEW_DIM = 3072

qd = AsyncQdrantClient(url="...", api_key="...")
oai = AsyncOpenAI()


async def reembed_batch(limit: int = 200):
    # On scroll les points encore en v1
    points, next_offset = await qd.scroll(
        collection_name="annonces",
        scroll_filter=Filter(must=[FieldCondition(key="embedding_version", match=MatchValue(value=1))]),
        limit=limit, with_payload=True, with_vectors=False,
    )
    while points:
        texts = [doc_to_text(p.payload) for p in points]
        r = await oai.embeddings.create(model=NEW_MODEL, input=texts)
        new_points = [
            PointStruct(
                id=p.id,
                vector=emb.embedding,
                payload={**p.payload, "embedding_model": NEW_MODEL, "embedding_version": NEW_VERSION},
            )
            for p, emb in zip(points, r.data)
        ]
        await qd.upsert(collection_name="annonces_v2", points=new_points)  # collection séparée
        if not next_offset: break
        points, next_offset = await qd.scroll(
            collection_name="annonces",
            scroll_filter=Filter(must=[FieldCondition(key="embedding_version", match=MatchValue(value=1))]),
            limit=limit, offset=next_offset, with_payload=True, with_vectors=False,
        )

Le piège du changement de dimension (small 1536 → large 3072)

Changer de modèle d'embedding n'est pas qu'un changement de poids : la dimension du vecteur change. Une collection Qdrant créée avec size=1536 refuse un vecteur de 3072 dims (Wrong input: Vector dimension error). Conséquence : tu ne peux pas faire un re-embed in-place vers large. Le seul chemin propre est blue/green :

   collection "annonces"       (v1, dim=1536)  ← prod lit ici
   collection "annonces_v2"    (v2, dim=3072)  ← backfill écrit ici

                  ▼  quand v2 == 100% backfillée + vérifiée
   alias "annonces_live" ─────swap atomique────▶ pointe sur annonces_v2

Qdrant supporte les aliases (update_collection_aliases avec DeleteAlias + CreateAlias dans un seul appel atomique). La prod requête toujours annonces_live (l'alias), jamais le nom physique. Le swap est instantané, sans trou de recherche, et rollbackable (re-pointer l'alias sur v1). Ne jamais drop + create sur la collection live : ça crée une fenêtre où la recherche renvoie zéro résultat.

LLM-enrichment avant embedding (optionnel, mais ROI réel)

Sur des annonces sales (titre en CAPS, description vide, fautes), embedder le texte brut donne des vecteurs bruités. Un pattern senior 2026 : un LLM normalise/enrichit le texte avant embedding (résumé canonique, extraction d'attributs structurés, normalisation de la ville). Ici Claude est le bon outil — pas pour l'embedding (Anthropic n'expose pas d'API d'embeddings, on garde OpenAI/Voyage/BGE), mais pour l'étape de préparation.

python
# enrich.py — normalisation LLM avant embedding (batch, async, cache-friendly)
import logging
from enum import Enum
from anthropic import AsyncAnthropic
from anthropic import RateLimitError, APIStatusError, APITimeoutError
from pydantic import BaseModel

log = logging.getLogger("enrich")
anthropic = AsyncAnthropic(max_retries=4, timeout=20.0)  # SDK gère le backoff 429/5xx + timeout/appel


class PropertyType(str, Enum):
    appartement = "appartement"
    maison = "maison"
    terrain = "terrain"
    autre = "autre"


class Enrichment(BaseModel):           # schéma natif → validation côté API + typage Python
    canonical_text: str
    city_normalized: str
    property_type: PropertyType


SYSTEM = (
    "Tu normalises des annonces immobilières françaises. "
    "Produis un canonical_text propre, sans CAPS abusives, prêt pour l'embedding."
)

async def enrich(raw_text: str) -> Enrichment | None:
    # messages.parse() = sortie structurée native, validée et désérialisée vers Enrichment
    resp = await anthropic.messages.parse(
        model="claude-haiku-4-5",          # cheap (1$/5$ par Mtok) : volume + latence
        max_tokens=400,
        system=[{"type": "text", "text": SYSTEM,
                 "cache_control": {"type": "ephemeral"}}],  # prefix stable → cache prompt
        output_config={"format": Enrichment},
        messages=[{"role": "user", "content": raw_text}],
    )
    log.info("enrich usage", extra={"in": resp.usage.input_tokens, "out": resp.usage.output_tokens})
    if resp.stop_reason == "refusal":      # classifieur a décliné → fallback texte brut en amont
        return None
    return resp.parsed_output              # instance Enrichment (jamais .input : ça, c'est un tool_use)

Décisions senior dans ce snippet :

  • claude-haiku-4-5 (1 USD / 5 USD par Mtok) et pas Opus : c'est de la normalisation à fort volume, pas du raisonnement. Réserver claude-opus-4-8 (5/25) à l'enrichissement complexe (déduplication sémantique, détection de fraude dans le cas AML).
  • messages.parse() + schéma Pydantic (output_config={"format": Enrichment}) plutôt que parser du XML/JSON à la main : sortie structurée native, validée côté API et désérialisée vers un type Python. On lit resp.parsed_output, jamais resp.content[0].input (.input = bloc tool_use, pas une sortie structurée).
  • cache_control sur le system prompt : le préfixe stable (system + schéma) est mis en cache → lecture à ~0.1× sur tous les appels suivants. À 50K annonces, le cache divise nettement la facture d'input.
  • AsyncAnthropic + max_retries + timeout : serveur → async obligatoire ; le SDK retry 429/RateLimitError et 5xx (APIStatusError, OverloadedError) avec backoff exponentiel typé, et le timeout par appel borne la latence p99 (sinon un appel pendu bloque un worker). Sur gros volume, préférer streaming + asyncio.gather pour paralléliser les appels d'enrichissement.
  • stop_reason == "refusal" : un classifieur de sécurité peut décliner (HTTP 200, content vide) ; on teste stop_reason avant de lire la sortie, et on retombe sur le texte brut (enriched=false) plutôt que de planter le pipeline.
  • Pièges : cette étape ajoute de la latence et un point de panne dans le chemin de freshness. Si l'enrichissement échoue après 4 retries → DLQ, ne bloque jamais l'indexation (fallback : embedder le texte brut, marquer enriched=false, ré-enrichir en async). Et versionne le prompt d'enrichissement (enrich_prompt_version dans le payload) au même titre que le modèle d'embedding : changer le prompt = changer la qualité = potentiel backfill.

Suppression RGPD-grade

python
# api/rgpd.py
"""Quand un utilisateur demande la suppression de SES annonces."""
async def rgpd_delete_user(user_id: str):
    # 1. Mark for deletion in Postgres
    await pg.execute("UPDATE annonces SET deleted_at = now() WHERE owner_id = $1", user_id)
    # 2. La CDC va propager l'op `u` avec deleted_at != NULL
    # 3. Le worker doit interpréter ça comme un delete logique :
    #    soit hard delete dans Qdrant, soit ajouter payload `is_deleted=true` + filtre search
    # 4. Hard delete dans Postgres après 30j (rétention légale) → cascade vers Qdrant via CDC `d`

Métriques observabilité

python
# observability/metrics.py
from prometheus_client import Histogram, Counter, Gauge

EMBED_LATENCY = Histogram("embed_latency_seconds", "embedding API latency")
UPSERT_LATENCY = Histogram("qdrant_upsert_latency_seconds", "qdrant upsert latency")
BATCH_SIZE = Histogram("batch_size", "messages per batch")
PROCESSED = Counter("messages_processed_total", "messages processed", ["op"])
KAFKA_LAG = Gauge("kafka_consumer_lag", "lag par partition", ["partition"])
FRESHNESS = Histogram("freshness_seconds", "delta pg.commit → qdrant.upsert")

Numbers réels (100K annonces, ~2K updates/h, 6 workers, OpenAI small) :

  • Freshness p50 : 8s, p95 : 22s, p99 : 41s
  • Coût embeddings : ~85€/mois
  • Coût Kafka Confluent basic : ~80€/mois
  • Workers Scaleway Serverless Containers : ~40€/mois
  • Total pipeline : ~205€/mois

🎯 Patterns courants

  1. group.id versionné (embed-worker-annonces-v1) → quand tu changes le model, nouveau consumer group, replay du log Kafka pour réindexer.
  2. Batching adaptatif (batch size OR timeout) → équilibre coût/latence. 50-100 docs par batch OpenAI = -80% coût vs 1 par 1.
  3. Dedup par primary key dans le batch (last-write-wins) → si un user édite 5 fois en 2s, tu n'embeddes qu'une fois.
  4. ID Qdrant déterministe (hash(doc_id)) → upsert idempotent, replay du log sans drift.
  5. tombstones.on.delete=true côté Debezium → events de suppression propres à consommer.
  6. Versioning d'embeddings dans le payload (embedding_model, embedding_version) → permet de filtrer "n'utiliser que la nouvelle version" pendant un backfill.
  7. Reindex via collection séparée puis swap atomique de l'alias — pas de "drop + create" qui crée un trou.
  8. DLQ (Dead Letter Queue) sur erreurs persistantes après 5 retries → topic Kafka séparé documents.dlq avec alerting.
  9. Backpressure : si Qdrant ralentit, consumer.pause() automatique → Kafka stocke le retard, pas de OOM.
  10. Mesurer la freshness depuis source.ts_ms (Debezium) jusqu'à qdrant.upsert.timestamp → c'est la métrique métier.

🔄 Versions & écosystème 2026

ComposantVersion 2026Notes
Debezium2.7+Connector Postgres mature, support PG 17
Kafka / Confluent Cloud3.7+ / dedicated EU clustersbasic tier suffit pour < 5K msg/s
RedPanda24.xAlternative Kafka, plus simple à opérer
Postgres logical replicationPG 17pgoutput plugin, slots stables
Qdrant async client1.13+AsyncQdrantClient, batching natif
Embeddings APIOpenAI embeddings-v3, Voyage 3, BGE-M3 localHosted ou self-hosted
Scaleway Serverless ContainersGAHosting workers Python idéal
Kafka Connect on Strimzisur KubernetesPour les setups self-hosted

⚠️ Pitfalls

  1. Pas de WAL slot monitoring → un Debezium qui plante laisse un slot ouvert, le WAL Postgres grossit jusqu'à saturer le disque. Toujours alerter sur pg_replication_slots.confirmed_flush_lsn.
  2. group.id partagé entre dev/staging/prod → consommation croisée, désastre. Group ID par env.
  3. Pas de dédoublonnage dans le batch → si user édite 10x, tu fais 10 embeddings au lieu de 1. Cher.
  4. Suppression molle (deleted_at) sans filter search → docs supprimés continuent à apparaître dans Qdrant. Soit hard delete dans Qdrant, soit is_deleted=true + must_not dans toute query.
  5. Re-embed sur la même collection sans collection v2 → tu écris pendant que tu lis, scores incohérents. Toujours blue/green via alias.
  6. Pas de DLQ → un seul message poison fait crasher le worker en boucle. Toujours DLQ + alert.
  7. auto.offset.reset=earliest après replay non-prévu → un consumer group qui se réinitialise repart de l'origine, dépile 10M messages. Surveille les offsets.
  8. Embedding model gratuit-mais-instable sans pin de version → un changement silencieux fait drifter la qualité. Pin model name + version.
  9. Suppression RGPD via "supprimer dans Postgres seulement" → vector toujours dans Qdrant, infraction. Toujours vérifier la chaîne complète.
  10. Pas de test de bout-en-bout → tu ne sais pas si tout le pipeline marche. Test "j'insère un doc → je le trouve dans Qdrant en < 60s" en CI sur env staging.

💰 Pricing / ROI client

Infra (ordres de grandeur 2026)

ComposantSetup PME (100K docs)Setup ETI (10M docs)
Debezium (sur K8s)inclus dans K8sdédié 2 noeuds
Kafka Confluent Basic EU~80€/mois~600€/mois (Standard)
Workers (Scaleway Serverless Containers)~40€/mois~200€/mois (3-5 instances)
Qdrant Cloud EU~290€/mois~2 100€/mois
Embeddings (OpenAI small)~85€/mois~800€/mois
Observabilité (Grafana/Prometheus)inclus~150€/mois
Total~500€/mois~3 850€/mois

Pricing freelance type

MissionJoursTJMPrix HT
Audit pipeline ingestion existant31300€3 900€
Setup Debezium + Kafka + 1 worker prod71400€9 800€
Pipeline complet end-to-end + monitoring + DLQ121500€18 000€
Migration model embedding (v1 → v2) sans downtime61500€9 000€
Mise en conformité RGPD du pipeline51500€7 500€

ROI client marketplace : SLA freshness 60s vs "à la prochaine batch nuit" = adoption agent immobilier +40%, churn -15%.

🧪 Testing / Eval

python
# tests/test_freshness_e2e.py
"""Test end-to-end : insère dans Postgres, mesure le temps avant qu'il apparaisse dans Qdrant."""
import asyncio, time, uuid, psycopg
from qdrant_client import AsyncQdrantClient

@pytest.mark.asyncio
async def test_doc_searchable_within_60s():
    doc_id = int(time.time())
    title = f"Test annonce {uuid.uuid4()}"
    async with await psycopg.AsyncConnection.connect(DSN) as conn:
        await conn.execute(
            "INSERT INTO annonces (id, title, description, city, price_eur) "
            "VALUES ($1,$2,$3,$4,$5)",
            (doc_id, title, "test", "Paris", 500000),
        )
        await conn.commit()

    qd = AsyncQdrantClient(url=..., api_key=...)
    deadline = time.time() + 60
    while time.time() < deadline:
        hits = (await qd.query_points(
            "annonces", query=await embed_query(title), limit=5
        )).points
        if any(h.payload["doc_id"] == doc_id for h in hits):
            return
        await asyncio.sleep(1)
    pytest.fail("doc pas trouvable après 60s — freshness SLA cassé")


def test_delete_propagation():
    """Suppression RGPD : doit disparaître de Qdrant en < 60s."""
    ...


def test_replay_idempotence():
    """Rejouer 1000x le même event ne crée pas de doublon."""
    ...

À monitorer en continu :

  • Freshness p50/p95/p99 (histogramme Prometheus)
  • Kafka consumer lag par partition
  • Erreurs embedding API (rate limit, 5xx)
  • Erreurs Qdrant upsert
  • Taille de la DLQ
  • Coût $/jour embeddings + Kafka + DB
  • WAL slot lag Postgres

🏋️ Exercices

Progressifs, du « fais marcher » au « casse-le puis défends ton chiffre ». Monte un Postgres + Redpanda + Qdrant en docker-compose local pour tout faire tourner.

Exercice 1 — Pipeline de bout en bout (le socle)

Objectif : INSERT dans Postgres → vecteur searchable dans Qdrant en < 60s, avec suppression propagée. Indice/Solution : Debezium (pgoutput, tombstones.on.delete=true) → 1 topic → 1 worker confluent-kafka qui parse l'enveloppe Debezium (op ∈ c/u/r/d), embed, upsert avec id=hash(doc_id). Test d'acceptation : le test_doc_searchable_within_60s du fichier doit passer. Valide aussi la suppression : DELETE → tombstone → qd.delete.

Exercice 2 — Prouver l'idempotence sous rejeu

Objectif : rejouer 10 000× le même event Kafka ne doit créer aucun doublon ni faire dériver le score. Indice/Solution : reset le consumer group à earliest, redépile tout le log. Assert : qd.count(collection) identique avant/après, et query_points renvoie un seul point par doc_id. Le levier est l'ID déterministe — supprime le hash(doc_id) et remplace par un UUID aléatoire pour voir les doublons apparaître, puis remets le hash. Tu dois pouvoir expliquer pourquoi at-least-once + idempotence = effectively-once.

Exercice 3 — Batching adaptatif et défense du coût

Objectif : absorber un burst de 5 000 updates/2 min (cas e-commerce flash) sans backlog, et diviser la facture d'embedding par 4. Indice/Solution : implémente le worker_batched.py (batch size OR timeout) avec dédup last-write-wins par doc_id dans le buffer. Mesure : nombre d'appels API embedding avant/après, et le coût $ correspondant. Défends le chiffre — pourquoi 96 et pas 512 ? (latence p95 de freshness vs coût ; un batch trop gros gonfle la freshness, trop petit gonfle le coût). Trace une courbe coût/latence en faisant varier BATCH_SIZE.

Exercice 4 — Casse-le : le slot WAL qui sature le disque

Objectif : reproduire la panne #1 des pitfalls (Debezium down → slot ouvert → WAL grossit → disque plein), puis instrumenter l'alerte qui l'aurait prévenue. Indice/Solution : tue le connecteur Debezium, génère du trafic d'écriture, observe pg_replication_slots.confirmed_flush_lsn qui stagne pendant que pg_wal grossit (SELECT pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn))). Ajoute une alerte Prometheus sur ce lag. Bonus : configure max_slot_wal_keep_size pour que Postgres protège son disque (au prix de tuer le slot — discute le tradeoff disponibilité vs intégrité).

Exercice 5 — Migration model sans downtime (blue/green + dimension)

Objectif : migrer de text-embedding-3-small (1536) à text-embedding-3-large (3072) sans aucune fenêtre où la recherche est vide, et rollbackable. Indice/Solution : crée annonces_v2 (dim=3072), backfill via le job reembed_v2 en lisant embedding_version=1, garde le worker live qui écrit dans les deux collections pendant la transition (dual-write contrôlé), puis update_collection_aliases (DeleteAlias+CreateAlias atomique) pour basculer annonces_live sur v2. Test de rollback : re-pointe l'alias sur v1, la recherche doit rester cohérente. Piège à éviter : ne JAMAIS re-embed in-place — la collection 1536 refuse les vecteurs 3072.

Exercice 6 — RGPD adversarial : prouve la suppression sur toute la chaîne

Objectif : un utilisateur exerce son droit à l'oubli ; prouve qu'aucune trace de ses vecteurs ne subsiste — ni dans Qdrant, ni dans le snapshot, ni dans le log Kafka au-delà de la rétention. Indice/Solution : UPDATE deleted_at=now() → CDC u interprété comme delete logique → is_deleted=true + must_not dans toute query, PUIS hard delete après rétention légale → CDC dqd.delete. Vérifie : (1) query_points ne renvoie plus le point, (2) qd.retrieve(point_id) renvoie vide, (3) la rétention Kafka (retention.ms) purge l'event en clair, (4) si tu fais des snapshots Qdrant, le snapshot post-suppression ne contient plus le vecteur. Écris le test test_delete_propagation qui mesure le délai de purge complète. C'est exactement ce qu'un DPO/auditeur CNIL te demandera de démontrer.

🎤 En entretien

Q : « Comment garantis-tu l'exactly-once entre Postgres et ta vector DB ? » R : Je ne le garantis pas au niveau transport — Kafka c'est at-least-once. J'obtiens un effectively-once via idempotence : ID Qdrant déterministe (hash(doc_id)) + commit d'offset après l'upsert. Un rejeu produit le même état, jamais un doublon.

Q : « Ta table de transactions est trop chaude pour brancher du CDC dessus. Que fais-tu ? » R : J'évite le CDC direct (latence WAL critique) et je passe par le pattern outbox — INSERT outbox dans la même transaction que l'écriture métier, Debezium relaie la table outbox. Atomique, sans dual-write. Si même ça coûte trop, double-write applicatif assumé avec réconciliation périodique.

Q : « Tu changes de modèle d'embedding en prod. Comment, sans downtime ? » R : Blue/green via alias Qdrant. La dimension change (1536→3072) donc re-embed in-place est impossible — nouvelle collection, backfill en lisant embedding_version, dual-write pendant la transition, puis swap atomique de l'alias. Rollback = re-pointer l'alias.

Q : « Comment tu mesures la fraîcheur, et quel SLA tu défends ? » R : Je mesure de source.ts_ms (timestamp du commit Postgres, fourni par Debezium) jusqu'au timestamp d'upsert Qdrant — c'est la freshness métier, pas juste la latence worker. SLA typique p95 < 60s, p99 < 90s, exposé en histogramme Prometheus. Si le p99 dérape, je regarde le consumer lag par partition d'abord.

Q : « Un message poison fait crasher ton worker en boucle. » R : DLQ après N retries (topic documents.dlq) + alerting, et le worker continue de dépiler le reste. Sans DLQ, un seul event malformé bloque toute la partition — c'est le pitfall classique. Je ne commit jamais un offset sans avoir traité ou redirigé le message.

🔁 Quand utiliser / éviter

Mettre un vrai pipeline real-time quand :

  • SLA freshness < 5 min requis (marketplace, ecom flash, AML)
  • Volume > 1K updates/jour
  • Multiples consumers en aval (vector DB + cache + analytics)
  • Souveraineté + observabilité prouvable

Rester sur batch nocturne quand :

  • Catalogue quasi-statique (référentiel produit qui change peu)
  • < 100 updates/jour
  • POC ou démarrage projet
  • Coût Kafka/Debezium > valeur métier de la freshness

Approche hybride : CDC pour les tables chaudes (annonces, produits), batch pour les tables froides (corpus juridique).

🔗 Liens

→ Voir aussi : 02-qdrant.md (cible vector DB), 01-pgvector.md (alternative simple), 04-pinecone-self-hosted.md, 05-hybrid-search.md.

Bibliothèque tech perso — Achref