Skip to content

Messenger — bus, transports, workers

TL;DR — Symfony Messenger découple dispatch (envoyer un message) et handling (le traiter), via un bus + un transport. Tu peux le faire sync (in-process), ou async via Doctrine/AMQP/Redis. Les workers (messenger:consume) consomment les messages, retry exponentiel, et envoient les échecs dans un failure transport. Maîtriser Messenger = maîtriser idempotence, transactional outbox, middleware order et scaling horizontal.

🧠 Mental model

        ┌──────────────┐   dispatch(msg)   ┌──────────────┐
        │  Producer    │ ─────────────────▶│   Message    │
        │ (controller, │                   │     Bus      │
        │  service)    │                   └──────┬───────┘
        └──────────────┘                          │

                                     ┌────────────────────────┐
                                     │  Middleware stack      │
                                     │ ── add_bus_name        │
                                     │ ── dispatch_after_curr │
                                     │ ── failed_message      │
                                     │ ── send_message ────┐  │
                                     │ ── handle_message   │  │
                                     └─────────────────────│──┘

                                          routed by class  │

                                                  ┌────────────────┐
                                                  │   Transport    │
                                                  │ (doctrine/AMQP)│
                                                  └────────┬───────┘
                                                           │ persisted

            ┌─────────────┐ consume()        ┌───────────────────────┐
            │   Worker 1  │ ◀───────────────│ messenger_messages /  │
            │   Worker 2  │ ◀───────────────│ rabbitmq queue        │
            │   Worker N  │ ◀───────────────└───────────────────────┘
            └──────┬──────┘                          │ on failure
                   │ handle                          │  + retries
                   ▼                                 ▼
        ┌──────────────────┐               ┌───────────────────┐
        │ MessageHandler   │               │ failure_transport │
        └──────────────────┘               │ (dead letter)     │
                                           └───────────────────┘

Analogie : le bus, c'est la poste. Un message est une lettre (DTO immuable). Le handler, c'est le destinataire qui ouvre la lettre. Le transport, c'est le camion (Doctrine = camion du facteur local, AMQP = train de marchandises). Tant que tu n'ouvres pas la lettre (handle), tu n'as encore rien fait.

Le concept central : l'Envelope et les Stamps

Beaucoup de devs croient dispatcher un message. Faux : on dispatche une Envelope qui contient le message + une collection de Stamps (métadonnées immuables). C'est le modèle mental le plus rentable de Messenger : presque tout comportement avancé est un stamp.

StampPosé parRôle
BusNameStampadd_bus_name middlewarequel bus a traité l'envelope
SentStampsend_message middleware« ce message est parti sur un transport » → coupe le handling local
ReceivedStample receiver« ce message vient d'un transport » → autorise le handling
HandledStamphandle_messagerésultat de chaque handler (clé pour le Query Bus)
TransportMessageIdStample transportid natif (auto-increment Doctrine, delivery tag AMQP)
DelayStamptoiretarde la livraison de N ms (new DelayStamp(5000))
RedeliveryStampretry layercompteur de tentatives + timestamp
DispatchAfterCurrentBusStamptoibufferise jusqu'au commit du bus parent
ErrorDetailsStampfailure layerexception sérialisée pour la DLQ
php
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;

// Dispatch retardé de 5 s, sur un bus nommé, sans handler local
$bus->dispatch(new SendWelcomeEmail(42), [
    new DelayStamp(5000),
]);

// Lire un stamp côté handler/middleware
$retryCount = $envelope->last(RedeliveryStamp::class)?->getRetryCount() ?? 0;

Règle mentale : un message qui revient d'un transport porte un ReceivedStamp ; un message qui part en porte un SentStamp. C'est ce duo qui empêche un message d'être à la fois envoyé ET traité in-process — sauf si tu le routes vers sync:// ou pas du tout.

Sérialisation — la frontière physique

Dès qu'un transport est async, le message traverse un sérialiseur : il devient des octets dans une queue, puis est reconstruit dans un autre process (souvent un autre déploiement, une autre version du code).

  • Serializer par défaut (PhpSerializer) : serialize()/unserialize() PHP natif. Simple, mais couple producteur et consommateur à la même classe PHP. Un rename de classe ou un changement de signature readonly casse les messages en vol.
  • Serializer Symfony (messenger.transport.symfony_serializer) : JSON + groupes de normalisation. Interopérable (un consommateur Node/Go peut lire la queue), versionnable, mais tu gères toi-même le mapping classe ↔ header type. Recommandé dès qu'un transport est partagé entre services ou survit à un déploiement.
