Skip to content

Mongoose

TL;DR — Mongoose est le wrapper ODM le plus utilisé pour MongoDB en Node. @nestjs/mongoose propose MongooseModule.forRoot + forFeature avec des schémas TypeScript via décorateurs. Spécificités senior : refs + populate() (analogue mais pas équivalent aux JOINs SQL), hooks (pre('save'), pre('findOneAndUpdate')) — pièges de version v6/v7/v8, indexes via décorateurs et leur synchronisation, transactions via sessions multi-document (replica set requis), et la confusion entre _id ObjectId vs string.

🧠 Mental model

Mongoose connection ──► Model<T> (cache + hooks)


     Schema<T>  ──── refs ────► Model<U>  (populate)

            └── hooks: pre / post  (save, find*, update*, deleteOne)
            └── indexes — synchronisés via createIndexes() au boot
            └── virtuals — getters non persistants

Analogie — Mongoose c'est un ORM-like pour Mongo : tu décris un Schema, tu obtiens un Model. Les ref ne sont pas des FKs (pas d'intégrité côté DB) mais une convention pour populate() qui fait un second query. Donc populate = N+1 contrôlé : 1 query parent + 1 query enfants groupés ($in).

🛠️ Code minimal

ts
// Setup
import { MongooseModule } from '@nestjs/mongoose';

@Module({
  imports: [
    MongooseModule.forRootAsync({
      useFactory: (cfg: ConfigService) => ({
        uri: cfg.get('MONGO_URI'),
        retryAttempts: 3,
        serverSelectionTimeoutMS: 5000,
      }),
      inject: [ConfigService],
    }),
    MongooseModule.forFeature([
      { name: User.name, schema: UserSchema },
      { name: Post.name, schema: PostSchema },
    ]),
  ],
})
export class AppModule {}
ts
// Schema via décorateurs
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose';
import { HydratedDocument, Types } from 'mongoose';

export type UserDocument = HydratedDocument<User>;

@Schema({ timestamps: true, collection: 'users' })
export class User {
  _id!: Types.ObjectId;

  @Prop({ required: true, unique: true, index: true, lowercase: true, trim: true })
  email!: string;

  @Prop({ required: true })
  name!: string;

  @Prop({ type: [{ type: Types.ObjectId, ref: 'Post' }], default: [] })
  posts!: Types.ObjectId[];

  @Prop({ type: Object, default: {} })
  metadata!: Record<string, any>;
}

export const UserSchema = SchemaFactory.createForClass(User);
UserSchema.index({ email: 1 }, { unique: true });
UserSchema.index({ createdAt: -1 });

// Hook
UserSchema.pre('save', function (next) {
  if (this.isModified('email')) this.email = this.email.toLowerCase();
  next();
});
ts
@Schema({ timestamps: true })
export class Post {
  _id!: Types.ObjectId;
  @Prop({ required: true }) title!: string;
  @Prop({ required: true }) body!: string;
  @Prop({ type: Types.ObjectId, ref: 'User', required: true, index: true })
  author!: Types.ObjectId;
}
export const PostSchema = SchemaFactory.createForClass(Post);
PostSchema.index({ author: 1, createdAt: -1 });
ts
// Service
@Injectable()
export class UsersService {
  constructor(
    @InjectModel(User.name) private readonly userModel: Model<UserDocument>,
    @InjectConnection() private readonly connection: Connection,
  ) {}

  create(dto: { email: string; name: string }) {
    return this.userModel.create(dto);
  }

  // Population
  findOneWithPosts(id: string) {
    return this.userModel.findById(id)
      .populate({ path: 'posts', options: { sort: { createdAt: -1 }, limit: 5 } })
      .lean()                                    // retourne plain object — plus rapide
      .exec();
  }

  // Aggregation (équivalent JOIN+GROUP)
  topAuthors(limit = 10) {
    return this.userModel.aggregate([
      { $lookup: { from: 'posts', localField: '_id', foreignField: 'author', as: 'posts' } },
      { $addFields: { postCount: { $size: '$posts' } } },
      { $sort: { postCount: -1 } },
      { $limit: limit },
      { $project: { posts: 0 } },
    ]).exec();
  }

  // Transaction multi-document (replica set requis)
  async transferPosts(fromId: string, toId: string) {
    const session = await this.connection.startSession();
    try {
      await session.withTransaction(async () => {
        const from = await this.userModel.findById(fromId).session(session);
        if (!from) throw new NotFoundException('user_not_found');
        await this.userModel.updateOne(
          { _id: toId },
          { $push: { posts: { $each: from.posts } } },
          { session },
        );
        await this.userModel.updateOne(
          { _id: fromId },
          { $set: { posts: [] } },
          { session },
        );
      }, { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } });
    } finally {
      await session.endSession();
    }
  }
}

