Mongoose
TL;DR — Mongoose est le wrapper ODM le plus utilisé pour MongoDB en Node.
@nestjs/mongooseproposeMongooseModule.forRoot+forFeatureavec des schémas TypeScript via décorateurs. Spécificités senior :refs+populate()(analogue mais pas équivalent aux JOINs SQL), hooks (pre('save'),pre('findOneAndUpdate')) — pièges de version v6/v7/v8, indexes via décorateurs et leur synchronisation, transactions via sessions multi-document (replica set requis), et la confusion entre_idObjectId vs string.
🧠 Mental model
Mongoose connection ──► Model<T> (cache + hooks)
│
▼
Schema<T> ──── refs ────► Model<U> (populate)
│
└── hooks: pre / post (save, find*, update*, deleteOne)
└── indexes — synchronisés via createIndexes() au boot
└── virtuals — getters non persistantsAnalogie — Mongoose c'est un ORM-like pour Mongo : tu décris un Schema, tu obtiens un Model. Les ref ne sont pas des FKs (pas d'intégrité côté DB) mais une convention pour populate() qui fait un second query. Donc populate = N+1 contrôlé : 1 query parent + 1 query enfants groupés ($in).
🛠️ Code minimal
// Setup
import { MongooseModule } from '@nestjs/mongoose';
@Module({
imports: [
MongooseModule.forRootAsync({
useFactory: (cfg: ConfigService) => ({
uri: cfg.get('MONGO_URI'),
retryAttempts: 3,
serverSelectionTimeoutMS: 5000,
}),
inject: [ConfigService],
}),
MongooseModule.forFeature([
{ name: User.name, schema: UserSchema },
{ name: Post.name, schema: PostSchema },
]),
],
})
export class AppModule {}// Schema via décorateurs
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose';
import { HydratedDocument, Types } from 'mongoose';
export type UserDocument = HydratedDocument<User>;
@Schema({ timestamps: true, collection: 'users' })
export class User {
_id!: Types.ObjectId;
@Prop({ required: true, unique: true, index: true, lowercase: true, trim: true })
email!: string;
@Prop({ required: true })
name!: string;
@Prop({ type: [{ type: Types.ObjectId, ref: 'Post' }], default: [] })
posts!: Types.ObjectId[];
@Prop({ type: Object, default: {} })
metadata!: Record<string, any>;
}
export const UserSchema = SchemaFactory.createForClass(User);
UserSchema.index({ email: 1 }, { unique: true });
UserSchema.index({ createdAt: -1 });
// Hook
UserSchema.pre('save', function (next) {
if (this.isModified('email')) this.email = this.email.toLowerCase();
next();
});@Schema({ timestamps: true })
export class Post {
_id!: Types.ObjectId;
@Prop({ required: true }) title!: string;
@Prop({ required: true }) body!: string;
@Prop({ type: Types.ObjectId, ref: 'User', required: true, index: true })
author!: Types.ObjectId;
}
export const PostSchema = SchemaFactory.createForClass(Post);
PostSchema.index({ author: 1, createdAt: -1 });// Service
@Injectable()
export class UsersService {
constructor(
@InjectModel(User.name) private readonly userModel: Model<UserDocument>,
@InjectConnection() private readonly connection: Connection,
) {}
create(dto: { email: string; name: string }) {
return this.userModel.create(dto);
}
// Population
findOneWithPosts(id: string) {
return this.userModel.findById(id)
.populate({ path: 'posts', options: { sort: { createdAt: -1 }, limit: 5 } })
.lean() // retourne plain object — plus rapide
.exec();
}
// Aggregation (équivalent JOIN+GROUP)
topAuthors(limit = 10) {
return this.userModel.aggregate([
{ $lookup: { from: 'posts', localField: '_id', foreignField: 'author', as: 'posts' } },
{ $addFields: { postCount: { $size: '$posts' } } },
{ $sort: { postCount: -1 } },
{ $limit: limit },
{ $project: { posts: 0 } },
]).exec();
}
// Transaction multi-document (replica set requis)
async transferPosts(fromId: string, toId: string) {
const session = await this.connection.startSession();
try {
await session.withTransaction(async () => {
const from = await this.userModel.findById(fromId).session(session);
if (!from) throw new NotFoundException('user_not_found');
await this.userModel.updateOne(
{ _id: toId },
{ $push: { posts: { $each: from.posts } } },
{ session },
);
await this.userModel.updateOne(
{ _id: fromId },
{ $set: { posts: [] } },
{ session },
);
}, { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } });
} finally {
await session.endSession();
}
}
}🎯 Patterns courants
lean()partout sauf besoin de hooks — retourne plain JS, 5-10x plus rapide, mais perd les hookspost('find')et les virtuals. Utiliser pour les reads de masse.- Indexes en décorateur ET via
Schema.index()— décorateur sur prop pour les single-field unique,Schema.index()pour les composites. Penser àautoIndex: falseen prod (sinon Mongoose tente de créer à chaque boot — coûteux sur grosses collections). - Soft delete via discriminators ou flag — soit
@Prop({ default: null }) deletedAt: Date | null, soit middlewarepre('find', function() { this.where({ deletedAt: null }) }). Attention : hooks ne s'appliquent pas àupdateMany/aggregate. - DTO ≠ Schema — toujours valider l'input avec class-validator, ne pas balancer
req.bodydirectement dansuserModel.create(req.body). - Population sélective —
.populate({ path: 'posts', select: 'title' })pour ne pas hydrater tout le doc. - Aggregation pipelines — pour les JOINs/GROUPs, c'est
$lookup+$group. Plus puissant quepopulatemais syntaxe Mongo-only. - Discriminators — héritage de schémas. Utile pour
Notificationavec sous-typesEmailNotification,PushNotificationpartageant les champs communs. - Virtuals + populate virtual — un virtual
postssurUseravecref: 'Post', localField: '_id', foreignField: 'author'permet.populate('posts')sans stocker un array d'ObjectId. - TTL index —
@Prop({ index: { expireAfterSeconds: 3600 } })pour les sessions/tokens. Mongo purge automatiquement. - Bulk ops —
Model.bulkWrite([{ insertOne }, { updateOne }, ...])pour des batches mixtes. Beaucoup plus rapide que desforloops.
🔄 Versions — Mongoose
| Version | Notes |
|---|---|
| Mongoose 5 | Lourd legacy. Callbacks supportés. useNewUrlParser etc. |
| Mongoose 6 | Drop callbacks d'options de connexion. strictQuery: false par défaut — depuis 6.0 ⇒ true en 7. |
| Mongoose 7 | Breaking : strictQuery: true par défaut (un filter avec champ non au schéma est ignoré). Drop des callbacks d'API (Model.findOne(filter, cb)). Node 14+. |
| Mongoose 8 | TypeScript 5+. HydratedDocument<T> recommandé. Drop Mongoose < 6 patterns. |
@nestjs/mongoose 9 | Compat Mongoose 6. |
@nestjs/mongoose 10 | Compat Mongoose 7+. |
@nestjs/mongoose 11 | Compat Mongoose 8. Nest 11. |
Mongoose 7 piège — strictQuery: true peut faire qu'une requête findOne({ tenant: tid }) (sur un champ pas dans le schéma) retourne tout. Toujours déclarer les champs filtrés au schéma ou désactiver strictQuery explicitement avec un risque assumé.
⚠️ Pitfalls
- N+1 caché via
populate— chaque.populate()est un round-trip. Pour 3 niveaux de population, 3 queries supplémentaires. Préférer$lookupen aggregation pour les reads denses. - Sans
.lean(), ralentissement énorme sur les listes — hydrater des documents coûte cher (Proxies, hooks)..lean()quand pas besoin des features Mongoose. - Hooks bypassés par
updateMany,findOneAndUpdate—pre('save')ne s'exécute pas. Doubler avecpre('findOneAndUpdate')si besoin, ou utiliserpre('updateMany'). ObjectIdvs string —user._idest unObjectId. Le comparer avec une string échoue (user._id === '...'⇒ false). Toujoursuser._id.toString()ouequals().- Transactions non supportées sur standalone — il faut un replica set (même mono-noeud avec
rs.initiate()). En local :mongod --replSet rs0 --port 27017. autoIndex: trueen prod — recrée tous les indexes au boot, peut prendre des minutes/heures sur grosses collections. MettreautoIndex: falseet faireModel.syncIndexes()à la maintenance.required: truesans default — uncreate({})lance uneValidationError(mongoose-level, pas DB). Vérifier le mapping versBadRequestException.- Schema strict mais champ ajouté — sans migration, vieux docs ont des champs absents.
find({ field: { $exists: true } })peut être nécessaire après une évolution. save()partiel —doc.email = 'x'; await doc.save()envoie un$setdu champ modifié. OK. MaisModel.replaceOne()remplace TOUT le doc — surprise garantie.strictQuery: truev7+ surprise — comme dit plus haut, sur un filtre avec champ non au schéma, Mongoose silentement ignore le filtre (potentiellement retourne tout !). ActiverstrictQuery: 'throw'en dev.updateManyignore les hooks par défaut — pour valider les updates partiels, activerrunValidators: trueexplicitement :Model.updateMany({...}, {...}, { runValidators: true }).- Index
uniquecréé pendant la prod —{ unique: true }crée un index unique. Si la collection a déjà des doublons, la création échoue. Nettoyer avant. - Casting silencieux —
Model.find({ age: '42' })cast'42'en number selon le schéma. Mais sur un champ Mixed, ça passe brut. Surprise sur les filtres. - Document discriminator confondu avec collection — discriminators partagent la même collection ; un
find()sur le modèle parent retourne tous les sous-types. Toujours filtrer sur__t(le discriminator key par défaut).
🧪 Testing
// Unitaire — mock Model
const userModelMock = {
create: jest.fn(),
findById: jest.fn(() => ({ populate: jest.fn().mockReturnThis(), lean: jest.fn().mockReturnThis(), exec: jest.fn() })),
};
const mod = await Test.createTestingModule({
providers: [UsersService, { provide: getModelToken(User.name), useValue: userModelMock }],
}).compile();// Intégration — mongodb-memory-server
import { MongoMemoryReplSet } from 'mongodb-memory-server';
let replSet: MongoMemoryReplSet;
beforeAll(async () => {
replSet = await MongoMemoryReplSet.create({ replSet: { count: 1 } });
});
beforeEach(async () => {
process.env.MONGO_URI = replSet.getUri();
const mod = await Test.createTestingModule({
imports: [MongooseModule.forRoot(process.env.MONGO_URI),
MongooseModule.forFeature([{ name: User.name, schema: UserSchema }])],
providers: [UsersService],
}).compile();
});Astuce — pour tester les transactions Mongoose, il FAUT un replica set (MongoMemoryReplSet, pas MongoMemoryServer).
🎬 Cas d'usage concrets
Banque — Audit log haut débit
Qui — Établissement de paiement français qui doit conserver chaque action utilisateur (login, virement, modification IBAN) pendant 10 ans pour la conformité ACPR. Problème — 30 M d'événements par jour, schéma qui varie selon le type d'action (un virement n'a pas les mêmes champs qu'un login), et besoin d'écritures non bloquantes pour ne pas ralentir l'API métier. Comment — Time-series collection (Mongo 5+) ou collection classique + index TTL + écriture asynchrone via insertMany bufferisé.
⚠️ Piège classique senior — une collection capped ne peut PAS porter d'index TTL.
capped+expiressont mutuellement exclusifs : un capped évince les vieux docs par taille FIFO, pas par date. Pour une rétention de 10 ans réglementaire, on veut une expiration par date ⇒ index TTL sur collection normale (ou time-series), jamais capped. Le code ci-dessous corrige cette erreur fréquente.
// Rétention 10 ans → index TTL (pas capped). 10y ≈ 315_360_000 s.
@Schema({ collection: 'audit_log' })
export class AuditLog {
@Prop({ required: true, index: true }) userId: string;
@Prop({ required: true, index: true }) action: string;
@Prop({ type: SchemaTypes.Mixed }) payload: Record<string, unknown>;
// expires sur un Date crée un index TTL ; Mongo purge en tâche de fond (~60 s de granularité)
@Prop({ default: Date.now, expires: 60 * 60 * 24 * 365 * 10 }) createdAt: Date;
}
@Injectable()
export class AuditService implements OnModuleDestroy {
private buffer: Partial<AuditLog>[] = [];
private readonly timer = setInterval(() => void this.flush(), 500);
constructor(@InjectModel(AuditLog.name) private model: Model<AuditLog>) {}
log(entry: Partial<AuditLog>) {
// Backpressure : si le consommateur DB décroche, on ne gonfle pas la heap à l'infini.
if (this.buffer.length >= 50_000) this.buffer.shift(); // drop-oldest + métrique
this.buffer.push(entry);
}
private async flush() {
if (!this.buffer.length) return;
const batch = this.buffer.splice(0, this.buffer.length);
try {
// ordered:false → un doc invalide ne bloque pas le batch entier
await this.model.insertMany(batch, { ordered: false, lean: true });
} catch (err) {
// re-buffer le reliquat ? Non : risque de duplication. On logge + métrique d'audit raté.
Logger.error(`audit flush dropped ${batch.length} entries`, err as Error);
}
}
async onModuleDestroy() {
clearInterval(this.timer);
await this.flush(); // flush final au shutdown gracieux — sinon perte des 500 dernières ms
}
}Gains — ~200 K écritures/s sur un cluster 3 nœuds, schéma flexible sans migration, expiration automatique passé 10 ans par date (et non par taille). Le onModuleDestroy évite la perte du dernier buffer au déploiement.
Raisonnement staff — un buffer en mémoire process est une at-most-once delivery : un crash perd le buffer non flushé. Pour un audit réglementaire ACPR (perte interdite), ce pattern ne suffit pas seul — on l'adosse à un WAL/Kafka (durabilité) puis Mongo en projection, ou on accepte un fsync par batch. Toujours expliciter la garantie de delivery en entretien : la question piège est « et si le pod meurt entre le
log()et leflush()? ».
E-commerce — Avis produits hétérogènes
Qui — Plateforme française de vente de cosmétiques (200 K références) qui collecte des avis multi-formats : texte, étoiles, photos, vidéos, questions/réponses, badges modérateur. Problème — Chaque famille de produit a des critères différents (parfum a "tenue", crème a "hydratation"), impossible à modéliser proprement en SQL sans EAV. Comment — Schéma Mongoose avec Mixed pour les critères et discriminator par type d'avis.
@Schema({ discriminatorKey: 'kind', timestamps: true })
export class Review {
@Prop({ required: true, index: true }) productId: string;
@Prop({ required: true }) authorId: string;
@Prop({ required: true, min: 1, max: 5 }) rating: number;
@Prop() text?: string;
@Prop({ type: SchemaTypes.Mixed }) attributes: Record<string, number | string>;
@Prop([{ url: String, type: { type: String, enum: ['photo', 'video'] } }])
media: Array<{ url: string; type: 'photo' | 'video' }>;
}
ReviewSchema.index({ productId: 1, createdAt: -1 });
ReviewSchema.index({ 'attributes.hydratation': 1 }, { sparse: true });Gains — Ajouter un critère = 0 migration, indexes sparse couvrent les requêtes par famille, agrégation native pour le score moyen.
RH — Attribute storage flexible
Qui — Éditeur SIRH qui doit stocker des compétences (skills) personnalisables par client : un cabinet d'audit ne veut pas les mêmes attributs qu'une usine. Problème — Un schéma SQL aurait 200 colonnes nullables ou une table EAV imbuvable. Les requêtes doivent rester rapides sur les filtres composés. Comment — Document Employee avec sous-document skills typé Map + index multikey.
@Schema()
export class Employee {
@Prop({ required: true }) tenantId: string;
@Prop({ required: true }) fullName: string;
@Prop({ type: Map, of: Number, default: {} })
skills: Map<string, number>; // 'sql' → 4, 'leadership' → 3
@Prop([String]) certifications: string[];
}
EmployeeSchema.index({ tenantId: 1, 'skills.sql': 1 });
EmployeeSchema.index({ tenantId: 1, certifications: 1 });Gains — Chaque tenant définit ses skills sans toucher au schéma, requêtes sur skills.sql >= 3 indexées, code applicatif partagé.
🛠️ Exemple end-to-end
Contexte — Le SIRH ci-dessus veut un endpoint de recherche de talents internes : filtrer par compétences requises avec niveau minimum, certifications, et calculer un score de matching. Les résultats sont paginés par curseur, et chaque consultation est tracée dans l'audit log pour conformité RGPD.
// src/employee/employee.schema.ts
@Schema({ timestamps: true })
export class Employee {
@Prop({ required: true, index: true }) tenantId: string;
@Prop({ required: true }) fullName: string;
@Prop({ required: true, index: true }) email: string;
@Prop() jobTitle: string;
@Prop({ default: 'active', enum: ['active', 'on_leave', 'left'] })
status: string;
@Prop({ type: Map, of: Number, default: {} })
skills: Map<string, number>;
@Prop([String]) certifications: string[];
@Prop({ type: SchemaTypes.Mixed }) preferences: Record<string, unknown>;
}
export const EmployeeSchema = SchemaFactory.createForClass(Employee);
EmployeeSchema.index({ tenantId: 1, status: 1 });
EmployeeSchema.index({ tenantId: 1, certifications: 1 });
@Schema({ timestamps: { createdAt: true, updatedAt: false } })
export class SearchAudit {
@Prop({ required: true }) tenantId: string;
@Prop({ required: true }) actorId: string;
@Prop({ type: SchemaTypes.Mixed }) criteria: unknown;
@Prop() resultCount: number;
}// src/employee/employee.search.ts
export class SearchTalentsDto {
skills: Record<string, number>; // { sql: 3, python: 4 }
certifications?: string[];
cursor?: string;
limit?: number;
}
@Injectable()
export class TalentSearchService {
constructor(
@InjectModel(Employee.name) private employees: Model<Employee>,
@InjectModel(SearchAudit.name) private audit: Model<SearchAudit>,
@InjectConnection() private conn: Connection,
) {}
async search(tenantId: string, actorId: string, dto: SearchTalentsDto) {
const limit = Math.min(dto.limit ?? 20, 100);
const skillFilters: Record<string, { $gte: number }> = {};
for (const [skill, level] of Object.entries(dto.skills ?? {})) {
skillFilters[`skills.${skill}`] = { $gte: level };
}
const match: FilterQuery<Employee> = {
tenantId,
status: 'active',
...skillFilters,
...(dto.certifications?.length
? { certifications: { $all: dto.certifications } }
: {}),
...(dto.cursor ? { _id: { $gt: new Types.ObjectId(dto.cursor) } } : {}),
};
// Aggregation: compute match score = sum of skill levels above threshold
const skillKeys = Object.keys(dto.skills ?? {});
const employees = await this.employees.aggregate([
{ $match: match },
{
$addFields: {
matchScore: {
$sum: skillKeys.map((s) => ({
$ifNull: [`$skills.${s}`, 0],
})),
},
},
},
{ $sort: { matchScore: -1, _id: 1 } },
{ $limit: limit + 1 },
{ $project: {
fullName: 1, jobTitle: 1, skills: 1,
certifications: 1, matchScore: 1,
} },
]).exec();
const hasMore = employees.length > limit;
const page = hasMore ? employees.slice(0, limit) : employees;
const nextCursor = hasMore ? page[page.length - 1]._id.toString() : null;
// Audit log (best-effort, fire-and-forget)
this.audit.create({
tenantId,
actorId,
criteria: dto,
resultCount: page.length,
}).catch((err) => Logger.error('audit log failed', err));
return { items: page, nextCursor };
}
// Transactional skill update with replica set
async bulkUpdateSkills(tenantId: string, updates: Array<{ id: string; skills: Record<string, number> }>) {
const session = await this.conn.startSession();
try {
await session.withTransaction(async () => {
for (const u of updates) {
const setOps: Record<string, number> = {};
for (const [k, v] of Object.entries(u.skills)) {
setOps[`skills.${k}`] = v;
}
await this.employees.updateOne(
{ _id: u.id, tenantId },
{ $set: setOps },
{ session },
);
}
});
} finally {
session.endSession();
}
}
}// src/employee/employee.controller.ts
@Controller('talents')
@UseGuards(JwtAuthGuard)
export class TalentsController {
constructor(private svc: TalentSearchService) {}
@Post('search')
search(@CurrentUser() user: AuthUser, @Body() dto: SearchTalentsDto) {
return this.svc.search(user.tenantId, user.id, dto);
}
}L'agrégation calcule le score à la volée sans index supplémentaire, le curseur sur _id évite la pagination décalée coûteuse, et withTransaction garantit l'atomicité des updates de skills (requiert un replica set, sans quoi MongoDB renvoie une erreur explicite).
🔁 Quand utiliser / éviter
Utiliser Mongoose :
- Projet Mongo existant.
- Schéma flexible / semi-structuré.
- Besoin de validation côté ODM (sans DB).
- Documents imbriqués (sous-documents) où Mongo brille.
Éviter Mongoose, préférer le driver natif :
- Pour les ops massives / streaming d'agrégats — Mongoose ajoute de l'overhead.
- Quand on n'a pas besoin de schémas/hooks/virtuals.
Éviter Mongo (et donc Mongoose) :
- Forte relationnalité (JOINs partout) — Postgres + Prisma.
- ACID multi-collection sans tooling (replica set requis pour transactions).
🧰 Exemples avancés
Discriminators (héritage de schémas)
@Schema({ discriminatorKey: 'kind', timestamps: true })
export class Notification {
@Prop({ required: true }) userId!: string;
@Prop() readAt?: Date;
}
export const NotificationSchema = SchemaFactory.createForClass(Notification);
@Schema()
export class EmailNotification extends Notification {
@Prop({ required: true }) subject!: string;
@Prop({ required: true }) htmlBody!: string;
}
export const EmailNotificationSchema = SchemaFactory.createForClass(EmailNotification);
@Schema()
export class PushNotification extends Notification {
@Prop({ required: true }) deviceToken!: string;
@Prop() payload?: Record<string, any>;
}
export const PushNotificationSchema = SchemaFactory.createForClass(PushNotification);
// Module
MongooseModule.forFeatureAsync([
{
name: Notification.name,
useFactory: () => NotificationSchema,
discriminators: [
{ name: EmailNotification.name, schema: EmailNotificationSchema, value: 'email' },
{ name: PushNotification.name, schema: PushNotificationSchema, value: 'push' },
],
},
]);Aggregation pipeline avec $lookup
async usersWithPostStats(tenantId: string) {
return this.userModel.aggregate([
{ $match: { tenantId, deletedAt: null } },
{
$lookup: {
from: 'posts',
let: { uid: '$_id' },
pipeline: [
{ $match: { $expr: { $eq: ['$author', '$$uid'] } } },
{ $group: { _id: null, count: { $sum: 1 }, lastPostAt: { $max: '$createdAt' } } },
],
as: 'stats',
},
},
{ $addFields: { stats: { $arrayElemAt: ['$stats', 0] } } },
{ $project: { name: 1, email: 1, postCount: '$stats.count', lastPostAt: '$stats.lastPostAt' } },
]).exec();
}Hook complet avec error handling
UserSchema.pre('save', async function (next) {
try {
if (this.isModified('password')) {
this.passwordHash = await bcrypt.hash(this.password, 10);
this.password = undefined;
}
next();
} catch (err) {
next(err as any);
}
});
UserSchema.post('save', function (doc, next) {
// event dispatch après commit (à publier sur Kafka via outbox idéalement)
next();
});🤖 Persister & servir des agents IA depuis NestJS (avec Mongo)
Mongo est un excellent store pour les charges IA : les payloads d'agents sont semi-structurés (messages hétérogènes, tool-calls de schémas variables, métadonnées de tokens) et append-only — exactement là où le document model brille et où un schéma SQL souffrirait. Voici comment un staff structure ça.
Modéliser une conversation/run d'agent
import { SchemaTypes, Types } from 'mongoose';
// Un message = discriminated union sur `role`. On stocke les tool-calls bruts.
@Schema({ _id: false })
export class AgentMessage {
@Prop({ required: true, enum: ['user', 'assistant', 'tool'] }) role: string;
@Prop({ type: SchemaTypes.Mixed }) content: unknown; // string | content blocks Anthropic
@Prop({ type: SchemaTypes.Mixed }) toolUse?: unknown; // { id, name, input }
@Prop() tokensIn?: number;
@Prop() tokensOut?: number;
}
const AgentMessageSchema = SchemaFactory.createForClass(AgentMessage);
@Schema({ timestamps: true, collection: 'agent_runs' })
export class AgentRun {
@Prop({ required: true, index: true }) tenantId: string;
// generationId = clé d'idempotence côté client (UUID v4 généré AVANT l'appel).
@Prop({ required: true, unique: true }) generationId: string;
@Prop({ required: true, default: 'claude-opus-4-8' }) model: string;
@Prop({ enum: ['pending', 'streaming', 'done', 'error', 'cancelled'], default: 'pending', index: true })
status: string;
@Prop({ type: [AgentMessageSchema], default: [] }) messages: AgentMessage[];
@Prop({ type: SchemaTypes.Mixed }) usage?: { inputTokens: number; outputTokens: number; costUsd: number };
@Prop() error?: string;
}
export const AgentRunSchema = SchemaFactory.createForClass(AgentRun);
AgentRunSchema.index({ tenantId: 1, createdAt: -1 });Pourquoi generationId unique — c'est la clé d'idempotence. Le client génère l'UUID avant d'appeler ; un retry réseau (ou un double-clic) retombe sur le même run au lieu de lancer une 2ᵉ génération facturée. L'index unique transforme la course en E11000 duplicate key qu'on rattrape proprement.
Client LLM injecté via forRootAsync (jamais new Anthropic() en champ)
// llm.module.ts — le client est un provider DI, configurable et mockable en test.
import Anthropic from '@anthropic-ai/sdk';
@Module({})
export class LlmModule {
static forRootAsync(): DynamicModule {
return {
module: LlmModule,
providers: [{
provide: Anthropic,
useFactory: (cfg: ConfigService) =>
new Anthropic({
apiKey: cfg.getOrThrow('ANTHROPIC_API_KEY'),
maxRetries: 3, // le SDK retry les 429/5xx avec backoff
timeout: 60_000,
}),
inject: [ConfigService],
}],
exports: [Anthropic],
global: true,
};
}
}Mettre
new Anthropic()dans un champ de service casse les tests (pas mockable), recharge la conf à chaud impossible, et duplique le pool HTTP. Le DI résout les trois.
La boucle agentique server-side + streaming SSE + persistance Mongo
@Injectable()
export class AgentService {
constructor(
private readonly llm: Anthropic,
@InjectModel(AgentRun.name) private readonly runs: Model<AgentRun>,
) {}
// Renvoie un AsyncGenerator de chunks SSE ; le controller pipe vers Sse().
async *run(tenantId: string, generationId: string, prompt: string, signal: AbortSignal) {
// Idempotence : INSERT atomique. Si le run existe déjà → on ne relance pas.
let run: AgentRun;
try {
run = await this.runs.create({
tenantId, generationId, status: 'streaming',
messages: [{ role: 'user', content: prompt }],
});
} catch (e: any) {
if (e?.code === 11000) {
const existing = await this.runs.findOne({ generationId }).lean();
yield { event: 'replay', data: JSON.stringify(existing) };
return;
}
throw e;
}
const acc: string[] = [];
try {
const stream = this.llm.messages.stream(
{ model: run.model, max_tokens: 4096, messages: [{ role: 'user', content: prompt }] },
{ signal }, // ⬅️ AbortSignal propagé au SDK → coupe la requête HTTP sortante
);
for await (const ev of stream) {
if (ev.type === 'content_block_delta' && ev.delta.type === 'text_delta') {
acc.push(ev.delta.text);
yield { event: 'token', data: ev.delta.text };
}
}
const final = await stream.finalMessage();
await this.runs.updateOne(
{ generationId },
{
$set: { status: 'done', usage: {
inputTokens: final.usage.input_tokens,
outputTokens: final.usage.output_tokens,
costUsd: this.cost(run.model, final.usage),
} },
$push: { messages: { role: 'assistant', content: acc.join('') } },
},
);
yield { event: 'done', data: JSON.stringify({ generationId }) };
} catch (err) {
const cancelled = signal.aborted;
// Partial-output handling : on PERSISTE ce qu'on a déjà streamé, même annulé.
// ⚠️ Mongo rejette un `$push: {}` vide → on construit l'update conditionnellement.
const update: UpdateQuery<AgentRun> = {
$set: { status: cancelled ? 'cancelled' : 'error', error: cancelled ? undefined : String(err) },
};
if (acc.length) {
update.$push = { messages: { role: 'assistant', content: acc.join('') } };
}
await this.runs.updateOne({ generationId }, update);
if (!cancelled) throw err;
}
}
private cost(model: string, u: { input_tokens: number; output_tokens: number }): number {
// tarif $/Mtok indicatif ; à externaliser en conf, jamais hardcodé en dur.
const rates: Record<string, [number, number]> = {
'claude-opus-4-8': [15, 75], 'claude-sonnet-4-6': [3, 15], 'claude-haiku-4-5': [0.8, 4],
};
const [pin, pout] = rates[model] ?? [0, 0];
return (u.input_tokens / 1e6) * pin + (u.output_tokens / 1e6) * pout;
}
}// controller — SSE natif Nest. La déconnexion client doit annuler le run.
@Controller('agent')
export class AgentController {
constructor(private readonly agent: AgentService) {}
@Sse('stream')
stream(@Query() q: { gen: string; prompt: string }, @Req() req: Request): Observable<MessageEvent> {
const ctrl = new AbortController();
req.on('close', () => ctrl.abort()); // ⬅️ client ferme l'onglet → on coupe l'appel LLM (et la facturation)
return from(this.agent.run('tenant-1', q.gen, q.prompt, ctrl.signal)).pipe(
map((chunk) => ({ type: chunk.event, data: chunk.data }) as MessageEvent),
);
}
}Points senior à retenir :
- AbortController au bord —
req.on('close')→ctrl.abort()propagé jusqu'au SDK. Sans ça, un utilisateur qui ferme l'onglet laisse tourner (et facture) une génération Opus à 75 $/Mtok output. C'est un cost leak classique. - Partial-output handling — on persiste les tokens déjà reçus même sur annulation/erreur. L'utilisateur peut reprendre, et l'audit a la trace.
- Idempotence par
generationId— l'indexunique+ lecatch 11000rendent le endpoint sûr aux retries.
Jobs IA longs via BullMQ (génération > 60 s, batch)
Pour les tâches qui dépassent le timeout HTTP (résumé de 500 docs, embeddings de masse), on sort de la requête et on met en file :
@Processor('ai-jobs')
export class AiJobProcessor extends WorkerHost {
constructor(
private readonly llm: Anthropic,
@InjectModel(AgentRun.name) private readonly runs: Model<AgentRun>,
) { super(); }
async process(job: Job<{ generationId: string; prompt: string }>) {
// Idempotence : si le run est déjà 'done', on no-op (le job a pu être rejoué par BullMQ).
const existing = await this.runs.findOne({ generationId: job.data.generationId }).lean();
if (existing?.status === 'done') return existing.usage;
// ... appel LLM, persistance partielle par chunk pour survivre à un retry de worker ...
}
}- Idempotency keyée sur
generationId— BullMQ peut rejouer un job (worker tué). Le checkstatus === 'done'évite la double facturation. - Cost-aware retry — un 429 (rate limit) se retry avec backoff ; un 400 (prompt invalide) ne se retry jamais (gaspillage). Configurer
attempts+backoffBullMQ et classer l'erreur avant de relancer. - Partial-output — persister par chunk dans Mongo (
$pushincrémental) pour qu'un retry reprenne, pas pour qu'il recommence.
🏭 Production : perf, observabilité, sécurité, scale
Performance
| Levier | Effet | Quand |
|---|---|---|
.lean() | 5–10× plus rapide, pas d'hydratation | tous les reads sans hooks/virtuals |
.select('a b') projection | moins de bytes wire + RAM | listes, gros docs |
| Index couvrant (covered query) | sert depuis l'index sans lire le doc | requêtes hot chemin |
cursor() / streaming | RAM constante | exports, agrégats massifs |
bulkWrite | 1 round-trip pour N ops | imports, sync batch |
readPreference: secondary | délester le primary | reads analytiques tolérants au lag |
Détecter le N+1 — activer mongoose.set('debug', true) en dev affiche chaque query. En prod, instrumenter : un endpoint qui émet 50 queries pour une liste de 50 items = populate en boucle, à remplacer par $lookup ou populate batché.
Observabilité
// Tracer chaque commande Mongo (OpenTelemetry-style) via les events du driver.
mongoose.connection.on('commandStarted', (e) => span.start(e.commandName, e.command));
mongoose.set('debug', (coll, method, query) => metrics.timing('mongo.query', { coll, method }));- Slow query log — activer le profiler Mongo (
db.setProfilingLevel(1, { slowms: 100 })) et alerter sur lesCOLLSCAN(scans sans index) danssystem.profile. - Connection pool — surveiller
poolSize/checkedOut. Un pool saturé = latence p99 qui explose. RéglermaxPoolSizeselon la concurrence (défaut 100, souvent trop pour un microservice).
Sécurité
- NoSQL injection —
find({ email: req.query.email })oùemailvaut{ "$ne": null }retourne TOUT. Toujours valider/caster avec class-validator (@IsString()) avant de passer à un filter.strictQueryne protège pas de ça. - Mass assignment —
userModel.create(req.body)peut écrirerole: 'admin'. Passer par un DTO whitelisté, jamais le body brut. $where/mapReduce— exécutent du JS server-side : à bannir (RCE potentielle).- Champs sensibles —
select: falsesurpasswordHashau schéma pour qu'ils ne sortent jamais par défaut.
Scale
- Sharding — choisir une shard key à haute cardinalité et bien distribuée (
{ tenantId: 1, _id: 1 }souvent). Un_idObjectId seul crée un hotspot d'écriture (monotone croissant) → préférerhashed. - Pagination par curseur (déjà montrée :
_id: { $gt: cursor }) — O(1) vsskip(n)qui est O(n) et dérive sous écritures concurrentes. - Schema versioning — ajouter
@Prop({ default: 1 }) schemaVersionpermet une migration lazy (migrer au read) sans downtime ni backfill bloquant.
🏋️ Exercices
Populate →
$lookup(perf)Objectif — remplacer une liste paginée qui faitUser.find().populate('posts')(N+1) par une seule agrégation$lookup, et mesurer le nombre de queries avant/après. Indice —mongoose.set('debug', true)compte les queries ; le$lookupavec sous-pipeline$groupramène lepostCountsans hydrater les posts.Idempotence d'écriture sous raceObjectif — un endpoint
POST /ordersdoit être idempotent sur un headerIdempotency-Key. Deux requêtes concurrentes avec la même clé ne doivent créer qu'une commande. Indice — indexuniquesuridempotencyKey,create()dans untry/catchsurerr.code === 11000qui renvoie la commande existante. Tester avecPromise.all([req, req]).Transaction multi-document + rollbackObjectif —
transferCredits(from, to, amount): débiter A, créditer B, atomique. Si B n'existe pas, A ne doit pas être débité. Indice —MongoMemoryReplSeten test,session.withTransaction, vérifier qu'une exception au milieu laisse les soldes inchangés. Penser auwriteConcern: { w: 'majority' }.Le break-then-fix
strictQueryObjectif — écrire un test qui prouve qu'un filtrefindOne({ tenant: x })sur un champ absent du schéma retourne le mauvais résultat en Mongoose 7+ (filtre silencieusement ignoré), puis le corriger. Indice — déclarer le champ au schéma OUmongoose.set('strictQuery', 'throw')pour transformer le bug silencieux en exception. Discuter pourquoi'throw'en dev / déclaration au schéma en prod.Streaming LLM annulable persisté (production-grade)Objectif — implémenter le endpoint SSE de la section IA : stream des tokens Anthropic, persiste le run dans
agent_runs, et sur déconnexion client annule l'appel LLM et marque le runcancelledavec l'output partiel. Indice —req.on('close')→AbortController.abort()→signalpassé àmessages.stream(...). Tester l'annulation en fermant le client à mi-stream et en vérifiantstatus: 'cancelled'+ messages partiels en base.Break it : capped vs TTLObjectif — prouver qu'une collection
capped: { size }ne purge pas par date (un vieux doc reste tant que la taille n'est pas atteinte), puis migrer vers un index TTL qui purge bien à l'échéance. Indice — insérer un doc daté de -2 ans dans une capped peu remplie : il survit. Recréer la collection sanscapped+expiressur le champ date ; observer la purge (granularité ~60 s du daemon TTL).
🎤 En entretien
- « Quelle différence entre
populate()et$lookup? » —populate= N+1 contrôlé côté ODM (1 query parent + 1 query groupée$inpar path), pratique mais round-trips multiples et pas de filtrage relationnel riche ;$lookupest un JOIN exécuté server-side dans l'aggregation, plus rapide pour les reads denses et seul moyen de filtrer/agréger sur les enfants. Règle :populatepour le CRUD,$lookuppour les vues analytiques. - « Pourquoi les transactions exigent un replica set ? » — les transactions multi-document de Mongo s'appuient sur l'oplog et le mécanisme de réplication pour la durabilité et l'isolation snapshot ; un mongod standalone n'a pas d'oplog. En local,
rs.initiate()sur un nœud unique suffit pour les activer. - «
_id === stringrenvoiefalse, pourquoi ? » —_idest unObjectId(objet, pas primitive) ;===compare la référence. Toujoursid.equals(other)ouid.toString(). Piège fréquent dans les guards d'autorisation (doc.owner === userId) → faille de permission silencieuse. - « Comment garantir qu'un endpoint LLM ne facture pas deux fois sur un retry ? » — clé d'idempotence (
generationId) générée client, indexuniqueen base,create()qui rattrapeE11000et renvoie le run existant ; côté worker BullMQ, checkstatus === 'done'avant de relancer. EtAbortControllersur déconnexion pour ne pas payer un stream que personne ne lit.
🔗 Liens
- Mongoose docs
- @nestjs/mongoose
- Mongoose 6 → 7 migration
- mongodb-memory-server
- Mongo transactions docs
- Voir
04-transactions.mdpour transactions multi-store.