yaml
framework:
    messenger:
        serializer:
            default_serializer: messenger.transport.symfony_serializer
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                serializer: messenger.transport.symfony_serializer

Le piège staff : un message déjà en queue a été sérialisé avec l'ancien schéma. Si tu déploies une version qui renomme un champ, les messages en vol explosent au unserialize. Stratégie : champs additifs uniquement, ou versionne le message (SendWelcomeEmailV2) et garde le vieux handler pendant une fenêtre de drain.

🛠️ Code minimal

php
// src/Message/SendWelcomeEmail.php
namespace App\Message;

final readonly class SendWelcomeEmail
{
    public function __construct(
        public int $userId,
        public string $locale = 'fr',
    ) {}
}
php
// src/MessageHandler/SendWelcomeEmailHandler.php
namespace App\MessageHandler;

use App\Message\SendWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\Email;

#[AsMessageHandler]
final readonly class SendWelcomeEmailHandler
{
    public function __construct(
        private UserRepository $users,
        private MailerInterface $mailer,
    ) {}

    public function __invoke(SendWelcomeEmail $message): void
    {
        $user = $this->users->find($message->userId)
            ?? throw new \DomainException("User {$message->userId} not found");

        $this->mailer->send(
            (new Email())
                ->to($user->getEmail())
                ->subject('Welcome')
                ->text('Bienvenue !')
        );
    }
}
yaml
# config/packages/messenger.yaml
framework:
    messenger:
        failure_transport: failed

        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%' # doctrine://default?queue_name=async
                retry_strategy:
                    max_retries: 3
                    delay: 1000        # ms
                    multiplier: 2      # 1s, 2s, 4s
                    max_delay: 60000
                options:
                    use_notify: true
                    check_delayed_interval: 60000

            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: high
                retry_strategy: { max_retries: 5, delay: 500, multiplier: 3 }

            failed:
                dsn: 'doctrine://default?queue_name=failed'

        routing:
            'App\Message\SendWelcomeEmail': async
            'App\Message\ChargePayment': async_priority_high
bash
# Dispatch
bin/console messenger:consume async async_priority_high -vv \
    --limit=200 --time-limit=3600 --memory-limit=256M
php
// In a controller
public function register(MessageBusInterface $bus, ...): Response
{
    $user = $this->createUser(...);
    $bus->dispatch(new SendWelcomeEmail($user->getId()));
    return $this->redirectToRoute('home');
}

Middleware stack — l'ordre compte

php
// config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command.bus:
                middleware:
                    - validation
                    - doctrine_transaction         # wraps handler in DB transaction
                    - App\Middleware\TracingMiddleware
            event.bus:
                default_middleware:
                    allow_no_handlers: true        # events may have 0 handlers
                middleware:
                    - dispatch_after_current_bus   # delay event dispatch until command bus commits
php
// src/Middleware/TracingMiddleware.php
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

final readonly class TracingMiddleware implements MiddlewareInterface
{
    public function __construct(private TracerInterface $tracer) {}

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $span = $this->tracer->start($envelope->getMessage()::class);
        try {
            return $stack->next()->handle($envelope, $stack);
        } finally {
            $span->end();
        }
    }
}

Trois middlewares critiques :

  • DoctrinePingConnectionMiddleware : ping la connexion DB avant chaque handler. Indispensable pour workers long-running (sinon "MySQL server has gone away").
  • DoctrineCloseConnectionMiddleware : ferme la connexion si une erreur survient pour forcer un reconnect propre.
  • DispatchAfterCurrentBusMiddleware : permet de retarder le dispatch d'un message enveloppé avec DispatchAfterCurrentBusStamp jusqu'à ce que le bus parent ait terminé. Le mécanisme exact : pendant handle() du bus parent, les dispatches enfants sont buffered, et flushés après le retour réussi.

Ordre canonique de la pile handler (config par défaut, du producteur vers le handler) : add_bus_namereject_redelivered_messagedispatch_after_current_busfailed_messagesend_messagehandle_message. Tout middleware métier (tracing, rate-limit, transaction) doit se placer avant send_message s'il doit tourner côté producteur, et est ré-exécuté côté consommateur car le worker rejoue toute la pile sur le message reçu. doctrine_transaction doit envelopper handle_message, donc se place tard.