🎯 Patterns courants

  1. lean() partout sauf besoin de hooks — retourne plain JS, 5-10x plus rapide, mais perd les hooks post('find') et les virtuals. Utiliser pour les reads de masse.
  2. Indexes en décorateur ET via Schema.index() — décorateur sur prop pour les single-field unique, Schema.index() pour les composites. Penser à autoIndex: false en prod (sinon Mongoose tente de créer à chaque boot — coûteux sur grosses collections).
  3. Soft delete via discriminators ou flag — soit @Prop({ default: null }) deletedAt: Date | null, soit middleware pre('find', function() { this.where({ deletedAt: null }) }). Attention : hooks ne s'appliquent pas à updateMany / aggregate.
  4. DTO ≠ Schema — toujours valider l'input avec class-validator, ne pas balancer req.body directement dans userModel.create(req.body).
  5. Population sélective.populate({ path: 'posts', select: 'title' }) pour ne pas hydrater tout le doc.
  6. Aggregation pipelines — pour les JOINs/GROUPs, c'est $lookup + $group. Plus puissant que populate mais syntaxe Mongo-only.
  7. Discriminators — héritage de schémas. Utile pour Notification avec sous-types EmailNotification, PushNotification partageant les champs communs.
  8. Virtuals + populate virtual — un virtual posts sur User avec ref: 'Post', localField: '_id', foreignField: 'author' permet .populate('posts') sans stocker un array d'ObjectId.
  9. TTL index@Prop({ index: { expireAfterSeconds: 3600 } }) pour les sessions/tokens. Mongo purge automatiquement.
  10. Bulk opsModel.bulkWrite([{ insertOne }, { updateOne }, ...]) pour des batches mixtes. Beaucoup plus rapide que des for loops.

🔄 Versions — Mongoose

VersionNotes
Mongoose 5Lourd legacy. Callbacks supportés. useNewUrlParser etc.
Mongoose 6Drop callbacks d'options de connexion. strictQuery: false par défaut — depuis 6.0 ⇒ true en 7.
Mongoose 7Breaking : strictQuery: true par défaut (un filter avec champ non au schéma est ignoré). Drop des callbacks d'API (Model.findOne(filter, cb)). Node 14+.
Mongoose 8TypeScript 5+. HydratedDocument<T> recommandé. Drop Mongoose < 6 patterns.
@nestjs/mongoose 9Compat Mongoose 6.
@nestjs/mongoose 10Compat Mongoose 7+.
@nestjs/mongoose 11Compat Mongoose 8. Nest 11.

Mongoose 7 piègestrictQuery: true peut faire qu'une requête findOne({ tenant: tid }) (sur un champ pas dans le schéma) retourne tout. Toujours déclarer les champs filtrés au schéma ou désactiver strictQuery explicitement avec un risque assumé.

⚠️ Pitfalls

  1. N+1 caché via populate — chaque .populate() est un round-trip. Pour 3 niveaux de population, 3 queries supplémentaires. Préférer $lookup en aggregation pour les reads denses.
  2. Sans .lean(), ralentissement énorme sur les listes — hydrater des documents coûte cher (Proxies, hooks). .lean() quand pas besoin des features Mongoose.
  3. Hooks bypassés par updateMany, findOneAndUpdatepre('save') ne s'exécute pas. Doubler avec pre('findOneAndUpdate') si besoin, ou utiliser pre('updateMany').
  4. ObjectId vs stringuser._id est un ObjectId. Le comparer avec une string échoue (user._id === '...' ⇒ false). Toujours user._id.toString() ou equals().
  5. Transactions non supportées sur standalone — il faut un replica set (même mono-noeud avec rs.initiate()). En local : mongod --replSet rs0 --port 27017.
  6. autoIndex: true en prod — recrée tous les indexes au boot, peut prendre des minutes/heures sur grosses collections. Mettre autoIndex: false et faire Model.syncIndexes() à la maintenance.
  7. required: true sans default — un create({}) lance une ValidationError (mongoose-level, pas DB). Vérifier le mapping vers BadRequestException.
  8. Schema strict mais champ ajouté — sans migration, vieux docs ont des champs absents. find({ field: { $exists: true } }) peut être nécessaire après une évolution.
  9. save() partieldoc.email = 'x'; await doc.save() envoie un $set du champ modifié. OK. Mais Model.replaceOne() remplace TOUT le doc — surprise garantie.
  10. strictQuery: true v7+ surprise — comme dit plus haut, sur un filtre avec champ non au schéma, Mongoose silentement ignore le filtre (potentiellement retourne tout !). Activer strictQuery: 'throw' en dev.
  11. updateMany ignore les hooks par défaut — pour valider les updates partiels, activer runValidators: true explicitement : Model.updateMany({...}, {...}, { runValidators: true }).
  12. Index unique créé pendant la prod{ unique: true } crée un index unique. Si la collection a déjà des doublons, la création échoue. Nettoyer avant.
  13. Casting silencieuxModel.find({ age: '42' }) cast '42' en number selon le schéma. Mais sur un champ Mixed, ça passe brut. Surprise sur les filtres.
  14. Document discriminator confondu avec collection — discriminators partagent la même collection ; un find() sur le modèle parent retourne tous les sous-types. Toujours filtrer sur __t (le discriminator key par défaut).

🧪 Testing

