Skip to content

Générateurs & itérateurs

TL;DR — Un itérateur est tout objet qui sait répondre à la question « donne-moi l'élément suivant » (__next__) jusqu'à lever StopIteration ; un itérable sait fabriquer un itérateur (__iter__). Un générateur est le moyen idiomatique de produire un itérateur en Python : une fonction qui contient yield devient une machine à états suspendable qui ne calcule chaque valeur qu'à la demande (lazy), avec une empreinte mémoire O(1) quel que soit le volume traité. C'est exactement le mécanisme qui sous-tend le streaming token-par-token d'un agent LLM : for event in stream: consomme un itérateur que le SDK Anthropic alimente au fur et à mesure que les tokens arrivent du réseau. Maîtriser yield, yield from, les générateurs asynchrones (async def + async for), et le pattern pipeline vous donne l'outil de base de tout traitement de flux en Python — du CSV de 50 Go aux deltas SSE d'une réponse Claude.


🧠 Mental model

Vous venez de TypeScript/PHP. Le réflexe array.map(...).filter(...) construit tout le tableau intermédiaire en mémoire à chaque étape. Un générateur Python, c'est l'inverse : c'est un tapis roulant à la demande, pas un entrepôt.

Imaginez une distribution de tickets à la boulangerie. Une list (eager) imprime à l'avance les 10 000 tickets et les empile sur le comptoir : il faut le papier pour les 10 000, même si le premier client repart après le ticket n°3. Un générateur (lazy), c'est la machine à tickets : elle ne déroule le ticket suivant que quand on appuie sur le bouton. Entre deux appuis, la machine est gelée — elle se souvient exactement où elle en est (le numéro courant), mais ne consomme rien.

Ce « gel » est la propriété centrale. Une fonction normale s'exécute du début à la fin et rend la main une seule fois (return). Un générateur rend la main à chaque yield, en mémorisant son état local complet (variables, position dans la boucle), et reprend exactement là au prochain appel.

Fonction normale            Générateur
─────────────────           ──────────────────────────────
def f():                    def g():
  a = 1                       a = 1
  b = 2                        yield a      ← gèle ici, rend 1
  return a + b                 b = 2        ← reprend ici au next() suivant
                               yield b      ← gèle ici, rend 2
  appel → 3 (tout cuit)       appel → 1, puis 2, puis StopIteration
                              (état conservé entre les deux)


Pull-based : le CONSOMMATEUR tire, le générateur pousse une valeur puis dort

  for x in g():        next()      ┌─────────┐  yield x   ┌──────────┐
  ──────────────────►  ─────────►  │   gen   │  ────────► │ consumer │
                       (réveille)  │ (gelé)  │  (1 valeur)│          │
                                   └─────────┘            └──────────┘

Le protocole d'itération complet tient en deux méthodes :

  • iter(obj) appelle obj.__iter__() → renvoie un itérateur.
  • next(it) appelle it.__next__() → renvoie la valeur suivante, ou lève StopIteration quand c'est fini.

for x in obj: n'est que du sucre syntaxique pour : it = iter(obj); while True: try: x = next(it) except StopIteration: break.


Le protocole d'itération, à la main

Pour comprendre les générateurs, il faut d'abord voir ce qu'ils automatisent. Un itérateur écrit « manuellement » :

python
from collections.abc import Iterator


class Countdown:
    """Itérable ET itérateur naïf — on verra plus bas pourquoi c'est un piège."""

    def __init__(self, start: int) -> None:
        self._current = start

    def __iter__(self) -> Iterator[int]:
        return self

    def __next__(self) -> int:
        if self._current <= 0:
            raise StopIteration
        self._current -= 1
        return self._current + 1


for n in Countdown(3):
    print(n)  # 3, 2, 1

Ça marche, mais c'est verbeux et subtilement bogué : la classe est à la fois itérable et son propre itérateur, donc l'état (_current) est partagé. Itérer deux fois le même objet ne le réinitialise pas :

python
cd = Countdown(3)
print(list(cd))  # [3, 2, 1]
print(list(cd))  # [] — l'itérateur est épuisé, surprise !