Cycle de vie d'un message : ack, nack, retry

Côté worker, un message reçu n'a que trois issues. Les comprendre, c'est comprendre la garantie at-least-once.

        receiver->get()  ──▶  Envelope (+ReceivedStamp)

                          worker handle()
            ┌──────────────────────┼──────────────────────┐
            ▼                      ▼                       ▼
        succès               exception                exception
            │             (retryable)              (unrecoverable)
            ▼                      ▼                       ▼
        receiver->ack()    receiver->reject()      receiver->reject()
        (supprime de la    + re-dispatch avec      + envoi direct vers
         queue)             DelayStamp(backoff)     failure_transport
                            jusqu'à max_retries,     (pas de retry)
                            puis failure_transport
  • ack = succès → le message est retiré du transport. Sur AMQP c'est un vrai basic.ack ; sur Doctrine, un DELETE.
  • reject + retry : si RetryStrategy autorise encore une tentative, le message est re-dispatché sur le même transport avec un DelayStamp = delay * multiplier^(n-1) (capé par max_delay) et un RedeliveryStamp incrémenté. Le reject_redelivered_message middleware empêche un double-handling si le broker re-livre lui-même (ex : worker tué entre handle et ack).
  • reject sans retry : si on a épuisé les tentatives OU si l'exception est une UnrecoverableMessageHandlingException, le message va dans le failure_transport (DLQ).

Le point qui pique en entretien : entre handle() réussi et ack(), si le worker est SIGKILL-é, le message n'est pas acké → il sera re-livré → ton handler tourne deux fois. D'où l'idempotence non négociable. ack/nack ne sont jamais atomiques avec ton effet de bord métier.

Retry custom & exclusion d'exceptions

Le backoff exponentiel par défaut suffit rarement en prod. On veut souvent du jitter (éviter le thundering herd) et exclure certaines exceptions du retry.

php
// src/Messenger/JitteredRetryStrategy.php
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;

final class JitteredRetryStrategy implements RetryStrategyInterface
{
    public function __construct(
        private int $maxRetries = 5,
        private int $baseDelayMs = 1000,
        private float $multiplier = 2.0,
    ) {}

    public function isRetryable(Envelope $message, ?\Throwable $throwable = null): bool
    {
        $retries = $this->retryCount($message);
        return $retries < $this->maxRetries;
    }

    public function getWaitingTime(Envelope $message, ?\Throwable $throwable = null): int
    {
        $retries = $this->retryCount($message);
        $delay = (int) ($this->baseDelayMs * $this->multiplier ** $retries);
        // jitter ±20 % pour désynchroniser les workers
        return $delay + random_int(-(int) ($delay * 0.2), (int) ($delay * 0.2));
    }

    private function retryCount(Envelope $envelope): int
    {
        return $envelope->last(RedeliveryStamp::class)?->getRetryCount() ?? 0;
    }
}
yaml
framework:
    messenger:
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    service: App\Messenger\JitteredRetryStrategy

Pour exclure une exception du retry sans la transformer en UnrecoverableMessageHandlingException à la main, l'option retry_strategy n'aide pas — c'est le rôle de l'exception elle-même : lance UnrecoverableMessageHandlingException (ou implémente \Symfony\Component\Messenger\Exception\HandlerFailedException filtering via un WorkerMessageFailedEvent listener). À l'inverse, RecoverableMessageHandlingException force un retry même si l'exception serait normalement fatale.

RateLimiter middleware — throttler un consommateur

Pour respecter le quota d'une API tierce (Stripe, un partenaire logistique, une API LLM), on ne veut pas bloquer le worker mais retarder le message. Le middleware natif RateLimiterMiddleware réserve un token et, s'il faut attendre, dort dans le worker — simple mais ça gèle un process. Variante non-bloquante : ré-injecter le message avec un DelayStamp égal au temps d'attente.

php
// config/packages/messenger.yaml
framework:
    messenger:
        transports:
            llm_calls:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                rate_limiter: anthropic_api   # défini sous framework.rate_limiter
yaml
framework:
    rate_limiter:
        anthropic_api:
            policy: 'token_bucket'
            limit: 50            # 50 req
            rate: { interval: '1 minute', amount: 50 }

