Skip to content

Worker Usage

Learn how to use the type-safe AMQP worker to consume messages.

NestJS Users

For NestJS applications, see the NestJS Worker Usage guide.

Creating a Worker

Create a worker with type-safe message handlers:

typescript
import { TypedAmqpWorker } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
import { contract } from "./contract";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) => {
      console.log("Processing:", payload.orderId);
      // Your business logic here
      return Future.value(Result.Ok(undefined));
    },
    notifyOrder: ({ payload }) => {
      console.log("Notifying:", payload.orderId);
      return Future.value(Result.Ok(undefined));
    },
  },
  urls: ["amqp://localhost"],
}).resultToPromise();

console.log("✅ Worker ready!");

The worker automatically connects and starts consuming messages from all queues.

Message Handlers

Handlers receive validated, fully-typed messages with { payload, headers }:

typescript
import { Future, Result } from "@swan-io/boxed";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) => {
      // Payload is fully typed!
      console.log(payload.orderId); // ✅ string
      console.log(payload.amount); // ✅ number
      console.log(payload.items); // ✅ array

      for (const item of payload.items) {
        console.log(`${item.productId}: ${item.quantity}`);
      }
      return Future.value(Result.Ok(undefined));
    },
  },
  connection,
}).resultToPromise();

Type Safety

The worker enforces:

  • Required handlers - All consumers must have handlers
  • Message validation - Validated before reaching handlers
  • Type inference - Fully typed parameters
typescript
// ❌ TypeScript error: missing handler
const workerResult = await TypedAmqpWorker.create({
  contract,
  handlers: {
    notifyOrder: ({ payload }) => { ... },
    // Missing processOrder handler!
  },
  urls: ['amqp://localhost'],
});

// ✅ All handlers present
const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) => { ... },
    notifyOrder: ({ payload }) => { ... },
  },
  urls: ['amqp://localhost'],
}).resultToPromise();

console.log('✅ All handlers present');

Defining Handlers Externally

For better organization, define handlers separately. The library provides two types of handlers:

Safe handlers return Future<Result<void, HandlerError>> for explicit error handling:

typescript
import { defineHandler, RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
import { contract } from "./contract";

const processOrderHandler = defineHandler(contract, "processOrder", ({ payload }) =>
  Future.fromPromise(saveToDatabase(payload))
    .mapOk(() => undefined)
    .mapError((error) => new RetryableError("Database error", error)),
);

// Non-retryable errors go directly to DLQ
const validateOrderHandler = defineHandler(contract, "validateOrder", ({ payload }) => {
  if (payload.amount <= 0) {
    return Future.value(Result.Error(new NonRetryableError("Invalid order amount")));
  }
  return Future.value(Result.Ok(undefined));
});

Multiple Handlers

typescript
import { defineHandlers, RetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
import { contract } from "./contract";

// Safe handlers (recommended) - for async operations use Future.fromPromise
const handlers = defineHandlers(contract, {
  processOrder: ({ payload }) =>
    Future.fromPromise(processPayment(payload))
      .mapOk(() => undefined)
      .mapError((error) => new RetryableError("Payment failed", error)),
  notifyOrder: ({ payload }) =>
    Future.fromPromise(sendEmail(payload))
      .mapOk(() => undefined)
      .mapError((error) => new RetryableError("Email failed", error)),
});

const worker = await TypedAmqpWorker.create({
  contract,
  handlers,
  urls: ["amqp://localhost"],
});

Benefits

External handler definitions provide several advantages:

  • Better Organization: Separate handler logic from worker setup code
  • Reusability: Share handlers across multiple workers or test them independently
  • Type Safety: Full TypeScript type checking at definition time
  • Testability: Test handlers in isolation before integrating with workers
  • Maintainability: Easier to modify and refactor handler logic
  • Explicit Error Control: Safe handlers force explicit error handling

Example: Organized Handler Module

Create a dedicated module for handlers with explicit error handling:

typescript
// handlers/order-handlers.ts
import { defineHandler, defineHandlers, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
import { orderContract } from "../contract";
import { processPayment } from "../services/payment";
import { sendEmail } from "../services/email";

export const processOrderHandler = defineHandler(orderContract, "processOrder", ({ payload }) =>
  Future.fromPromise(processPayment(payload))
    .mapOk(() => undefined)
    .mapError((error) => new RetryableError("Payment failed", error)),
);

export const notifyOrderHandler = defineHandler(orderContract, "notifyOrder", ({ payload }) =>
  Future.fromPromise(sendEmail(payload))
    .mapOk(() => undefined)
    .mapError((error) => new RetryableError("Email failed", error)),
);

// Export all handlers together
export const orderHandlers = defineHandlers(orderContract, {
  processOrder: processOrderHandler,
  notifyOrder: notifyOrderHandler,
});
typescript
// worker.ts
import { TypedAmqpWorker } from "@amqp-contract/worker";
import { orderContract } from "./contract";
import { orderHandlers } from "./handlers/order-handlers";

const worker = await TypedAmqpWorker.create({
  contract: orderContract,
  handlers: orderHandlers,
  urls: ["amqp://localhost"],
});

Starting Consumers

Automatic Consumption

By default, TypedAmqpWorker.create automatically starts all consumers defined in the contract:

typescript
const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) => { ... },
    notifyOrder: ({ payload }) => { ... },
  },
  connection,
});
// Worker is already consuming messages from all queues
console.log('Worker ready, waiting for messages...');