ts
// Unitaire — mock Model
const userModelMock = {
  create: jest.fn(),
  findById: jest.fn(() => ({ populate: jest.fn().mockReturnThis(), lean: jest.fn().mockReturnThis(), exec: jest.fn() })),
};

const mod = await Test.createTestingModule({
  providers: [UsersService, { provide: getModelToken(User.name), useValue: userModelMock }],
}).compile();
ts
// Intégration — mongodb-memory-server
import { MongoMemoryReplSet } from 'mongodb-memory-server';

let replSet: MongoMemoryReplSet;
beforeAll(async () => {
  replSet = await MongoMemoryReplSet.create({ replSet: { count: 1 } });
});

beforeEach(async () => {
  process.env.MONGO_URI = replSet.getUri();
  const mod = await Test.createTestingModule({
    imports: [MongooseModule.forRoot(process.env.MONGO_URI),
              MongooseModule.forFeature([{ name: User.name, schema: UserSchema }])],
    providers: [UsersService],
  }).compile();
});

Astuce — pour tester les transactions Mongoose, il FAUT un replica set (MongoMemoryReplSet, pas MongoMemoryServer).

🎬 Cas d'usage concrets

Banque — Audit log haut débit

Qui — Établissement de paiement français qui doit conserver chaque action utilisateur (login, virement, modification IBAN) pendant 10 ans pour la conformité ACPR. Problème — 30 M d'événements par jour, schéma qui varie selon le type d'action (un virement n'a pas les mêmes champs qu'un login), et besoin d'écritures non bloquantes pour ne pas ralentir l'API métier. Comment — Time-series collection (Mongo 5+) ou collection classique + index TTL + écriture asynchrone via insertMany bufferisé.

⚠️ Piège classique senior — une collection capped ne peut PAS porter d'index TTL. capped + expires sont mutuellement exclusifs : un capped évince les vieux docs par taille FIFO, pas par date. Pour une rétention de 10 ans réglementaire, on veut une expiration par date ⇒ index TTL sur collection normale (ou time-series), jamais capped. Le code ci-dessous corrige cette erreur fréquente.

ts
// Rétention 10 ans → index TTL (pas capped). 10y ≈ 315_360_000 s.
@Schema({ collection: 'audit_log' })
export class AuditLog {
  @Prop({ required: true, index: true }) userId: string;
  @Prop({ required: true, index: true }) action: string;
  @Prop({ type: SchemaTypes.Mixed }) payload: Record<string, unknown>;
  // expires sur un Date crée un index TTL ; Mongo purge en tâche de fond (~60 s de granularité)
  @Prop({ default: Date.now, expires: 60 * 60 * 24 * 365 * 10 }) createdAt: Date;
}

@Injectable()
export class AuditService implements OnModuleDestroy {
  private buffer: Partial<AuditLog>[] = [];
  private readonly timer = setInterval(() => void this.flush(), 500);

  constructor(@InjectModel(AuditLog.name) private model: Model<AuditLog>) {}

  log(entry: Partial<AuditLog>) {
    // Backpressure : si le consommateur DB décroche, on ne gonfle pas la heap à l'infini.
    if (this.buffer.length >= 50_000) this.buffer.shift(); // drop-oldest + métrique
    this.buffer.push(entry);
  }

  private async flush() {
    if (!this.buffer.length) return;
    const batch = this.buffer.splice(0, this.buffer.length);
    try {
      // ordered:false → un doc invalide ne bloque pas le batch entier
      await this.model.insertMany(batch, { ordered: false, lean: true });
    } catch (err) {
      // re-buffer le reliquat ? Non : risque de duplication. On logge + métrique d'audit raté.
      Logger.error(`audit flush dropped ${batch.length} entries`, err as Error);
    }
  }

  async onModuleDestroy() {
    clearInterval(this.timer);
    await this.flush(); // flush final au shutdown gracieux — sinon perte des 500 dernières ms
  }
}

Gains — ~200 K écritures/s sur un cluster 3 nœuds, schéma flexible sans migration, expiration automatique passé 10 ans par date (et non par taille). Le onModuleDestroy évite la perte du dernier buffer au déploiement.

Raisonnement staff — un buffer en mémoire process est une at-most-once delivery : un crash perd le buffer non flushé. Pour un audit réglementaire ACPR (perte interdite), ce pattern ne suffit pas seul — on l'adosse à un WAL/Kafka (durabilité) puis Mongo en projection, ou on accepte un fsync par batch. Toujours expliciter la garantie de delivery en entretien : la question piège est « et si le pod meurt entre le log() et le flush() ? ».

E-commerce — Avis produits hétérogènes

Qui — Plateforme française de vente de cosmétiques (200 K références) qui collecte des avis multi-formats : texte, étoiles, photos, vidéos, questions/réponses, badges modérateur. Problème — Chaque famille de produit a des critères différents (parfum a "tenue", crème a "hydratation"), impossible à modéliser proprement en SQL sans EAV. Comment — Schéma Mongoose avec Mixed pour les critères et discriminator par type d'avis.

