Skip to content

TypeORM

TL;DR — TypeORM est un Data Mapper / Active Record hybride. Dans NestJS, on l'intègre via @nestjs/typeorm. Les pièges à connaître au niveau senior : la cassure 0.2 → 0.3 (ConnectionDataSource, plus de connection dans repository.manager), les N+1 silencieux sur les relations lazy, la confusion eager vs JOIN, les transactions via DataSource.transaction() ou QueryRunner (et pas @Transaction() deprecated), et la perte du contexte transactionnel quand on injecte un Repository qui ne partage pas le bon EntityManager.

🧠 Mental model

   ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
   │   DataSource    │◄──►│  EntityManager  │◄──►│  Repository<T>  │
   │ (pool + meta)   │    │ (txn boundary)  │    │ (CRUD typed)    │
   └─────────────────┘    └─────────────────┘    └─────────────────┘
            ▲                       ▲                       ▲
            │                       │                       │
       DataSource opts        manager.transaction()    @InjectRepository

AnalogieDataSource = la connexion + le schéma. EntityManager = un client SQL stateful (peut être dans une transaction). Repository<T> = un EntityManager spécialisé sur une entité. Un Repository qu'on injecte par défaut n'est pas transactional — il colle au manager global. Pour transaction propre : passer le EntityManager du transaction() callback.

Le modèle qui compte vraiment, c'est le pool. TypeORM n'a pas "une connexion" : il a un pool (poolSize). Chaque transaction() / QueryRunner emprunte une connexion au pool pour toute sa durée et la rend à la fin. Trois conséquences que tout staff garde en tête :

  • Une transaction longue = une connexion confisquée. Si tu fais un appel HTTP (LLM, paiement) à l'intérieur d'une transaction, tu tiens une connexion DB pendant des secondes pour rien. À poolSize: 20, 20 requêtes lentes simultanées et le 21e client attend (connection pool timeout). Règle : jamais d'I/O réseau dans une transaction.
  • Un QueryRunner créé manuellement et non release() fuit une connexion — le pool se vide silencieusement jusqu'au deadlock applicatif. Toujours try/finally { await qr.release() }.
  • Le bon modèle de capacité : poolSize ≈ (cœurs DB × 2..4), pas "le plus possible". Trop de connexions ⇒ contention de locks et context-switch côté Postgres. PgBouncer en transaction mode devant l'app si tu scales horizontalement (mais alors : pas de prepared statements persistants, pas de SET de session entre requêtes).

🛠️ Code minimal

ts
// Setup
import { TypeOrmModule } from '@nestjs/typeorm';

@Module({
  imports: [
    TypeOrmModule.forRootAsync({
      useFactory: (cfg: ConfigService) => ({
        type: 'postgres',
        url: cfg.get('DATABASE_URL'),
        entities: [User, Post, Comment],
        migrations: ['dist/migrations/*.js'],
        migrationsRun: false,        // jamais autorun en prod
        synchronize: false,          // JAMAIS true en prod
        logging: cfg.get('NODE_ENV') === 'dev' ? ['query', 'error', 'warn'] : ['error'],
        maxQueryExecutionTime: 1000, // toute query > 1s est loggée (canal 'warn')
        poolSize: 20,
      }),
      inject: [ConfigService],
    }),
    TypeOrmModule.forFeature([User, Post]),
  ],
})
export class AppModule {}
ts
// Entité avec relations
import { Entity, PrimaryGeneratedColumn, Column, OneToMany, ManyToOne, JoinColumn, Index, CreateDateColumn, UpdateDateColumn } from 'typeorm';

@Entity('users')
@Index('uq_users_email', ['email'], { unique: true })
export class User {
  @PrimaryGeneratedColumn('uuid') id!: string;
  @Column({ type: 'citext' }) email!: string;
  @Column() name!: string;
  @CreateDateColumn() createdAt!: Date;
  @UpdateDateColumn() updatedAt!: Date;

  @OneToMany(() => Post, (p) => p.author, { cascade: ['insert'] })
  posts!: Post[];
}

@Entity('posts')
@Index('ix_posts_author_id_created', ['authorId', 'createdAt'])
export class Post {
  @PrimaryGeneratedColumn('uuid') id!: string;
  @Column() title!: string;
  @Column({ type: 'text' }) body!: string;
  @Column('uuid') authorId!: string;

  @ManyToOne(() => User, (u) => u.posts, { onDelete: 'CASCADE' })
  @JoinColumn({ name: 'authorId' })
  author!: User;

  @CreateDateColumn() createdAt!: Date;
}
ts
// Repository injection
@Injectable()
export class UsersService {
  constructor(
    @InjectRepository(User) private readonly users: Repository<User>,
    private readonly dataSource: DataSource,
  ) {}

  findOne(id: string) {
    return this.users.findOne({ where: { id }, relations: { posts: true } });
  }

  // QueryBuilder — JOIN + projection
  topAuthors(limit = 10) {
    return this.users.createQueryBuilder('u')
      .leftJoin('u.posts', 'p')
      .addSelect('COUNT(p.id)', 'postCount')
      .groupBy('u.id')
      .orderBy('"postCount"', 'DESC')
      .limit(limit)
      .getRawAndEntities();
  }

  // Transaction — pattern recommandé 0.3
  async transferPosts(fromId: string, toId: string) {
    return this.dataSource.transaction(async (em) => {
      const userRepo = em.getRepository(User);
      const postRepo = em.getRepository(Post);
      // toutes les ops via em → même transaction
      const from = await userRepo.findOneOrFail({ where: { id: fromId } });
      await postRepo.update({ authorId: from.id }, { authorId: toId });
      // throw ici ⇒ rollback auto
    });
  }
}