Manual Consumption

If you need more control, you can create a worker using the TypedAmqpWorker class directly and call consume() for specific consumers:

typescript
import { TypedAmqpWorker } from '@amqp-contract/worker';

const worker = new TypedAmqpWorker(contract, {
  processOrder: ({ payload }) => { ... },
  notifyOrder: ({ payload }) => { ... },
});

await worker.connect(connection);

// Start only the processOrder consumer
await worker.consume('processOrder');

// Start multiple consumers later
await worker.consume('notifyOrder');

Message Acknowledgment

Automatic Acknowledgment

By default, messages are automatically acknowledged after successful processing:

typescript
import { Future, Result } from "@swan-io/boxed";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) => {
      console.log("Processing:", payload.orderId);
      // Message is automatically acked after this handler completes
      return Future.value(Result.Ok(undefined));
    },
  },
  connection,
});

Manual Acknowledgment

For more control over acknowledgment, use the raw message parameter and error types:

typescript
import { defineHandler, RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: defineHandler(contract, "processOrder", ({ payload }, rawMessage) => {
      // Access raw AMQP message properties if needed
      console.log("Delivery tag:", rawMessage.fields.deliveryTag);

      return Future.fromPromise(processOrder(payload))
        .mapOk(() => undefined) // Success - message will be acked
        .mapError((error) => new RetryableError("Processing failed", error)); // Failure - will retry
    }),
  },
  urls: ["amqp://localhost"],
}).resultToPromise();

Acknowledgment behavior:

  • Handler returns Result.Ok(undefined) → Message is acknowledged
  • Handler returns Result.Error(RetryableError) → Message is nacked and retried
  • Handler returns Result.Error(NonRetryableError) → Message is sent to DLQ

Graceful Shutdown

Properly close the worker on shutdown:

typescript
async function shutdown() {
  console.log("Shutting down...");
  await worker.close();
  process.exit(0);
}

process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);

Complete Example

typescript
import { TypedAmqpWorker, defineHandlers, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
import { contract } from "./contract";

async function main() {
  const worker = await TypedAmqpWorker.create({
    contract,
    handlers: defineHandlers(contract, {
      processOrder: ({ payload }) => {
        console.log(`Processing order ${payload.orderId}`);

        return Future.fromPromise(
          Promise.all([saveToDatabase(payload), sendConfirmation(payload.customerId)]),
        )
          .mapOk(() => undefined)
          .mapError((error) => {
            console.error("Processing failed:", error);
            return new RetryableError("Order processing failed", error);
          });
      },

      notifyOrder: ({ payload }) => {
        console.log(`Sending notification for ${payload.orderId}`);
        return Future.fromPromise(sendEmail(payload))
          .mapOk(() => undefined)
          .mapError((error) => new RetryableError("Email failed", error));
      },
    }),
    urls: ["amqp://localhost"],
  }).resultToPromise();

  console.log("✅ Worker ready!");

  // Graceful shutdown
  const shutdown = async () => {
    console.log("Shutting down...");
    await worker.close();
    process.exit(0);
  };

  process.on("SIGTERM", shutdown);
  process.on("SIGINT", shutdown);
}

main().catch(console.error);

Advanced Features

Prefetch Configuration

Control the number of unacknowledged messages a consumer can have at once. This helps manage memory usage and processing rate.

Use the tuple syntax [handler, options] to configure prefetch per-handler:

typescript
import { Future } from "@swan-io/boxed";
import { RetryableError } from "@amqp-contract/worker";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: [
      ({ payload }) =>
        Future.fromPromise(saveToDatabase(payload))
          .mapOk(() => {
            console.log("Order:", payload.orderId);
            return undefined;
          })
          .mapError((error) => new RetryableError("Failed to save order", error)),
      { prefetch: 10 }, // Process up to 10 messages concurrently
    ],
  },
  urls: ["amqp://localhost"],
});