ts
@Schema({ discriminatorKey: 'kind', timestamps: true })
export class Review {
  @Prop({ required: true, index: true }) productId: string;
  @Prop({ required: true }) authorId: string;
  @Prop({ required: true, min: 1, max: 5 }) rating: number;
  @Prop() text?: string;
  @Prop({ type: SchemaTypes.Mixed }) attributes: Record<string, number | string>;
  @Prop([{ url: String, type: { type: String, enum: ['photo', 'video'] } }])
  media: Array<{ url: string; type: 'photo' | 'video' }>;
}
ReviewSchema.index({ productId: 1, createdAt: -1 });
ReviewSchema.index({ 'attributes.hydratation': 1 }, { sparse: true });

Gains — Ajouter un critère = 0 migration, indexes sparse couvrent les requêtes par famille, agrégation native pour le score moyen.

RH — Attribute storage flexible

Qui — Éditeur SIRH qui doit stocker des compétences (skills) personnalisables par client : un cabinet d'audit ne veut pas les mêmes attributs qu'une usine. Problème — Un schéma SQL aurait 200 colonnes nullables ou une table EAV imbuvable. Les requêtes doivent rester rapides sur les filtres composés. Comment — Document Employee avec sous-document skills typé Map + index multikey.

ts
@Schema()
export class Employee {
  @Prop({ required: true }) tenantId: string;
  @Prop({ required: true }) fullName: string;
  @Prop({ type: Map, of: Number, default: {} })
  skills: Map<string, number>; // 'sql' → 4, 'leadership' → 3
  @Prop([String]) certifications: string[];
}
EmployeeSchema.index({ tenantId: 1, 'skills.sql': 1 });
EmployeeSchema.index({ tenantId: 1, certifications: 1 });

Gains — Chaque tenant définit ses skills sans toucher au schéma, requêtes sur skills.sql >= 3 indexées, code applicatif partagé.

🛠️ Exemple end-to-end

Contexte — Le SIRH ci-dessus veut un endpoint de recherche de talents internes : filtrer par compétences requises avec niveau minimum, certifications, et calculer un score de matching. Les résultats sont paginés par curseur, et chaque consultation est tracée dans l'audit log pour conformité RGPD.

ts
// src/employee/employee.schema.ts
@Schema({ timestamps: true })
export class Employee {
  @Prop({ required: true, index: true }) tenantId: string;
  @Prop({ required: true }) fullName: string;
  @Prop({ required: true, index: true }) email: string;
  @Prop() jobTitle: string;
  @Prop({ default: 'active', enum: ['active', 'on_leave', 'left'] })
  status: string;
  @Prop({ type: Map, of: Number, default: {} })
  skills: Map<string, number>;
  @Prop([String]) certifications: string[];
  @Prop({ type: SchemaTypes.Mixed }) preferences: Record<string, unknown>;
}
export const EmployeeSchema = SchemaFactory.createForClass(Employee);
EmployeeSchema.index({ tenantId: 1, status: 1 });
EmployeeSchema.index({ tenantId: 1, certifications: 1 });

@Schema({ timestamps: { createdAt: true, updatedAt: false } })
export class SearchAudit {
  @Prop({ required: true }) tenantId: string;
  @Prop({ required: true }) actorId: string;
  @Prop({ type: SchemaTypes.Mixed }) criteria: unknown;
  @Prop() resultCount: number;
}
ts
// src/employee/employee.search.ts
export class SearchTalentsDto {
  skills: Record<string, number>;          // { sql: 3, python: 4 }
  certifications?: string[];
  cursor?: string;
  limit?: number;
}

@Injectable()
export class TalentSearchService {
  constructor(
    @InjectModel(Employee.name) private employees: Model<Employee>,
    @InjectModel(SearchAudit.name) private audit: Model<SearchAudit>,
    @InjectConnection() private conn: Connection,
  ) {}

  async search(tenantId: string, actorId: string, dto: SearchTalentsDto) {
    const limit = Math.min(dto.limit ?? 20, 100);
    const skillFilters: Record<string, { $gte: number }> = {};
    for (const [skill, level] of Object.entries(dto.skills ?? {})) {
      skillFilters[`skills.${skill}`] = { $gte: level };
    }

    const match: FilterQuery<Employee> = {
      tenantId,
      status: 'active',
      ...skillFilters,
      ...(dto.certifications?.length
        ? { certifications: { $all: dto.certifications } }
        : {}),
      ...(dto.cursor ? { _id: { $gt: new Types.ObjectId(dto.cursor) } } : {}),
    };

    // Aggregation: compute match score = sum of skill levels above threshold
    const skillKeys = Object.keys(dto.skills ?? {});
    const employees = await this.employees.aggregate([
      { $match: match },
      {
        $addFields: {
          matchScore: {
            $sum: skillKeys.map((s) => ({
              $ifNull: [`$skills.${s}`, 0],
            })),
          },
        },
      },
      { $sort: { matchScore: -1, _id: 1 } },
      { $limit: limit + 1 },
      { $project: {
          fullName: 1, jobTitle: 1, skills: 1,
          certifications: 1, matchScore: 1,
      } },
    ]).exec();

    const hasMore = employees.length > limit;
    const page = hasMore ? employees.slice(0, limit) : employees;
    const nextCursor = hasMore ? page[page.length - 1]._id.toString() : null;

    // Audit log (best-effort, fire-and-forget)
    this.audit.create({
      tenantId,
      actorId,
      criteria: dto,
      resultCount: page.length,
    }).catch((err) => Logger.error('audit log failed', err));

    return { items: page, nextCursor };
  }