🎯 Patterns courants

  1. Migrations onlysynchronize: false, écrire chaque changement de schéma comme migration (typeorm migration:generate). En CI, faillir si une migration non commitée existe.
  2. { relations: { posts: true } } style — préférer à l'array ['posts'] (typé). Ou utiliser loadRelationCountAndMap pour des counts sans JOIN.
  3. QueryBuilder pour les requêtes non triviales — agrégats, sous-requêtes, projections custom. Le repo simple suffit pour CRUD plat.
  4. Soft delete@DeleteDateColumn() + repo.softDelete(id) / softRemove(entity) au lieu de delete/remove. Les requêtes filtrent les lignes soft-deleted par défaut ; withDeleted: true (ou qb.withDeleted()) pour les inclure.
  5. upsert() — Postgres ON CONFLICT. repo.upsert(rows, ['email']). Plus rapide que findOne + save.
  6. Listeners / Subscribers@EventSubscriber() pour audit log, MAIS pas pour de la logique métier (hook hors du flux applicatif = surprises).
  7. Custom Repository — extension via dataSource.getRepository(User).extend({ findActiveByTenant(tid) { return this.find({ where: { tenantId: tid, deletedAt: IsNull() } }); } }) (jamais deletedAt: null — TypeORM exige IsNull()). Préférer à EntityRepository deprecated.
  8. Index hints + EXPLAIN — en QueryBuilder, qb.setQueryRunner(...) pour exécuter manuellement EXPLAIN ANALYZE et observer le plan. Sans ça, on optimise à l'aveugle.
  9. Pagination cursor (keyset) — pour les grandes tables, éviter .skip(offset) (scan O(offset)). Préférer WHERE id > :lastId ORDER BY id LIMIT :n.
  10. createQueryBuilder('u').setLock('pessimistic_write') — verrou ligne pour les sections critiques (incrément de stock).

🔄 Versions — Nest + TypeORM