Channel-Wide Prefetch

In AMQP 0.9.1, prefetch is set per-channel. Since all consumers in a worker share the same channel, the worker will use the maximum prefetch value among all consumers.

For example, if you have two consumers with prefetch values of 5 and 10, the effective prefetch for the channel will be 10.

Batch Processing

Process multiple messages at once for better throughput. This is especially useful for bulk database operations or API calls.

typescript
import { Future } from "@swan-io/boxed";
import { RetryableError } from "@amqp-contract/worker";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrders: [
      (messages) => {
        // Handler receives array of messages for batch processing
        console.log(`Processing ${messages.length} orders`);

        // Batch insert to database
        return Future.fromPromise(
          db.orders.insertMany(
            messages.map(({ payload }) => ({
              id: payload.orderId,
              amount: payload.amount,
            })),
          ),
        )
          .mapOk(() => undefined) // All messages are acked together on success
          .mapError((error) => new RetryableError("Batch insert failed", error)); // Or nacked together on error
      },
      {
        batchSize: 5, // Process messages in batches of 5
        batchTimeout: 1000, // Wait max 1 second to fill batch
        prefetch: 10, // Optional: fetch more messages than batch size
      },
    ],
  },
  urls: ["amqp://localhost"],
});

Batch Processing Behavior:

  • Messages are accumulated until batchSize is reached
  • If batchTimeout is reached before batch is full, the partial batch is processed
  • All messages in a batch are acknowledged or rejected together
  • If a consumer does not set prefetch but sets batchSize, that batchSize is used as its effective prefetch contribution
  • The actual channel prefetch is the maximum effective prefetch across all consumers

Type Safety:

TypeScript automatically enforces the correct handler signature based on configuration:

typescript
// Single message handler (no batchSize)
[({ payload }) => { ... }, { prefetch: 10 }]

// Batch handler (with batchSize)
[(messages) => { ... }, { batchSize: 5 }]

Handler Configuration Patterns

Three configuration patterns are supported:

  1. Simple handler - No options
typescript
handlers: {
  processOrder: ({ payload }) => {
    // Single message processing
    return Future.value(Result.Ok(undefined));
  };
}
  1. Handler with prefetch - Control concurrency
typescript
handlers: {
  processOrder: [
    ({ payload }) => {
      // Single message processing with prefetch
      return Future.value(Result.Ok(undefined));
    },
    { prefetch: 10 },
  ];
}
  1. Batch handler - Process multiple messages
typescript
handlers: {
  processOrders: [
    (messages) => {
      // Batch processing - each message has { payload, headers }
      for (const { payload } of messages) {
        console.log(payload.orderId);
      }
      return Future.value(Result.Ok(undefined));
    },
    { batchSize: 5, batchTimeout: 1000 },
  ];
}

Best Practices

  1. Handle Errors - Always wrap business logic in try-catch
  2. Use Prefetch - Limit concurrent messages with prefetch option to control memory usage
  3. Batch for Throughput - Use batch processing for bulk operations (database inserts, API calls)
  4. Graceful Shutdown - Properly close connections to finish processing in-flight messages
  5. Idempotency - Handlers should be safe to retry since messages may be redelivered
  6. Dead Letters - Configure DLQ for failed messages to avoid infinite retry loops

Error Handling and Retry

The worker supports automatic retry with two different strategies, configured at the queue level in the contract:

  1. Quorum-Native Mode - Uses quorum queue's native x-delivery-limit feature for simpler retries
  2. TTL-Backoff Mode - Uses TTL + wait queue pattern for exponential backoff

Retry Strategies

A simpler mode that leverages RabbitMQ quorum queue's native x-delivery-limit feature:

typescript
import { defineQueue, defineExchange, defineContract } from "@amqp-contract/contract";
import { TypedAmqpWorker, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";

// 1. Define queue with deliveryLimit and quorum-native retry
const dlx = defineExchange("orders-dlx", "topic", { durable: true });
const ordersQueue = defineQueue("orders", {
  type: "quorum", // Default queue type
  deliveryLimit: 3, // After 3 delivery attempts, dead-letter
  deadLetter: {
    exchange: dlx,
    routingKey: "orders.failed",
  },
  retry: { mode: "quorum-native" }, // Retry configured at queue level
});

// 2. Worker automatically uses queue's retry configuration
const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) =>
      Future.fromPromise(processPayment(payload))
        .mapOk(() => undefined)
        .mapError((error) => new RetryableError("Payment failed", error)),
  },
  urls: ["amqp://localhost"],
}).resultToPromise();