  // Transactional skill update with replica set
  async bulkUpdateSkills(tenantId: string, updates: Array<{ id: string; skills: Record<string, number> }>) {
    const session = await this.conn.startSession();
    try {
      await session.withTransaction(async () => {
        for (const u of updates) {
          const setOps: Record<string, number> = {};
          for (const [k, v] of Object.entries(u.skills)) {
            setOps[`skills.${k}`] = v;
          }
          await this.employees.updateOne(
            { _id: u.id, tenantId },
            { $set: setOps },
            { session },
          );
        }
      });
    } finally {
      session.endSession();
    }
  }
}
ts
// src/employee/employee.controller.ts
@Controller('talents')
@UseGuards(JwtAuthGuard)
export class TalentsController {
  constructor(private svc: TalentSearchService) {}

  @Post('search')
  search(@CurrentUser() user: AuthUser, @Body() dto: SearchTalentsDto) {
    return this.svc.search(user.tenantId, user.id, dto);
  }
}

L'agrégation calcule le score à la volée sans index supplémentaire, le curseur sur _id évite la pagination décalée coûteuse, et withTransaction garantit l'atomicité des updates de skills (requiert un replica set, sans quoi MongoDB renvoie une erreur explicite).

🔁 Quand utiliser / éviter

Utiliser Mongoose :

  • Projet Mongo existant.
  • Schéma flexible / semi-structuré.
  • Besoin de validation côté ODM (sans DB).
  • Documents imbriqués (sous-documents) où Mongo brille.

Éviter Mongoose, préférer le driver natif :

  • Pour les ops massives / streaming d'agrégats — Mongoose ajoute de l'overhead.
  • Quand on n'a pas besoin de schémas/hooks/virtuals.

Éviter Mongo (et donc Mongoose) :

  • Forte relationnalité (JOINs partout) — Postgres + Prisma.
  • ACID multi-collection sans tooling (replica set requis pour transactions).

🧰 Exemples avancés

Discriminators (héritage de schémas)

ts
@Schema({ discriminatorKey: 'kind', timestamps: true })
export class Notification {
  @Prop({ required: true }) userId!: string;
  @Prop() readAt?: Date;
}
export const NotificationSchema = SchemaFactory.createForClass(Notification);

@Schema()
export class EmailNotification extends Notification {
  @Prop({ required: true }) subject!: string;
  @Prop({ required: true }) htmlBody!: string;
}
export const EmailNotificationSchema = SchemaFactory.createForClass(EmailNotification);

@Schema()
export class PushNotification extends Notification {
  @Prop({ required: true }) deviceToken!: string;
  @Prop() payload?: Record<string, any>;
}
export const PushNotificationSchema = SchemaFactory.createForClass(PushNotification);

// Module
MongooseModule.forFeatureAsync([
  {
    name: Notification.name,
    useFactory: () => NotificationSchema,
    discriminators: [
      { name: EmailNotification.name, schema: EmailNotificationSchema, value: 'email' },
      { name: PushNotification.name,  schema: PushNotificationSchema,  value: 'push' },
    ],
  },
]);

Aggregation pipeline avec $lookup

ts
async usersWithPostStats(tenantId: string) {
  return this.userModel.aggregate([
    { $match: { tenantId, deletedAt: null } },
    {
      $lookup: {
        from: 'posts',
        let: { uid: '$_id' },
        pipeline: [
          { $match: { $expr: { $eq: ['$author', '$$uid'] } } },
          { $group: { _id: null, count: { $sum: 1 }, lastPostAt: { $max: '$createdAt' } } },
        ],
        as: 'stats',
      },
    },
    { $addFields: { stats: { $arrayElemAt: ['$stats', 0] } } },
    { $project: { name: 1, email: 1, postCount: '$stats.count', lastPostAt: '$stats.lastPostAt' } },
  ]).exec();
}

Hook complet avec error handling

ts
UserSchema.pre('save', async function (next) {
  try {
    if (this.isModified('password')) {
      this.passwordHash = await bcrypt.hash(this.password, 10);
      this.password = undefined;
    }
    next();
  } catch (err) {
    next(err as any);
  }
});

UserSchema.post('save', function (doc, next) {
  // event dispatch après commit (à publier sur Kafka via outbox idéalement)
  next();
});

🤖 Persister & servir des agents IA depuis NestJS (avec Mongo)

Mongo est un excellent store pour les charges IA : les payloads d'agents sont semi-structurés (messages hétérogènes, tool-calls de schémas variables, métadonnées de tokens) et append-only — exactement là où le document model brille et où un schéma SQL souffrirait. Voici comment un staff structure ça.

