Fonctions, *args/**kwargs, closures
TL;DR — En Python, une fonction est un objet de première classe : on peut la passer en argument, la retourner, la stocker.
*args/**kwargscapturent un nombre arbitraire d'arguments positionnels et nommés, et servent de colle pour écrire des décorateurs et des wrappers transparents. Une closure est une fonction qui capture des variables de son scope englobant — c'est le mécanisme qui sous-tend les décorateurs, les usines à fonctions (factories), les handlers de callback, et — pour vous qui construisez des agents — l'injection propre d'un clientAsyncAnthropic, de la configuration de retry, ou d'un budget de tokens dans une boucle d'outils sans la faire transiter par dix paramètres. Maîtriser ces trois notions, c'est arrêter d'écrire du Python « à la PHP » et commencer à écrire du Python idiomatique.
🧠 Mental model
Vous venez de TypeScript et de PHP. La bonne nouvelle : les fonctions Python ressemblent beaucoup aux fonctions fléchées de JS — ce sont des valeurs. La nuance qui fait trébucher tout le monde tient au scope et au moment de la capture.
L'analogie du sac à dos. Une closure, c'est une fonction qui part en randonnée avec un sac à dos. Au moment où elle est créée, elle ne copie pas le contenu de son environnement dans le sac : elle y range une référence vers le placard de la maison (le scope englobant). Quand on l'appelle, plus tard, ailleurs, elle ouvre le sac, suit la référence, et lit la valeur actuelle du placard — pas celle qu'il y avait au départ. C'est exactement le piège des closures dans une boucle (on y revient).
def make_counter(): ┌─────────────────────────────┐
count = 0 │ scope englobant (la maison) │
def inc(): │ count ──────────────┐ │
nonlocal count │ │ │
count += 1 │ ┌──────────────────┐ │ │
return count │ │ inc (la rando) │ │ │
return inc ─────────────┼──▶│ sac à dos: ─────┼───┘ │
│ │ __closure__ │ │
c = make_counter() │ └──────────────────┘ │
c() # 1 └─────────────────────────────┘
c() # 2 ← le placard "count" survit à make_counter()Le point crucial : make_counter() a terminé son exécution, sa frame devrait être détruite — mais count survit parce que inc détient encore une référence dessus. C'est la cellule de closure (__closure__). En PHP vous aviez function() use ($count) {} qui copiait ; en Python la capture est par référence et implicite.
Les fonctions sont des objets de première classe
Avant les closures, ancrons l'idée de base : une fonction est une valeur comme une autre.
from collections.abc import Callable
def double(x: int) -> int:
return x * 2
# On la stocke dans une variable — pas d'appel, pas de parenthèses
f: Callable[[int], int] = double
print(f(21)) # 42
# On la met dans une liste, un dict, on la passe en argument
ops: dict[str, Callable[[int], int]] = {"double": double, "neg": lambda x: -x}
print(ops["double"](10)) # 20
def apply(fn: Callable[[int], int], value: int) -> int:
return fn(value)
print(apply(double, 5)) # 10Annotez vos callables avec collections.abc.Callable (Python 3.9+, et c'est le canonique en 3.12 — pas typing.Callable, qui est déprécié). Callable[[int, str], bool] se lit « prend un int et un str, renvoie un bool ».
Paramètres : le vocabulaire complet
Python a une grammaire de paramètres riche. La connaître évite 90 % des TypeError: got an unexpected keyword argument.
def configure(
model: str, # positionnel ou nommé
/, # tout ce qui précède est positionnel-only
max_tokens: int = 4096, # avec défaut
*, # tout ce qui suit est keyword-only
stream: bool = False,
effort: str = "high",
) -> dict[str, object]:
return {"model": model, "max_tokens": max_tokens, "stream": stream, "effort": effort}
configure("claude-opus-4-8", 8192, stream=True) # ✅
configure("claude-opus-4-8", max_tokens=8192) # ✅
configure(model="claude-opus-4-8") # ❌ TypeError: model est positionnel-only (à cause du /)
configure("claude-opus-4-8", 8192, True) # ❌ TypeError: stream est keyword-only (à cause du *)/: tout ce qui est avant doit être passé par position. Utile pour figer le nom d'un paramètre (vous pourrez le renommer sans casser les appelants).*(seul) : tout ce qui est après doit être passé par nom. C'est l'outil n°1 pour la lisibilité —stream=Trueest auto-documenté là oùTruene dit rien.
Règle senior : pour tout booléen ou paramètre « drapeau », rendez-le keyword-only. Un appel comme create(msgs, True, False, True) est un bug en puissance.
*args et **kwargs : capturer l'arbitraire
*args collecte les positionnels excédentaires dans un tuple ; **kwargs collecte les nommés excédentaires dans un dict.
def trace(*args: object, **kwargs: object) -> None:
print(f"positionnels = {args!r}") # tuple
print(f"nommés = {kwargs!r}") # dict
trace(1, "deux", 3.0, mode="debug", retries=2)
# positionnels = (1, 'deux', 3.0)
# nommés = {'mode': 'debug', 'retries': 2}Les noms args/kwargs sont une convention, pas une obligation : c'est * et ** qui font le travail. L'usage symétrique, c'est le unpacking à l'appel :
params = {"model": "claude-opus-4-8", "max_tokens": 1024}
extra = ["bloc1", "bloc2"]
def build(*blocks: str, model: str, max_tokens: int) -> str:
return f"{model} ({max_tokens}): {', '.join(blocks)}"
build(*extra, **params) # déballe la liste en positionnels et le dict en nommésLa manière idiomatique : un wrapper transparent
Le cas d'usage roi de *args/**kwargs est le passe-plat : un wrapper qui ne connaît pas la signature de la fonction qu'il enveloppe et la transmet telle quelle. C'est exactement ce dont on a besoin pour, par exemple, ajouter un retry autour d'un appel LLM.
import functools
import time
from collections.abc import Callable
from typing import TypeVar
T = TypeVar("T")
def retry(times: int = 3, base_delay: float = 0.5) -> Callable[[Callable[..., T]], Callable[..., T]]:
def decorator(fn: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(fn)
def wrapper(*args: object, **kwargs: object) -> T:
if times < 1:
raise ValueError("times doit être >= 1")
last_exc: Exception | None = None
for attempt in range(times):
try:
return fn(*args, **kwargs)
except Exception as exc: # en prod : capturez des exceptions typées (voir plus bas)
last_exc = exc
if attempt < times - 1: # pas de sleep après la dernière tentative
time.sleep(base_delay * 2**attempt) # backoff exponentiel
assert last_exc is not None # garanti par times >= 1 + boucle entrée
raise last_exc
return wrapper
return decoratorNotez @functools.wraps(fn) : sans lui, wrapper.__name__ deviendrait "wrapper" et le docstring serait perdu — un cauchemar au débogage. Mettez-le systématiquement.
La manière fautive (à reconnaître en review)
# ❌ MAUVAIS — argument mutable par défaut
def append_to(item: int, target: list[int] = []) -> list[int]:
target.append(item)
return target
append_to(1) # [1]
append_to(2) # [1, 2] ← surprise : la même liste est réutilisée !Le défaut [] est évalué une seule fois, à la définition de la fonction, pas à chaque appel. C'est le piège Python le plus classique en entretien. Le correctif idiomatique :
# ✅ BON — sentinelle None
def append_to(item: int, target: list[int] | None = None) -> list[int]:
if target is None:
target = []
target.append(item)
return targetClosures : capture par référence, late binding
Une closure capture des variables, pas leurs valeurs. Et la lecture se fait au moment de l'appel (late binding). Ce double fait produit le bug le plus célèbre du langage :
# ❌ MAUVAIS — toutes les closures partagent la même variable i
funcs = [lambda: i for i in range(3)]
print([f() for f in funcs]) # [2, 2, 2] ← pas [0, 1, 2] !Au moment où on appelle f(), la boucle est terminée et i vaut 2 partout. Les trois lambdas pointent vers la même variable i. Deux correctifs idiomatiques :
# ✅ Option A — figer la valeur via un argument par défaut (évalué à la création)
funcs = [lambda i=i: i for i in range(3)]
print([f() for f in funcs]) # [0, 1, 2]
# ✅ Option B — une factory qui crée un nouveau scope par itération
def make(i: int):
return lambda: i
funcs = [make(i) for i in range(3)]
print([f() for f in funcs]) # [0, 1, 2]Modifier une variable capturée : nonlocal
Par défaut, affecter à une variable dans une fonction interne crée une nouvelle variable locale. Pour modifier celle du scope englobant, déclarez nonlocal :
def make_accumulator() -> tuple[Callable[[int], int], Callable[[], int]]:
total = 0
def add(n: int) -> int:
nonlocal total # sans ça : UnboundLocalError
total += n
return total
def current() -> int:
return total # lecture seule : pas besoin de nonlocal
return add, current
add, current = make_accumulator()
add(10); add(5)
print(current()) # 15Inspecter une closure (utile en debug) :
print(add.__closure__) # tuple de cellules
print([c.cell_contents for c in add.__closure__]) # [15]
print(add.__code__.co_freevars) # ('total',) ← variables libres capturées⚙️ En production — appliqué au service et à l'appel d'agents LLM
Voici où tout converge pour votre stack. Vous construisez des agents : vous appelez l'API Anthropic, vous streamez des tokens, vous bouclez sur du tool-use, vous gérez retries et observabilité. Closures et **kwargs sont précisément les outils pour le faire proprement.
1. Closure pour injecter le client et la config — sans le faire transiter partout
Le piège débutant est de passer le client AsyncAnthropic, le modèle, l'effort, etc. à chaque fonction. Une closure (ou une factory) capture cette config une fois et renvoie une fonction d'appel propre.
import os
from collections.abc import Awaitable, Callable
from anthropic import AsyncAnthropic
from anthropic.types import Message
def make_completion(
client: AsyncAnthropic,
*,
model: str = "claude-opus-4-8", # 5 / 25 USD par Mtok, fenêtre 1M
effort: str = "high",
) -> Callable[..., Awaitable[Message]]:
"""Capture client + config ; renvoie une fonction d'appel à un seul argument utile."""
async def complete(prompt: str, /, **overrides: object) -> Message:
return await client.messages.create(
model=model,
max_tokens=overrides.pop("max_tokens", 4096), # type: ignore[arg-type]
thinking={"type": "adaptive"}, # JAMAIS budget_tokens sur Opus 4.x
output_config={"effort": effort}, # low | medium | high | xhigh | max
messages=[{"role": "user", "content": prompt}],
**overrides, # passe-plat de tout le reste (tools, system, metadata...)
)
return complete
# Setup (une fois)
complete = make_completion(AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"]))
# Usage (partout, sans retrimballer la config)
# msg = await complete("Résume ce ticket", max_tokens=1024)Trois points seniors ici :
thinking={"type": "adaptive"}: sur la famille Opus 4.x,budget_tokensrenvoie un 400. La profondeur de réflexion se règle viaoutput_config.effort(highest le défaut,xhighpour l'agentique/code lourd).**overridesrend la fonction extensible sans toucher sa signature — vous ajouteztools=ousystem=à l'appel sans rien réécrire.- Le
/aprèspromptle verrouille comme positionnel : vous pourrez renommer le paramètre interne sans casser les appelants.
2. Streaming de tokens — la closure capture le buffer et le callback
Pour servir des tokens en SSE (ou les pousser dans une UI Angular), on streame. La méthode messages.stream du SDK gère le protocole ; une closure capture proprement le sink de sortie.
from collections.abc import Awaitable, Callable
def make_streamer(
client: AsyncAnthropic,
on_token: Callable[[str], Awaitable[None]], # callback capturé
) -> Callable[[str], Awaitable[str]]:
async def stream_reply(prompt: str) -> str:
chunks: list[str] = []
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=8192,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
chunks.append(text)
await on_token(text) # closure : on_token vient du scope englobant
final = await stream.get_final_message() # message complet une fois le flux terminé
_ = final.usage # observabilité : input/output/cache tokens
return "".join(chunks)
return stream_replyLe async with est non négociable : il garantit que la connexion HTTP est fermée même si le client se déconnecte en plein flux (mode d'échec classique en SSE). get_final_message() vous redonne le Message complet — utilisez final.usage pour logguer les tokens consommés (et donc le coût).
3. La boucle de tool-use — **kwargs pour dispatcher vers vos handlers
Un agent appelle des outils. Vous tenez un registre nom -> fonction, et chaque outil reçoit ses arguments via **. C'est l'archétype du déballage de dict.
import json
from collections.abc import Callable
# Registre d'outils — fonctions de première classe stockées dans un dict
def get_weather(*, city: str, unit: str = "celsius") -> str:
return f"22°{unit[0].upper()} à {city}"
TOOLS: dict[str, Callable[..., str]] = {"get_weather": get_weather}
async def run_agent(client: AsyncAnthropic, prompt: str) -> str:
messages: list[dict[str, object]] = [{"role": "user", "content": prompt}]
tool_defs = [{
"name": "get_weather",
"description": "Météo actuelle d'une ville. Appelle-le quand l'utilisateur demande le temps.",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}, "unit": {"type": "string"}},
"required": ["city"],
},
}]
while True:
msg = await client.messages.create(
model="claude-opus-4-8",
max_tokens=4096,
thinking={"type": "adaptive"},
tools=tool_defs,
messages=messages,
)
messages.append({"role": "assistant", "content": msg.content})
if msg.stop_reason != "tool_use":
return "".join(b.text for b in msg.content if b.type == "text")
results: list[dict[str, object]] = []
for block in msg.content:
if block.type == "tool_use":
handler = TOOLS[block.name]
# block.input est un dict déjà parsé par le SDK — JAMAIS json.loads() sur une string brute
output = handler(**block.input) # ← déballage **kwargs vers le handler
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})Le handler(**block.input) est le cœur : block.input est un dict (ex. {"city": "Tunis"}) et ** le déballe en arguments nommés vers get_weather(*, city=...). Les paramètres keyword-only de vos handlers (*,) protègent contre un schéma JSON malformé qui enverrait des positionnels.
Modes d'échec, performance, sécurité, observabilité
| Préoccupation | Le piège | La pratique senior |
|---|---|---|
| Exceptions | except Exception avale tout, y compris un KeyboardInterrupt mal placé et masque les bugs de programmation. | Capturez les exceptions typées du SDK : anthropic.RateLimitError (429, retriable), anthropic.APIStatusError, anthropic.BadRequestError (400, non retriable). Ne faites pas de string-matching sur le message. |
| Retries | Réimplémenter un backoff maison autour de chaque appel. | Le SDK retry déjà 429/5xx avec backoff exponentiel (max_retries=2 par défaut). Réglez-le sur le client (AsyncAnthropic(max_retries=4)) plutôt que d'empiler vos propres décorateurs. |
**kwargs opaque | def f(**kwargs) accepte silencieusement max_token (typo) et l'ignore. | Validez les clés attendues, ou n'utilisez **kwargs que pour le passe-plat vers une fonction qui, elle, validera. Pour une API publique, des paramètres explicites valent mieux. |
| Closures et fuites mémoire | Une closure capture un gros objet (client, buffer, modèle Pydantic volumineux) et le maintient en vie tant que la closure existe. | Conscience du graphe de références : une closure stockée dans un cache global retient tout son sac à dos. Capturez le minimum. |
| Late binding en async | Capturer i dans une boucle qui lance des asyncio.create_task → tous voient la dernière valeur. | Même correctif : lambda i=i: ou une factory. Encore plus piégeux en async car l'exécution est différée par construction. |
| Coût / observabilité | Ne pas mesurer les tokens. | Loggez message.usage (input/output/cache_read_input_tokens). À 5/25 USD par Mtok sur claude-opus-4-8, un agent bavard coûte vite. |
| Prompt caching | Recalculer le préfixe système à chaque tour. | Ajoutez cache_control: {"type": "ephemeral"} sur le dernier bloc système stable. Lectures de cache ≈ 0.1× le prix d'entrée. Vérifiez via usage.cache_read_input_tokens > 0. |
| Sécurité tool-use | Exécuter handler(**block.input) aveuglément (un outil delete_file déclenché par injection de prompt). | Gate les outils à effet de bord (validation, confirmation humaine) ; les handlers keyword-only + un schéma JSON strict limitent la surface d'attaque. |
🏋️ Exercices
Exercice 1 — compose typé (implémenter)
Objectif. Écrire compose(*funcs) qui renvoie une fonction appliquant les fonctions de droite à gauche : compose(f, g)(x) == f(g(x)). Doit accepter un nombre arbitraire de fonctions.
Indice/Solution. Une closure sur le tuple
funcs, unfunctools.reducesur les arguments inversés. Squelette :pythonimport functools from collections.abc import Callable def compose(*funcs: Callable[[object], object]) -> Callable[[object], object]: def inner(x: object) -> object: return functools.reduce(lambda acc, f: f(acc), reversed(funcs), x) return innerCas limite à gérer :
compose()sans argument doit renvoyer l'identité.
Exercice 2 — Décorateur @timed avec functools.wraps (implémenter)
Objectif. Un décorateur qui logge le temps d'exécution de n'importe quelle fonction (signature inconnue), préserve __name__/__doc__, et fonctionne aussi bien sur des fonctions sync qu'async.
Indice/Solution. Deux wrappers (un
def, unasync def) sélectionnés viainspect.iscoroutinefunction(fn).*args/**kwargspour le passe-plat,@functools.wraps(fn)sur chaque wrapper,time.perf_counter()pour la mesure. Le piège : oublier leawaitdans le wrapper async transforme la mesure en quasi-zéro.
Exercice 3 — Casser puis réparer le late binding (break-then-fix)
Objectif. On vous donne ce code qui doit créer trois middlewares loguant chacun leur index, mais qui logge 2, 2, 2. Diagnostiquez et corrigez sans changer le fait que c'est une list comprehension.
pythonmiddlewares = [lambda req: f"[{i}] {req}" for i in range(3)] print([m("GET") for m in middlewares]) # ['[2] GET', '[2] GET', '[2] GET']Indice/Solution. Late binding : les trois lambdas capturent la même
i. Fix :lambda req, i=i: f"[{i}] {req}". Sachez expliquer pourquoi l'argument par défaut résout le problème (évalué à la création de chaque lambda, donc fige la valeur).
Exercice 4 — Factory de client LLM avec retry sur exceptions typées (production-grade)
Objectif. Écrire make_resilient_completion(client, *, model, max_attempts) : une factory qui capture le client et renvoie une coroutine complete(prompt). Elle doit retry uniquement sur anthropic.RateLimitError et anthropic.InternalServerError, avec backoff exponentiel, et propager immédiatement anthropic.BadRequestError.
Indice/Solution. Closure sur
client/model/max_attempts. Bouclefor attempt in range(max_attempts);try/except (RateLimitError, InternalServerError)→await asyncio.sleep(base * 2**attempt); laissezBadRequestErrorremonter (ne la capturez pas). Utilisezthinking={"type": "adaptive"}etoutput_config={"effort": "high"}. Question bonus : pourquoi ne PAS implémenter ça quand le SDK retry déjà ? (Réponse : pour une logique métier — ex. dégrader versclaude-haiku-4-5après N échecs — sinon laissez le SDK faire.)
Exercice 5 — Mini-runner de tool-use avec registre de closures (production-grade)
Objectif. Construire un dispatcher d'outils où chaque outil est enregistré via un décorateur @tool(name=...) qui le range dans un registre global et capture sa description. Le runner boucle tant que stop_reason == "tool_use", appelle handler(**block.input), renvoie les tool_result.
Indice/Solution.
TOOLS: dict[str, Callable] = {};def tool(*, name): def deco(fn): TOOLS[name] = fn; return fn; return deco. Le décorateur est une factory de décorateur (trois niveaux de fonction — c'est normal). Pour la boucle, voir la section production ci-dessus. Cas limite à tester : un outil inconnu (KeyError) → renvoyez untool_resultavecis_error: trueplutôt que de crasher la boucle.
Exercice 6 — Streamer SSE qui survit à la déconnexion (break-then-fix)
Objectif. On vous donne un streamer qui n'utilise pas async with et laisse fuir des connexions HTTP quand le client ferme l'onglet. Reproduisez la fuite (avec un faux client), puis corrigez.
Indice/Solution. Le problème : sans
async with client.messages.stream(...) as stream:, uneasyncio.CancelledError(déclenchée par la déconnexion côté serveur web) interrompt la boucleasync forsans fermer le flux. Fix : envelopper dansasync with, qui appelle__aexit__(doncaclose()) même sur exception. Pour observer : comptez les connexions ouvertes avant/après une annulation simulée viatask.cancel().
🎤 En entretien
Q : Pourquoi def f(x, items=[]) est-il un anti-pattern, et que se passe-t-il exactement ? R : Le défaut [] est évalué une seule fois à la définition de la fonction, pas à chaque appel — la même liste est donc partagée et mutée entre appels. On utilise une sentinelle None puis items = items or [] (ou if items is None).
Q : Différence entre capturer une variable par valeur et par référence dans une closure ? R : Python capture par référence et lit la valeur au moment de l'appel (late binding) — d'où le bug classique des lambdas dans une boucle qui voient toutes la dernière valeur. PHP, lui, copiait par valeur avec use ($x). On fige une valeur en Python via un argument par défaut (lambda x=x:), évalué à la création.
Q : Quand utiliser *args/**kwargs plutôt que des paramètres explicites ? R : Pour les wrappers/décorateurs transparents qui ne connaissent pas la signature enveloppée, et pour le passe-plat vers une fonction qui validera. Pour une API publique, on préfère des paramètres explicites (meilleure découvrabilité, meilleur typage, erreurs claires) — **kwargs opaque masque les fautes de frappe.
Q : Dans une boucle de tool-use d'agent, pourquoi handler(**block.input) plutôt que de parser block.input à la main ? R : Le SDK Anthropic renvoie déjà block.input comme dict parsé — refaire un json.loads() sur une chaîne brute est à la fois redondant et fragile (les modèles Opus 4.x peuvent varier l'échappement Unicode/slash). Le ** déballe ce dict directement vers les paramètres nommés du handler ; coupler ça à des paramètres keyword-only (def handler(*, city)) protège contre un schéma malformé.