How Quorum-Native works:

  1. When a handler fails, the message is nacked with requeue=true
  2. RabbitMQ automatically tracks delivery count via x-delivery-count header
  3. When count exceeds deliveryLimit, message is automatically dead-lettered
  4. No wait queues or TTL management needed

Best for:

  • Simpler architecture requirements
  • When immediate retries are acceptable
  • Avoiding head-of-queue blocking issues

Limitation: No exponential backoff — retries are immediate.

TTL-Backoff Mode

This mode provides exponential backoff using RabbitMQ's TTL and Dead Letter Exchange (DLX) pattern. Wait queues and bindings are automatically generated when you use defineContract:

typescript
import { defineQueue, defineExchange, defineContract } from "@amqp-contract/contract";
import { TypedAmqpWorker, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";

// 1. Define queue with TTL-backoff retry - infrastructure auto-generated
const dlx = defineExchange("orders-dlx", "topic", { durable: true });
const ordersQueue = defineQueue("orders", {
  deadLetter: { exchange: dlx },
  retry: {
    mode: "ttl-backoff",
    maxRetries: 3, // Maximum retry attempts (default: 3)
    initialDelayMs: 1000, // Initial delay before first retry (default: 1000ms)
    maxDelayMs: 30000, // Maximum delay between retries (default: 30000ms)
    backoffMultiplier: 2, // Exponential backoff multiplier (default: 2)
    jitter: true, // Add random jitter to prevent thundering herd (default: true)
  },
});

// 2. defineContract automatically creates wait queue and bindings
const contract = defineContract({
  exchanges: { dlx },
  queues: { orders: ordersQueue }, // Wait queue auto-added as "ordersWait"
  // Bindings for retry routing are auto-added
});

// 3. Worker automatically uses queue's retry configuration
const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) =>
      Future.fromPromise(processPayment(payload))
        .mapOk(() => undefined)
        .mapError((error) => new RetryableError("Payment failed", error)),
  },
  urls: ["amqp://localhost"],
}).resultToPromise();

How TTL-Backoff works:

  1. Message is acknowledged - The worker acks the original message
  2. Published to wait queue - Message is republished to a wait queue with a TTL
  3. Wait in queue - Message sits in the wait queue for the calculated delay
  4. Dead-lettered back - After TTL expires, message is automatically routed back to the main queue
  5. Retry processing - Worker processes the message again
  6. Repeat or DLQ - Process repeats until success or max retries reached, then sent to Dead Letter Queue (DLQ)

Best for: When you need configurable delays between retries to give downstream services time to recover.

Limitation: Potential head-of-queue blocking when messages have mixed TTLs.

Accessing Queue Properties

When TTL-backoff retry is configured, defineQueue returns a wrapper object containing the infrastructure. Use extractQueue() to access the underlying queue definition:

typescript
import { extractQueue } from "@amqp-contract/contract";

const ordersQueue = defineQueue("orders", {
  deadLetter: { exchange: dlx },
  retry: { mode: "ttl-backoff", maxRetries: 3 },
});

// Access queue name
const queueName = extractQueue(ordersQueue).name; // "orders"

Comparing Retry Modes

FeatureTTL-BackoffQuorum-Native
Retry delaysConfigurable exponential backoffImmediate
ArchitectureWait queues + DLX (auto-created)Native RabbitMQ
Head-of-queue blockingPossible with mixed TTLsNone
Delivery trackingCustom x-retry-count headerNative x-delivery-count
Queue typeQuorum (default)Quorum only
Configuration locationQueue definitionQueue definition

Exponential Backoff

With TTL-backoff mode, retry delays increase exponentially to give downstream services time to recover:

typescript
// With default settings (initialDelayMs: 1000, backoffMultiplier: 2):
// Attempt 1: 1000ms delay
// Attempt 2: 2000ms delay
// Attempt 3: 4000ms delay
// After 3 attempts: Message sent to DLQ

With jitter enabled (default), a random factor (50-100% of calculated delay) is added to prevent all retried messages from hitting the system simultaneously.

Queue Configuration for Retry

For retry to work, your queues must be configured with a Dead Letter Exchange (DLX):