Modéliser une conversation/run d'agent

ts
import { SchemaTypes, Types } from 'mongoose';

// Un message = discriminated union sur `role`. On stocke les tool-calls bruts.
@Schema({ _id: false })
export class AgentMessage {
  @Prop({ required: true, enum: ['user', 'assistant', 'tool'] }) role: string;
  @Prop({ type: SchemaTypes.Mixed }) content: unknown;  // string | content blocks Anthropic
  @Prop({ type: SchemaTypes.Mixed }) toolUse?: unknown;  // { id, name, input }
  @Prop() tokensIn?: number;
  @Prop() tokensOut?: number;
}
const AgentMessageSchema = SchemaFactory.createForClass(AgentMessage);

@Schema({ timestamps: true, collection: 'agent_runs' })
export class AgentRun {
  @Prop({ required: true, index: true }) tenantId: string;
  // generationId = clé d'idempotence côté client (UUID v4 généré AVANT l'appel).
  @Prop({ required: true, unique: true }) generationId: string;
  @Prop({ required: true, default: 'claude-opus-4-8' }) model: string;
  @Prop({ enum: ['pending', 'streaming', 'done', 'error', 'cancelled'], default: 'pending', index: true })
  status: string;
  @Prop({ type: [AgentMessageSchema], default: [] }) messages: AgentMessage[];
  @Prop({ type: SchemaTypes.Mixed }) usage?: { inputTokens: number; outputTokens: number; costUsd: number };
  @Prop() error?: string;
}
export const AgentRunSchema = SchemaFactory.createForClass(AgentRun);
AgentRunSchema.index({ tenantId: 1, createdAt: -1 });

Pourquoi generationId unique — c'est la clé d'idempotence. Le client génère l'UUID avant d'appeler ; un retry réseau (ou un double-clic) retombe sur le même run au lieu de lancer une 2ᵉ génération facturée. L'index unique transforme la course en E11000 duplicate key qu'on rattrape proprement.

Client LLM injecté via forRootAsync (jamais new Anthropic() en champ)

ts
// llm.module.ts — le client est un provider DI, configurable et mockable en test.
import Anthropic from '@anthropic-ai/sdk';

@Module({})
export class LlmModule {
  static forRootAsync(): DynamicModule {
    return {
      module: LlmModule,
      providers: [{
        provide: Anthropic,
        useFactory: (cfg: ConfigService) =>
          new Anthropic({
            apiKey: cfg.getOrThrow('ANTHROPIC_API_KEY'),
            maxRetries: 3,           // le SDK retry les 429/5xx avec backoff
            timeout: 60_000,
          }),
        inject: [ConfigService],
      }],
      exports: [Anthropic],
      global: true,
    };
  }
}

Mettre new Anthropic() dans un champ de service casse les tests (pas mockable), recharge la conf à chaud impossible, et duplique le pool HTTP. Le DI résout les trois.

La boucle agentique server-side + streaming SSE + persistance Mongo

ts
@Injectable()
export class AgentService {
  constructor(
    private readonly llm: Anthropic,
    @InjectModel(AgentRun.name) private readonly runs: Model<AgentRun>,
  ) {}

  // Renvoie un AsyncGenerator de chunks SSE ; le controller pipe vers Sse().
  async *run(tenantId: string, generationId: string, prompt: string, signal: AbortSignal) {
    // Idempotence : INSERT atomique. Si le run existe déjà → on ne relance pas.
    let run: AgentRun;
    try {
      run = await this.runs.create({
        tenantId, generationId, status: 'streaming',
        messages: [{ role: 'user', content: prompt }],
      });
    } catch (e: any) {
      if (e?.code === 11000) {
        const existing = await this.runs.findOne({ generationId }).lean();
        yield { event: 'replay', data: JSON.stringify(existing) };
        return;
      }
      throw e;
    }

    const acc: string[] = [];
    try {
      const stream = this.llm.messages.stream(
        { model: run.model, max_tokens: 4096, messages: [{ role: 'user', content: prompt }] },
        { signal },  // ⬅️ AbortSignal propagé au SDK → coupe la requête HTTP sortante
      );

      for await (const ev of stream) {
        if (ev.type === 'content_block_delta' && ev.delta.type === 'text_delta') {
          acc.push(ev.delta.text);
          yield { event: 'token', data: ev.delta.text };
        }
      }

      const final = await stream.finalMessage();
      await this.runs.updateOne(
        { generationId },
        {
          $set: { status: 'done', usage: {
            inputTokens: final.usage.input_tokens,
            outputTokens: final.usage.output_tokens,
            costUsd: this.cost(run.model, final.usage),
          } },
          $push: { messages: { role: 'assistant', content: acc.join('') } },
        },
      );
      yield { event: 'done', data: JSON.stringify({ generationId }) };
    } catch (err) {
      const cancelled = signal.aborted;
      // Partial-output handling : on PERSISTE ce qu'on a déjà streamé, même annulé.
      // ⚠️ Mongo rejette un `$push: {}` vide → on construit l'update conditionnellement.
      const update: UpdateQuery<AgentRun> = {
        $set: { status: cancelled ? 'cancelled' : 'error', error: cancelled ? undefined : String(err) },
      };
      if (acc.length) {
        update.$push = { messages: { role: 'assistant', content: acc.join('') } };
      }
      await this.runs.updateOne({ generationId }, update);
      if (!cancelled) throw err;
    }
  }

