NestJS Worker Usage
Learn how to integrate the type-safe AMQP worker with NestJS applications for consuming messages with automatic lifecycle management.
Why Use NestJS Integration?
The @amqp-contract/worker-nestjs package provides seamless integration with NestJS:
- ✅ Automatic lifecycle management - Worker starts and stops with your application
- ✅ Dependency injection - Use NestJS services in your message handlers
- ✅ NestJS conventions - Follows standard module configuration patterns
- ✅ Graceful shutdown - Properly handles application shutdown hooks
- ✅ Type safety - Full TypeScript support with contract-based types
Installation
Install the required packages:
pnpm add @amqp-contract/worker-nestjs @amqp-contract/worker @amqp-contract/contract amqplibnpm install @amqp-contract/worker-nestjs @amqp-contract/worker @amqp-contract/contract amqplibyarn add @amqp-contract/worker-nestjs @amqp-contract/worker @amqp-contract/contract amqplibQuick Start
1. Define Your Contract
First, define your AMQP contract with consumers:
// contract.ts
import {
defineContract,
defineExchange,
defineQueue,
defineCommandConsumer,
defineMessage,
} from "@amqp-contract/contract";
import { z } from "zod";
// Define resources and messages
const ordersExchange = defineExchange("orders", "topic", { durable: true });
const orderProcessingQueue = defineQueue("order-processing", { durable: true });
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
customerId: z.string(),
amount: z.number(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number(),
price: z.number(),
}),
),
}),
);
// Command pattern: consumer owns the queue
const processOrderCommand = defineCommandConsumer(
orderProcessingQueue,
ordersExchange,
orderMessage,
{ routingKey: "order.created" },
);
// Compose contract - configs go directly, bindings auto-generated
export const contract = defineContract({
exchanges: { orders: ordersExchange },
queues: { orderProcessing: orderProcessingQueue },
consumers: {
// CommandConsumerConfig → auto-extracted to consumer + binding
processOrder: processOrderCommand,
},
});2. Configure the Module
Import and configure the worker module:
// app.module.ts
import { Module } from "@nestjs/common";
import { AmqpWorkerModule } from "@amqp-contract/worker-nestjs";
import { Future, Result } from "@swan-io/boxed";
import { contract } from "./contract";
@Module({
imports: [
AmqpWorkerModule.forRoot({
contract,
handlers: {
processOrder: ({ payload }) => {
console.log(`Processing order: ${payload.orderId}`);
console.log(`Customer: ${payload.customerId}`);
console.log(`Amount: $${payload.amount}`);
// Your business logic here
return Future.value(Result.Ok(undefined));
},
},
urls: ["amqp://localhost"],
}),
],
})
export class AppModule {}3. Start Your Application
// main.ts
import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module";
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// Enable graceful shutdown
app.enableShutdownHooks();
await app.listen(3000);
console.log("Worker is consuming messages");
}
bootstrap();That's it! The worker automatically starts consuming messages when the application starts.
Using Services in Handlers
To use NestJS services in your handlers, use forRootAsync with dependency injection:
// order.service.ts
import { Injectable } from "@nestjs/common";
@Injectable()
export class OrderService {
async saveOrder(payload: { orderId: string }) {
console.log("Saving order to database:", payload.orderId);
// Save to database
}
async sendConfirmation(customerId: string) {
console.log("Sending confirmation to:", customerId);
// Send email
}
}// app.module.ts
import { Module } from "@nestjs/common";
import { AmqpWorkerModule } from "@amqp-contract/worker-nestjs";
import { RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
import { contract } from "./contract";
import { OrderService } from "./order.service";
@Module({
imports: [
AmqpWorkerModule.forRootAsync({
useFactory: (orderService: OrderService) => ({
contract,
handlers: {
processOrder: ({ payload }) =>
Future.fromPromise(
Promise.all([
orderService.saveOrder(payload),
orderService.sendConfirmation(payload.customerId),
]),
)
.mapOk(() => undefined) // Success - message will be acknowledged automatically
.mapError((error) => new RetryableError("Order processing failed", error)),
},
urls: ["amqp://localhost"],
}),
inject: [OrderService],
}),
],
providers: [OrderService],
})
export class AppModule {}Configuration with Environment Variables
Use @nestjs/config for environment-based configuration:
import { Module } from "@nestjs/common";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { AmqpWorkerModule } from "@amqp-contract/worker-nestjs";
import { RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
import { contract } from "./contract";
import { OrderService } from "./order.service";
@Module({
imports: [
ConfigModule.forRoot(),
AmqpWorkerModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService, orderService: OrderService) => ({
contract,
handlers: {
processOrder: ({ payload }) =>
Future.fromPromise(orderService.saveOrder(payload))
.mapOk(() => undefined) // Success - message will be acknowledged automatically
.mapError((error) => new RetryableError("Order processing failed", error)),
},
urls: [configService.get("RABBITMQ_URL") ?? "amqp://localhost"],
}),
inject: [ConfigService, OrderService],
}),
],
providers: [OrderService],
})
export class AppModule {}Then set the environment variable:
RABBITMQ_URL=amqp://user:pass@rabbitmq-server:5672Message Acknowledgment
Messages are automatically acknowledged based on handler completion:
- Success: Message is acknowledged when handler completes without error
- Retryable Error: Throw
RetryableErrorto retry the message - Non-Retryable Error: Throw
NonRetryableErrorto send to DLQ
import { RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
AmqpWorkerModule.forRoot({
contract,
handlers: {
processOrder: ({ payload }) => {
console.log("Processing:", payload.orderId);
// Success - message is automatically acknowledged
// For retryable errors, return Result.Error(new RetryableError(...))
// For permanent failures, return Result.Error(new NonRetryableError(...))
return Future.value(Result.Ok(undefined));
},
},
urls: ["amqp://localhost"],
});Error-Based Retry Control
Use typed errors for explicit retry control:
import { RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";
AmqpWorkerModule.forRoot({
contract,
handlers: {
processOrder: ({ payload }) =>
Future.fromPromise(processOrder(payload))
.mapOk(() => undefined) // Success - message acknowledged
.mapError((error) => {
if (isTemporaryFailure(error)) {
return new RetryableError("Temporary failure, will retry");
} else {
return new NonRetryableError("Permanent failure, send to DLQ");
}
}),
},
urls: ["amqp://localhost"],
});Error Handling
Service-Level Error Handling
Handle errors in your service layer:
@Injectable()
export class OrderService {
async processOrder(order: any) {
try {
await this.validateOrder(order);
await this.saveOrder(order);
await this.notifyCustomer(order.customerId);
} catch (error) {
console.error("Order processing failed:", error);
throw error; // Let handler decide what to do
}
}
}Handler-Level Error Handling
import { RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
AmqpWorkerModule.forRootAsync({
useFactory: (orderService: OrderService) => ({
contract,
handlers: {
processOrder: ({ payload }) =>
Future.fromPromise(orderService.processOrder(payload))
.mapOk(() => undefined) // Success - message acknowledged automatically
.mapError((error) => {
console.error("Handler error:", error);
if (error instanceof ValidationError) {
// Don't retry validation errors
return new NonRetryableError("Validation failed");
} else if (error instanceof TemporaryError) {
// Retry temporary errors
return new RetryableError("Temporary failure");
} else {
// Unknown error, don't retry by default
return new NonRetryableError("Unknown error");
}
}),
},
urls: ["amqp://localhost"],
}),
inject: [OrderService],
});Structured Logging
Use NestJS's built-in logger:
import { Injectable, Logger } from "@nestjs/common";
@Injectable()
export class OrderService {
private readonly logger = new Logger(OrderService.name);
async processOrder(order: any) {
this.logger.log(`Processing order ${order.orderId}`);
try {
await this.saveOrder(order);
this.logger.log(`Order ${order.orderId} processed successfully`);
} catch (error) {
this.logger.error(`Failed to process order ${order.orderId}`, error.stack);
throw error;
}
}
}Multiple Workers
Create separate modules for different domains:
import { Future, Result } from "@swan-io/boxed";
// order-worker.module.ts
@Module({
imports: [
AmqpWorkerModule.forRoot({
contract: orderContract,
handlers: {
processOrder: ({ payload }) => {
// Handle order processing
return Future.value(Result.Ok(undefined));
},
},
urls: ["amqp://localhost"],
}),
],
})
export class OrderWorkerModule {}
// payment-worker.module.ts
@Module({
imports: [
AmqpWorkerModule.forRoot({
contract: paymentContract,
handlers: {
processPayment: ({ payload }) => {
// Handle payment processing
return Future.value(Result.Ok(undefined));
},
},
urls: ["amqp://localhost"],
}),
],
})
export class PaymentWorkerModule {}
// app.module.ts
@Module({
imports: [OrderWorkerModule, PaymentWorkerModule],
})
export class AppModule {}Testing
Unit Testing Handlers
Test your handler logic by testing the services they use:
// order.service.spec.ts
import { Test, TestingModule } from "@nestjs/testing";
import { OrderService } from "./order.service";
describe("OrderService", () => {
let service: OrderService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [OrderService],
}).compile();
service = module.get<OrderService>(OrderService);
});
it("should process order successfully", async () => {
const order = {
orderId: "ORD-123",
customerId: "CUST-456",
amount: 99.99,
items: [],
};
await expect(service.processOrder(order)).resolves.toBeUndefined();
});
});Integration Testing
Test the full worker integration:
// app.e2e-spec.ts
import { Test, TestingModule } from "@nestjs/testing";
import { INestApplication } from "@nestjs/common";
import { connect, Connection } from "amqplib";
import { AppModule } from "../src/app.module";
describe("AMQP Worker (e2e)", () => {
let app: INestApplication;
let connection: Connection;
beforeAll(async () => {
connection = await connect("amqp://localhost");
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = moduleFixture.createNestApplication();
await app.init();
});
afterAll(async () => {
await app.close();
await connection.close();
});
it("should consume and process messages", async () => {
// Send a test message
const channel = await connection.createChannel();
await channel.publish(
"orders",
"order.created",
Buffer.from(
JSON.stringify({
orderId: "ORD-TEST-123",
customerId: "CUST-TEST-456",
amount: 99.99,
items: [],
}),
),
);
// Wait for processing
await new Promise((resolve) => setTimeout(resolve, 1000));
// Verify processing (check database, logs, etc.)
// ...
});
});Best Practices
- Use forRootAsync - Always use
forRootAsyncfor dependency injection and configuration - Inject Services - Inject services into handlers through the factory pattern
- Error Handling - Implement comprehensive error handling with appropriate acknowledgment
- Logging - Use structured logging with context (order IDs, customer IDs, etc.)
- Idempotency - Ensure handlers can safely retry (messages may be delivered more than once)
- Validation - Rely on contract schemas for validation, but add business logic validation
- Dead Letter Queues - Configure DLQs for messages that consistently fail
- Monitoring - Track message processing metrics and errors
- Graceful Shutdown - Always call
app.enableShutdownHooks()in main.ts - Testing - Test service logic separately from worker infrastructure
Complete Example
See a full working example:
import {
defineContract,
defineExchange,
defineQueue,
defineQueueBinding,
defineConsumer,
defineMessage,
} from "@amqp-contract/contract";
import { z } from "zod";
const ordersExchange = defineExchange("orders", "topic", { durable: true });
const orderProcessingQueue = defineQueue("order-processing", {
durable: true,
arguments: {
"x-dead-letter-exchange": "orders-dlx",
},
});
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
customerId: z.string(),
amount: z.number().positive(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number().int().positive(),
price: z.number().positive(),
}),
),
}),
);
export const contract = defineContract({
exchanges: { orders: ordersExchange },
queues: { orderProcessing: orderProcessingQueue },
bindings: {
orderBinding: defineQueueBinding(orderProcessingQueue, ordersExchange, {
routingKey: "order.created",
}),
},
consumers: {
processOrder: defineConsumer(orderProcessingQueue, orderMessage),
},
});import { Injectable, Logger } from "@nestjs/common";
// Define custom error classes for type-safe error handling
export class BusinessRuleError extends Error {
constructor(message: string) {
super(message);
this.name = "BusinessRuleError";
}
}
export class TemporaryError extends Error {
constructor(message: string) {
super(message);
this.name = "TemporaryError";
}
}
interface OrderPayload {
orderId: string;
customerId: string;
amount: number;
items: Array<{ productId: string; quantity: number; price: number }>;
}
@Injectable()
export class OrderService {
private readonly logger = new Logger(OrderService.name);
async processOrder(payload: OrderPayload) {
this.logger.log(`Processing order ${payload.orderId}`);
try {
// Validate business rules
this.validateOrder(payload);
// Save to database
await this.saveToDatabase(payload);
// Send confirmation
await this.sendConfirmation(payload.customerId);
this.logger.log(`Order ${payload.orderId} processed successfully`);
} catch (error) {
const stack = error instanceof Error ? error.stack : undefined;
this.logger.error(`Failed to process order ${payload.orderId}`, stack);
throw error;
}
}
private validateOrder(payload: OrderPayload) {
if (payload.amount <= 0) {
throw new BusinessRuleError("Amount must be positive");
}
if (payload.items.length === 0) {
throw new BusinessRuleError("Order must have at least one item");
}
}
private async saveToDatabase(payload: OrderPayload) {
// Save to database
this.logger.debug(`Saving order ${payload.orderId} to database`);
}
private async sendConfirmation(customerId: string) {
// Send email
this.logger.debug(`Sending confirmation to ${customerId}`);
}
}import { Module } from "@nestjs/common";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { AmqpWorkerModule } from "@amqp-contract/worker-nestjs";
import { RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";
import { contract } from "./contract";
import { OrderService, BusinessRuleError, TemporaryError } from "./order.service";
@Module({
imports: [
ConfigModule.forRoot(),
AmqpWorkerModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService, orderService: OrderService) => ({
contract,
handlers: {
processOrder: ({ payload }) =>
Future.fromPromise(orderService.processOrder(payload))
.mapOk(() => undefined) // Success - message acknowledged automatically
.mapError((error) => {
// Use custom error classes with instanceof for type-safe error handling
// BusinessRuleError and TemporaryError are custom error classes defined in OrderService
if (error instanceof BusinessRuleError) {
// Business rule violation, don't retry
return new NonRetryableError("Business rule violation");
} else if (error instanceof TemporaryError) {
// Temporary error, retry
return new RetryableError("Temporary error, will retry");
} else {
// Unknown error, don't retry by default
return new NonRetryableError("Unknown error");
}
}),
},
urls: [configService.get("RABBITMQ_URL") ?? "amqp://localhost"],
}),
inject: [ConfigService, OrderService],
}),
],
providers: [OrderService],
})
export class AppModule {}import { NestFactory } from "@nestjs/core";
import { Logger } from "@nestjs/common";
import { AppModule } from "./app.module";
async function bootstrap() {
const logger = new Logger("Bootstrap");
const app = await NestFactory.create(AppModule);
// Enable graceful shutdown
app.enableShutdownHooks();
await app.listen(3000);
logger.log("Application is running on: http://localhost:3000");
logger.log("Worker is consuming messages from RabbitMQ");
}
bootstrap();Next Steps
- Learn about NestJS Client Usage for publishing messages
- Explore Worker API for core functionality
- See Worker NestJS API for detailed API reference
- Read about Defining Contracts