typescript
import {
  defineQueue,
  defineExchange,
  defineContract,
  defineQueueBinding,
} from "@amqp-contract/contract";

// Define the Dead Letter Exchange
const dlxExchange = defineExchange("orders-dlx", "topic", { durable: true });

// Define the Dead Letter Queue
const dlq = defineQueue("orders-dlq");

// Define your main queue with deadLetter and retry configuration
const ordersQueue = defineQueue("orders", {
  deadLetter: {
    exchange: dlxExchange,
    routingKey: "orders.failed",
  },
  retry: { mode: "quorum-native" }, // Or ttl-backoff
});

// Compose the contract
const contract = defineContract({
  exchanges: {
    main: mainExchange,
    dlx: dlxExchange,
  },
  queues: {
    orders: ordersQueue,
    ordersDlq: dlq,
  },
  bindings: {
    // ... main queue bindings
    dlqBinding: defineQueueBinding(dlq, dlxExchange, {
      routingKey: "orders.failed",
    }),
  },
  // ... rest of contract
});

Queue DLX Required

If a queue doesn't have deadLetter configured, retry will not work. Always configure DLX on your queues for proper retry functionality.

Retry Error Classes

The library provides two error classes for explicit error signaling:

RetryableError

Use RetryableError for transient failures that may succeed on retry:

typescript
import { RetryableError, defineHandler } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: [
      defineHandler(contract, "processOrder", ({ payload }) =>
        Future.fromPromise(externalApiCall(payload))
          .mapOk(() => undefined)
          .mapError(
            (error) =>
              // Explicitly signal this should be retried
              new RetryableError("External API temporarily unavailable", error),
          ),
      ),
      {
        retry: {
          maxRetries: 5,
          initialDelayMs: 2000,
        },
      },
    ],
  },
  urls: ["amqp://localhost"],
}).resultToPromise();

NonRetryableError

Use NonRetryableError for permanent failures that should NOT be retried:

typescript
import { NonRetryableError, RetryableError, defineHandler } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: defineHandler(contract, "processOrder", ({ payload }) => {
      // Validation errors should not be retried
      if (payload.amount <= 0) {
        return Future.value(Result.Error(new NonRetryableError("Invalid order amount")));
      }
      return Future.fromPromise(processPayment(payload))
        .mapOk(() => undefined)
        .mapError((error) => new RetryableError("Payment failed", error));
    }),
  },
  urls: ["amqp://localhost"],
}).resultToPromise();

NonRetryableError behavior:

  • Message is immediately sent to DLQ (if configured)
  • No retry attempts are made
  • Use for validation errors, business rule violations, or permanent failures

Using Safe Handlers for Better Error Control

For the most explicit error handling, use safe handlers that return Future<Result>:

typescript
import { defineHandler, RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
import { match } from "ts-pattern";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: defineHandler(contract, "processOrder", ({ payload }) => {
      // Validation - non-retryable
      if (payload.amount <= 0) {
        return Future.value(Result.Error(new NonRetryableError("Invalid amount")));
      }

      return Future.fromPromise(processPayment(payload))
        .mapOk(() => undefined)
        .mapError((error) =>
          match(error)
            .when(
              (e) => e instanceof PaymentDeclinedError,
              () => new NonRetryableError("Payment declined", error),
            )
            .otherwise(() => new RetryableError("Payment failed", error)),
        );
    }),
  },
  urls: ["amqp://localhost"],
}).resultToPromise();

When to use which error type:

Error TypeUse CaseBehavior
RetryableErrorTransient failures (network, rate limits)Retry with backoff
NonRetryableErrorPermanent failures (validation, business rules)Immediate DLQ
Any other error (unsafe handlers)Unexpected failuresRetry with backoff

Note: Retry is configured at the queue level. All errors except NonRetryableError are retried according to the queue's retry configuration.

Retry with Batch Processing

Retry works with batch processing. If a batch handler throws an error, all messages in the batch are retried:

typescript
import { Future } from "@swan-io/boxed";
import { RetryableError } from "@amqp-contract/worker";

const worker = await TypedAmqpWorker.create({
  contract, // Queue's retry configuration is used automatically
  handlers: {
    processOrders: [
      (messages) =>
        Future.fromPromise(db.orders.insertMany(messages))
          .mapOk(() => undefined)
          .mapError((error) => new RetryableError("Batch insert failed", error)), // All messages in batch will be retried
      {
        batchSize: 10,
        batchTimeout: 1000,
      },
    ],
  },
  urls: ["amqp://localhost"],
}).resultToPromise();