🎯 Patterns courants

  1. Transactional Outbox (Doctrine transport) — le transport Doctrine écrit dans la même DB que ton aggregate. Wrappe persist(User) + dispatch(SendWelcomeEmail) dans une seule transaction. Si le commit échoue, le message disparaît avec. Combine avec DispatchAfterCurrentBusMiddleware pour ne dispatcher qu'après commit du bus parent.
  2. Command Bus / Event Bus / Query Bus — déclare plusieurs bus (command.bus, event.bus, query.bus) avec leurs propres middlewares. Pattern CQRS-ready.
  3. Idempotent handlers — chaque message doit avoir une messageId (UUID). Stocke processed_messages(id, processed_at) et INSERT ... ON CONFLICT DO NOTHING avant le travail réel. Indispensable car les transports garantissent at-least-once, jamais exactly-once.
  4. Priority queues — sépare async_high, async_default, async_low ; lance des workers dédiés par criticité.
  5. Batch handlerBatchHandlerInterface (6.x) pour grouper des messages (ex : indexation Elasticsearch toutes les 100 unités).
  6. Sagas — un handler dispatche d'autres messages pour orchestrer un workflow long (ex : OrderCreatedReserveStockChargeCardShipOrder). Combine avec Workflow pour la machine d'état.

🔄 Versions

  • 5.4 (LTS) : MessageHandlerInterface + __invoke. Attribut PHP 8 #[AsMessageHandler] disponible. failure_transport global ou par transport. WorkerRunningEvent, WorkerMessageFailedEvent.
  • 6.4 (LTS) : BatchHandlerInterface mature. RateLimiterMiddleware natif. messenger:failed:show supporte --stats. Doctrine transport peut utiliser LISTEN/NOTIFY PostgreSQL (use_notify: true) → polling quasi temps réel. KeepaliveReceiverInterface pour les long handlers.
  • 7.x : nettoyage des deprecations (HandlerDescriptor, options de transport), SendFailedMessageForRetryListener retravaillé. Meilleure intégration Scheduler (les RecurringMessage passent par Messenger). --keepalive flag sur messenger:consume pour les transports compatibles.

⚠️ Pitfalls

  1. Pas idempotent — un handler qui débite une carte deux fois car le worker a crashé après charge() mais avant ack. Toujours coder pour "ce handler sera rejoué un jour".
  2. Doctrine connection killed — connexion DB perdue après idle (MySQL wait_timeout). Active DoctrinePingConnectionMiddleware + DoctrineCloseConnectionMiddleware dans la pile handler.
  3. Dispatch avant commit$em->persist($user); $bus->dispatch(new Welcome($user->getId())); puis exception → message dispatché, user pas en DB → handler crash. Solution : DispatchAfterCurrentBusMiddleware (qui exige wrap_in_envelope via DispatchAfterCurrentBusStamp) OU transactional outbox Doctrine.
  4. Worker fuites mémoire — un worker long-running garde tout en mémoire (Doctrine UoW, logs Monolog). Toujours --memory-limit + --time-limit + Supervisor qui relance.
  5. Pas de DLQ surveilléefailed se remplit, personne ne regarde. Monitore messenger:failed:show ou expose une metric Prometheus.
  6. Retry sur erreur métier — un UserNotFoundException est retryé 3× pour rien. Implémente UnrecoverableMessageHandlingException pour court-circuiter le retry et envoyer direct en DLQ.
  7. Sérialisation cassée — un message avec DateTimeImmutable non sérialisable, ou un objet Doctrine entier dans le message. Règle d'or : message = scalaires + IDs, jamais d'entités.
  8. Ordre des middlewares — si tu mets ton custom middleware après send_message, il ne s'exécute que côté handler. À comprendre pour le logging / tracing.

🧪 Testing

php
// tests/Functional/RegisterTest.php
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\Transport\InMemoryTransport;

final class RegisterTest extends KernelTestCase
{
    public function testItDispatchesWelcomeEmail(): void
    {
        self::bootKernel();
        $bus = self::getContainer()->get('messenger.bus.default');

        $bus->dispatch(new SendWelcomeEmail(42));

        /** @var InMemoryTransport $transport */
        $transport = self::getContainer()->get('messenger.transport.async');
        $this->assertCount(1, $transport->getSent());
        $env = $transport->getSent()[0];
        $this->assertSame(42, $env->getMessage()->userId);
    }
}
yaml
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            async: 'in-memory://'
            async_priority_high: 'in-memory://'
            failed: 'in-memory://'