C'est le piège classique : un itérateur est à usage unique ; un itérable bien conçu produit un nouvel itérateur à chaque iter(). C'est précisément ce qu'un générateur vous donne gratuitement.

La version générateur

python
from collections.abc import Iterator


def countdown(start: int) -> Iterator[int]:
    n = start
    while n > 0:
        yield n
        n -= 1


# Chaque appel countdown(3) crée un itérateur FRAIS et indépendant.
cd = countdown(3)
print(list(cd))       # [3, 2, 1]
print(list(countdown(3)))  # [3, 2, 1] — un nouvel itérateur, réinitialisé

Trois lignes au lieu de dix, l'état local est encapsulé dans la frame de la fonction, et le bug d'épuisement disparaît parce qu'on rappelle la fonction pour obtenir un nouvel itérateur. Règle senior : n'écrivez une classe d'itérateur que si vous avez besoin d'un état explicitement inspectable ou de méthodes additionnelles. Sinon, yield.


Lazy vs eager : le cœur du sujet

La différence n'est pas cosmétique — c'est une différence de complexité mémoire. Comparons le calcul de la somme des carrés de 1 à 100 millions.

❌ La mauvaise façon (eager) — vue depuis TS/PHP

python
def squares_eager(n: int) -> list[int]:
    return [i * i for i in range(n)]


total = sum(squares_eager(100_000_000))
# Construit une liste de 100 M d'entiers AVANT de sommer.
# ~3-4 Go de RAM. Sur un conteneur à 512 Mo : OOMKilled.

✅ La façon idiomatique (lazy)

python
from collections.abc import Iterator


def squares_lazy(n: int) -> Iterator[int]:
    for i in range(n):
        yield i * i


total = sum(squares_lazy(100_000_000))
# Empreinte mémoire O(1) : un seul carré vit à la fois.
# sum() tire les valeurs une par une.

Même résultat numérique, mais la version lazy tient dans quelques kilo-octets. La leçon : dès qu'une source de données peut être grande ou infinie, ou qu'elle arrive par morceaux dans le temps (réseau, fichier, file), un générateur est le bon outil.

Expressions génératrices

Pour les cas simples, pas besoin de def. La generator expression est à la list comprehension ce que le générateur est à la list :

python
# list comprehension — eager, alloue tout
sum([i * i for i in range(1_000_000)])

# generator expression — lazy, parenthèses au lieu de crochets
sum(i * i for i in range(1_000_000))  # pas de liste intermédiaire

Quand une generator expression est l'unique argument d'une fonction, on omet les parenthèses redondantes : sum(i * i for i in range(n)).


yield from : déléguer à un sous-générateur

Quand un générateur veut produire toutes les valeurs d'un autre itérable, le réflexe naïf est une boucle. yield from fait la même chose, en mieux (il transmet aussi les valeurs envoyées et les exceptions — utile pour les coroutines).

python
from collections.abc import Iterator


def flatten(nested: list[list[int]]) -> Iterator[int]:
    for sublist in nested:
        yield from sublist  # équivaut à: for x in sublist: yield x


print(list(flatten([[1, 2], [3], [4, 5]])))  # [1, 2, 3, 4, 5]

Récursivité naturelle pour un arbre :

python
from collections.abc import Iterator
from typing import Any


def walk(node: dict[str, Any]) -> Iterator[Any]:
    yield node["value"]
    for child in node.get("children", []):
        yield from walk(child)  # délégation récursive propre

Le pattern pipeline : composer des générateurs

C'est le pattern qui transforme les générateurs d'une curiosité en outil d'architecture. Chaque étape est un générateur qui consomme l'itérable précédent et en produit un nouveau. Rien n'est matérialisé entre les étapes : les données coulent élément par élément à travers tout le pipeline.

python
from collections.abc import Iterable, Iterator


def read_lines(path: str) -> Iterator[str]:
    with open(path, encoding="utf-8") as f:
        yield from f  # un fichier EST déjà un itérateur de lignes, lazy


def parse_amounts(lines: Iterable[str]) -> Iterator[float]:
    for line in lines:
        line = line.strip()
        if line and not line.startswith("#"):
            yield float(line)


