Worker Usage
Learn how to use the type-safe AMQP worker to consume messages.
NestJS Users
For NestJS applications, see the NestJS Worker Usage guide.
Creating a Worker
Create a worker with type-safe message handlers:
import { TypedAmqpWorker } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
import { contract } from "./contract";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: ({ payload }) => {
console.log("Processing:", payload.orderId);
// Your business logic here
return Future.value(Result.Ok(undefined));
},
notifyOrder: ({ payload }) => {
console.log("Notifying:", payload.orderId);
return Future.value(Result.Ok(undefined));
},
},
urls: ["amqp://localhost"],
}).resultToPromise();
console.log("✅ Worker ready!");The worker automatically connects and starts consuming messages from all queues.
Message Handlers
Handlers receive validated, fully-typed messages with { payload, headers }:
import { Future, Result } from "@swan-io/boxed";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: ({ payload }) => {
// Payload is fully typed!
console.log(payload.orderId); // ✅ string
console.log(payload.amount); // ✅ number
console.log(payload.items); // ✅ array
for (const item of payload.items) {
console.log(`${item.productId}: ${item.quantity}`);
}
return Future.value(Result.Ok(undefined));
},
},
connection,
}).resultToPromise();Type Safety
The worker enforces:
- ✅ Required handlers - All consumers must have handlers
- ✅ Message validation - Validated before reaching handlers
- ✅ Type inference - Fully typed parameters
// ❌ TypeScript error: missing handler
const workerResult = await TypedAmqpWorker.create({
contract,
handlers: {
notifyOrder: ({ payload }) => { ... },
// Missing processOrder handler!
},
urls: ['amqp://localhost'],
});
// ✅ All handlers present
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: ({ payload }) => { ... },
notifyOrder: ({ payload }) => { ... },
},
urls: ['amqp://localhost'],
}).resultToPromise();
console.log('✅ All handlers present');Defining Handlers Externally
For better organization, define handlers separately. The library provides two types of handlers:
Safe Handlers (Recommended)
Safe handlers return Future<Result<void, HandlerError>> for explicit error handling:
import { defineHandler, RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
import { contract } from "./contract";
const processOrderHandler = defineHandler(contract, "processOrder", ({ payload }) =>
Future.fromPromise(saveToDatabase(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Database error", error)),
);
// Non-retryable errors go directly to DLQ
const validateOrderHandler = defineHandler(contract, "validateOrder", ({ payload }) => {
if (payload.amount <= 0) {
return Future.value(Result.Error(new NonRetryableError("Invalid order amount")));
}
return Future.value(Result.Ok(undefined));
});Unsafe Handlers (Legacy/Deprecated)
For simpler use cases or migration from existing code, use unsafe handlers that return Promise<void>:
import { defineUnsafeHandler } from "@amqp-contract/worker";
import { contract } from "./contract";
const processOrderHandler = defineUnsafeHandler(contract, "processOrder", async ({ payload }) => {
console.log("Processing:", payload.orderId);
await saveToDatabase(payload);
// Throws on error - will be retried (when retry is configured)
});
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: processOrderHandler,
},
urls: ["amqp://localhost"],
});Multiple Handlers
import { defineHandlers, RetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
import { contract } from "./contract";
// Safe handlers (recommended) - for async operations use Future.fromPromise
const handlers = defineHandlers(contract, {
processOrder: ({ payload }) =>
Future.fromPromise(processPayment(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Payment failed", error)),
notifyOrder: ({ payload }) =>
Future.fromPromise(sendEmail(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Email failed", error)),
});
// Or use unsafe handlers for simpler code (deprecated)
import { defineUnsafeHandlers } from "@amqp-contract/worker";
const unsafeHandlers = defineUnsafeHandlers(contract, {
processOrder: async ({ payload }) => {
await processPayment(payload);
},
notifyOrder: async ({ payload }) => {
await sendEmail(payload);
},
});
const worker = await TypedAmqpWorker.create({
contract,
handlers, // or unsafeHandlers
urls: ["amqp://localhost"],
});Benefits
External handler definitions provide several advantages:
- Better Organization: Separate handler logic from worker setup code
- Reusability: Share handlers across multiple workers or test them independently
- Type Safety: Full TypeScript type checking at definition time
- Testability: Test handlers in isolation before integrating with workers
- Maintainability: Easier to modify and refactor handler logic
- Explicit Error Control: Safe handlers force explicit error handling
Example: Organized Handler Module (Safe Handlers)
Create a dedicated module for handlers with explicit error handling:
// handlers/order-handlers.ts
import { defineHandler, defineHandlers, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
import { orderContract } from "../contract";
import { processPayment } from "../services/payment";
import { sendEmail } from "../services/email";
export const processOrderHandler = defineHandler(orderContract, "processOrder", ({ payload }) =>
Future.fromPromise(processPayment(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Payment failed", error)),
);
export const notifyOrderHandler = defineHandler(orderContract, "notifyOrder", ({ payload }) =>
Future.fromPromise(sendEmail(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Email failed", error)),
);
// Export all handlers together
export const orderHandlers = defineHandlers(orderContract, {
processOrder: processOrderHandler,
notifyOrder: notifyOrderHandler,
});Example: Organized Handler Module (Unsafe Handlers)
For simpler use cases, use unsafe handlers:
// handlers/order-handlers.ts
import { defineUnsafeHandler, defineUnsafeHandlers } from "@amqp-contract/worker";
import { orderContract } from "../contract";
import { processPayment } from "../services/payment";
import { sendEmail } from "../services/email";
export const processOrderHandler = defineUnsafeHandler(
orderContract,
"processOrder",
async ({ payload }) => {
await processPayment(payload);
},
);
export const notifyOrderHandler = defineUnsafeHandler(
orderContract,
"notifyOrder",
async ({ payload }) => {
await sendEmail(payload);
},
);
// Export all handlers together
export const orderHandlers = defineUnsafeHandlers(orderContract, {
processOrder: processOrderHandler,
notifyOrder: notifyOrderHandler,
});// worker.ts
import { TypedAmqpWorker } from "@amqp-contract/worker";
import { orderContract } from "./contract";
import { orderHandlers } from "./handlers/order-handlers";
const worker = await TypedAmqpWorker.create({
contract: orderContract,
handlers: orderHandlers,
urls: ["amqp://localhost"],
});Starting Consumers
Automatic Consumption
By default, TypedAmqpWorker.create automatically starts all consumers defined in the contract:
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: ({ payload }) => { ... },
notifyOrder: ({ payload }) => { ... },
},
connection,
});
// Worker is already consuming messages from all queues
console.log('Worker ready, waiting for messages...');Manual Consumption
If you need more control, you can create a worker using the TypedAmqpWorker class directly and call consume() for specific consumers:
import { TypedAmqpWorker } from '@amqp-contract/worker';
const worker = new TypedAmqpWorker(contract, {
processOrder: ({ payload }) => { ... },
notifyOrder: ({ payload }) => { ... },
});
await worker.connect(connection);
// Start only the processOrder consumer
await worker.consume('processOrder');
// Start multiple consumers later
await worker.consume('notifyOrder');Message Acknowledgment
Automatic Acknowledgment
By default, messages are automatically acknowledged after successful processing:
import { Future, Result } from "@swan-io/boxed";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: ({ payload }) => {
console.log("Processing:", payload.orderId);
// Message is automatically acked after this handler completes
return Future.value(Result.Ok(undefined));
},
},
connection,
});Manual Acknowledgment
For more control over acknowledgment, use the raw message parameter and error types:
import { defineHandler, RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: defineHandler(contract, "processOrder", ({ payload }, rawMessage) => {
// Access raw AMQP message properties if needed
console.log("Delivery tag:", rawMessage.fields.deliveryTag);
return Future.fromPromise(processOrder(payload))
.mapOk(() => undefined) // Success - message will be acked
.mapError((error) => new RetryableError("Processing failed", error)); // Failure - will retry
}),
},
urls: ["amqp://localhost"],
}).resultToPromise();Acknowledgment behavior:
- Handler returns
Result.Ok(undefined)→ Message is acknowledged - Handler returns
Result.Error(RetryableError)→ Message is nacked and retried - Handler returns
Result.Error(NonRetryableError)→ Message is sent to DLQ
Graceful Shutdown
Properly close the worker on shutdown:
async function shutdown() {
console.log("Shutting down...");
await worker.close();
process.exit(0);
}
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);Complete Example
import { TypedAmqpWorker, defineHandlers, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
import { contract } from "./contract";
async function main() {
const worker = await TypedAmqpWorker.create({
contract,
handlers: defineHandlers(contract, {
processOrder: ({ payload }) => {
console.log(`Processing order ${payload.orderId}`);
return Future.fromPromise(
Promise.all([saveToDatabase(payload), sendConfirmation(payload.customerId)]),
)
.mapOk(() => undefined)
.mapError((error) => {
console.error("Processing failed:", error);
return new RetryableError("Order processing failed", error);
});
},
notifyOrder: ({ payload }) => {
console.log(`Sending notification for ${payload.orderId}`);
return Future.fromPromise(sendEmail(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Email failed", error));
},
}),
urls: ["amqp://localhost"],
}).resultToPromise();
console.log("✅ Worker ready!");
// Graceful shutdown
const shutdown = async () => {
console.log("Shutting down...");
await worker.close();
process.exit(0);
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
}
main().catch(console.error);Advanced Features
Prefetch Configuration
Control the number of unacknowledged messages a consumer can have at once. This helps manage memory usage and processing rate.
Use the tuple syntax [handler, options] to configure prefetch per-handler:
import { Future } from "@swan-io/boxed";
import { RetryableError } from "@amqp-contract/worker";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: [
({ payload }) =>
Future.fromPromise(saveToDatabase(payload))
.mapOk(() => {
console.log("Order:", payload.orderId);
return undefined;
})
.mapError((error) => new RetryableError("Failed to save order", error)),
{ prefetch: 10 }, // Process up to 10 messages concurrently
],
},
urls: ["amqp://localhost"],
});Channel-Wide Prefetch
In AMQP 0.9.1, prefetch is set per-channel. Since all consumers in a worker share the same channel, the worker will use the maximum prefetch value among all consumers.
For example, if you have two consumers with prefetch values of 5 and 10, the effective prefetch for the channel will be 10.
Batch Processing
Process multiple messages at once for better throughput. This is especially useful for bulk database operations or API calls.
import { Future } from "@swan-io/boxed";
import { RetryableError } from "@amqp-contract/worker";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrders: [
(messages) => {
// Handler receives array of messages for batch processing
console.log(`Processing ${messages.length} orders`);
// Batch insert to database
return Future.fromPromise(
db.orders.insertMany(
messages.map(({ payload }) => ({
id: payload.orderId,
amount: payload.amount,
})),
),
)
.mapOk(() => undefined) // All messages are acked together on success
.mapError((error) => new RetryableError("Batch insert failed", error)); // Or nacked together on error
},
{
batchSize: 5, // Process messages in batches of 5
batchTimeout: 1000, // Wait max 1 second to fill batch
prefetch: 10, // Optional: fetch more messages than batch size
},
],
},
urls: ["amqp://localhost"],
});Batch Processing Behavior:
- Messages are accumulated until
batchSizeis reached - If
batchTimeoutis reached before batch is full, the partial batch is processed - All messages in a batch are acknowledged or rejected together
- If a consumer does not set
prefetchbut setsbatchSize, thatbatchSizeis used as its effective prefetch contribution - The actual channel prefetch is the maximum effective prefetch across all consumers
Type Safety:
TypeScript automatically enforces the correct handler signature based on configuration:
// Single message handler (no batchSize)
[({ payload }) => { ... }, { prefetch: 10 }]
// Batch handler (with batchSize)
[(messages) => { ... }, { batchSize: 5 }]Handler Configuration Patterns
Three configuration patterns are supported:
- Simple handler - No options
handlers: {
processOrder: ({ payload }) => {
// Single message processing
return Future.value(Result.Ok(undefined));
};
}- Handler with prefetch - Control concurrency
handlers: {
processOrder: [
({ payload }) => {
// Single message processing with prefetch
return Future.value(Result.Ok(undefined));
},
{ prefetch: 10 },
];
}- Batch handler - Process multiple messages
handlers: {
processOrders: [
(messages) => {
// Batch processing - each message has { payload, headers }
for (const { payload } of messages) {
console.log(payload.orderId);
}
return Future.value(Result.Ok(undefined));
},
{ batchSize: 5, batchTimeout: 1000 },
];
}Best Practices
- Handle Errors - Always wrap business logic in try-catch
- Use Prefetch - Limit concurrent messages with
prefetchoption to control memory usage - Batch for Throughput - Use batch processing for bulk operations (database inserts, API calls)
- Graceful Shutdown - Properly close connections to finish processing in-flight messages
- Idempotency - Handlers should be safe to retry since messages may be redelivered
- Dead Letters - Configure DLQ for failed messages to avoid infinite retry loops
Error Handling and Retry
The worker supports automatic retry with two different strategies, configured per-consumer via handler options:
- TTL-Backoff Mode (default) - Uses TTL + wait queue pattern for exponential backoff
- Quorum-Native Mode - Uses quorum queue's native
x-delivery-limitfeature for simpler retries
Retry is enabled by default for all consumers with sensible defaults. You can customize the retry behavior per-consumer using the handler tuple syntax.
Retry Strategies
TTL-Backoff Mode (Default)
This mode provides exponential backoff using RabbitMQ's TTL and Dead Letter Exchange (DLX) pattern:
import { TypedAmqpWorker, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
// Handler with custom retry configuration
processOrder: [
({ payload }) =>
Future.fromPromise(processPayment(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Payment failed", error)),
{
retry: {
mode: "ttl-backoff", // This is the default
maxRetries: 3, // Maximum retry attempts (default: 3)
initialDelayMs: 1000, // Initial delay before first retry (default: 1000ms)
maxDelayMs: 30000, // Maximum delay between retries (default: 30000ms)
backoffMultiplier: 2, // Exponential backoff multiplier (default: 2)
jitter: true, // Add random jitter to prevent thundering herd (default: true)
},
},
],
// Simple handler uses default retry configuration
notifyOrder: ({ payload }) =>
Future.fromPromise(sendNotification(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Notification failed", error)),
},
urls: ["amqp://localhost"],
}).resultToPromise();How TTL-Backoff works:
- Message is acknowledged - The worker acks the original message
- Published to wait queue - Message is republished to a wait queue with a TTL
- Wait in queue - Message sits in the wait queue for the calculated delay
- Dead-lettered back - After TTL expires, message is automatically routed back to the main queue
- Retry processing - Worker processes the message again
- Repeat or DLQ - Process repeats until success or max retries reached, then sent to Dead Letter Queue (DLQ)
Best for: When you need configurable delays between retries to give downstream services time to recover.
Limitation: Potential head-of-queue blocking when messages have mixed TTLs (messages with shorter TTLs behind messages with longer TTLs won't expire until the longer ones do).
Quorum-Native Mode
A simpler mode that leverages RabbitMQ quorum queue's native x-delivery-limit feature:
import { defineQueue, defineExchange } from "@amqp-contract/contract";
import { TypedAmqpWorker, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
// 1. Define queue with deliveryLimit
const dlx = defineExchange("orders-dlx", "topic", { durable: true });
const ordersQueue = defineQueue("orders", {
type: "quorum", // Default queue type
deliveryLimit: 3, // After 3 delivery attempts, dead-letter
deadLetter: {
exchange: dlx,
routingKey: "orders.failed",
},
});
// 2. Configure worker with quorum-native mode per consumer
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: [
({ payload }) =>
Future.fromPromise(processPayment(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Payment failed", error)),
{
retry: {
mode: "quorum-native", // Use quorum queue's native delivery limit
},
},
],
},
urls: ["amqp://localhost"],
}).resultToPromise();How Quorum-Native works:
- When a handler fails, the message is nacked with
requeue=true - RabbitMQ automatically tracks delivery count via
x-delivery-countheader - When count exceeds
deliveryLimit, message is automatically dead-lettered - No wait queues or TTL management needed
Best for:
- Simpler architecture requirements
- When immediate retries are acceptable
- Avoiding head-of-queue blocking issues
Limitation: No exponential backoff — retries are immediate.
Comparing Retry Modes
| Feature | TTL-Backoff | Quorum-Native |
|---|---|---|
| Retry delays | Configurable exponential backoff | Immediate |
| Architecture | Wait queues + DLX | Native RabbitMQ |
| Head-of-queue blocking | Possible with mixed TTLs | None |
| Delivery tracking | Custom x-retry-count header | Native x-delivery-count |
| Queue type | Any | Quorum only |
| Max retries configured in | Handler options | Queue's deliveryLimit |
Customizing Retry Per Consumer
Retry is enabled by default with sensible defaults. To customize the retry configuration for a specific consumer, use the handler tuple syntax:
import { TypedAmqpWorker, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
// Custom retry configuration for this consumer
processOrder: [
({ payload }) =>
Future.fromPromise(processPayment(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Payment processing failed", error)),
{
retry: {
maxRetries: 5, // Custom: 5 retries instead of default 3
initialDelayMs: 2000, // Custom: 2s initial delay
maxDelayMs: 60000, // Custom: 60s max delay
backoffMultiplier: 2,
jitter: true,
},
},
],
// Uses default retry configuration
notifyOrder: ({ payload }) =>
Future.fromPromise(sendNotification(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Notification failed", error)),
},
urls: ["amqp://localhost"],
}).resultToPromise();How TTL-Backoff Retry Works
- Message is acknowledged - The worker acks the original message
- Published to wait queue - Message is republished to a wait queue with a TTL
- Wait in queue - Message sits in the wait queue for the calculated delay
- Dead-lettered back - After TTL expires, message is automatically routed back to the main queue
- Retry processing - Worker processes the message again
- Repeat or DLQ - Process repeats until success or max retries reached, then sent to Dead Letter Queue (DLQ)
This approach uses native RabbitMQ features and doesn't block the consumer during retry delays.
Exponential Backoff
Retry delays increase exponentially to give downstream services time to recover:
// With default settings (initialDelayMs: 1000, backoffMultiplier: 2):
// Attempt 1: 1000ms delay
// Attempt 2: 2000ms delay
// Attempt 3: 4000ms delay
// After 3 attempts: Message sent to DLQWith jitter enabled (default), a random factor (50-100% of calculated delay) is added to prevent all retried messages from hitting the system simultaneously.
Queue Configuration for Retry
For retry to work, your queues must be configured with a Dead Letter Exchange (DLX):
import { defineQueue, defineExchange } from "@amqp-contract/contract";
// Define the Dead Letter Exchange
const dlxExchange = defineExchange("orders-dlx", "topic", { durable: true });
// Define the Dead Letter Queue
const dlq = defineQueue("orders-dlq", { durable: true });
// Define your main queue with deadLetter configuration
const ordersQueue = defineQueue("orders", {
durable: true,
deadLetter: {
exchange: dlxExchange,
routingKey: "orders.failed",
},
});
// Bind the DLQ to the DLX
const contract = defineContract({
exchanges: {
main: mainExchange,
dlx: dlxExchange,
},
queues: {
orders: ordersQueue,
ordersDlq: dlq,
},
bindings: {
// ... main queue bindings
dlqBinding: defineQueueBinding(dlq, dlxExchange, {
routingKey: "orders.failed",
}),
},
// ... rest of contract
});Queue DLX Required
If retry is enabled but a queue doesn't have deadLetter configured, the worker will log a warning and fall back to immediate requeue (legacy behavior). For proper retry functionality, always configure DLX on your queues.
Retry Error Classes
The library provides two error classes for explicit error signaling:
RetryableError
Use RetryableError for transient failures that may succeed on retry:
import { RetryableError, defineHandler } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: [
defineHandler(contract, "processOrder", ({ payload }) =>
Future.fromPromise(externalApiCall(payload))
.mapOk(() => undefined)
.mapError(
(error) =>
// Explicitly signal this should be retried
new RetryableError("External API temporarily unavailable", error),
),
),
{
retry: {
maxRetries: 5,
initialDelayMs: 2000,
},
},
],
},
urls: ["amqp://localhost"],
}).resultToPromise();NonRetryableError
Use NonRetryableError for permanent failures that should NOT be retried:
import { NonRetryableError, RetryableError, defineHandler } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: [
defineHandler(contract, "processOrder", ({ payload }) => {
// Validation errors should not be retried
if (payload.amount <= 0) {
return Future.value(Result.Error(new NonRetryableError("Invalid order amount")));
}
return Future.fromPromise(processPayment(payload))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Payment failed", error));
}),
{
retry: {
maxRetries: 5,
initialDelayMs: 2000,
},
},
],
},
urls: ["amqp://localhost"],
}).resultToPromise();NonRetryableError behavior:
- Message is immediately sent to DLQ (if configured)
- No retry attempts are made
- Use for validation errors, business rule violations, or permanent failures
Using Safe Handlers for Better Error Control
For the most explicit error handling, use safe handlers that return Future<Result>:
import { defineHandler, RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
import { match } from "ts-pattern";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: defineHandler(
contract,
"processOrder",
({ payload }) => {
// Validation - non-retryable
if (payload.amount <= 0) {
return Future.value(Result.Error(new NonRetryableError("Invalid amount")));
}
return Future.fromPromise(processPayment(payload))
.mapOk(() => undefined)
.mapError((error) =>
match(error)
.when(
(e) => e instanceof PaymentDeclinedError,
() => new NonRetryableError("Payment declined", error),
)
.otherwise(() => new RetryableError("Payment failed", error)),
);
},
{
retry: {
maxRetries: 5,
initialDelayMs: 2000,
},
},
),
},
urls: ["amqp://localhost"],
}).resultToPromise();When to use which error type:
| Error Type | Use Case | Behavior |
|---|---|---|
RetryableError | Transient failures (network, rate limits) | Retry with backoff |
NonRetryableError | Permanent failures (validation, business rules) | Immediate DLQ |
| Any other error (unsafe handlers) | Unexpected failures | Retry with backoff |
Note: Retry is enabled by default for all consumers. All errors except NonRetryableError are retried.
Retry with Batch Processing
Retry works with batch processing. If a batch handler throws an error, all messages in the batch are retried:
import { Future } from "@swan-io/boxed";
import { RetryableError } from "@amqp-contract/worker";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrders: [
(messages) =>
Future.fromPromise(db.orders.insertMany(messages))
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Batch insert failed", error)), // All messages in batch will be retried
{
batchSize: 10,
batchTimeout: 1000,
retry: {
maxRetries: 3,
initialDelayMs: 1000,
},
},
],
},
urls: ["amqp://localhost"],
}).resultToPromise();Batch Retry Behavior
All messages in a failed batch are treated the same way - they all get the same retry count and delay. For partial batch success handling, consider processing messages individually instead.
Monitoring Retry Headers
The worker adds headers to track retry information:
x-retry-count- Number of times this message has been retriedx-last-error- Error message from the last failed attemptx-first-failure-timestamp- Timestamp of the first failure
These headers can be useful for monitoring and debugging:
// Example: Log retry information (requires custom message access)
// Note: Standard handlers don't expose raw message properties
// This is for illustration of what the worker tracks internallyBest Practices for Retry
- Configure appropriate delays - Start with 1-2 seconds, max out at 30-60 seconds
- Use jitter - Keep jitter enabled (default) to prevent thundering herd
- Set reasonable max retries - 3-5 retries is usually sufficient
- Configure DLX on all queues - Ensures proper retry behavior and DLQ routing
- Make handlers idempotent - Messages may be processed multiple times
- Monitor DLQ - Set up alerts for messages reaching the DLQ
- Handle transient vs permanent failures - Use retry for transient failures (network issues, rate limits), handle permanent failures (validation errors) before throwing
Example: Complete Retry Setup
import { TypedAmqpWorker, RetryableError, NonRetryableError } from "@amqp-contract/worker";
import {
defineContract,
defineQueue,
defineExchange,
defineQueueBinding,
} from "@amqp-contract/contract";
import { Future, Result } from "@swan-io/boxed";
// Define contract with DLX
const dlxExchange = defineExchange("orders-dlx", "topic", { durable: true });
const ordersQueue = defineQueue("orders", {
durable: true,
deadLetter: {
exchange: dlxExchange,
routingKey: "orders.failed",
},
});
const dlq = defineQueue("orders-dlq", { durable: true });
const contract = defineContract({
exchanges: {
main: mainExchange,
dlx: dlxExchange,
},
queues: {
orders: ordersQueue,
ordersDlq: dlq,
},
bindings: {
mainBinding: defineQueueBinding(ordersQueue, mainExchange, {
routingKey: "order.#",
}),
dlqBinding: defineQueueBinding(dlq, dlxExchange, {
routingKey: "orders.failed",
}),
},
consumers: {
processOrder: defineConsumer(ordersQueue, orderMessage),
},
// ... rest of contract
});
// Create worker with per-consumer retry configuration
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: [
({ payload }) => {
// Validate before processing (don't retry validation errors)
if (!payload.amount || payload.amount <= 0) {
return Future.value(Result.Error(new NonRetryableError("Invalid order amount")));
}
// Process with external service (retry on failure)
return Future.fromPromise(
Promise.all([
paymentService.charge(payload),
inventoryService.reserve(payload),
notificationService.send(payload),
]),
)
.mapOk(() => undefined)
.mapError((error) => new RetryableError("Order processing failed", error));
},
{
retry: {
maxRetries: 3,
initialDelayMs: 1000,
maxDelayMs: 30000,
backoffMultiplier: 2,
jitter: true,
},
},
],
},
urls: ["amqp://localhost"],
}).resultToPromise();
console.log("✅ Worker ready with retry enabled!");Best Practices
- Handle Errors - Always wrap business logic in try-catch
- Use Prefetch - Limit concurrent messages with
prefetchoption to control memory usage - Batch for Throughput - Use batch processing for bulk operations (database inserts, API calls)
- Graceful Shutdown - Properly close connections to finish processing in-flight messages
- Idempotency - Handlers should be safe to retry since messages may be redelivered
- Dead Letters - Configure DLQ for failed messages to avoid infinite retry loops
Next Steps
- Learn about Client Usage
- Explore Defining Contracts
- Check out Examples