Transactions
TL;DR — Une transaction est un contrat ACID local au store. En Nest, le défi est la propagation : comment passer "le client transactionnel courant" à des services qui appellent d'autres services, sans tout polluer ? Trois patterns : (1) callback explicite (
prisma.$transaction(tx => svc.foo(tx))), (2) interceptor / décorateur custom qui ouvre la transaction et la binde enreq, (3)AsyncLocalStoragequi implicite la propagation. Pièges senior : DI request-scoped + transactions, sagas vs 2PC pour cross-store, isolation par défaut (souvent READ COMMITTED, parfois SNAPSHOT/Serializable).
🧠 Mental model
Controller ─► Service A ─► Service B ─► Repository ──► DB
│ │ │
└── tx ────────┴────────────┘
propagation : explicit param / interceptor + req / ALS
cross-store (Postgres + Mongo + S3) :
pas de transaction native ⇒ Saga (compensations) ou Outbox patternAnalogie — Une transaction c'est un rideau de scène : ouvre, joue ta pièce (writes), si quelqu'un crie "feu" tu fermes le rideau (rollback) et personne n'a rien vu. Mais ce rideau ne couvre qu'une scène (un store). Si tu joues sur deux scènes (Postgres + Mongo), pas de rideau unique — il te faut un chef d'orchestre (saga) qui sait défaire chaque scène individuellement.
Comment un staff raisonne sur une transaction
Avant d'écrire $transaction, un senior pose quatre questions, dans cet ordre :
- Quels sont les invariants ? ("le solde ne peut pas devenir négatif", "le stock réservé ≤ stock physique"). L'invariant détermine l'isolation, pas l'inverse. Si l'invariant est "deux lignes doivent exister ensemble ou pas du tout",
READ COMMITTEDsuffit. Si l'invariant porte sur une somme/agrégat lue puis vérifiée (write skew), il fautSERIALIZABLEou un verrou explicite. - Combien de stores ? 1 store → transaction native. 2+ stores → saga/outbox, jamais de 2PC en Node.
- Quel est le chemin chaud ? Une transaction tient une connexion du pool. À 5 000 req/s avec un pool de 20, une txn de 100 ms plafonne à 200 req/s. La durée de la txn × le débit = connexions nécessaires (loi de Little). Toute I/O non-DB (HTTP, S3, LLM) sort de la transaction.
- Que se passe-t-il en cas de retry ? Le client va re-tenter sur timeout réseau. Sans clé d'idempotence, le retry double-écrit. L'idempotence n'est pas une option en prod.
Isolation levels — le tableau que tout senior doit avoir en tête
Une transaction protège l'écriture ; le niveau d'isolation définit ce qu'elle voit des transactions concurrentes. Les ORMs n'ont pas de défaut neutre : ils héritent du défaut du moteur.
| Niveau | Dirty read | Non-repeatable read | Phantom read | Write skew | Coût |
|---|---|---|---|---|---|
READ UNCOMMITTED | possible* | possible | possible | possible | nul |
READ COMMITTED (défaut Postgres) | non | possible | possible | possible | faible |
REPEATABLE READ (défaut MySQL/InnoDB) | non | non | non (PG/MySQL via MVCC) | possible | moyen |
SNAPSHOT (SQL Server) | non | non | non | possible | moyen |
SERIALIZABLE | non | non | non | non | élevé (peut abort 40001) |
*Postgres traite READ UNCOMMITTED comme READ COMMITTED — il n'autorise jamais les dirty reads.
Les anomalies en une phrase :
- Non-repeatable read — tu lis une ligne, une autre txn la met à jour et commit, tu relis : valeur différente.
- Phantom read — tu fais
COUNT(*) WHERE status='pending', une autre txn insère une lignepending, tu recomptes : nombre différent. - Write skew — le piège senior. Deux txn lisent le même état (ex. "2 médecins de garde"), chacune décide légitimement de se retirer, chacune écrit, les deux commit → 0 médecin de garde. Aucune des deux n'a écrit la ligne que l'autre a lue, donc seul
SERIALIZABLE(ou un verrou explicite sur l'invariant) l'empêche.
Décision pratique :
- Invariant local à des lignes que tu écris →
READ COMMITTED+ verrou pessimiste (SELECT … FOR UPDATE) sur ces lignes suffit. - Invariant sur un agrégat lu (somme, comptage, "au moins un") →
SERIALIZABLE+ retry, OU matérialiser l'invariant en contrainte DB (CHECK, exclusion constraint,UNIQUE) qui le rend impossible structurellement. La meilleure transaction est celle qu'une contrainte rend inutile.
// Postgres SSI : SERIALIZABLE est optimiste, il abort au COMMIT, pas au write.
// => TOUT bloc SERIALIZABLE doit être idempotent et wrappé dans un retry (voir withRetry).
await prisma.$transaction(fn, { isolationLevel: Prisma.TransactionIsolationLevel.Serializable });🛠️ Code minimal
Pattern 1 — Callback explicite
// Prisma — le plus simple
async createUserWithProfile(input) {
return this.prisma.$transaction(async (tx) => {
const user = await tx.user.create({ data: input.user });
const profile = await tx.profile.create({ data: { ...input.profile, userId: user.id } });
return { user, profile };
});
}
// TypeORM 0.3
async createUserWithProfile(input) {
return this.dataSource.transaction(async (em) => {
const user = await em.getRepository(User).save(input.user);
const profile = await em.getRepository(Profile).save({ ...input.profile, userId: user.id });
return { user, profile };
});
}Pattern 2 — Decorator + Interceptor
// Decorator marker — la clé DOIT être `isolationLevel` (clé attendue par Prisma $transaction)
type TxOpts = { isolationLevel?: Prisma.TransactionIsolationLevel; timeout?: number; maxWait?: number };
export const TRANSACTIONAL = 'transactional';
export const Transactional = (opts: TxOpts = {}) => SetMetadata(TRANSACTIONAL, opts);
@Injectable()
export class TransactionInterceptor implements NestInterceptor {
constructor(private readonly reflector: Reflector, private readonly prisma: PrismaService) {}
intercept(ctx: ExecutionContext, next: CallHandler): Observable<unknown> {
const opts = this.reflector.get<TxOpts>(TRANSACTIONAL, ctx.getHandler());
if (!opts) return next.handle();
const req = ctx.switchToHttp().getRequest();
// defer => la txn ne s'ouvre QUE quand quelqu'un s'abonne (pas à la construction)
return defer(() =>
this.prisma.$transaction(async (tx) => {
req.tx = tx;
return lastValueFrom(next.handle()); // le handler s'exécute DANS le callback tx
}, opts),
);
}
}
// Usage
@Post()
@Transactional({ isolationLevel: 'Serializable', timeout: 5_000 })
create(@Body() dto: CreateUserDto, @Req() req: { tx: Prisma.TransactionClient }) {
return this.users.create(dto, req.tx); // service reçoit le tx
}Limite de Pattern 2 — on binde
txsurreq, donc le service doit encore recevoirreq.txexplicitement et le repasser à ses sous-appels. Ça fuit le détail transactionnel dans toutes les signatures (le piège "tx-passing"). Pattern 3 (ALS) supprime ce paramètre. Autre piège : si une seconde requête sur le même contexte (BullMQ, gRPC) ne passe pas par cet interceptor HTTP,req.txest absent — l'ALS, lui, est agnostique du transport.
Pattern 3 — AsyncLocalStorage (le plus propre)
import { AsyncLocalStorage } from 'node:async_hooks';
@Injectable()
export class TxContext {
private readonly als = new AsyncLocalStorage<PrismaTx>();
run<T>(tx: PrismaTx, fn: () => Promise<T>) { return this.als.run(tx, fn); }
get(): PrismaTx | undefined { return this.als.getStore(); }
}
@Injectable()
export class PrismaService extends PrismaClient {
constructor(private readonly txCtx: TxContext) { super(); }
// Proxy : utilise tx si défini, sinon prisma global
get client(): Prisma.TransactionClient | PrismaClient {
return this.txCtx.get() ?? this;
}
}
@Injectable()
export class TransactionInterceptor implements NestInterceptor {
constructor(private readonly prisma: PrismaClient, private readonly txCtx: TxContext, private readonly reflector: Reflector) {}
intercept(ctx: ExecutionContext, next: CallHandler) {
if (!this.reflector.get(TRANSACTIONAL, ctx.getHandler())) return next.handle();
return from(this.prisma.$transaction((tx) => this.txCtx.run(tx, () => lastValueFrom(next.handle()))));
}
}
// Service — JAMAIS de tx en paramètre
@Injectable()
export class UsersService {
constructor(private readonly db: PrismaService) {}
async create(dto) { return this.db.client.user.create({ data: dto }); } // auto-tx
}🎯 Patterns courants
- Callback explicite — le moins magique, le plus lisible. À privilégier pour les transactions ponctuelles avec 2-3 ops.
- ALS-based propagation — quand on a beaucoup d'opérations imbriquées sur plusieurs services. Léger en perf (Node 16+). Permet les services "transparents" (ne savent pas qu'ils sont dans une txn).
- Outbox pattern — pour la cohérence cross-store : écrire dans la même transaction DB une "table d'événements", un worker poll cette table et publie au broker. Garantit at-least-once delivery.
- Saga / compensation — orchestrateur explicit (
createOrder → reserveStock → chargePayment → if fail: cancelOrder, releaseStock). Frameworks : MassTransit-like avec NestJS, ou simplement une state machine custom (XState). - Retry sur conflit —
SERIALIZABLEisolation peut throw sur conflit (P2034en Prisma,40001en SQL). Wrapper avec retry (backoff exponentiel, max 3). - Read-only transactions —
BEGIN READ ONLYpour les requêtes longues / reports (libère verrous, autorise les replicas). - Savepoints (Postgres) — pour des sous-transactions nestées au sein d'une txn parente.
BEGIN; SAVEPOINT sp1; ... ROLLBACK TO sp1;. Prisma ne supporte pas, TypeORM partiellement. - Optimistic locking — colonne
versionincrémentée à chaque update,WHERE id = ? AND version = ?. Plus scalable que le verrou pessimiste pour les conflits rares. - Pessimistic locking —
SELECT ... FOR UPDATEcôté SQL.setLock('pessimistic_write')en TypeORM. Pour les compteurs critiques (stock). - Idempotency token — au début de la txn, INSERT dans une table
idempotency_keys (key UNIQUE, response JSON). Si conflit, retourner la réponse cachée. Garantit qu'un retry ne double-écrit pas.
🔄 Versions — Nest 7 / 8 / 9 / 10 / 11
| Version | Notes |
|---|---|
| Nest 7 | Pas de AsyncLocalStorage stable dans Node < 16 — ALS pattern fragile. |
| Nest 8 | Node 16 default. ALS utilisable. |
| Nest 9 | Pas de support natif transactions Nest, communauté pousse @nestjs-cls. |
| Nest 10 | @nestjs-cls v3+ + @nestjs-cls/transactional mature, gère Prisma/TypeORM/Knex via adapters. |
| Nest 11 | Node 20+. ALS performant. Pattern ALS quasi standard pour propagation. |
@nestjs-cls — package communautaire qui couvre ALS + transaction plugins. Très utile, mais ajouter une dépendance. À comparer au pattern manuel selon la taille du projet.
⚠️ Pitfalls
- Repo injecté ≠ repo transactionnel — c'est LE piège n°1.
this.userRepoinjecté ne participe pas à undataSource.transaction(em => ...)à moins d'utiliserem.getRepository(User)dans le callback. Avec Prisma :this.prisma.userau lieu detx.user. Scope.REQUEST+ transaction — un provider request-scoped est instancié à chaque requête. Mais sa "vue" du repo est figée. Si tu ouvres une txn en cours de requête, le repo capturé à l'instanciation n'est pas dans la txn. Préférer transient + factory ou ALS.- Promise non await — un
prisma.$transaction(async tx => { fireAndForget(tx); ... })qui neawaitpas la sous-promise ⇒ commit avant que l'op finisse, rollback impossible si elle plante. - Hooks externes (audit log, events) dans la transaction — si le hook publie sur un broker (Kafka), il publie même si la txn rollback ⇒ inconsistance. Soit Outbox, soit déclencher après commit (
tap(() => publish())hors$transaction). - Niveaux d'isolation par défaut surprises — Postgres default =
READ COMMITTED(non-repeatable read possible). Pour des invariants forts (stock négatif impossible) ⇒SERIALIZABLE+ retry. SinonSELECT ... FOR UPDATE. - Long-running transactions — verrous tenus longtemps ⇒ blocage pool, vacuum bloqué. Toujours timeout (
prisma.$transaction(..., { timeout: 5000 })). - Cross-store — pas de 2PC pratique en JS moderne (XA Java only ou presque). Saga + compensations + idempotence. Ne jamais prétendre faire de la txn distribuée à la légère.
- Nested transactions — Postgres supporte les SAVEPOINTs, mais l'API des ORMs varie. Prisma ne supporte pas (en v5/v6) les transactions imbriquées correctement — détecter et réutiliser la transaction parente via ALS.
- Side-effects non transactionnels — envoyer un email, appeler une API tierce ⇒ ne peut pas être rollback. Toujours après commit, ou via Outbox + worker idempotent.
- Erreur silencieusement avalée — un
try/catchà l'intérieur du callback de txn qui ne re-throw pas ⇒ commit même si quelque chose s'est mal passé. Soit re-throw, soittx.rollback()explicite (pas dispo en Prisma callback). - Connection pool starvation — chaque txn occupe une connexion. Si tu ouvres une txn et fais des appels HTTP synchrones dedans, tu monopolises la connexion pendant que tu attends le réseau. Garder les transactions purement DB.
- Conflit
SERIALIZABLEnon géré — la majorité du code suppose qu'une txn réussit. EnSERIALIZABLE, ~5% peuvent rejeter (40001/P2034). Sans retry, c'est une erreur 500 visible. Toujours wrapper.
📊 Production — observabilité & capacity
Une transaction qui marche en dev peut écrouler un cluster en prod. Ce qu'un senior instrumente :
Métriques à exporter (Prometheus) :
db_tx_duration_seconds(histogram) — p99 doit rester sub-100ms. Une dérive du p99 = verrous ou I/O dans la txn.db_tx_retries_total{code}— un pic de40001/P2034signale une contentionSERIALIZABLE(hotspot à dé-corréler, ex. sharding du compteur).db_pool_waiting/db_pool_borrowed— siwaiting > 0régulièrement, le pool est sous-dimensionné OU une txn fuit (ne libère pas).db_deadlocks_total(Postgres :pg_stat_database.deadlocks).
Capacity (loi de Little) — connexions nécessaires ≈ débit (req/s) × durée txn (s). Pour 1 000 commits/s à 20 ms : 20 connexions actives en pointe. Postgres plafonne en pratique vers 100-300 connexions (au-delà → PgBouncer en mode transaction). Attention : PgBouncer transaction mode casse les transactions interactives multi-statements qui dépendent d'une session collante — vérifier le pooling avant d'activer.
Deadlocks — Postgres les détecte (≈ 1 s) et abort une victime (40P01). Préventif : toujours verrouiller dans un ordre déterministe (tri par id/iban, comme dans l'exemple banque). Curatif : retry idempotent.
Slow-tx kill — SET LOCAL statement_timeout = '5s' ou idle_in_transaction_session_timeout côté Postgres pour qu'une txn oubliée ne tienne pas un verrou indéfiniment (et ne bloque pas VACUUM/autovacuum).
// Tracer chaque transaction (OpenTelemetry) — la durée et l'issue (commit/rollback)
async traced<T>(name: string, fn: (em: EntityManager) => Promise<T>): Promise<T> {
return tracer.startActiveSpan(`tx.${name}`, async (span) => {
const start = performance.now();
try {
const res = await this.ds.transaction('SERIALIZABLE', fn);
span.setAttribute('tx.outcome', 'commit');
return res;
} catch (e) {
span.setAttribute('tx.outcome', 'rollback');
span.recordException(e as Error);
throw e;
} finally {
txDuration.observe(performance.now() - start);
span.end();
}
});
}🤖 Transactions & jobs IA (BullMQ + Outbox)
Servir un agent LLM depuis NestJS croise directement les transactions : une génération coûte de l'argent, peut être longue (streaming), et ne doit jamais être rejouée par accident. Les patterns de cette page (idempotence, outbox, "I/O hors txn") sont exactement la bonne boîte à outils.
Règle d'or : l'appel au LLM (Anthropic) est une I/O réseau coûteuse — il ne vit jamais dans une transaction DB. On encadre par deux courtes transactions : une qui réserve la génération (idempotence + quota), une qui persiste le résultat.
// 1) Edge transaction : claim idempotent + cost-guard, AVANT tout appel LLM
async claimGeneration(cmd: { generationId: string; userId: string; estTokens: number }) {
return this.prisma.$transaction(async (tx) => {
// idempotence : l'index unique sur generationId fait le travail
const existing = await tx.generation.findUnique({ where: { id: cmd.generationId } });
if (existing) return existing; // retry réseau -> même résultat
// cost-guard : on débite le quota DANS la même txn (pas de survente de budget)
const quota = await tx.quota.update({
where: { userId: cmd.userId, remainingTokens: { gte: cmd.estTokens } },
data: { remainingTokens: { decrement: cmd.estTokens } },
}).catch(() => null);
if (!quota) throw new ForbiddenException('QUOTA_EXCEEDED');
return tx.generation.create({
// on persiste l'estimation débitée pour pouvoir réconcilier au commit final
data: { id: cmd.generationId, userId: cmd.userId, status: 'queued', estTokens: cmd.estTokens },
});
}, { isolationLevel: 'Serializable', timeout: 3_000 });
}// 2) Le worker BullMQ : l'appel LLM est HORS transaction (streaming, retries SDK)
@Processor('llm')
export class LlmProcessor extends WorkerHost {
constructor(
@Inject(ANTHROPIC) private readonly anthropic: Anthropic, // DI via forRootAsync, jamais `new Anthropic()`
private readonly prisma: PrismaService,
) { super(); }
async process(job: Job<{ generationId: string; prompt: string }>) {
// Idempotence du worker : si déjà terminé, on ne rappelle PAS le LLM (coût !)
const gen = await this.prisma.generation.findUnique({ where: { id: job.data.generationId } });
if (gen?.status === 'done') return gen;
const ac = new AbortController();
// si le client se déconnecte, on annule l'appel LLM ET le job (cost-aware)
job.token && this.onCancel(job, () => ac.abort());
let text = '';
const stream = await this.anthropic.messages.stream(
{ model: 'claude-sonnet-4-6', max_tokens: 1024, messages: [{ role: 'user', content: job.data.prompt }] },
{ signal: ac.signal },
);
for await (const ev of stream) {
if (ev.type === 'content_block_delta' && ev.delta.type === 'text_delta') text += ev.delta.text;
}
const usage = (await stream.finalMessage()).usage;
const realTokens = usage.input_tokens + usage.output_tokens;
// 3) Persistance + outbox dans UNE courte transaction (l'event SSE/Kafka est transactionnel)
return this.prisma.$transaction(async (tx) => {
const done = await tx.generation.update({
where: { id: job.data.generationId },
data: { status: 'done', output: text, inputTokens: usage.input_tokens, outputTokens: usage.output_tokens },
});
// Réconcilier le quota : on avait débité `estTokens` au claim ; on rembourse
// (ou re-débite) la différence avec le coût réel. delta > 0 => on avait sur-estimé.
const delta = done.estTokens - realTokens; // estTokens persisté au claim
await tx.quota.update({
where: { userId: done.userId },
data: { remainingTokens: { increment: BigInt(delta) } },
});
await tx.outbox.create({ data: { topic: 'generation.done', payload: { generationId: done.id } } });
return done;
});
}
}Pourquoi ça tient : (1) l'index unique sur generationId rend le claim idempotent — un retry BullMQ ou un double-clic UI ne lance qu'une génération ; (2) l'appel Anthropic est hors txn donc ne tient aucune connexion DB pendant les secondes de streaming ; (3) le cost-guard débite le quota atomiquement (pas de write skew sur le budget) ; (4) l'outbox garantit que l'event "génération terminée" (qui pousse le token final en SSE/WebSocket) n'existe que si la persistance a commit. Retry cost-aware : on relit status === 'done' avant de rappeler le LLM, sinon un retry de job = une facture en double. Utiliser les retries du SDK Anthropic pour les 429/5xx, et le retry BullMQ uniquement pour les échecs après persistance.
🧪 Testing
// Unitaire : tester que le service appelle bien le tx mock
const txMock = { user: { create: jest.fn() }, profile: { create: jest.fn() } };
const prismaMock = { $transaction: jest.fn().mockImplementation((cb) => cb(txMock)) };
const svc = new UsersService(prismaMock as any);
await svc.createUserWithProfile({ user: {...}, profile: {...} });
expect(txMock.user.create).toHaveBeenCalled();
expect(txMock.profile.create).toHaveBeenCalled();// Intégration — rollback réel
beforeEach(async () => {
await prisma.$executeRaw`TRUNCATE TABLE users, profiles RESTART IDENTITY CASCADE`;
});
it('rollback si profile fail', async () => {
prisma.profile.create = jest.fn().mockRejectedValue(new Error('boom'));
await expect(svc.createUserWithProfile(...)).rejects.toThrow();
const users = await prisma.user.findMany();
expect(users).toHaveLength(0); // rollback OK
});// Saga / compensation
it('cancel order if payment fails', async () => {
paymentMock.charge.mockRejectedValue(new Error('declined'));
await expect(orderSaga.run(input)).rejects.toThrow();
expect(stockMock.release).toHaveBeenCalled(); // compensation OK
expect(orderMock.markCancelled).toHaveBeenCalled();
});🎬 Cas d'usage concrets
Banque — Virement SEPA atomique
Qui — Établissement de crédit français traitant 2 M virements internes par jour. Problème — Un virement débite le compte payeur, crédite le bénéficiaire, écrit deux lignes de ledger et un événement de notification. Toute incohérence = audit ACPR + provision pour risque opérationnel. Comment — Transaction SERIALIZABLE avec SELECT FOR UPDATE sur les deux comptes triés par ID pour éviter les deadlocks, et publication d'événement post-commit.
async transfer(fromId: string, toId: string, amount: bigint) {
const [a, b] = [fromId, toId].sort(); // deterministic lock order
return this.ds.transaction('SERIALIZABLE', async (em) => {
const accounts = await em.createQueryBuilder(Account, 'a')
.setLock('pessimistic_write')
.where('a.id IN (:...ids)', { ids: [a, b] })
.getMany();
const from = accounts.find((x) => x.id === fromId)!;
const to = accounts.find((x) => x.id === toId)!;
if (from.balance < amount) throw new InsufficientFundsError();
from.balance -= amount; to.balance += amount;
await em.save([from, to]);
await em.insert(Ledger, [
{ accountId: fromId, delta: -amount, ref: 'transfer' },
{ accountId: toId, delta: amount, ref: 'transfer' },
]);
});
}Gains — 0 incohérence sur 18 mois, deadlocks supprimés par l'ordre lexical, retry géré par advice spring-like en cas de serialization_failure.
E-commerce — Checkout panier + stock + paiement
Qui — Pure player français mode (1 M commandes/mois). Problème — Décrémenter le stock, créer la commande, capturer le paiement Stripe : si le paiement échoue après décrémentation, le stock est perdu. Si on commit avant paiement, on survend en cas de timeout. Comment — Pattern "réservation 2 phases" : transaction DB qui décrémente avec statut reserved, appel Stripe hors transaction, transaction de finalisation qui passe à confirmed ou rollback applicatif.
async checkout(cartId: string, paymentMethod: string) {
const order = await this.ds.transaction(async (em) => {
const cart = await em.findOneOrFail(Cart, { where: { id: cartId, status: 'open' }, relations: { items: true } });
for (const item of cart.items) {
const res = await em.update(Stock,
{ sku: item.sku, available: MoreThanOrEqual(item.quantity) },
{ available: () => `available - ${item.quantity}` });
if (res.affected === 0) throw new OutOfStockError(item.sku);
}
return em.save(em.create(Order, { cartId, status: 'reserved', total: cart.total }));
});
try {
const charge = await this.stripe.paymentIntents.create({
amount: Number(order.total) * 100, payment_method: paymentMethod, confirm: true,
});
await this.orderRepo.update(order.id, { status: 'confirmed', chargeId: charge.id });
return order;
} catch (e) {
await this.compensate(order.id); // restock + cancel order
throw new PaymentDeclinedError();
}
}Gains — Survente impossible, paiement échoué = stock restauré automatiquement, pas de transaction longue qui bloque le pool.
LegalTech — Signature contrat multi-parties
Qui — Plateforme de signature électronique pour cabinets d'avocats français. Problème — Un contrat à 4 signataires : on enregistre la signature, on met à jour le statut global, on archive le PDF horodaté, on notifie le suivant. Toute interruption laisse un contrat dans un état incohérent. Comment — Transaction Prisma interactive, lecture du contrat avec verrou applicatif via update WHERE version = X, write du nouveau PDF.
async signContract(contractId: string, signerId: string, signature: Buffer) {
return this.prisma.$transaction(async (tx) => {
const contract = await tx.contract.findUniqueOrThrow({
where: { id: contractId },
include: { signatures: true, signers: { orderBy: { order: 'asc' } } },
});
const next = contract.signers.find((s) => s.status === 'pending');
if (!next || next.id !== signerId) throw new ForbiddenException('NOT_YOUR_TURN');
await tx.signature.create({ data: { contractId, signerId, hash: sha256(signature) } });
await tx.signer.update({ where: { id: signerId }, data: { status: 'signed', signedAt: new Date() } });
const remaining = contract.signers.filter((s) => s.status === 'pending' && s.id !== signerId).length;
const newStatus = remaining === 0 ? 'completed' : 'in_progress';
return tx.contract.update({
where: { id: contractId, version: contract.version },
data: { status: newStatus, version: { increment: 1 } },
include: { signers: true },
});
}, { isolationLevel: 'Serializable', timeout: 10000 });
}Gains — Pas de double signature, ordre des signataires respecté, optimistic locking via version empêche les races.
🛠️ Exemple end-to-end
Contexte — La banque ci-dessus déploie un endpoint de virement instantané SCT Inst (Single Euro Payments Area Instant Credit Transfer) : moins de 10 s end-to-end, idempotence par endToEndId, anti double-débit, audit immuable, et notification temps réel. On combine TypeORM transaction + outbox pattern pour publier l'événement Kafka après commit.
// src/transfer/transfer.entities.ts
@Entity()
@Index(['endToEndId'], { unique: true })
export class Transfer {
@PrimaryGeneratedColumn('uuid') id: string;
@Column({ unique: true }) endToEndId: string; // idempotency key
@Column() debitorIban: string;
@Column() creditorIban: string;
@Column('bigint', { transformer: bigintTransformer }) amountCents: bigint;
@Column({ default: 'pending' }) status: 'pending' | 'settled' | 'rejected';
@Column({ nullable: true }) rejectionCode: string;
@CreateDateColumn() createdAt: Date;
@UpdateDateColumn() updatedAt: Date;
}
@Entity()
export class Account {
@PrimaryGeneratedColumn('uuid') id: string;
@Column({ unique: true }) iban: string;
@Column('bigint', { transformer: bigintTransformer }) balanceCents: bigint;
@Column('bigint', { default: '0', transformer: bigintTransformer })
reservedCents: bigint;
@Column({ default: 'open' }) status: 'open' | 'frozen' | 'closed';
@VersionColumn() version: number;
}
@Entity()
export class LedgerEntry {
@PrimaryGeneratedColumn('uuid') id: string;
@Column() accountId: string;
@Column() transferId: string;
@Column('bigint', { transformer: bigintTransformer }) deltaCents: bigint;
@Column() side: 'debit' | 'credit';
@CreateDateColumn() postedAt: Date;
}
@Entity()
export class OutboxEvent {
@PrimaryGeneratedColumn('uuid') id: string;
@Column() topic: string;
@Column('jsonb') payload: Record<string, unknown>;
@Column({ default: false }) published: boolean;
@CreateDateColumn() createdAt: Date;
}// src/transfer/transfer.service.ts
@Injectable()
export class InstantTransferService {
constructor(@InjectDataSource() private ds: DataSource) {}
async execute(cmd: InstantTransferCommand): Promise<Transfer> {
// Idempotence check OUTSIDE transaction (read-only)
const existing = await this.ds.getRepository(Transfer)
.findOne({ where: { endToEndId: cmd.endToEndId } });
if (existing) return existing;
return this.ds.transaction('SERIALIZABLE', async (em) => {
// 1. Re-check idempotency inside transaction
const dup = await em.findOne(Transfer, { where: { endToEndId: cmd.endToEndId } });
if (dup) return dup;
// 2. Lock both accounts in deterministic order to avoid deadlock
const ibans = [cmd.debitorIban, cmd.creditorIban].sort();
const accounts = await em.createQueryBuilder(Account, 'a')
.setLock('pessimistic_write')
.where('a.iban IN (:...ibans)', { ibans })
.getMany();
const debitor = accounts.find((a) => a.iban === cmd.debitorIban);
const creditor = accounts.find((a) => a.iban === cmd.creditorIban);
if (!debitor || !creditor) throw new NotFoundException('ACCOUNT_NOT_FOUND');
if (debitor.status !== 'open') throw new ForbiddenException('DEBITOR_FROZEN');
if (creditor.status === 'closed') throw new ForbiddenException('CREDITOR_CLOSED');
// 3. Solvency check (balance - reserved >= amount)
const available = debitor.balanceCents - debitor.reservedCents;
if (available < cmd.amountCents) {
const rejected = em.create(Transfer, {
endToEndId: cmd.endToEndId,
debitorIban: cmd.debitorIban,
creditorIban: cmd.creditorIban,
amountCents: cmd.amountCents,
status: 'rejected',
rejectionCode: 'INSUFFICIENT_FUNDS',
});
return em.save(rejected);
}
// 4. Move money
debitor.balanceCents -= cmd.amountCents;
creditor.balanceCents += cmd.amountCents;
await em.save([debitor, creditor]);
// 5. Create transfer + ledger entries
const transfer = await em.save(em.create(Transfer, {
endToEndId: cmd.endToEndId,
debitorIban: cmd.debitorIban,
creditorIban: cmd.creditorIban,
amountCents: cmd.amountCents,
status: 'settled',
}));
await em.insert(LedgerEntry, [
{ accountId: debitor.id, transferId: transfer.id,
deltaCents: -cmd.amountCents, side: 'debit' },
{ accountId: creditor.id, transferId: transfer.id,
deltaCents: cmd.amountCents, side: 'credit' },
]);
// 6. Outbox event (published after commit by a separate poller)
await em.insert(OutboxEvent, {
topic: 'transfer.settled',
payload: {
transferId: transfer.id,
endToEndId: transfer.endToEndId,
debitorIban: transfer.debitorIban,
creditorIban: transfer.creditorIban,
amountCents: transfer.amountCents.toString(),
},
});
return transfer;
});
}
}// src/transfer/outbox.poller.ts
@Injectable()
export class OutboxPoller {
constructor(
@InjectRepository(OutboxEvent) private outbox: Repository<OutboxEvent>,
private kafka: KafkaProducer,
) {}
@Cron('*/1 * * * * *') // every second
async drain() {
const batch = await this.outbox.find({
where: { published: false },
take: 100, order: { createdAt: 'ASC' },
});
for (const evt of batch) {
try {
await this.kafka.send(evt.topic, evt.payload);
await this.outbox.update(evt.id, { published: true });
} catch (e) {
Logger.error(`outbox publish failed for ${evt.id}`, e);
return; // stop batch, retry next tick
}
}
}
}// src/transfer/transfer.controller.ts
@Controller('transfers/instant')
export class InstantTransferController {
constructor(private svc: InstantTransferService) {}
@Post()
@HttpCode(202)
async create(@Body() dto: InstantTransferDto) {
const transfer = await this.svc.execute({
endToEndId: dto.endToEndId,
debitorIban: dto.debitorIban,
creditorIban: dto.creditorIban,
amountCents: BigInt(Math.round(dto.amount * 100)),
});
return { status: transfer.status, transferId: transfer.id };
}
}L'index unique sur endToEndId est la garantie ultime d'idempotence (un retry réseau renvoie le même résultat), l'outbox rend la publication Kafka transactionnelle (l'event n'existe que si le virement a commit), et le SERIALIZABLE interdit le scénario "deux virements concurrents qui voient chacun un solde suffisant alors qu'à eux deux ils dépassent".
🔁 Quand utiliser / éviter
Utiliser une transaction locale (1 store) :
- 2+ writes qui doivent être atomic.
- Read+write pattern (lock + update).
Préférer ALS / interceptor propagation :
- Code applicatif riche, beaucoup de services qui doivent participer.
- Tester est ok car services dépendent d'une abstraction (
db.client).
Préférer callback explicite :
- Code applicatif simple, transaction localisée à 1 service.
Préférer Saga :
- Cross-store (Postgres + Mongo + S3 + Stripe).
- Étapes longues (jobs async).
- Besoin d'observabilité fine (chaque étape audit).
Préférer Outbox :
- Synchroniser DB + broker / search index avec garantie at-least-once.
Éviter "transaction-everywhere" :
- Ouvrir une txn par requête HTTP (anti-pattern) tue la concurrence.
- Garder la txn courte (sub-seconde idéalement).
🧰 Exemples avancés
Saga manuelle avec compensations
type Step<T> = { run: () => Promise<T>; compensate?: (result: T) => Promise<void> };
@Injectable()
export class OrderSaga {
async createOrder(input: CreateOrderInput): Promise<Order> {
const completed: Array<{ result: unknown; compensate?: (r: any) => Promise<void> }> = [];
const exec = async <T>(step: Step<T>): Promise<T> => {
const result = await step.run();
completed.push({ result, compensate: step.compensate });
return result;
};
try {
const order = await exec({
run: () => this.orders.create(input),
compensate: (o: Order) => this.orders.markCancelled(o.id),
});
await exec({
run: () => this.stock.reserve(input.items),
compensate: () => this.stock.release(input.items),
});
await exec({
run: () => this.payments.charge(input.payment),
compensate: (c: Charge) => this.payments.refund(c.id),
});
return order;
} catch (err) {
// Compensations en ordre inverse
for (const step of completed.reverse()) {
try { await step.compensate?.(step.result); }
catch (e) { this.log.error('compensation_failed', e); }
}
throw err;
}
}
}Outbox + worker
// Dans la transaction métier
await this.prisma.$transaction(async (tx) => {
const user = await tx.user.create({ data: input });
await tx.outbox.create({
data: {
aggregateId: user.id,
eventType: 'user.created',
payload: { id: user.id, email: user.email },
occurredAt: new Date(),
},
});
});
// Worker (cron / interval)
@Injectable()
export class OutboxWorker {
constructor(private readonly prisma: PrismaService, private readonly bus: EventBus) {}
@Cron('*/5 * * * * *') // toutes les 5s
async drain() {
const events = await this.prisma.outbox.findMany({
where: { dispatchedAt: null },
take: 50,
orderBy: { occurredAt: 'asc' },
});
for (const ev of events) {
try {
await this.bus.publish(ev.eventType, ev.payload);
await this.prisma.outbox.update({ where: { id: ev.id }, data: { dispatchedAt: new Date() } });
} catch (e) {
// backoff handled via retry counter (column)
}
}
}
}Retry sur conflit SERIALIZABLE
async withRetry<T>(fn: () => Promise<T>, attempts = 3): Promise<T> {
let lastErr: any;
for (let i = 0; i < attempts; i++) {
try { return await fn(); }
catch (e: any) {
if (e.code === 'P2034' || e.code === '40001') {
lastErr = e;
await new Promise((r) => setTimeout(r, 50 * 2 ** i)); // backoff exponentiel
continue;
}
throw e;
}
}
throw lastErr;
}
// Usage
await this.withRetry(() =>
this.prisma.$transaction(async (tx) => { /* ... */ }, { isolationLevel: 'Serializable' }),
);🏋️ Exercices
Progression : implémenter → rendre production-grade → casser puis réparer. Fais-les avec un vrai Postgres (Testcontainers), pas un mock — l'isolation ne se mocke pas.
1. Transfert atomique (échauffement)
Objectif — Implémenter transfer(from, to, amount) qui débite, crédite et écrit 2 lignes de ledger, atomiquement, avec un test de rollback réel (le crédit throw → le débit doit être annulé). Indice — prisma.$transaction(async tx => …), puis un test qui mocke la 2ᵉ écriture en rejet et vérifie findMany().length === 0. Vérifie que tu utilises bien tx.* et pas prisma.* dans le callback.
2. Reproduire un write skew, puis le tuer
Objectif — Modéliser "il faut toujours ≥ 1 admin actif". Lancer 2 requêtes concurrentes qui désactivent chacune un admin différent ; observer qu'en READ COMMITTED tu te retrouves à 0 admin. Puis corriger. Indice/Solution — Deux fixes valides à comparer : (a) SERIALIZABLE + withRetry sur 40001 ; (b) une contrainte DB qui matérialise l'invariant (trigger ou EXCLUDE/partial unique index "au moins un actif"). Argumente lequel tu choisis en prod (la contrainte gagne : pas de retry, pas de hotspot).
3. Propagation ALS sans fuite de tx
Objectif — Faire participer UsersService.create() ET AuditService.log() à la même transaction sans qu'aucune signature ne reçoive tx. Le AuditService ne doit pas savoir qu'il est dans une txn. Indice — AsyncLocalStorage, db.client qui retourne als.getStore() ?? this, interceptor qui als.run(tx, () => lastValueFrom(next.handle())). Test : un audit log écrit pendant un rollback ne doit PAS persister.
4. Outbox transactionnel + worker idempotent (production-grade)
Objectif — Garantir qu'un event Kafka/SSE n'est publié que si la transaction métier a commit, exactly-effectively-once côté consommateur. Indice/Solution — outbox.create() dans la txn ; poller FOR UPDATE SKIP LOCKED (sinon 2 pollers publient en double) ; marquer published=true dans la même txn que le send. Côté consommateur : dédup par eventId (l'outbox est at-least-once, pas exactly-once réseau).
5. Casser le pool, le diagnostiquer, le réparer (chaos)
Objectif — Mettre un await fetch(slowApi) dans un $transaction, monter la charge avec autocannon, observer db_pool_waiting exploser et les timeouts. Puis sortir l'I/O de la txn et re-mesurer. Indice — La txn tient une connexion pendant tout le fetch. À pool=10 et fetch=2 s, tu plafonnes à 5 req/s. Fix : lire les données dans une txn courte, faire le fetch hors txn, persister dans une 2ᵉ txn. Prouve le gain avec le p99 de db_tx_duration_seconds.
6. Job LLM idempotent & cost-aware (intégration stack)
Objectif — Implémenter claimGeneration + worker de la section IA : un double-POST avec le même generationId ne doit lancer qu'UN appel Anthropic et ne débiter le quota qu'une fois. Un retry BullMQ après persistance ne doit PAS rappeler le LLM. Indice/Solution — Index unique sur generationId = idempotence ; relire status==='done' avant l'appel ; appel LLM hors txn avec AbortController câblé sur la déconnexion client ; outbox pour le push SSE final. Test : 50 POST concurrents → exactement 1 ligne generation, 1 débit quota, 1 appel SDK (espionne le client Anthropic mocké).
🎤 En entretien
Q — Postgres est en READ COMMITTED par défaut. Donne-moi un bug que ce niveau laisse passer et qui surprend les juniors. Le write skew : deux transactions lisent le même état (ex. "2 admins actifs"), chacune décide de retirer un admin différent, les deux commit → 0 admin, alors qu'aucune n'a écrit la ligne lue par l'autre. READ COMMITTED et même REPEATABLE READ ne le voient pas ; il faut SERIALIZABLE (+ retry sur 40001) ou matérialiser l'invariant en contrainte DB.
Q — Pourquoi ne jamais mettre un appel HTTP/LLM dans une transaction ? La txn tient une connexion du pool pendant toute l'I/O réseau. Par la loi de Little, le débit max = taille_pool / durée_txn ; un fetch de 2 s à pool=20 plafonne à 10 req/s et fait fuir le pool sous charge. On lit en txn courte, on fait l'I/O hors txn, on persiste en 2ᵉ txn — et pour la cohérence DB↔broker, on utilise l'outbox.
Q — Comment publier un event Kafka "exactly-once" avec une écriture DB ? On ne peut pas atomiquement écrire en DB et publier sur Kafka (pas de 2PC pratique). On écrit l'event dans une table outbox dans la même transaction que la donnée métier (donc atomique avec elle), puis un poller (FOR UPDATE SKIP LOCKED) le publie : c'est de l'at-least-once, le consommateur déduplique par eventId. "Exactly-once" réseau n'existe pas ; "effectively-once" via idempotence côté consommateur, oui.
Q — Un provider Scope.REQUEST qui ouvre une transaction : quel piège ? Le provider et ses dépendances sont instanciés à la requête, mais le repo capturé à l'instanciation n'est pas lié au EntityManager/tx ouvert après. Les writes passent hors transaction sans erreur visible. Solution : propager le client transactionnel via AsyncLocalStorage (un db.client qui lit als.getStore()), pas via l'instance injectée.
🔗 Liens
- Prisma transactions
- TypeORM transactions
- @nestjs-cls
- Outbox pattern
- Saga pattern
- Postgres isolation levels
- Voir
01-typeorm.md,02-prisma.md,03-mongoose.mdpour les APIs spécifiques.