def above_threshold(amounts: Iterable[float], threshold: float) -> Iterator[float]:
    for amount in amounts:
        if amount > threshold:
            yield amount


# Composition : aucune des étapes n'a lu tout le fichier.
# La RAM consommée est celle d'UNE ligne, même pour un fichier de 50 Go.
lines = read_lines("transactions.txt")
amounts = parse_amounts(lines)
large = above_threshold(amounts, 1000.0)

total = sum(large)  # c'est sum() qui amorce la pompe et tire tout le pipeline

Le sum() final est le seul consommateur ; tant que personne ne tire, aucun octet n'est lu. C'est le même principe que les streams Node ou les IteratorAggregate PHP, mais sans cérémonie.


Générateurs asynchrones — et le lien direct avec les agents LLM

C'est ici que la leçon rencontre votre métier. Quand vous streamez la réponse d'un agent Claude, le SDK Anthropic vous rend… un itérateur asynchrone. async for event in stream: est strictement le protocole d'itération vu plus haut, transposé en monde asyncio : __aiter__ / __anext__ / StopAsyncIteration.

Un générateur asynchrone est une fonction async def qui contient yield. Il peut await entre deux yield — exactement ce qu'il faut pour produire des valeurs qui arrivent du réseau au compte-gouttes.

python
from collections.abc import AsyncIterator

from anthropic import AsyncAnthropic

client = AsyncAnthropic()  # lit ANTHROPIC_API_KEY depuis l'environnement


async def stream_answer(question: str) -> AsyncIterator[str]:
    """Générateur async qui re-yield les deltas de texte d'une réponse Claude.

    Le SDK gère le réseau ; nous, on transforme son flux en un itérateur
    de chaînes propres que le reste de l'app peut consommer sans connaître
    Anthropic.
    """
    async with client.messages.stream(
        model="claude-opus-4-8",
        max_tokens=1024,
        messages=[{"role": "user", "content": question}],
    ) as stream:
        async for text in stream.text_stream:
            yield text  # un fragment de réponse, dès qu'il arrive


async def main() -> None:
    async for chunk in stream_answer("Explique les générateurs en une phrase."):
        print(chunk, end="", flush=True)
    print()

Points à retenir, droit issus des faits Anthropic :

  • On utilise AsyncAnthropic (jamais le client synchrone dans une route async).
  • Le streaming est le défaut recommandé dès qu'une requête peut être longue ou avec un gros max_tokens : il évite les timeouts de requête HTTP. Le générateur async est l'abstraction naturelle pour le restituer.
  • Si vous n'avez pas besoin de traiter chaque événement, le SDK fournit await stream.get_final_message() pour récupérer le message complet d'un coup. Mais pour une UI token-par-token, c'est async for qui brille.
  • Le modèle par défaut est claude-opus-4-8 (5 $/25 $ par Mtok, fenêtre 1M). Pour du thinking, on utilise thinking={"type": "adaptive"}jamais budget_tokens (supprimé sur Opus 4.8, renvoie une 400).

Servir ce flux en FastAPI (SSE)

Le générateur async se branche directement sur une réponse streaming FastAPI. C'est le squelette d'un endpoint « parler à un agent » :

python
from collections.abc import AsyncIterator

from anthropic import AsyncAnthropic
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse

app = FastAPI()


def get_client() -> AsyncAnthropic:
    # Dépendance injectable : un seul client réutilisé (pool de connexions).
    return AsyncAnthropic()


class ChatRequest(BaseModel):
    question: str


async def token_events(
    question: str, client: AsyncAnthropic
) -> AsyncIterator[dict[str, str]]:
    async with client.messages.stream(
        model="claude-opus-4-8",
        max_tokens=1024,
        messages=[{"role": "user", "content": question}],
    ) as stream:
        async for text in stream.text_stream:
            yield {"event": "token", "data": text}
        final = await stream.get_final_message()
        yield {"event": "done", "data": final.stop_reason or "end_turn"}


@app.post("/chat")
async def chat(
    req: ChatRequest, client: AsyncAnthropic = Depends(get_client)
) -> EventSourceResponse:
    # On RETOURNE le générateur, on ne l'exécute pas. FastAPI/Starlette
    # le consomme au fur et à mesure et pousse chaque token au client.
    return EventSourceResponse(token_events(req.question, client))