  private cost(model: string, u: { input_tokens: number; output_tokens: number }): number {
    // tarif $/Mtok indicatif ; à externaliser en conf, jamais hardcodé en dur.
    const rates: Record<string, [number, number]> = {
      'claude-opus-4-8': [15, 75], 'claude-sonnet-4-6': [3, 15], 'claude-haiku-4-5': [0.8, 4],
    };
    const [pin, pout] = rates[model] ?? [0, 0];
    return (u.input_tokens / 1e6) * pin + (u.output_tokens / 1e6) * pout;
  }
}
ts
// controller — SSE natif Nest. La déconnexion client doit annuler le run.
@Controller('agent')
export class AgentController {
  constructor(private readonly agent: AgentService) {}

  @Sse('stream')
  stream(@Query() q: { gen: string; prompt: string }, @Req() req: Request): Observable<MessageEvent> {
    const ctrl = new AbortController();
    req.on('close', () => ctrl.abort()); // ⬅️ client ferme l'onglet → on coupe l'appel LLM (et la facturation)
    return from(this.agent.run('tenant-1', q.gen, q.prompt, ctrl.signal)).pipe(
      map((chunk) => ({ type: chunk.event, data: chunk.data }) as MessageEvent),
    );
  }
}

Points senior à retenir :

  • AbortController au bordreq.on('close')ctrl.abort() propagé jusqu'au SDK. Sans ça, un utilisateur qui ferme l'onglet laisse tourner (et facture) une génération Opus à 75 $/Mtok output. C'est un cost leak classique.
  • Partial-output handling — on persiste les tokens déjà reçus même sur annulation/erreur. L'utilisateur peut reprendre, et l'audit a la trace.
  • Idempotence par generationId — l'index unique + le catch 11000 rendent le endpoint sûr aux retries.

Jobs IA longs via BullMQ (génération > 60 s, batch)

Pour les tâches qui dépassent le timeout HTTP (résumé de 500 docs, embeddings de masse), on sort de la requête et on met en file :

ts
@Processor('ai-jobs')
export class AiJobProcessor extends WorkerHost {
  constructor(
    private readonly llm: Anthropic,
    @InjectModel(AgentRun.name) private readonly runs: Model<AgentRun>,
  ) { super(); }

  async process(job: Job<{ generationId: string; prompt: string }>) {
    // Idempotence : si le run est déjà 'done', on no-op (le job a pu être rejoué par BullMQ).
    const existing = await this.runs.findOne({ generationId: job.data.generationId }).lean();
    if (existing?.status === 'done') return existing.usage;

    // ... appel LLM, persistance partielle par chunk pour survivre à un retry de worker ...
  }
}
  • Idempotency keyée sur generationId — BullMQ peut rejouer un job (worker tué). Le check status === 'done' évite la double facturation.
  • Cost-aware retry — un 429 (rate limit) se retry avec backoff ; un 400 (prompt invalide) ne se retry jamais (gaspillage). Configurer attempts + backoff BullMQ et classer l'erreur avant de relancer.
  • Partial-output — persister par chunk dans Mongo ($push incrémental) pour qu'un retry reprenne, pas pour qu'il recommence.

🏭 Production : perf, observabilité, sécurité, scale

Performance

LevierEffetQuand
.lean()5–10× plus rapide, pas d'hydratationtous les reads sans hooks/virtuals
.select('a b') projectionmoins de bytes wire + RAMlistes, gros docs
Index couvrant (covered query)sert depuis l'index sans lire le docrequêtes hot chemin
cursor() / streamingRAM constanteexports, agrégats massifs
bulkWrite1 round-trip pour N opsimports, sync batch
readPreference: secondarydélester le primaryreads analytiques tolérants au lag

Détecter le N+1 — activer mongoose.set('debug', true) en dev affiche chaque query. En prod, instrumenter : un endpoint qui émet 50 queries pour une liste de 50 items = populate en boucle, à remplacer par $lookup ou populate batché.

Observabilité

ts
// Tracer chaque commande Mongo (OpenTelemetry-style) via les events du driver.
mongoose.connection.on('commandStarted', (e) => span.start(e.commandName, e.command));
mongoose.set('debug', (coll, method, query) => metrics.timing('mongo.query', { coll, method }));
  • Slow query log — activer le profiler Mongo (db.setProfilingLevel(1, { slowms: 100 })) et alerter sur les COLLSCAN (scans sans index) dans system.profile.
  • Connection pool — surveiller poolSize / checkedOut. Un pool saturé = latence p99 qui explose. Régler maxPoolSize selon la concurrence (défaut 100, souvent trop pour un microservice).