Test du handler isolé :

php
public function testHandlerSendsEmail(): void
{
    $mailer = $this->createMock(MailerInterface::class);
    $mailer->expects($this->once())->method('send');

    $handler = new SendWelcomeEmailHandler($users, $mailer);
    $handler(new SendWelcomeEmail(1));
}

Test d'un worker complet : Symfony\Component\Messenger\Worker avec un InMemoryTransport préchargé via ->send(), puis $worker->run(['sleep' => 0]) + StopWorkerOnMessageLimitListener.

🎬 Cas d'usage concrets

Scénario 1 — Traitement asynchrone de factures côté SaaS comptable Pennylane-like

Un SaaS comptable français traite quotidiennement 200 000 factures fournisseurs déposées via email, drag-and-drop ou connecteurs (Stripe, Shopify, Qonto). Chaque facture déclenche un pipeline en quatre étapes : OCR via service externe Mindee, extraction structurée, matching avec écritures existantes, et enregistrement comptable. Symfony Messenger orchestre ce pipeline sur un transport Redis Streams. Le message OcrFacture est consommé par un worker dédié avec timeout 90 secondes, retry exponentiel (3 tentatives, base 60s avec jitter ±20 %) et déduplication via unique_message_id basé sur le hash SHA-256 du fichier source. Les factures qui échouent après retries partent en queue failed_ocr avec inspection humaine en backoffice. Le pipeline est instrumenté avec OpenTelemetry : chaque message trace son temps d'attente, son temps d'exécution, et le temps de chaque étape. L'équipe a calibré huit workers OCR (CPU intensifs) et seize workers de matching (DB intensifs) sur des nœuds Kubernetes distincts pour absorber les pics de fin de mois.

Scénario 2 — Ingestion de contrats massifs dans le DMS du cabinet juridique

Le cabinet d'avocats reçoit régulièrement des lots de contrats (typiquement 500 à 5 000 PDF) lors de due diligences M&A. L'ingestion synchrone bloquerait l'interface pendant des heures. L'équipe a découpé en chaîne de messages : IngestionLot (créé après upload ZIP) explose en IngestionContrat (un par fichier), chacun extrait métadonnées, indexe le contenu dans Elasticsearch, génère un thumbnail, et déclenche AnalyseContratIA (LLM Claude via Anthropic API) avec rate-limit ajusté (50 req/min) via un middleware RateLimitedHandler. Les messages utilisent le transport Doctrine en outbox pour garantir l'atomicité avec la création du dossier en base. La progression du lot est visible en temps réel grâce à un compteur Redis incrémenté par chaque handler et streamé en SSE au navigateur de l'avocat. Les échecs partent en DLQ avec une fenêtre de 30 jours pour rejeu manuel.

Scénario 3 — Webhooks sortants e-commerce vers partenaires logistiques

La marketplace de mode envoie chaque commande validée à un ou plusieurs partenaires logistiques (Colissimo, Chronopost, Mondial Relay) sous forme de webhook HTTP signé HMAC. La fiabilité est critique : un webhook perdu signifie une commande non préparée. Messenger orchestre l'envoi via un message EnvoiWebhook consommé par un handler qui appelle l'API du partenaire avec timeout 10 s. La stratégie retry est agressive (5 tentatives, backoff exponentiel 1m, 5m, 30m, 2h, 12h) et permet d'absorber une panne partenaire de 24 h. Les webhooks 4xx (clients) ne sont pas retentés ; les 5xx et timeouts le sont. Un middleware IdempotencyMiddleware ajoute un en-tête Idempotency-Key (UUID v7) que les partenaires conformes utilisent pour dédupliquer. Le rate-limit par partenaire (par exemple 200 req/min sur Chronopost) est appliqué via Symfony RateLimiter et un middleware Messenger qui retarde les messages sans bloquer le worker. La DLQ est monitorée par Sentry et un dashboard Grafana avec alerte PagerDuty au-delà de 50 messages.

🛠️ Exemple end-to-end

Use case : envoi de webhook sortant pour commande validée, avec signature HMAC, retry exponentiel, et idempotency-key.

php
<?php
// src/Application/Webhook/Message/EnvoiWebhook.php
declare(strict_types=1);