L'élégance : token_events est un générateur async, donc paresseux — il ne produit un événement SSE que lorsque Claude a produit un token, sans jamais bufferiser la réponse entière côté serveur. Mémoire serveur O(1) par requête, premier octet renvoyé au client en quelques centaines de millisecondes.

La boucle tool-use comme générateur d'événements

Le même pattern modélise proprement une boucle agentique (tool-use). On expose la boucle comme un générateur qui yield des événements typés ; l'appelant décide quoi en faire (logger, streamer à l'UI, gater une confirmation).

python
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Any

from anthropic import AsyncAnthropic


@dataclass(frozen=True, slots=True)
class TextEvent:
    text: str


@dataclass(frozen=True, slots=True)
class ToolCallEvent:
    name: str
    input: dict[str, Any]


@dataclass(frozen=True, slots=True)
class DoneEvent:
    stop_reason: str


async def agent_loop(
    client: AsyncAnthropic,
    messages: list[dict[str, Any]],
    tools: list[dict[str, Any]],
) -> AsyncIterator[TextEvent | ToolCallEvent | DoneEvent]:
    """Un tour de boucle agentique, restitué comme un flux d'événements.

    Le générateur encapsule l'orchestration ; l'appelant n'a qu'à
    `async for` dessus et brancher l'exécution réelle des outils.
    """
    while True:
        async with client.messages.stream(
            model="claude-opus-4-8",
            max_tokens=2048,
            messages=messages,
            tools=tools,
        ) as stream:
            async for text in stream.text_stream:
                yield TextEvent(text)
            response = await stream.get_final_message()

        messages.append({"role": "assistant", "content": response.content})

        tool_calls = [b for b in response.content if b.type == "tool_use"]
        if not tool_calls:
            yield DoneEvent(response.stop_reason or "end_turn")
            return

        results: list[dict[str, Any]] = []
        for call in tool_calls:
            yield ToolCallEvent(call.name, call.input)
            # ... exécuter l'outil ici (omis) ...
            results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": call.id,
                    "content": "résultat",
                }
            )
        messages.append({"role": "user", "content": results})
        # On reboucle : le générateur reprendra son streaming au prochain tour.

Le générateur suspend et reprend la boucle multi-tours sans que l'appelant ait à gérer un automate à états. C'est la généralisation directe de countdown : un état local gelé entre les yield, sauf que l'état est ici toute une conversation.


⚙️ En production

Modes de défaillance.

  • Itérateur épuisé silencieusement. Un générateur est à usage unique. Si vous le passez à deux consommateurs, le second voit un flux vide — sans erreur. Symptôme typique : un calcul qui renvoie 0 ou [] la deuxième fois. Si vous devez réutiliser, matérialisez (data = list(gen)) ou recréez le générateur. Pour partager un flux entre N consommateurs, utilisez itertools.teemais attention, tee bufferise tout ce que le consommateur le plus lent n'a pas encore lu : sur un flux infini avec consommateurs désynchronisés, c'est une fuite mémoire.
  • return dans un générateur. return valeur ne renvoie pas la valeur au for : il termine le générateur et la valeur devient StopIteration.value (récupérable uniquement via yield from). Erreur fréquente venant de TS où return dans une fonction génératrice a un sens différent.
  • Exceptions et nettoyage de ressources. Le piège majeur du pipeline-sur-fichier : si le consommateur s'arrête tôt (un break, une exception en aval), le générateur est suspendu avec son with open(...) encore ouvert. Python appelle .close() sur le générateur quand le GC le ramasse, ce qui déclenche un GeneratorExit dans le yield et fait remonter le finally/__exit__ — mais le timing dépend du GC. En prod, ne comptez pas sur le GC : fermez explicitement (gen.close()) ou structurez avec contextlib.closing. Mettez toujours la libération de ressource dans un try/finally (ou un with) à l'intérieur du générateur.
python
from collections.abc import Iterator


def read_lines(path: str) -> Iterator[str]:
    f = open(path, encoding="utf-8")
    try:
        yield from f
    finally:
        f.close()  # exécuté même si le consommateur break ou close() le gen