Sécurité

  • NoSQL injectionfind({ email: req.query.email })email vaut { "$ne": null } retourne TOUT. Toujours valider/caster avec class-validator (@IsString()) avant de passer à un filter. strictQuery ne protège pas de ça.
  • Mass assignmentuserModel.create(req.body) peut écrire role: 'admin'. Passer par un DTO whitelisté, jamais le body brut.
  • $where / mapReduce — exécutent du JS server-side : à bannir (RCE potentielle).
  • Champs sensiblesselect: false sur passwordHash au schéma pour qu'ils ne sortent jamais par défaut.

Scale

  • Sharding — choisir une shard key à haute cardinalité et bien distribuée ({ tenantId: 1, _id: 1 } souvent). Un _id ObjectId seul crée un hotspot d'écriture (monotone croissant) → préférer hashed.
  • Pagination par curseur (déjà montrée : _id: { $gt: cursor }) — O(1) vs skip(n) qui est O(n) et dérive sous écritures concurrentes.
  • Schema versioning — ajouter @Prop({ default: 1 }) schemaVersion permet une migration lazy (migrer au read) sans downtime ni backfill bloquant.

🏋️ Exercices

  1. Populate → $lookup (perf)Objectif — remplacer une liste paginée qui fait User.find().populate('posts') (N+1) par une seule agrégation $lookup, et mesurer le nombre de queries avant/après. Indicemongoose.set('debug', true) compte les queries ; le $lookup avec sous-pipeline $group ramène le postCount sans hydrater les posts.

  2. Idempotence d'écriture sous raceObjectif — un endpoint POST /orders doit être idempotent sur un header Idempotency-Key. Deux requêtes concurrentes avec la même clé ne doivent créer qu'une commande. Indice — index unique sur idempotencyKey, create() dans un try/catch sur err.code === 11000 qui renvoie la commande existante. Tester avec Promise.all([req, req]).

  3. Transaction multi-document + rollbackObjectiftransferCredits(from, to, amount) : débiter A, créditer B, atomique. Si B n'existe pas, A ne doit pas être débité. IndiceMongoMemoryReplSet en test, session.withTransaction, vérifier qu'une exception au milieu laisse les soldes inchangés. Penser au writeConcern: { w: 'majority' }.

  4. Le break-then-fix strictQueryObjectif — écrire un test qui prouve qu'un filtre findOne({ tenant: x }) sur un champ absent du schéma retourne le mauvais résultat en Mongoose 7+ (filtre silencieusement ignoré), puis le corriger. Indice — déclarer le champ au schéma OU mongoose.set('strictQuery', 'throw') pour transformer le bug silencieux en exception. Discuter pourquoi 'throw' en dev / déclaration au schéma en prod.

  5. Streaming LLM annulable persisté (production-grade)Objectif — implémenter le endpoint SSE de la section IA : stream des tokens Anthropic, persiste le run dans agent_runs, et sur déconnexion client annule l'appel LLM et marque le run cancelled avec l'output partiel. Indicereq.on('close')AbortController.abort()signal passé à messages.stream(...). Tester l'annulation en fermant le client à mi-stream et en vérifiant status: 'cancelled' + messages partiels en base.

  6. Break it : capped vs TTLObjectif — prouver qu'une collection capped: { size } ne purge pas par date (un vieux doc reste tant que la taille n'est pas atteinte), puis migrer vers un index TTL qui purge bien à l'échéance. Indice — insérer un doc daté de -2 ans dans une capped peu remplie : il survit. Recréer la collection sans capped + expires sur le champ date ; observer la purge (granularité ~60 s du daemon TTL).

🎤 En entretien

  • « Quelle différence entre populate() et $lookup ? »populate = N+1 contrôlé côté ODM (1 query parent + 1 query groupée $in par path), pratique mais round-trips multiples et pas de filtrage relationnel riche ; $lookup est un JOIN exécuté server-side dans l'aggregation, plus rapide pour les reads denses et seul moyen de filtrer/agréger sur les enfants. Règle : populate pour le CRUD, $lookup pour les vues analytiques.
  • « Pourquoi les transactions exigent un replica set ? » — les transactions multi-document de Mongo s'appuient sur l'oplog et le mécanisme de réplication pour la durabilité et l'isolation snapshot ; un mongod standalone n'a pas d'oplog. En local, rs.initiate() sur un nœud unique suffit pour les activer.
  • « _id === string renvoie false, pourquoi ? »_id est un ObjectId (objet, pas primitive) ; === compare la référence. Toujours id.equals(other) ou id.toString(). Piège fréquent dans les guards d'autorisation (doc.owner === userId) → faille de permission silencieuse.
  • « Comment garantir qu'un endpoint LLM ne facture pas deux fois sur un retry ? » — clé d'idempotence (generationId) générée client, index unique en base, create() qui rattrape E11000 et renvoie le run existant ; côté worker BullMQ, check status === 'done' avant de relancer. Et AbortController sur déconnexion pour ne pas payer un stream que personne ne lit.

🔗 Liens

Bibliothèque tech perso — Achref