Queues & Async Processing

Decouple webhook ingestion from delivery by enqueueing jobs into a Cloudflare Queue. A consumer Worker picks up batches, forwards to target URLs, and routes failures to a dead letter queue. This quickstart adds async processing to the Webhook Hub.

Prerequisites: Workers, D1 CRUD

How It Works

flowchart LR
    A["Webhook Received"] --> B["Producer: Enqueue Job"]
    B --> C[("Queue")]
    C --> D["Consumer: Deliver Batch"]
    D -->|Success| E["Mark Delivered"]
    D -->|Failure| F{Retries Left?}
    F -->|Yes| C
    F -->|No| G[("Dead Letter Queue")]

The producer is fast - it writes to D1 and enqueues a job, then responds immediately. The consumer handles the slow part (HTTP delivery to external targets) asynchronously.

Create the Queue

npx wrangler queues create webhook-deliveries
npx wrangler queues create webhook-deliveries-dlq

Add the bindings to wrangler.jsonc:

{
  "name": "webhook-hub",
  "main": "src/index.ts",
  "compatibility_date": "2025-01-01",
  "compatibility_flags": ["nodejs_compat"],

  "d1_databases": [
    {
      "binding": "DB",
      "database_name": "webhook-hub-db",
      "database_id": "<your-database-id>"
    }
  ],

  "queues": {
    "producers": [
      {
        "binding": "DELIVERY_QUEUE",
        "queue": "webhook-deliveries"
      }
    ],
    "consumers": [
      {
        "queue": "webhook-deliveries",
        "max_batch_size": 10,
        "max_batch_timeout": 30,
        "max_retries": 3,
        "dead_letter_queue": "webhook-deliveries-dlq",
        "retry_delay": 60
      }
    ]
  }
}

Re-generate types:

npx wrangler types

Now Env includes DELIVERY_QUEUE: Queue.

Producer: Enqueue Delivery Jobs

When a webhook arrives, store it in D1 and enqueue a delivery job. The response returns immediately without waiting for delivery.

import { Hono } from "hono";

const app = new Hono<{ Bindings: Env }>();

interface DeliveryJob {
  webhookId: number;
  targetUrl: string;
  payload: string;
}

app.post("/webhook/:source", async (c) => {
  const source = c.req.param("source");
  const payload = await c.req.text();
  const eventType = c.req.header("X-Event-Type") ?? "unknown";

  // Store in D1
  const result = await c.env.DB.prepare(
    "INSERT INTO webhooks (source, event_type, payload) VALUES (?, ?, ?)"
  )
    .bind(source, eventType, payload)
    .run();

  const webhookId = result.meta.last_row_id;

  // Look up forwarding targets for this source
  const targets = await c.env.DB.prepare(
    "SELECT target_url FROM forwarding_rules WHERE source = ? AND active = 1"
  )
    .bind(source)
    .all<{ target_url: string }>();

  // Enqueue a delivery job for each target
  const jobs: MessageSendRequest<DeliveryJob>[] = targets.results.map(
    (target) => ({
      body: {
        webhookId: webhookId as number,
        targetUrl: target.target_url,
        payload,
      },
    })
  );

  if (jobs.length > 0) {
    await c.env.DELIVERY_QUEUE.sendBatch(jobs);
  }

  return c.json({ id: webhookId, queued: jobs.length }, 201);
});

The sendBatch call accepts up to 100 messages per call. For sources with many targets, batch accordingly.

Consumer: Process Delivery Batches

The consumer is a separate export on the same Worker. It receives batches of messages, attempts delivery, and acknowledges or retries each one individually.