namespace App\Application\Webhook\Message;

use Symfony\Component\Uid\Uuid;

final readonly class EnvoiWebhook
{
    public function __construct(
        public string $idempotencyKey,
        public string $partenaire,
        public string $endpoint,
        public array $payload,
    ) {}

    public static function pourCommande(string $partenaire, string $endpoint, array $payload): self
    {
        return new self(Uuid::v7()->toRfc4122(), $partenaire, $endpoint, $payload);
    }
}

// src/Application/Webhook/Handler/EnvoiWebhookHandler.php
namespace App\Application\Webhook\Handler;

use App\Application\Webhook\Message\EnvoiWebhook;
use App\Infrastructure\Webhook\SecretResolver;
use Psr\Log\LoggerInterface;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;

#[AsMessageHandler]
final readonly class EnvoiWebhookHandler
{
    public function __construct(
        private HttpClientInterface $http,
        private SecretResolver $secrets,
        private LoggerInterface $logger,
    ) {}

    public function __invoke(EnvoiWebhook $msg): void
    {
        $body = json_encode($msg->payload, JSON_THROW_ON_ERROR);
        $signature = hash_hmac('sha256', $body, $this->secrets->pour($msg->partenaire));
        try {
            $response = $this->http->request('POST', $msg->endpoint, [
                'headers' => [
                    'Content-Type' => 'application/json',
                    'Idempotency-Key' => $msg->idempotencyKey,
                    'X-Signature' => $signature,
                ],
                'body' => $body,
                'timeout' => 10.0,
            ]);
            $status = $response->getStatusCode();
            if ($status >= 500) {
                throw new RecoverableMessageHandlingException("Partenaire 5xx ({$status})");
            }
            if ($status >= 400) {
                $this->logger->error('Webhook 4xx ignoré', ['key' => $msg->idempotencyKey, 'status' => $status]);
                throw new UnrecoverableMessageHandlingException("Partenaire 4xx ({$status})");
            }
        } catch (TransportException $e) {
            throw new RecoverableMessageHandlingException('Timeout réseau', previous: $e);
        }
    }
}
yaml
# config/packages/messenger.yaml (extrait)
framework:
    messenger:
        transports:
            webhooks:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 5
                    delay: 60000           # 1 min
                    multiplier: 5          # 1m, 5m, 25m, ~2h, ~10h (capé)
                    max_delay: 43200000    # 12 h
                options:
                    queue_name: webhooks
            webhooks_failed:
                dsn: 'doctrine://default?queue_name=webhooks_failed'
        routing:
            App\Application\Webhook\Message\EnvoiWebhook: webhooks
        failure_transport: webhooks_failed