Performance. Les générateurs ont un coût par-élément (overhead d'appel __next__) : pour des transformations triviales sur des données déjà en mémoire, une list comprehension peut être plus rapide en CPU. Le gain des générateurs est la mémoire et la latence du premier élément (time-to-first-token), pas le débit brut. Choisissez selon la contrainte dominante. Pour du calcul numérique vectorisable, ni l'un ni l'autre : NumPy.

Sécurité. Un générateur paresseux peut masquer un flux infini ou contrôlé par l'utilisateur. list(user_supplied_generator()) sur une source non bornée est un déni de service. Bornez explicitement avec itertools.islice(gen, max_items). Dans le contexte LLM : un async for sur un stream sans timeout peut pendre indéfiniment si la connexion traîne — enveloppez avec asyncio.timeout() et fixez un max_tokens raisonnable.

Observabilité. Le caractère paresseux rend le débogage moins évident : un print dans le générateur ne s'exécute qu'au moment où la valeur est tirée, pas à la définition. Pour tracer la latence inter-tokens d'un agent, instrumentez dans la boucle de consommation (async for), pas dans le générateur lui-même — c'est là que le temps réel s'écoule.

Le tradeoff senior. Générateur quand : source grande/infinie/streamée, mémoire contrainte, latence du premier élément critique (UI, SSE), ou pipeline composable. List/eager quand : petites données réutilisées plusieurs fois, accès aléatoire/indexation nécessaire, ou besoin de len(). Ne « génératorisez » pas par réflexe — la lisibilité d'une comprehension sur 100 éléments l'emporte sur une micro-économie mémoire fantôme.


🏋️ Exercices

Exercice 1 — chunked (implémenter)

Objectif. Écrire un générateur chunked(iterable, size) qui yield des listes de size éléments au maximum (le dernier morceau peut être plus court), en restant lazy : il ne doit jamais matérialiser tout l'itérable. Cas d'usage réel : batcher des items avant un appel API.

python
chunked([1, 2, 3, 4, 5], 2)  # → [1, 2], [3, 4], [5]

Indice/Solution. Accumulez dans une liste tampon ; dès qu'elle atteint size, yield puis videz-la. À la fin de la boucle, n'oubliez pas de yield le tampon résiduel s'il est non vide. Squelette :

python
from collections.abc import Iterable, Iterator
from typing import TypeVar

T = TypeVar("T")


def chunked(iterable: Iterable[T], size: int) -> Iterator[list[T]]:
    if size < 1:
        raise ValueError("size doit être >= 1")
    buf: list[T] = []
    for item in iterable:
        buf.append(item)
        if len(buf) == size:
            yield buf
            buf = []  # NB: nouvelle liste, pas buf.clear() (le yield a livré la référence)
    if buf:
        yield buf

Piège testé : buf.clear() au lieu de buf = [] corromprait les chunks déjà yieldés (même référence).

Exercice 2 — pipeline de logs (production-grade)

Objectif. Construire un pipeline de trois générateurs qui lit un gros fichier de logs, parse chaque ligne en dict, filtre les lignes de niveau ERROR, et compte les erreurs par heure — le tout en O(1) mémoire et en fermant proprement le fichier même si on s'arrête tôt.

Indice/Solution. Réutilisez le pattern read_lines avec try/finally. Étape parse : yield un dict ou skip les lignes malformées (ne crashez pas le pipeline sur une ligne corrompue — try/except autour du parse, continue). Étape compte : agrégez dans un collections.Counter. Validez la robustesse en passant un fichier avec des lignes vides et tronquées. Bonus : bornez avec itertools.islice pour ne traiter que les N premières lignes en dev.

Exercice 3 — merge_sorted avec heapq (algorithmique)

Objectif. Écrire merge_sorted(*iterables) qui fusionne plusieurs itérables déjà triés en un seul flux trié, paresseusement, sans tout charger. (C'est le k-way merge, brique du tri externe.)

Indice/Solution. heapq.merge(*iterables) existe et fait exactement ça — mais l'exercice est de comprendre pourquoi. Implémentez-le : un tas (heapq) contenant (valeur, index_source, itérateur), on pop le minimum, on yield, on avance l'itérateur correspondant et on re-push s'il reste des éléments. Gérez StopIteration par source. Comparez ensuite votre version à heapq.merge sur le comportement et les perfs.

Exercice 4 — streamer un agent et mesurer la latence (AI, dur)

Objectif. Écrire un générateur async timed_stream(client, question) qui re-yield les tokens d'une réponse claude-opus-4-8 et émet, à la fin, la latence du premier token et le débit moyen (tokens/s). Puis le servir via un endpoint FastAPI SSE.

Indice/Solution. Capturez time.perf_counter() avant le async with ... .stream(...). Au premier yield du text_stream, calculez le time-to-first-token. Comptez les fragments, mesurez le total à la fin. Yieldez les tokens au fur et à mesure (type TextEvent) puis un dernier MetricsEvent. Rappels des faits : AsyncAnthropic, streaming par défaut, thinking={"type": "adaptive"} si vous voulez du raisonnement (jamais budget_tokens), get_final_message() pour récupérer usage.output_tokens exact plutôt que de compter les fragments. Enveloppez le async for dans asyncio.timeout(30) pour ne pas pendre.

Exercice 5 — casser puis réparer : le double consommateur (break-then-fix)

Objectif. On vous donne ce code qui « marche une fois sur deux ». Diagnostiquez, expliquez la cause racine, réparez.

python
def parse(lines):
    for ln in lines:
        yield int(ln)


nums = parse(["1", "2", "3"])
print(max(nums))   # 3
print(sum(nums))   # ⚠️ 0 — pourquoi ?

Indice/Solution. max(nums) épuise l'itérateur (il le parcourt entièrement) ; nums est mort, sum voit un flux vide. Cause racine : un générateur est un itérateur à usage unique. Trois réparations possibles, à discuter en termes de tradeoff : (a) matérialiser nums = list(parse(...)) si les données sont petites et réutilisées — coût mémoire ; (b) recréer le générateur pour chaque consommation — coût de recalcul ; (c) itertools.tee(parse(...), 2) pour deux consommateurs synchrones — coût de buffer. Pour max et sum en une seule passe, le bon réflexe senior est de ne parcourir qu'une fois : total = 0; m = float("-inf"); for x in nums: total += x; m = max(m, x).


🎤 En entretien

Q : Quelle est la différence entre un itérable et un itérateur ? Un itérable implémente __iter__ et sait produire un itérateur frais (une list, un dict, un fichier) ; un itérateur implémente __next__ (et __iter__ qui se retourne lui-même) et porte l'état de progression — il est consommable une seule fois. iter(iterable) vous donne un itérateur ; next(iterator) avance.

Q : Pourquoi un générateur consomme-t-il O(1) mémoire là où une list comprehension consomme O(n) ? Le générateur ne calcule chaque valeur qu'à la demande et ne garde en vie que l'élément courant plus l'état local de sa frame ; rien n'est stocké entre deux yield. La comprehension matérialise les n éléments d'un coup. Le générateur troque de la mémoire contre un léger surcoût CPU par élément et une perte de l'accès aléatoire.

Q : À quoi sert yield from et qu'apporte-t-il par rapport à une boucle for ... yield ? Il délègue à un sous-itérable : yield from sub re-yield toutes ses valeurs. Au-delà du raccourci, il établit un canal transparent qui propage aussi les valeurs send(), les exceptions throw() et capture la valeur de retour du sous-générateur — indispensable pour composer des coroutines, ce qu'une simple boucle ne fait pas.

Q : Comment le streaming d'un agent LLM est-il relié aux générateurs, et quel piège de ressource faut-il anticiper ? Le SDK Anthropic expose la réponse en streaming comme un itérateur (async) : async for text in stream.text_stream est le protocole d'itération transposé en asyncio. Le piège : si le consommateur s'arrête tôt (client déconnecté, exception), le générateur reste suspendu avec le async with du stream ouvert — il faut garantir la fermeture (le async with interne le gère, mais sur un générateur fait main, prévoir try/finally et ne pas dépendre du GC), et borner avec un timeout pour éviter une connexion pendue.

Bibliothèque tech perso — Achref