export default {
  fetch: app.fetch,

  async queue(
    batch: MessageBatch<DeliveryJob>,
    env: Env
  ): Promise<void> {
    for (const msg of batch.messages) {
      const { webhookId, targetUrl, payload } = msg.body;

      try {
        const response = await fetch(targetUrl, {
          method: "POST",
          headers: {
            "Content-Type": "application/json",
            "X-Webhook-Id": String(webhookId),
          },
          body: payload,
        });

        if (!response.ok) {
          throw new Error(`Target returned ${response.status}`);
        }

        // Record successful delivery
        await env.DB.prepare(
          "INSERT INTO delivery_log (webhook_id, target_url, status, delivered_at) VALUES (?, ?, 'delivered', datetime('now'))"
        )
          .bind(webhookId, targetUrl)
          .run();

        msg.ack();
      } catch (err) {
        console.error(
          `Delivery failed for webhook ${webhookId} to ${targetUrl}:`,
          err
        );

        // Record failure
        await env.DB.prepare(
          "INSERT INTO delivery_log (webhook_id, target_url, status, error, delivered_at) VALUES (?, ?, 'failed', ?, datetime('now'))"
        )
          .bind(webhookId, targetUrl, String(err))
          .run();

        msg.retry();
      }
    }
  },
};

Key points:

  • msg.ack() removes the message from the queue
  • msg.retry() re-enqueues the message (up to max_retries times)
  • After all retries are exhausted, the message moves to the dead letter queue automatically

Supporting Schema

Add these tables alongside the existing webhooks schema:

CREATE TABLE IF NOT EXISTS forwarding_rules (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  source TEXT NOT NULL,
  target_url TEXT NOT NULL,
  active INTEGER NOT NULL DEFAULT 1
);

CREATE TABLE IF NOT EXISTS delivery_log (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  webhook_id INTEGER NOT NULL,
  target_url TEXT NOT NULL,
  status TEXT NOT NULL,
  error TEXT,
  delivered_at TEXT NOT NULL DEFAULT (datetime('now')),
  FOREIGN KEY (webhook_id) REFERENCES webhooks(id)
);

CREATE INDEX IF NOT EXISTS idx_delivery_log_webhook
  ON delivery_log(webhook_id);

Run the migration:

npx wrangler d1 execute webhook-hub-db --local --file=schema-queues.sql

Queue Configuration Options

OptionDefaultDescription
max_batch_size10Messages per batch (max 100)
max_batch_timeout5Seconds to wait for a full batch
max_retries3Retry attempts before DLQ
dead_letter_queuenoneQueue name for failed messages
max_concurrency1Parallel consumer invocations
retry_delay0Seconds before retry (backoff)

Gotcha: The consumer receives messages in batches, not one at a time. With the default max_batch_size of 10, your queue() handler gets up to 10 messages per invocation. Design your consumer to handle batches - if you throw an unhandled error, the entire batch retries. Use msg.ack() and msg.retry() per-message for fine-grained control.

Dead Letter Queue

Messages that fail all retries land in the DLQ. Set up a separate consumer to inspect them:

// In a separate Worker or the same one with a different consumer binding
export default {
  async queue(
    batch: MessageBatch<DeliveryJob>,
    env: Env
  ): Promise<void> {
    for (const msg of batch.messages) {
      const { webhookId, targetUrl } = msg.body;
      console.error(
        `DLQ: webhook ${webhookId} permanently failed delivery to ${targetUrl}`
      );

      await env.DB.prepare(
        "UPDATE delivery_log SET status = 'dead_letter' WHERE webhook_id = ? AND target_url = ? AND status = 'failed'"
      )
        .bind(webhookId, targetUrl)
        .run();

      msg.ack();
    }
  },
};

Testing Locally

Queues work in local development with wrangler dev. Messages are processed in-memory, so you get the full producer/consumer flow without deploying.

npx wrangler dev

Test the flow:

# Add a forwarding rule
curl -X POST http://localhost:8787/rules \
  -H "Content-Type: application/json" \
  -d '{"source": "github", "target_url": "http://localhost:9999/hook"}'

# Send a webhook (enqueues delivery)
curl -X POST http://localhost:8787/webhook/github \
  -H "X-Event-Type: push" \
  -d '{"ref": "refs/heads/main"}'

Check the delivery log to verify the consumer ran:

curl http://localhost:8787/deliveries?webhook_id=1

What You Built

The Webhook Hub now receives webhooks synchronously (fast) and delivers them asynchronously (reliable). Failed deliveries retry with backoff, and permanently failed ones land in a dead letter queue for inspection.

Next: Tunnels to forward webhooks to local services, Cron Triggers for scheduled health checks, or Workflows for multi-step durable execution when simple queues aren’t enough.