VersionNotes
TypeORM 0.2.xConnection global, createConnection(), @Transaction() decorator (deprecated). getConnection(), getRepository() API statique.
TypeORM 0.3.xBreaking : DataSource remplace Connection. createConnectionnew DataSource(...).initialize(). APIs statiques (getRepository) supprimées. findOne(id)findOne({ where: { id } }) obligatoire (l'ancienne forme ne compile plus). relations accepte objet. Soft delete via softDelete/softRemove + withDeleted.
@nestjs/typeorm 8Compat 0.2.
@nestjs/typeorm 9+Compat 0.3. DataSource injectable directement.
@nestjs/typeorm 10Nest 10. Toujours TypeORM 0.3. Recommandé.
Nest 11Compat TypeORM 0.3.20+. Préférer Prisma ou Drizzle pour les nouveaux projets selon la communauté — TypeORM 0.3 a un rythme de release lent.

Migration 0.2 → 0.3findOne(id) ne compile plus : la signature 0.3 attend un FindOneOptions, donc un id brut lève une erreur de type (et à l'exécution un TypeORMError/"No overload matches"). Toujours migrer vers findOne({ where: { id } }) ou findOneBy({ id }).

⚠️ Pitfalls

  1. N+1 avec relations lazy@OneToMany({ lazy: true }) charge à l'accès (await user.posts), créant un SELECT par user. Préférer relations explicites + relations: { posts: true } ou QueryBuilder avec leftJoinAndSelect.
  2. eager: true partout — gain initial, mais charge la moitié du graphe à chaque findOne. Réserver aux cas vraiment 1-1 essentiels.
  3. Repository hors transaction — injecter Repository<User> puis l'utiliser dans un dataSource.transaction(em => ...) ne participe pas à la transaction. Toujours em.getRepository(User) dans le callback.
  4. synchronize: true en prod — drop de colonnes silencieux. Bannir.
  5. save() qui ne fait qu'INSERTrepo.save(entity) fait un INSERT ou un UPDATE selon si l'id est défini. Sur un payload partiel non hydraté, ça peut faire un INSERT qui viole une contrainte. Utiliser update() explicite si tu sais que l'entité existe.
  6. Cascade insert sans cascade: ['insert'] — sauvegarder un user.posts = [p1, p2] via save(user) ignore les posts sauf si cascade activé. Sémantique opaque, préférer save explicite des deux.
  7. maxQueryExecutionTime ignoré — log seulement, ne tue pas la query. Pour ça, statement_timeout côté Postgres ou query_timeout côté pool.
  8. Migrations TypeORM non-idempotentesmigration:generate peut produire des ALTER ambigus (rename = drop + add). Toujours review à la main, jamais blindly accepted.
  9. findOne(id) cassé en 0.3 — comme dit plus haut, la signature exige un FindOneOptions : un id brut ne compile pas (et lève à l'exécution). Tous les findOne(id) héritage sont à migrer en findOneBy({ id }).
  10. @JoinColumn() mal placé — sur une relation @ManyToOne, le @JoinColumn va sur le côté "many" (qui porte la FK). Inverser = colonne manquante en base.
  11. relations qui charge des subgraphes énormesrelations: { posts: { comments: { author: true } } } peut tirer 50 000 rows si tu n'as pas de pagination. Toujours take / where sur les nested.
  12. Repository.delete() sans conditionrepo.delete({}) vide la table. TypeORM ne protège pas (Prisma le fait via where obligatoire). Toujours typer le where.
  13. DataSource pas initialisé en script standalone — pour les seeders/migrations runtime, await dataSource.initialize() avant utilisation.

🧪 Testing

ts
// Unitaire — mock du Repository
const repoMock = {
  findOne: jest.fn(),
  save: jest.fn(),
  createQueryBuilder: jest.fn(() => ({ where: jest.fn().mockReturnThis(), getMany: jest.fn() })),
};

const mod = await Test.createTestingModule({
  providers: [UsersService, { provide: getRepositoryToken(User), useValue: repoMock }],
}).compile();

const svc = mod.get(UsersService);
repoMock.findOne.mockResolvedValue({ id: '1', email: '[email protected]' });
expect(await svc.findOne('1')).toEqual({ id: '1', email: '[email protected]' });
ts
// Intégration — testcontainers Postgres
import { PostgreSqlContainer } from '@testcontainers/postgresql';

let container: StartedPostgreSqlContainer;
beforeAll(async () => {
  container = await new PostgreSqlContainer('postgres:16-alpine').start();
});
afterAll(() => container.stop());

beforeEach(async () => {
  const app = await Test.createTestingModule({
    imports: [TypeOrmModule.forRoot({
      type: 'postgres', url: container.getConnectionUri(),
      entities: [User], synchronize: true,
    }), TypeOrmModule.forFeature([User])],
    providers: [UsersService],
  }).compile();
  // run tests against real Postgres
});

Règle : la couche service avec QueryBuilder se teste mal en mock — utiliser testcontainers ou pg-mem (limité). Les rules métier pures se testent en mock.

ts
// Reset entre tests
beforeEach(async () => {
  const dataSource = mod.get(DataSource);
  const tables = ['posts', 'users'];     // ordre FK respecté
  for (const t of tables) {
    await dataSource.query(`TRUNCATE TABLE "${t}" RESTART IDENTITY CASCADE`);
  }
});

Pour les tests de migrations, lancer dataSource.runMigrations() puis dataSource.undoLastMigration() et vérifier le schéma — essentiel pour s'assurer que down n'est pas cassé.

🎬 Cas d'usage concrets

LegalTech — RAG juridique avec pgvector

Qui — Cabinet d'avocats parisien (50 collaborateurs) qui développe un assistant interne capable de retrouver des passages pertinents dans 200 000 décisions de jurisprudence indexées. Problème — Les requêtes ANN (Approximate Nearest Neighbor) sur l'extension pgvector doivent cohabiter avec le modèle relationnel des dossiers clients. Prisma ne supporte pas nativement le type vector, et l'équipe veut garder un seul ORM. Comment — On modélise l'entité CaseLawChunk avec une colonne vector(1536) exposée via un transformer custom, et on requête en raw SQL typé.

ts
@Entity('case_law_chunk')
export class CaseLawChunk {
  @PrimaryGeneratedColumn('uuid') id: string;
  @Column('text') content: string;
  // pgvector n'a pas de type natif TypeORM. On déclare la colonne en raw
  // (`columnType: 'vector(1536)'`) et on convertit number[] ↔ string '[...]'
  // via un transformer. La COLONNE et l'INDEX HNSW sont créés en migration manuelle.
  @Column({
    type: 'varchar' as any,            // placeholder côté metadata
    transformer: {
      to: (v: number[]) => `[${v.join(',')}]`,
      from: (v: string) => v ? JSON.parse(v) : [],
    },
  })
  embedding: number[];
  @ManyToOne(() => CaseLawDoc, (d) => d.chunks) doc: CaseLawDoc;
}

@Injectable()
export class JurisprudenceSearch {
  constructor(@InjectDataSource() private ds: DataSource) {}

  async findSimilar(query: number[], topK = 5) {
    return this.ds.query(
      `SELECT id, content, 1 - (embedding <=> $1::vector) AS score
       FROM case_law_chunk
       ORDER BY embedding <=> $1::vector
       LIMIT $2`,
      [`[${query.join(',')}]`, topK],
    );
  }
}

Gains — Une seule base Postgres pour métier + IA, opérateur <=> indexé par HNSW, p95 < 80 ms sur 200 K chunks.

E-commerce — Catalogue marketplace multi-vendeurs

Qui — Marketplace française B2B vendant du matériel professionnel pour 4 000 marchands. Problème — Le catalogue mélange produits propres, produits affiliés et variantes (taille, coloris) avec règles de visibilité par marchand. Les jointures arrivent à 6-7 niveaux et les N+1 explosent en lazy. Comment — On utilise QueryBuilder avec leftJoinAndSelect explicite et setLock sur les opérations de réassortiment.

ts
async listForVendor(vendorId: string, page: number) {
  return this.ds.getRepository(Product)
    .createQueryBuilder('p')
    .leftJoinAndSelect('p.variants', 'v')
    .leftJoinAndSelect('v.stockLevels', 's', 's.vendorId = :vendorId', { vendorId })
    .leftJoinAndSelect('p.category', 'c')
    .where('p.active = true')
    .andWhere('s.quantity > 0')
    .orderBy('p.rank', 'DESC')
    .take(50).skip(page * 50)
    .getMany();
}

Gains — Suppression des N+1, p95 de 1.2 s à 180 ms, code reste typé bout en bout.

Banque — Cœur transactionnel multidevise

Qui — Néobanque européenne (350 K comptes) avec un livre de comptes en double-entrée. Problème — Chaque mouvement doit créer deux lignes opposées dans la même transaction, avec verrouillage SELECT FOR UPDATE pour éviter les soldes négatifs en concurrence. Comment — Transaction explicite via DataSource.transaction() + QueryBuilder avec setLock('pessimistic_write').

ts
async transfer(fromId: string, toId: string, amount: bigint, currency: string) {
  return this.ds.transaction('SERIALIZABLE', async (em) => {
    const from = await em.createQueryBuilder(Account, 'a')
      .setLock('pessimistic_write')
      .where('a.id = :id', { id: fromId }).getOneOrFail();
    if (from.balance < amount) throw new InsufficientFundsError();
    await em.decrement(Account, { id: fromId }, 'balance', amount.toString());
    await em.increment(Account, { id: toId }, 'balance', amount.toString());
    await em.save([
      em.create(LedgerEntry, { accountId: fromId, amount: -amount, currency }),
      em.create(LedgerEntry, { accountId: toId, amount, currency }),
    ]);
  });
}

Gains — Garantie ACID, 0 anomalie sur 18 mois en production, audit trail complet.

🛠️ Exemple end-to-end

Contexte — La marketplace B2B veut lancer un mode "panier vendeur" : un acheteur compose un panier multi-marchands, et la validation crée N commandes filles (une par vendeur) tout en réservant le stock dans la même transaction. Si un seul vendeur est en rupture, on annule tout et on remonte les SKUs problématiques.

ts
// src/order/order.entities.ts
@Entity()
export class Cart {
  @PrimaryGeneratedColumn('uuid') id: string;
  @Column() buyerId: string;
  @OneToMany(() => CartItem, (i) => i.cart, { cascade: true }) items: CartItem[];
  @Column({ default: 'open' }) status: 'open' | 'checked_out' | 'cancelled';
}

@Entity()
export class CartItem {
  @PrimaryGeneratedColumn('uuid') id: string;
  @ManyToOne(() => Cart, (c) => c.items) cart: Cart;
  @Column() sku: string;
  @Column() vendorId: string;
  @Column('int') quantity: number;
  @Column('numeric', { precision: 12, scale: 2 }) unitPrice: string;
}

@Entity()
export class StockLevel {
  @PrimaryColumn() sku: string;
  @PrimaryColumn() vendorId: string;
  @Column('int') available: number;
  @VersionColumn() version: number;
}

@Entity()
export class Order {
  @PrimaryGeneratedColumn('uuid') id: string;
  @Column() buyerId: string;
  @Column() vendorId: string;
  @Column('numeric', { precision: 12, scale: 2 }) total: string;
  @OneToMany(() => OrderLine, (l) => l.order, { cascade: true }) lines: OrderLine[];
  @CreateDateColumn() createdAt: Date;
}

@Entity()
export class OrderLine {
  @PrimaryGeneratedColumn('uuid') id: string;
  @ManyToOne(() => Order, (o) => o.lines) order: Order;
  @Column() sku: string;
  @Column('int') quantity: number;
  @Column('numeric', { precision: 12, scale: 2 }) unitPrice: string;
}
ts
// src/order/checkout.service.ts
@Injectable()
export class CheckoutService {
  constructor(@InjectDataSource() private ds: DataSource) {}

  async checkout(cartId: string): Promise<Order[]> {
    return this.ds.transaction('READ COMMITTED', async (em) => {
      const cart = await em.findOneOrFail(Cart, {
        where: { id: cartId, status: 'open' },
        relations: { items: true },
      });

      // 1. Group by vendor
      const byVendor = new Map<string, CartItem[]>();
      for (const item of cart.items) {
        const arr = byVendor.get(item.vendorId) ?? [];
        arr.push(item);
        byVendor.set(item.vendorId, arr);
      }

      // 2. Lock and verify stock for every SKU
      const shortages: { sku: string; needed: number; available: number }[] = [];
      for (const item of cart.items) {
        const stock = await em.createQueryBuilder(StockLevel, 's')
          .setLock('pessimistic_write')
          .where('s.sku = :sku AND s.vendorId = :vendor',
                 { sku: item.sku, vendor: item.vendorId })
          .getOneOrFail();
        if (stock.available < item.quantity) {
          shortages.push({ sku: item.sku, needed: item.quantity, available: stock.available });
        }
      }
      if (shortages.length) throw new StockShortageError(shortages);

      // 3. Decrement stock and create child orders
      const orders: Order[] = [];
      for (const [vendorId, items] of byVendor) {
        for (const item of items) {
          await em.decrement(StockLevel,
            { sku: item.sku, vendorId },
            'available', item.quantity);
        }
        const order = em.create(Order, {
          buyerId: cart.buyerId,
          vendorId,
          total: items.reduce(
            (s, i) => s + Number(i.unitPrice) * i.quantity, 0,
          ).toFixed(2),
          lines: items.map((i) => em.create(OrderLine, {
            sku: i.sku, quantity: i.quantity, unitPrice: i.unitPrice,
          })),
        });
        orders.push(await em.save(order));
      }

      cart.status = 'checked_out';
      await em.save(cart);
      return orders;
    });
  }
}
ts
// src/order/order.controller.ts
@Controller('checkout')
export class CheckoutController {
  constructor(private svc: CheckoutService) {}

  @Post(':cartId')
  async checkout(@Param('cartId') cartId: string) {
    try {
      const orders = await this.svc.checkout(cartId);
      return { ok: true, orderIds: orders.map((o) => o.id) };
    } catch (e) {
      if (e instanceof StockShortageError) {
        throw new ConflictException({ code: 'STOCK', shortages: e.details });
      }
      throw e;
    }
  }
}

Le pessimistic_write sur StockLevel sérialise les checkouts concurrents par SKU, la transaction garantit qu'aucune commande n'est créée si un seul item manque, et cascade: true propage la sauvegarde des OrderLine sans aller-retour manuel.

🧬 Transaction propagation — le piège senior

Le défaut #1 en code TypeORM "presque correct" : un Repository injecté n'a aucune idée qu'une transaction est ouverte. La transaction vit dans un EntityManager (celui passé au callback de dataSource.transaction()), et un @InjectRepository(User) est lié au manager global du DataSource. Conséquence : du code qui semble transactionnel ne l'est pas, et un throw ne rollback qu'une partie des écritures.

ts
// ❌ FAUX — usersService.users est hors transaction
async badTransfer(fromId: string, toId: string) {
  return this.ds.transaction(async (em) => {
    await em.update(Account, { id: fromId }, { ... });   // dans la txn
    await this.usersService.touchAudit(fromId);           // HORS txn (manager global)
  });                                                      // throw ici ⇒ l'audit reste committé
}

Trois stratégies, du plus simple au plus puissant :

StratégieMécanismeQuandCoût
Passer l'EntityManagerchaque méthode de service accepte un em?: EntityManager optionnel, défaut = repo globalpetites bases, contrôle explicitepollue les signatures, virale
@Transactional() (typeorm-transactional)AsyncLocalStorage : un cls-hooked store propage le manager courant à tous les reposgros services, propagation implicite type Springdépendance + magie, debug plus dur
Unit of Work expliciteun objet UnitOfWork créé par requête, injecté en REQUEST scope, expose les repos transactionnelsDDD, agrégatsscope REQUEST = coût DI par requête

Pattern AsyncLocalStorage (recommandé pour propagation implicite, fonctionne nativement avec @Injectable sans REQUEST scope) :

ts
// als-transaction.ts
import { AsyncLocalStorage } from 'node:async_hooks';
import { DataSource, EntityManager } from 'typeorm';

export const txStore = new AsyncLocalStorage<EntityManager>();

// Helper que TOUT service appelle au lieu de @InjectRepository
export function repo<T extends object>(ds: DataSource, entity: new () => T) {
  const em = txStore.getStore() ?? ds.manager;   // manager courant OU global
  return em.getRepository(entity);
}

export async function runInTransaction<T>(
  ds: DataSource,
  fn: () => Promise<T>,
  isolation: 'READ COMMITTED' | 'SERIALIZABLE' = 'READ COMMITTED',
): Promise<T> {
  return ds.transaction(isolation, (em) => txStore.run(em, fn));
}
ts
// Service — aucune signature polluée, propagation automatique
@Injectable()
export class AuditService {
  constructor(private readonly ds: DataSource) {}
  touch(userId: string) {
    return repo(this.ds, AuditEntry).insert({ userId, at: new Date() });
  }
}

@Injectable()
export class TransferService {
  constructor(private readonly ds: DataSource, private readonly audit: AuditService) {}
  transfer(fromId: string, toId: string) {
    return runInTransaction(this.ds, async () => {
      await repo(this.ds, Account).decrement({ id: fromId }, 'balance', 100);
      await this.audit.touch(fromId);   // ⇒ MÊME transaction, rollback inclus
    });
  }
}

Comment un staff raisonne : la propagation transactionnelle est un problème de contexte ambiant, pas de paramètre. Soit tu rends le contexte explicite (signatures em), soit tu l'attaches au flow async (ALS). Le piège mortel d'ALS : un setInterval, un EventEmitter non-awaité, ou un worker pool peut perdre le store → l'écriture fuit hors transaction sans erreur. Règle : ne jamais détacher un await du flow à l'intérieur d'un runInTransaction.

🎚️ Niveaux d'isolation — choisir, pas copier-coller

Le code ci-dessus passe 'SERIALIZABLE' et 'READ COMMITTED' comme si c'était cosmétique. Ça ne l'est pas : le niveau d'isolation décide quelles anomalies tu acceptes et combien de retries tu vas devoir gérer. Postgres a un défaut READ COMMITTED, et c'est rarement suffisant pour de la logique financière.

NiveauDirty readNon-repeatable readPhantomWrite skewCoût
READ COMMITTED (défaut PG)possiblepossiblepossiblele moins cher, aucun retry
REPEATABLE READ (= snapshot PG)❌ (en PG)possiblesnapshot figé au 1er statement ; peut lever 40001
SERIALIZABLE (SSI)détecte les cycles de sérialisation ; lève 40001 plus souvent

Le piège du write-skew — deux transactions lisent un invariant (« il reste ≥ 1 médecin de garde »), chacune décide qu'elle peut se retirer, les deux commitent : invariant violé alors qu'aucune n'a touché la même ligne. READ COMMITTED et REPEATABLE READ ne l'empêchent pas. Seuls SERIALIZABLE ou un verrou explicite (SELECT ... FOR UPDATE sur la ligne d'invariant) le couvrent. Le cas bancaire de la section précédente utilise SERIALIZABLE et pessimistic_write — ceinture + bretelles, justifié sur un cœur de paiement.

Conséquence opérationnelle : tu DOIS gérer le retry sur 40001. Sous SERIALIZABLE/REPEATABLE READ, Postgres abort une transaction avec serialization_failure (40001) ou deadlock_detected (40P01). Ce n'est pas un bug, c'est le contrat : retente.

ts
import { DataSource, EntityManager, IsolationLevel } from 'typeorm';

async function withRetry<T>(
  ds: DataSource,
  fn: (em: EntityManager) => Promise<T>,
  { isolation = 'SERIALIZABLE', max = 3 } = {},
): Promise<T> {
  for (let attempt = 1; ; attempt++) {
    try {
      return await ds.transaction(isolation as IsolationLevel, fn);
    } catch (e: any) {
      // 40001 = serialization_failure, 40P01 = deadlock_detected
      const retryable = e?.code === '40001' || e?.code === '40P01';
      if (!retryable || attempt >= max) throw e;
      await new Promise((r) => setTimeout(r, 20 * 2 ** attempt + Math.random() * 20)); // backoff + jitter
    }
  }
}

Comment un staff raisonne : commence en READ COMMITTED (le moins cher). Monte d'un cran uniquement quand tu identifies une anomalie concrète : non-repeatable read sur un calcul multi-statement ⇒ REPEATABLE READ ; write-skew sur un invariant lu-puis-décidé ⇒ SERIALIZABLE (avec retry) ou verrou ciblé. Ne mets jamais SERIALIZABLE "par sécurité" sur tout le trafic : tu transformes des lectures bénignes en retry-storms.

📈 Production : observabilité & garde-fous

Un ORM masque le SQL — donc en prod tu pilotes à l'aveugle si tu n'instrumentes pas. Les leviers, du plus rentable au plus fin :

  • maxQueryExecutionTime logge (canal warn) toute query au-delà du seuil. C'est un log, pas un kill (cf. Pitfall 7). Le vrai garde-fou est côté DB : statement_timeout (par rôle/session) tue la query, et idle_in_transaction_session_timeout tue une transaction laissée ouverte par un client crashé — sans lui, un pod zombie tient des locks indéfiniment.
  • Tracing — wrapper le DataSource dans un span OpenTelemetry par requête (instrumentation pg auto via @opentelemetry/instrumentation-pg). Tu veux corréler requête HTTP → spans SQL pour voir le N+1 dans Jaeger/Tempo, pas dans les logs.
  • Métriques pool — exposer pool.totalCount / idleCount / waitingCount (driver pg) en gauges Prometheus. waitingCount > 0 durablement = pool saturé : soit fuite de connexion, soit transactions trop longues. C'est le signal avancé d'une panne, bien avant les timeouts.
  • EXPLAIN (ANALYZE, BUFFERS) sur les requêtes chaudes en pré-prod. Un Seq Scan sur une grosse table ou un Sort qui devrait être un Index Scan = index manquant. Vérifie aussi les rows estimated vs actual : un écart x100 = stats périmées (ANALYZE).
  • Slow-query budget en CI — un test d'intégration qui compte les requêtes émises (logging + spy) et échoue si une route dépasse N requêtes attrape les régressions N+1 avant la prod.

🤖 Servir de l'IA depuis TypeORM/NestJS

Le cas LegalTech ci-dessus pose pgvector. Voici la version production : indexation asynchrone via BullMQ, recherche typée, et exposition d'un endpoint RAG. Anthropic — modèles phares claude-opus-4-8 (raisonnement), claude-sonnet-4-6 (équilibre), claude-haiku-4-5 (rapide/embedding-adjacent). Le client LLM est injecté via DI (forRootAsync), jamais new Anthropic() dans un champ.

Client LLM injectable (jamais new en dur)

ts
// llm.module.ts
import Anthropic from '@anthropic-ai/sdk';

export const ANTHROPIC = Symbol('ANTHROPIC');

@Module({})
export class LlmModule {
  static forRootAsync(): DynamicModule {
    return {
      module: LlmModule,
      global: true,
      providers: [{
        provide: ANTHROPIC,
        inject: [ConfigService],
        useFactory: (cfg: ConfigService) => new Anthropic({
          apiKey: cfg.getOrThrow('ANTHROPIC_API_KEY'),
          maxRetries: 4,             // backoff exponentiel intégré au SDK
          timeout: 60_000,
        }),
      }],
      exports: [ANTHROPIC],
    };
  }
}

Indexation pgvector via BullMQ (idempotent, cost-aware)

L'embedding est coûteux : on ne ré-embed jamais un chunk inchangé. L'idempotence est clé sur le hash du contenu, pas sur l'id du job.

ts
@Entity('case_law_chunk')
@Index('ix_chunk_content_hash', ['contentHash'], { unique: true })
export class CaseLawChunk {
  @PrimaryGeneratedColumn('uuid') id: string;
  @Column('text') content: string;
  @Column({ length: 64 }) contentHash: string;          // sha256(content)
  // pgvector : pas de type natif TypeORM → déclaré en raw, migration manuelle
  @Column({ type: 'vector' as any, length: 1536, nullable: true })
  embedding: number[] | null;
  @Column({ default: 'pending' }) embedStatus: 'pending' | 'done' | 'failed';
  @VersionColumn() version: number;
}
ts
@Processor('embeddings')
export class EmbeddingProcessor extends WorkerHost {
  constructor(
    @InjectDataSource() private readonly ds: DataSource,
    @Inject(ANTHROPIC) private readonly llm: Anthropic,   // ou client embeddings dédié
  ) { super(); }

  async process(job: Job<{ chunkId: string }>) {
    const chunkRepo = this.ds.getRepository(CaseLawChunk);
    const chunk = await chunkRepo.findOneBy({ id: job.data.chunkId });
    if (!chunk || chunk.embedStatus === 'done') return;   // idempotence : skip si déjà fait

    const vec = await embed(chunk.content);               // appel modèle embeddings

    // update conditionnel sur version ⇒ évite la double-écriture concurrente
    const res = await chunkRepo.update(
      { id: chunk.id, version: chunk.version },
      { embedding: vec, embedStatus: 'done' },
    );
    if (res.affected === 0) throw new Error('stale chunk, retry');  // optimistic lock perdu
  }
}

Côté BullModule.registerQueue, configurer attempts: 5, backoff: { type: 'exponential', delay: 2000 }, et un removeOnComplete borné. Le coût se garde au edge : rate-limit par tenant + budget mensuel décrémenté dans une table ai_usage (mise à jour dans la même transaction que l'écriture du résultat — sinon double comptage en cas de retry).

Recherche RAG + endpoint, streaming SSE de la réponse

ts
@Injectable()
export class RagService {
  constructor(
    @InjectDataSource() private readonly ds: DataSource,
    @Inject(ANTHROPIC) private readonly llm: Anthropic,
  ) {}

  // Retrieval typé — paramétré, jamais d'interpolation de string
  private retrieve(queryVec: number[], topK = 5) {
    return this.ds.query<{ id: string; content: string; score: number }[]>(
      `SELECT id, content, 1 - (embedding <=> $1::vector) AS score
         FROM case_law_chunk
        WHERE embed_status = 'done'
        ORDER BY embedding <=> $1::vector
        LIMIT $2`,
      [`[${queryVec.join(',')}]`, topK],
    );
  }

  // Stream des tokens du modèle, annulable côté serveur
  async *answer(question: string, signal: AbortSignal) {
    const qVec = await embed(question);
    const ctx = await this.retrieve(qVec);
    const stream = this.llm.messages.stream({
      model: 'claude-sonnet-4-6',
      max_tokens: 1024,
      system: 'Réponds uniquement à partir du contexte fourni. Cite les ids.',
      messages: [{
        role: 'user',
        content: `Contexte:\n${ctx.map((c) => `[${c.id}] ${c.content}`).join('\n')}\n\nQuestion: ${question}`,
      }],
    }, { signal });                                        // ⇐ AbortSignal propagé au SDK
    for await (const ev of stream) {
      if (ev.type === 'content_block_delta' && ev.delta.type === 'text_delta') {
        yield ev.delta.text;
      }
    }
  }
}
ts
@Controller('rag')
export class RagController {
  constructor(private readonly rag: RagService) {}

  @Sse('ask')
  ask(@Query('q') q: string, @Req() req: Request): Observable<MessageEvent> {
    const ac = new AbortController();
    req.on('close', () => ac.abort());                     // déconnexion client ⇒ stop modèle + libère la conn DB
    return new Observable((sub) => {
      (async () => {
        try {
          for await (const tok of this.rag.answer(q, ac.signal)) {
            sub.next({ data: tok } as MessageEvent);
          }
          sub.complete();
        } catch (e) { sub.error(e); }
      })();
    });
  }
}

Pourquoi ça compte au niveau data-layer : (1) le retrieval est du SQL paramétré — $1::vector, jamais de concat (sinon injection + plan non caché) ; (2) l'AbortController relie déconnexion client → arrêt LLM → libération de la connexion du pool (un stream pendu = une conn fuitée du poolSize: 20) ; (3) l'index HNSW sur embedding doit être créé en migration manuelle (CREATE INDEX ... USING hnsw (embedding vector_cosine_ops)), TypeORM ne le génère pas ; (4) la comptabilité de coût partage la transaction d'écriture pour être idempotente face aux retries BullMQ.

🔁 Quand utiliser / éviter

Utiliser TypeORM :

  • Projet existant déjà en TypeORM.
  • Besoin d'Active Record + Data Mapper hybride.
  • Multi-DB (Postgres, MySQL, SQLite, MSSQL, Oracle) avec une API unique.

Éviter TypeORM :

  • Nouveau projet : Prisma offre une meilleure UX, meilleur typage, meilleures migrations. Drizzle pour du SQL-first typé.
  • Besoin de requêtes très spécifiques Postgres (CTE complexes, window functions) — TypeORM QueryBuilder rame, préférer raw SQL ou Drizzle/Kysely.
  • Performance critique sur les agrégats — raw query ou Knex/Kysely.

🧰 Exemples avancés

Migrations CLI

ts
// data-source.ts — utilisé par la CLI typeorm
import 'reflect-metadata';
import { DataSource } from 'typeorm';
import { User, Post } from './entities';

export const AppDataSource = new DataSource({
  type: 'postgres',
  url: process.env.DATABASE_URL,
  entities: [User, Post],
  migrations: ['src/migrations/*.ts'],
});
bash
# Génère depuis le diff entities ↔ DB
pnpm typeorm migration:generate src/migrations/AddUserActiveColumn -d src/data-source.ts

# Crée un fichier vide à compléter à la main
pnpm typeorm migration:create src/migrations/SeedRoles

# Run / revert
pnpm typeorm migration:run -d src/data-source.ts
pnpm typeorm migration:revert -d src/data-source.ts

QueryBuilder — recherche full-text + pagination keyset

ts
async search(q: string, lastId?: string, limit = 20) {
  const qb = this.users
    .createQueryBuilder('u')
    .where('to_tsvector(u.name || \' \' || u.email) @@ plainto_tsquery(:q)', { q })
    .orderBy('u.id', 'ASC')
    .limit(limit + 1);

  if (lastId) qb.andWhere('u.id > :lastId', { lastId });

  const rows = await qb.getMany();
  const hasNext = rows.length > limit;
  return { data: rows.slice(0, limit), nextCursor: hasNext ? rows[limit - 1].id : null };
}

Custom Repository extension (0.3 style)

ts
import { DataSource, Repository } from 'typeorm';

@Injectable()
export class UsersRepositoryFactory {
  static create(dataSource: DataSource) {
    return dataSource.getRepository(User).extend({
      findActiveByTenant(this: Repository<User>, tenantId: string) {
        return this.find({ where: { tenantId, deletedAt: IsNull() } });
      },
      async incrementLoginCount(this: Repository<User>, id: string) {
        await this.increment({ id }, 'loginCount', 1);
      },
    });
  }
}

🏋️ Exercices

Postgres réel obligatoire (testcontainers) — ces exercices ne se simulent pas en mock.

1. Le N+1 invisible — détecter puis tuer (implement)

Objectif — Prouver, mesurer et éliminer un N+1 sur une relation lazy. Crée User 1—N Post, seed 100 users × 10 posts. Écris une route qui sérialise users[].posts[].title. Active logging: ['query'], compte les requêtes émises. Réécris en 1 requête (leftJoinAndSelect) puis en 2 requêtes (relations ⇒ TypeORM split en IN (...)), et explique pourquoi leftJoinAndSelect peut être pire (explosion cartésienne users × posts rows hydratées). IndiceleftJoinAndSelect = 1 requête mais take casse (limit sur le produit joint, pas sur les users) ; TypeORM bascule alors en sous-requête distinct. Compte les rows transférées, pas que les requêtes.

2. Keyset pagination résistante (production-grade)

Objectif — Pagination stable sous insertion concurrente, tri sur clé non unique. Pagine Post par (createdAt DESC, id DESC). Implémente le cursor composite encodé en base64 ({createdAt, id}), le WHERE (createdAt, id) < (:c, :i) en row-value comparison, et un index (createdAt DESC, id DESC). Vérifie via EXPLAIN ANALYZE que c'est un Index Scan, pas un Sort. Compare le coût vs OFFSET 100000. Indice — Postgres supporte la comparaison de tuples : WHERE (p.created_at, p.id) < (:ca, :id). En QueryBuilder c'est du raw .andWhere('(p.created_at, p.id) < (:ca, :id)', ...). Sans index composite descendant, le planner refait un Sort.

3. Optimistic vs pessimistic locking — décrément de stock (break it then fix it)

Objectif — Reproduire une race de survente, puis la corriger deux fois. StockLevel { available, @VersionColumn version }. Lance 50 checkouts concurrents (Promise.all) sur le même SKU à available: 10. Version naïve findOne + save ⇒ observe available négatif (lost update). Corrige (a) en optimistic (update WHERE version = :v, retry sur affected === 0) puis (b) en pessimistic (setLock('pessimistic_write')). Mesure le débit des deux sous contention. Indice — Optimistic gagne à faible contention (pas de verrou), s'effondre à forte (retry storm). Pessimistic sérialise dès le départ. Mesure p95 à 5 puis 500 threads concurrents pour voir le croisement.

4. Transaction qui fuit via AsyncLocalStorage (break it then fix it)

Objectif — Reproduire une écriture qui échappe au rollback, puis la rattacher. Avec le helper runInTransaction/repo de la section propagation, écris un transfer qui appelle audit.touch() depuis un setImmediate(() => audit.touch()). Force un throw après. Constate : le transfert rollback, l'audit reste. Explique pourquoi ALS perd le store, puis corrige en awaitant dans le flow. IndicesetImmediate/process.nextTick détaché du await sort du contexte ALS ⇒ txStore.getStore() renvoie undefined ⇒ repo global ⇒ commit indépendant. Tout ce qui doit être transactionnel doit rester dans la chaîne await synchrone du callback.

5. Migration réversible auditée (production-grade)

Objectif — Garantir qu'un down restaure exactement l'état initial. Génère une migration qui renomme une colonne (namefull_name) + ajoute un index. migration:generate produira probablement DROP COLUMN name; ADD COLUMN full_name (= perte de données). Réécris le up en ALTER ... RENAME, écris un down cohérent, et un test : snapshot du schéma (information_schema) avant run, après run, après revert ⇒ assert before === afterRevert. Indicemigration:generate ne voit qu'un diff de schéma : rename = drop+add pour lui. Toujours review à la main. Le test de réversibilité est runMigrations()undoLastMigration() → compare pg_dump --schema-only ou une requête information_schema.columns.

6. RAG idempotent sous retry BullMQ (make it production-grade)

Objectif — Garantir qu'un job d'embedding rejoué ne double ni le coût ni l'écriture. Reprends EmbeddingProcessor. Force un crash après le update mais avant le job complete (BullMQ rejouera). Vérifie : pas de second appel modèle (skip sur embedStatus === 'done'), ai_usage incrémenté une seule fois. Puis casse-le : déplace l'incrément ai_usage hors de la transaction du résultat ⇒ observe le double comptage au retry, et recolle-le. Indice — L'idempotence vit dans le state lu avant l'effet de bord (embedStatus) + un check-and-set conditionné par version. Le coût doit être écrit dans la même transaction que embedStatus = 'done', sinon retry = crédit débité deux fois.

7. Épuisement du pool — provoquer le connection pool timeout (break it then fix it)

Objectif — Comprendre dans la peau que poolSize est une ressource confisquée par les transactions longues. Configure poolSize: 5. Écris une route qui ouvre ds.transaction(async (em) => { await em.find(...); await fakeHttp(2000); })fakeHttp est un setTimeout de 2 s (simulant un appel LLM). Tire 20 requêtes concurrentes (Promise.all) : observe que la 6e et les suivantes bloquent puis lèvent connection pool timeout (connectionTimeoutMillis). Instrumente pool.waitingCount. Corrige : sors le fakeHttp hors de la transaction (lire/fermer la txn, appeler le réseau, rouvrir une txn courte pour écrire), et observe waitingCount retomber à 0. Indice — La transaction tient la connexion pendant toute la durée du await fakeHttp. Le pattern correct est read → commit → call → read-modify-write court, en gérant l'éventuelle staleness entre les deux (optimistic lock sur @VersionColumn). C'est exactement le piège « I/O réseau dans une transaction » : à l'échelle, il transforme un pool sain en file d'attente.

8. QueryRunner qui fuit (break it then fix it)

Objectif — Reproduire une fuite de connexion silencieuse. Écris un service qui crée const qr = ds.createQueryRunner(); await qr.connect(); await qr.startTransaction(); puis lance une query qui throw (ex: violation de contrainte) sans finally. Boucle l'appel 5 fois sur poolSize: 5 : la 6e requête ne trouve plus de connexion. Constate via pool.totalCount/idleCount que les connexions sont parties et jamais rendues. Corrige avec le pattern canonique try { ... await qr.commitTransaction(); } catch { await qr.rollbackTransaction(); throw; } finally { await qr.release(); }. IndicecreateQueryRunner() emprunte une connexion dédiée hors du flux managé ; sans release() dans un finally, chaque erreur en confisque une définitivement. C'est pour ça qu'on préfère ds.transaction(cb) (release automatique) sauf besoin explicite de contrôle fin (savepoints, requêtes hors entité).

🎤 En entretien

Q: Pourquoi un Repository injecté via @InjectRepository ne participe-t-il pas à un dataSource.transaction() ? Parce qu'il est lié au EntityManager global du DataSource, alors que la transaction crée un nouveau EntityManager isolé passé au callback. Il faut soit utiliser em.getRepository() dans le callback, soit propager le manager via AsyncLocalStorage (typeorm-transactional).

Q: save() vs update() vs upsert() — quand et pourquoi ?save() fait un SELECT puis INSERT ou UPDATE selon la présence de l'id (2 requêtes, déclenche les subscribers, hydrate l'entité) — pratique mais coûteux et ambigu sur payload partiel. update() est un UPDATE ... WHERE direct (1 requête, pas de subscribers, pas de hydration). upsert() mappe vers INSERT ... ON CONFLICT — atomique, idéal pour de l'ingestion idempotente.

Q: Optimistic vs pessimistic locking — comment choisis-tu ? Optimistic (@VersionColumn) ne pose aucun verrou et détecte le conflit à l'écriture (affected === 0 ⇒ retry) : excellent à faible contention, retry-storm à forte. Pessimistic (SELECT FOR UPDATE) sérialise dès la lecture : sûr sous contention, mais réduit le débit et risque le deadlock si l'ordre de verrouillage n'est pas constant. Règle : optimistic par défaut, pessimistic pour les hot rows (stock, solde).

Q: Pourquoi synchronize: true est-il banni en prod, et que mets-tu à la place ? Il diffe les entités contre le schéma et applique les changements sans garde-fou — un rename d'entité = DROP COLUMN silencieux = perte de données. En prod : synchronize: false + migrations versionnées et reviewées à la main, jouées explicitement (migrationsRun contrôlé par la CD, jamais au boot d'un pod qui scale).

Q: Comment streames-tu une réponse LLM depuis NestJS sans fuiter de connexion DB ? Endpoint @Sse, génération via llm.messages.stream({...}, { signal }). On lie req.on('close') à un AbortController.abort() : la déconnexion client annule le stream modèle ET libère toute connexion du pool tenue par le retrieval. Sans ça, un client qui ferme l'onglet laisse un stream pendu et une conn hors du poolSize.

Q: Différence entre REPEATABLE READ et SERIALIZABLE en Postgres, et quand monter de l'un à l'autre ?REPEATABLE READ donne un snapshot figé (pas de dirty/non-repeatable read, pas de phantom en PG) mais laisse passer le write-skew : deux transactions lisent un invariant, décident chacune de le violer sur des lignes différentes, et commitent. SERIALIZABLE ajoute le Serializable Snapshot Isolation — détection des cycles de dépendance — et abort une transaction en 40001 quand l'ordre n'est pas sérialisable. On monte à SERIALIZABLE quand on a un invariant lu-puis-décidé qu'un verrou ciblé ne couvre pas proprement, et on accepte alors d'implémenter le retry avec backoff.

Q: À poolSize: 20, ton API tombe en connection pool timeout sous charge sans pic de trafic DB. Première hypothèse ? Une connexion confisquée trop longtemps : très probablement un appel réseau (LLM, HTTP, paiement) ou un sleep dans une transaction(), ou un QueryRunner non release() en finally. On le confirme avec waitingCount > 0 côté pool et des transactions longues en pg_stat_activity (state = 'idle in transaction'). Fix : sortir l'I/O de la transaction, garantir le release, poser idle_in_transaction_session_timeout, et ne dimensionner poolSize que sur la capacité réelle de la DB.

🔗 Liens

Bibliothèque tech perso — Achref