Batch Retry Behavior

All messages in a failed batch are treated the same way - they all get the same retry count and delay. For partial batch success handling, consider processing messages individually instead.

Monitoring Retry Headers

The worker adds headers to track retry information:

  • x-retry-count - Number of times this message has been retried
  • x-last-error - Error message from the last failed attempt
  • x-first-failure-timestamp - Timestamp of the first failure

These headers can be useful for monitoring and debugging:

typescript
// Example: Log retry information (requires custom message access)
// Note: Standard handlers don't expose raw message properties
// This is for illustration of what the worker tracks internally

Best Practices for Retry

  1. Configure appropriate delays - Start with 1-2 seconds, max out at 30-60 seconds
  2. Use jitter - Keep jitter enabled (default) to prevent thundering herd
  3. Set reasonable max retries - 3-5 retries is usually sufficient
  4. Configure DLX on all queues - Ensures proper retry behavior and DLQ routing
  5. Make handlers idempotent - Messages may be processed multiple times
  6. Monitor DLQ - Set up alerts for messages reaching the DLQ
  7. Handle transient vs permanent failures - Use retry for transient failures (network issues, rate limits), handle permanent failures (validation errors) before throwing

Example: Complete Retry Setup

typescript
import { TypedAmqpWorker, RetryableError, NonRetryableError } from "@amqp-contract/worker";
import {
  defineContract,
  defineQueue,
  defineExchange,
  defineQueueBinding,
  defineConsumer,
  defineMessage,
} from "@amqp-contract/contract";
import { Future, Result } from "@swan-io/boxed";
import { z } from "zod";

// Define exchanges
const mainExchange = defineExchange("orders", "topic", { durable: true });
const dlxExchange = defineExchange("orders-dlx", "topic", { durable: true });

// Define message schema
const orderMessage = defineMessage(
  z.object({
    orderId: z.string(),
    amount: z.number(),
  }),
);

// Define queue with retry configuration at the queue level
const ordersQueue = defineQueue("orders", {
  deadLetter: {
    exchange: dlxExchange,
    routingKey: "orders.failed",
  },
  retry: {
    mode: "ttl-backoff",
    maxRetries: 3,
    initialDelayMs: 1000,
    maxDelayMs: 30000,
    backoffMultiplier: 2,
    jitter: true,
  },
});
const dlq = defineQueue("orders-dlq");

// defineContract automatically creates wait queue and retry bindings for TTL-backoff
const contract = defineContract({
  exchanges: {
    main: mainExchange,
    dlx: dlxExchange,
  },
  queues: {
    orders: ordersQueue, // Wait queue auto-added as "ordersWait"
    ordersDlq: dlq,
  },
  bindings: {
    mainBinding: defineQueueBinding(ordersQueue, mainExchange, {
      routingKey: "order.#",
    }),
    dlqBinding: defineQueueBinding(dlq, dlxExchange, {
      routingKey: "orders.failed",
    }),
    // Retry bindings are auto-added
  },
  consumers: {
    processOrder: defineConsumer(ordersQueue, orderMessage),
  },
  publishers: {
    orderCreated: definePublisher(mainExchange, orderMessage, {
      routingKey: "order.created",
    }),
  },
});

// Worker automatically uses queue's retry configuration
const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) => {
      // Validate before processing (don't retry validation errors)
      if (!payload.amount || payload.amount <= 0) {
        return Future.value(Result.Error(new NonRetryableError("Invalid order amount")));
      }

      // Process with external service (retry on failure based on queue config)
      return Future.fromPromise(
        Promise.all([
          paymentService.charge(payload),
          inventoryService.reserve(payload),
          notificationService.send(payload),
        ]),
      )
        .mapOk(() => undefined)
        .mapError((error) => new RetryableError("Order processing failed", error));
    },
  },
  urls: ["amqp://localhost"],
}).resultToPromise();

console.log("✅ Worker ready with retry enabled!");

Best Practices

  1. Handle Errors - Always wrap business logic in try-catch
  2. Use Prefetch - Limit concurrent messages with prefetch option to control memory usage
  3. Batch for Throughput - Use batch processing for bulk operations (database inserts, API calls)
  4. Graceful Shutdown - Properly close connections to finish processing in-flight messages
  5. Idempotency - Handlers should be safe to retry since messages may be redelivered
  6. Dead Letters - Configure DLQ for failed messages to avoid infinite retry loops

Next Steps

Released under the MIT License.