> ## Documentation Index
> Fetch the complete documentation index at: https://doc.fluximmo.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Recevoir des webhooks à fort volume — bonnes pratiques

> Architecture queue + worker async, ack rapide, idempotence, retry/DLX, observability.

## Goal

Architecturer un endpoint webhook capable d'absorber des **pics de plusieurs centaines/milliers de webhooks par minute** sans en perdre un seul, avec ack \< 1 s et traitement métier asynchrone résilient.

## Scénario

Vous êtes un client à fort volume — chasseur national, marketplace, agrégateur — et vous recevez en continu des webhooks `advert` ou `property` de Fluximmo, parfois en pic (rebond après une coupure, seed initial, période de marché chargée). Votre handler actuel fait du sync (DB write, fetch externe, scoring ML) : il timeout, Fluximmo retry, vous dropez du signal. Ce playbook donne l'architecture standard pour résoudre ça **une bonne fois pour toutes**.

## Étapes

<Steps>
  <Step title="Comprendre le problème">
    Quand votre handler webhook fait du **traitement synchrone lourd** (insertion DB + fetch d'une API tierce + inférence ML), il dépasse facilement 5–10 s sur les pics. Côté Fluximmo :

    * **Politique de retry** : 2 retries immédiats puis 10 retries espacés de 5 minutes (\~50 min de fenêtre totale).
    * **Codes acceptés** : `200`, `201`, `202`, `203`, `204`, `205`. Tout le reste = échec → retry.
    * Si après les 12 tentatives votre endpoint n'a toujours pas répondu 2xx, le webhook est **droppé**. Pas de re-livraison ultérieure.

    **Conséquence concrète** : 1 webhook perdu = 1 advert/property non synchronisé chez vous. À l'échelle, c'est un trou dans la base.

    C'est un **problème architectural**, pas un problème de code applicatif. La bonne réponse est de **découpler la réception du traitement**.
  </Step>

  <Step title="Architecture recommandée">
    Le pattern standard est : **endpoint léger qui ack en \< 1 s + queue + worker async**.

    ```mermaid theme={null}
    flowchart TD
      F["Fluximmo"] -->|POST webhook| EP["Endpoint webhook"]
      EP -->|"ack 200 < 1s"| F
      EP -->|push raw payload| Q[("Queue<br/>SQS / RabbitMQ /<br/>Redis Streams / Kafka")]
      Q -->|consume| W["Worker(s) async"]
      W -->|"DB upsert,<br/>fetch externe, ML"| Done["ack message"]
      W -- "échecs × N" --> DLQ[("DLQ")]
    ```

    L'endpoint ne fait **que** trois choses : valider le header de sécurité, pousser le raw payload dans la queue, répondre 200. Tout le métier vit dans le worker.
  </Step>

  <Step title="Choisir la queue">
    | Queue              | Quand l'utiliser                         | Effort d'intégration |
    | ------------------ | ---------------------------------------- | -------------------- |
    | **AWS SQS**        | Déjà sur AWS, simple, scalable, managed  | Faible               |
    | **RabbitMQ**       | On-prem ou multi-cloud, routing avancé   | Moyen                |
    | **Redis Streams**  | Déjà du Redis, volume modéré             | Faible               |
    | **Google Pub/Sub** | Déjà sur GCP                             | Faible               |
    | **Kafka**          | Très haut volume + besoin de replay long | Élevé                |

    Pour 80 % des clients : **SQS standard** ou **RabbitMQ** suffisent largement. Ne sur-architecturez pas avec Kafka si vous n'avez pas de besoin de replay long terme.
  </Step>

  <Step title="Endpoint webhook minimal">
    Le seul rôle : valider la signature, pousser dans la queue, répondre 200.

    <CodeGroup>
      ```python Python (FastAPI) theme={null}
      from fastapi import FastAPI, Request, Response, HTTPException
      import boto3, os

      app = FastAPI()
      sqs = boto3.client("sqs")
      QUEUE_URL = os.environ["SQS_QUEUE_URL"]
      WEBHOOK_KEY = os.environ["FLUXIMMO_WEBHOOK_KEY"]

      @app.post("/webhooks/fluximmo")
      async def receive(request: Request):
          if request.headers.get("x-webhook-key") != WEBHOOK_KEY:
              raise HTTPException(401)
          body = await request.body()
          sqs.send_message(QueueUrl=QUEUE_URL, MessageBody=body.decode("utf-8"))
          return Response(status_code=200)
      ```

      ```javascript Node (Express) theme={null}
      import express from "express";
      import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";

      const app = express();
      app.use(express.raw({ type: "*/*" }));
      const sqs = new SQSClient({});
      const QUEUE_URL = process.env.SQS_QUEUE_URL;
      const WEBHOOK_KEY = process.env.FLUXIMMO_WEBHOOK_KEY;

      app.post("/webhooks/fluximmo", async (req, res) => {
        if (req.header("x-webhook-key") !== WEBHOOK_KEY) return res.sendStatus(401);
        await sqs.send(new SendMessageCommand({
          QueueUrl: QUEUE_URL,
          MessageBody: req.body.toString("utf-8"),
        }));
        res.sendStatus(200);
      });

      app.listen(8080);
      ```

      ```bash curl (test endpoint) theme={null}
      curl -X POST https://votre-app.com/webhooks/fluximmo \
        -H "x-webhook-key: <votre-secret>" \
        -H "Content-Type: application/json" \
        -d '{"data":{"created":[{"alert_id":"alr_demo","adverts":[{"flxId":"adv_demo"}]}],"updated":[]}}'
      ```
    </CodeGroup>

    <Tip>
      Le code ci-dessus tient en \~15 lignes. C'est **voulu**. Plus l'endpoint est mince, plus il est rapide et fiable.
    </Tip>
  </Step>

  <Step title="Worker async">
    Le worker consomme la queue et exécute la logique métier. Il peut être déployé en N instances pour scaler horizontalement.

    <CodeGroup>
      ```python Python theme={null}
      import boto3, json, logging

      sqs = boto3.client("sqs")
      QUEUE_URL = "..."

      def handle(body: dict):
          # Webhook canonique : { data: { created, updated } }
          for entry in body.get("data", {}).get("created", []):
              for advert in entry.get("adverts", []):
                  db.upsert_advert_full(advert["flxId"], advert, alert_id=entry["alert_id"])
          for entry in body.get("data", {}).get("updated", []):
              for advert in entry.get("adverts", []):
                  # DTO réduit : flxId, currentPrice, isOnline → diff vs état local
                  db.update_advert_partial(advert["flxId"], advert, alert_id=entry["alert_id"])

      def consume_loop():
          while True:
              resp = sqs.receive_message(
                  QueueUrl=QUEUE_URL,
                  MaxNumberOfMessages=10,
                  WaitTimeSeconds=20,  # long polling
              )
              for msg in resp.get("Messages", []):
                  try:
                      payload = json.loads(msg["Body"])
                      handle(payload)
                      sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=msg["ReceiptHandle"])
                  except Exception as e:
                      logging.exception("worker failed: %s", e)
                      # ne pas delete → SQS retry → après N échecs → DLQ
      ```

      ```javascript Node theme={null}
      import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from "@aws-sdk/client-sqs";

      const sqs = new SQSClient({});
      const QUEUE_URL = process.env.SQS_QUEUE_URL;

      async function handle(body) {
        for (const entry of body?.data?.created ?? []) {
          for (const advert of entry.adverts ?? []) {
            await db.upsertAdvertFull(advert.flxId, advert, entry.alert_id);
          }
        }
        for (const entry of body?.data?.updated ?? []) {
          for (const advert of entry.adverts ?? []) {
            // DTO réduit : flxId, currentPrice, isOnline → diff vs état local
            await db.updateAdvertPartial(advert.flxId, advert, entry.alert_id);
          }
        }
      }

      async function loop() {
        while (true) {
          const { Messages = [] } = await sqs.send(new ReceiveMessageCommand({
            QueueUrl: QUEUE_URL,
            MaxNumberOfMessages: 10,
            WaitTimeSeconds: 20,
          }));
          for (const msg of Messages) {
            try {
              await handle(JSON.parse(msg.Body));
              await sqs.send(new DeleteMessageCommand({
                QueueUrl: QUEUE_URL,
                ReceiptHandle: msg.ReceiptHandle,
              }));
            } catch (e) {
              console.error("worker failed", e);
            }
          }
        }
      }
      loop();
      ```
    </CodeGroup>
  </Step>

  <Step title="Idempotence">
    Les retries Fluximmo (et les retries internes de votre queue) peuvent livrer **le même webhook 2 fois**. Sans idempotence, vous dupliquez.

    **Patterns** :

    * **UPSERT** par `flxId` côté DB (`INSERT ... ON CONFLICT (flx_id) DO UPDATE`).
    * **Dedup key** : sur chaque advert iteré, hash de `(flxId, branch, alert_id)` dans une table de dédoublonnage avec TTL court (24h).

    ```python theme={null}
    def dedup_and_handle(body):
        for branch_name in ("created", "updated"):
            for entry in body.get("data", {}).get(branch_name, []):
                alert_id = entry.get("alert_id", "")
                for advert in entry.get("adverts", []):
                    key = f"{advert['flxId']}:{branch_name}:{alert_id}"
                    if redis.set(f"dedup:{key}", "1", nx=True, ex=86400):
                        handle_advert(advert, branch_name, alert_id)
                    # sinon : déjà traité, skip silencieux
    ```
  </Step>

  <Step title="DLQ + monitoring">
    **Dead Letter Queue** (DLQ) : configurez SQS / RabbitMQ pour qu'après N échecs (typiquement 5), le message bascule dans une DLQ. C'est votre filet de sécurité — vous pouvez l'inspecter et rejouer manuellement.

    **Métriques à monitorer** :

    * `webhooks_received` (compteur) — débit entrant
    * `webhooks_acked_under_1s` (ratio) — santé de l'endpoint
    * `queue_depth` (gauge) — bouchon en formation ?
    * `worker_lag_seconds` (gauge) — délai bout-en-bout
    * `dlq_count` (gauge) — messages échoués définitivement

    **Alerting** (Datadog, PagerDuty, Grafana) :

    * queue\_depth > seuil pendant > 5 min → scale up workers
    * dlq\_count > 0 → investigation manuelle
    * webhooks\_acked\_under\_1s \< 99 % → investiguer l'endpoint
  </Step>

  <Step title="Sécurité — header x-webhook-key">
    Fluximmo envoie un header `x-webhook-key` avec une valeur secrète que vous avez configurée. **Toujours le valider** avant de pousser dans la queue, sinon n'importe qui peut spammer votre endpoint.

    ```python theme={null}
    if request.headers.get("x-webhook-key") != WEBHOOK_KEY:
        raise HTTPException(401)
    ```

    Voir [Concept — Webhooks](/concepts/webhooks) pour la configuration côté dashboard et les bonnes pratiques de rotation.
  </Step>
</Steps>

## Architecture / flow

```mermaid theme={null}
flowchart LR
  Flux["Fluximmo"] -->|POST + x-webhook-key| EP["Endpoint<br/>webhook"]
  EP -->|valide header| Auth{"Header<br/>OK ?"}
  Auth -- "non" --> R401["401"]
  Auth -- "oui" --> Push["push raw payload"]
  Push --> Q[("Queue<br/>SQS / RabbitMQ /<br/>Redis Streams")]
  EP -->|"ack 200 < 1s"| Flux
  Q --> W1["Worker 1"]
  Q --> W2["Worker 2"]
  Q --> Wn["Worker N"]
  W1 --> Dedup{"Dedup<br/>flxId ?"}
  W2 --> Dedup
  Wn --> Dedup
  Dedup -- "déjà vu" --> Skip["skip"]
  Dedup -- "neuf" --> Biz["UPSERT DB,<br/>fetch externe, ML"]
  Biz --> OK["delete msg"]
  Biz -- "fail × N" --> DLQ[("DLQ")]
  Mon["Monitoring<br/>queue_depth,<br/>worker_lag, dlq"] -.-> Q
  Mon -.-> DLQ
```

## Pièges fréquents

<Warning>
  **Traitement sync dans le handler.** L'erreur n°1. Sur un pic, le handler timeout, Fluximmo retry, votre app sature, effet boule de neige. Toujours découpler via une queue.
</Warning>

<Warning>
  **Pas de queue.** "Je traite directement, j'ai un load balancer, ça suffit." Non. Sur un pic réel, les workers saturent en quelques secondes et vous perdez du signal. La queue absorbe.
</Warning>

<Warning>
  **Pas d'idempotence.** Les retries (Fluximmo + retries de la queue) livrent le même message plusieurs fois. Sans UPSERT ni dedup-key, vous duplicate-write en DB.
</Warning>

<Warning>
  **Ignorer le header `x-webhook-key`.** Endpoint public + pas de validation = n'importe qui peut spammer votre queue. Validation systématique avant tout traitement.
</Warning>

<Warning>
  **Sous-dimensionner workers / queue.** Une queue qui se remplit plus vite qu'elle ne se vide = backlog persistant. Surveillez `queue_depth` et autoscalez les workers.
</Warning>

<Warning>
  **Ne pas monitorer la queue depth.** Sans observabilité sur la profondeur de la queue, un bouchon est invisible jusqu'à ce qu'un client se plaigne. Métriques + alerting dès le jour 1.
</Warning>

<Warning>
  **Renvoyer 5xx sur erreur métier.** Si vous renvoyez 500 quand votre DB échoue, Fluximmo retry. Préférez ack 200 + push en queue + gestion d'erreur côté worker (qui finira en DLQ si vraiment cassé).
</Warning>

## Pour aller plus loin

* [Concept — Webhooks (politique de retry, codes acceptés, header `x-webhook-key`)](/concepts/webhooks)
* [Concept — Match types et cycle d'alerte](/concepts/match-types-cycle-alerte)
* [Playbook — Répliquer une BDD adverts via webhooks](/playbooks/replicate-bdd-adverts)
* [Playbook — Tracker les changements de prix](/playbooks/track-price-changes)

<Card title="Clé test gratuite — 1 semaine" icon="key" href="https://my.fluximmo.io">
  Créez un compte sur **my.fluximmo.io** pour récupérer une clé API test gratuite (1 semaine, accès limité). Aucun paiement requis.
</Card>