Pourquoi ce design tient en prod : les 4xx lèvent UnrecoverableMessageHandlingException (pas de retry inutile sur une erreur cliente définitive), les 5xx et timeouts lèvent RecoverableMessageHandlingException (retry avec backoff long, capable d'absorber une panne partenaire de 12 h). L'Idempotency-Key (UUID v7, donc trié temporellement) est généré une seule fois à la construction du message — il survit donc aux retries, ce qui permet au partenaire de dédupliquer même si on renvoie le même webhook 5 fois.


🔁 Quand utiliser / éviter

Utiliser :

  • Traitement asynchrone (email, PDF, notifications push, webhooks sortants).
  • Découplage temporel (le user n'attend pas).
  • Pic de charge à lisser (file qui se vide à débit constant).
  • Communication inter-services (microservices via AMQP).
  • Tâches retryables avec backoff.

Éviter :

  • Besoin de cohérence forte immédiate (utilisateur attend le résultat).
  • Volumétrie minuscule et synchrone simple → un service direct suffit.
  • Workflow complexe multi-étapes avec rollback → préfère Workflow component + Messenger.
  • Streaming temps réel (utilise Mercure / WebSocket).

🚀 Scaling workers en production

bash
# /etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /app/bin/console messenger:consume async --time-limit=3600 --memory-limit=256M
user=www-data
numprocs=8                     # 8 worker processes per machine
autostart=true
autorestart=true
startsecs=0
stopwaitsecs=30                # SIGTERM grace period
killasgroup=true
stopasgroup=true
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/messenger.%(process_num)02d.log

Règles de scaling :

  • CPU-bound handlers (calcul, compression) → workers = nb cores.
  • I/O-bound handlers (HTTP externe, mail) → workers = 4× à 10× cores (la plupart attendent).
  • Mixed → mesurer avec messenger:stats + Prometheus.
  • Toujours un --time-limit (recycle pour libérer mémoire) et un --memory-limit (filet de sécurité).
  • Kubernetes : Deployment + HPA basé sur la queue depth (custom metric exporté). Pour Doctrine transport, requête SELECT COUNT(*) FROM messenger_messages WHERE queue_name = 'async' AND delivered_at IS NULL.

Observabilité — les 4 signaux qui comptent

Un staff engineer n'observe pas « le worker tourne », il observe des SLI dérivés des événements Messenger. Le worker émet WorkerMessageReceivedEvent, WorkerMessageHandledEvent, WorkerMessageFailedEvent, WorkerMessageRetriedEvent, WorkerRunningEvent, WorkerStoppedEvent — branche-les sur tes métriques.

SignalMétriquePourquoi
Backlogmessenger_queue_depth{queue} (gauge)détecte un débit consommateur < producteur avant l'incident
Lag / âgeâge du plus vieux message non livré (gauge)un backlog faible mais vieux = workers bloqués sur un message poison
Throughput & latencemessages_handled_total, histogramme du temps de handlingdimensionner les workers, détecter une régression handler
Échecsmessages_failed_total{class}, taille de la DLQalerte PagerDuty si DLQ > seuil
php
// src/EventListener/MessengerMetricsSubscriber.php
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;

final readonly class MessengerMetricsSubscriber implements EventSubscriberInterface
{
    public function __construct(private MetricsRegistry $metrics) {}

    public static function getSubscribedEvents(): array
    {
        return [
            WorkerMessageHandledEvent::class => 'onHandled',
            WorkerMessageFailedEvent::class  => 'onFailed',
        ];
    }

    public function onHandled(WorkerMessageHandledEvent $e): void
    {
        $this->metrics->counter('messages_handled_total', [
            'class' => $e->getEnvelope()->getMessage()::class,
        ])->inc();
    }

    public function onFailed(WorkerMessageFailedEvent $e): void
    {
        // willRetry() distingue un échec transitoire d'un envoi en DLQ
        if (!$e->willRetry()) {
            $this->metrics->counter('messages_dead_lettered_total', [
                'class' => $e->getEnvelope()->getMessage()::class,
            ])->inc();
        }
    }
}

Graceful shutdown : Messenger écoute SIGTERM/SIGINT et termine le message en cours avant de s'arrêter. Sous Kubernetes, garde terminationGracePeriodSeconds ≥ ton handler le plus long, sinon SIGKILL coupe au milieu d'un handle → re-livraison. Sous Supervisor, stopwaitsecs=30 joue le même rôle.

🏋️ Exercices

Code dans un projet Symfony 7.x jetable (symfony new msg-lab --webapp). Chaque exercice escalade : implémenter → durcir pour la prod → casser puis réparer.

1. Pipeline async basique — Objectif : router un message vers un transport Doctrine et le consommer.

Crée GenerateInvoicePdf(int $invoiceId), un handler qui logge "PDF for {id}", route-le vers un transport doctrine://default, dispatch depuis une commande console, puis consomme avec messenger:consume async -vv. Vérifie l'insertion dans messenger_messages avant le consume. Indice : #[AsMessageHandler] + routing: dans messenger.yaml. Avant le consume, SELECT * FROM messenger_messages doit montrer 1 ligne avec delivered_at IS NULL.

2. Idempotence at-least-once — Objectif : prouver puis empêcher le double-handling.

Ajoute un sleep(2) dans le handler de l'ex.1, lance le consume, et kill -9 le worker pendant le sleep. Constate au re-consume que le message est rejoué. Puis rends le handler idempotent via une table processed_messages(message_id PRIMARY KEY, processed_at) et un INSERT ... ON CONFLICT DO NOTHING en début de handler, en propageant un Uuid::v7() dans le message. Indice : si l'INSERT affecte 0 ligne, return immédiatement. Le message_id doit voyager dans le DTO, pas être regénéré dans le handler.

3. Retry intelligent + DLQ — Objectif : distinguer erreur transitoire et erreur fatale.

Handler appelant une fausse API qui renvoie aléatoirement 200 / 503 / 422. Mappe 503 → RecoverableMessageHandlingException, 422 → UnrecoverableMessageHandlingException, configure max_retries: 3 + failure_transport. Vérifie qu'un 422 part immédiatement en DLQ (0 retry) et qu'un 503 est retenté 3× avant la DLQ. Inspecte avec messenger:failed:show puis rejoue un message avec messenger:failed:retry. Indice : compte les retries via RedeliveryStamp loggé dans le handler ; le 422 ne doit jamais incrémenter le compteur.

4. Transactional outbox — Objectif : garantir l'atomicité entre écriture DB et dispatch.

Dans un controller : persist(Order) + dispatch(OrderPlaced). Introduis une exception entre le persist et le flush. Sans protection, observe le message dispatché alors que l'Order n'existe pas. Corrige en activant dispatch_after_current_bus sur le bus + DispatchAfterCurrentBusStamp, et place le transport Doctrine dans la même connexion. Prouve que sur exception, ni l'Order ni le message ne persistent. Indice : le middleware bufferise le dispatch jusqu'au retour réussi du handler/bus parent ; combine avec doctrine_transaction sur le command bus.

5. Worker production-grade — Objectif : un worker qui ne tombe jamais et qui s'observe.

Configure Supervisor (numprocs=4, --time-limit=3600 --memory-limit=256M), branche le MessengerMetricsSubscriber ci-dessus, expose messenger_queue_depth et la taille de DLQ, et ajoute DoctrinePingConnectionMiddleware. Casse-le : baisse wait_timeout MySQL à 5 s, laisse le worker idle 10 s, dispatch un message → reproduis « MySQL server has gone away », puis répare via le ping middleware. Indice : sans ping, la connexion Doctrine du worker meurt pendant l'idle ; le ping/reconnect avant chaque handler restaure la connexion de façon transparente.

6. Sérialisation cross-version (casse-puis-répare) — Objectif : survivre à un déploiement avec des messages en vol.

Dispatch 100 SendNotification(string $email) sur un transport Doctrine sans les consommer. Déploie une « v2 » qui renomme $email en $recipient (readonly). Lance le consume → observe l'explosion au unserialize. Répare de deux façons et compare : (a) bascule sur le symfony_serializer JSON + champ additif rétro-compatible ; (b) introduis SendNotificationV2 avec son propre handler et garde l'ancien pendant le drain. Indice : le PhpSerializer couple producteur et consommateur à la signature exacte de la classe ; le JSON serializer tolère un champ manquant si tu fournis une valeur par défaut.

🎤 En entretien

Q: Messenger garantit-il exactly-once ? Sinon, comment vit-on avec ? Non — les transports sont at-least-once. Entre un handle() réussi et le ack(), un crash worker re-livre le message. On code donc des handlers idempotents (dédup par messageId persisté avec contrainte d'unicité) ; l'exactly-once « logique » se construit par-dessus l'at-least-once du transport, jamais l'inverse.

Q: J'ai persist($user) puis dispatch(WelcomeEmail($user->getId())). Quel bug, quelle fix ? Race classique : si le dispatch part avant le commit DB (transport sync, ou flush ultérieur qui échoue), le handler peut tourner sur un user inexistant — ou le message est envoyé alors que la transaction rollback. Fix : DispatchAfterCurrentBusMiddleware (bufferise jusqu'au commit du bus) ou transactional outbox Doctrine (message écrit dans la même transaction que l'aggregate).

Q: Pourquoi ne jamais mettre une entité Doctrine dans un message ? Le message traverse un sérialiseur et un délai temporel : l'entité serait sérialisée détachée (relations lazy cassées, état périmé au moment du handling, version de schéma différente). Règle d'or : message = scalaires + IDs, le handler recharge l'entité fraîche depuis le repository. Bonus : ça réduit la taille de la payload en queue.

Q: Comment scaler des workers I/O-bound vs CPU-bound, et comment piloter l'autoscaling ? CPU-bound (compression, calcul) : workers ≈ nb de cores, au-delà tu contextes-switches pour rien. I/O-bound (HTTP, mail) : 4×–10× les cores car les process attendent le réseau. Pilotage : HPA sur la queue depth (et l'âge du plus vieux message), pas sur le CPU — un backlog qui grossit est le vrai signal de sous-capacité, le CPU peut rester bas pendant que la latence explose.

🔗 Liens

Bibliothèque tech perso — Achref