NestJS Client Usage
Learn how to integrate the type-safe AMQP client with NestJS applications for publishing messages with automatic lifecycle management.
Why Use NestJS Integration?
The @amqp-contract/client-nestjs package provides seamless integration with NestJS:
- ✅ Automatic lifecycle management - Client connects and disconnects with your application
- ✅ Dependency injection - Use the client service anywhere in your application
- ✅ NestJS conventions - Follows standard module configuration patterns
- ✅ Graceful shutdown - Properly handles application shutdown hooks
- ✅ Type safety - Full TypeScript support with contract-based types
Installation
Install the required packages:
pnpm add @amqp-contract/client-nestjs @amqp-contract/client @amqp-contract/contract amqplibnpm install @amqp-contract/client-nestjs @amqp-contract/client @amqp-contract/contract amqplibyarn add @amqp-contract/client-nestjs @amqp-contract/client @amqp-contract/contract amqplibQuick Start
1. Define Your Contract
First, define your AMQP contract with publishers:
// contract.ts
import {
defineContract,
defineExchange,
definePublisher,
defineMessage,
} from "@amqp-contract/contract";
import { z } from "zod";
// Define resources and messages
const ordersExchange = defineExchange("orders", "topic", { durable: true });
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
customerId: z.string(),
amount: z.number().positive(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number().int().positive(),
price: z.number().positive(),
}),
),
}),
);
export const contract = defineContract({
exchanges: { orders: ordersExchange },
publishers: {
orderCreated: definePublisher(ordersExchange, orderMessage, { routingKey: "order.created" }),
},
});2. Configure the Module
Import and configure the client module:
// app.module.ts
import { Module } from "@nestjs/common";
import { AmqpClientModule } from "@amqp-contract/client-nestjs";
import { contract } from "./contract";
@Module({
imports: [
AmqpClientModule.forRoot({
contract,
urls: ["amqp://localhost"],
}),
],
})
export class AppModule {}3. Use the Client Service
Inject and use the client service in your services or controllers:
// order.service.ts
import { Injectable } from "@nestjs/common";
import { AmqpClientService } from "@amqp-contract/client-nestjs";
import type { contract } from "./contract";
@Injectable()
export class OrderService {
constructor(private readonly client: AmqpClientService<typeof contract>) {}
async createOrder(customerId: string, amount: number, items: any[]) {
const orderId = this.generateOrderId();
const result = await this.client.publish("orderCreated", {
orderId,
customerId,
amount,
items,
});
result.match({
Ok: () => console.log(`Order ${orderId} published`),
Error: (error) => {
console.error("Failed to publish order:", error);
throw new Error(`Failed to publish order: ${error.message}`);
},
});
return { orderId };
}
private generateOrderId(): string {
return `ORD-${Date.now()}`;
}
}4. Use in Controllers
// order.controller.ts
import { Controller, Post, Body } from "@nestjs/common";
import { OrderService } from "./order.service";
interface CreateOrderDto {
customerId: string;
amount: number;
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
}
@Controller("orders")
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@Post()
async createOrder(@Body() dto: CreateOrderDto) {
return this.orderService.createOrder(dto.customerId, dto.amount, dto.items);
}
}That's it! The client automatically connects when the application starts and disconnects on shutdown.
Configuration with Environment Variables
Use @nestjs/config with registerAs and Zod for type-safe configuration:
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { AmqpClientModule } from "@amqp-contract/client-nestjs";
import { contract } from "./contract";
import { amqpConfig } from "./config/amqp.config";
@Module({
imports: [
ConfigModule.forRoot({
load: [amqpConfig],
}),
AmqpClientModule.forRootAsync({
imports: [ConfigModule],
useFactory: () => ({
contract,
urls: [amqpConfig().url],
}),
}),
],
})
export class AppModule {}Create a config file with Zod validation:
// config/amqp.config.ts
import { registerAs } from "@nestjs/config";
import { z } from "zod";
const amqpConfigSchema = z.object({
url: z.string().url().default("amqp://localhost"),
});
export const amqpConfig = registerAs("amqp", () => {
const config = amqpConfigSchema.parse({
url: process.env.RABBITMQ_URL,
});
return config;
});
export type AmqpConfig = z.infer<typeof amqpConfigSchema>;Then set the environment variable:
RABBITMQ_URL=amqp://user:pass@rabbitmq-server:5672Publishing Messages
Basic Publishing
@Injectable()
export class OrderService {
constructor(private readonly client: AmqpClientService<typeof contract>) {}
async createOrder(orderId: string, amount: number) {
const result = this.client.publish("orderCreated", {
orderId,
customerId: "CUST-123",
amount,
items: [],
});
result.match({
Ok: () => console.log("Order published successfully"),
Error: (error) => {
console.error("Failed to publish:", error);
throw new Error(`Publish failed: ${error.message}`);
},
});
}
}Publishing with Options
@Injectable()
export class OrderService {
constructor(private readonly client: AmqpClientService<typeof contract>) {}
async createUrgentOrder(orderId: string, amount: number) {
const result = this.client.publish(
"orderCreated",
{
orderId,
customerId: "CUST-123",
amount,
items: [],
},
{
routingKey: "order.created.urgent",
options: {
persistent: true,
priority: 10,
headers: {
"x-priority": "high",
"x-source": "api",
},
},
},
);
result.match({
Ok: () => {},
Error: (error) => {
throw new Error(`Failed to publish: ${error.message}`);
},
});
}
async createOrderWithTTL(orderId: string, amount: number) {
const result = this.client.publish(
"orderCreated",
{
orderId,
customerId: "CUST-123",
amount,
items: [],
},
{
options: {
persistent: true,
expiration: "60000", // 60 seconds
},
},
);
result.match({
Ok: () => {},
Error: (error) => {
throw new Error(`Failed to publish: ${error.message}`);
},
});
}
}Error Handling
The NestJS client service uses Result types for explicit error handling:
Basic Error Handling
import { Injectable } from "@nestjs/common";
import { AmqpClientService } from "@amqp-contract/client-nestjs";
import type { contract } from "./contract";
@Injectable()
export class OrderService {
constructor(private readonly client: AmqpClientService<typeof contract>) {}
async createOrder(orderId: string, amount: number) {
const result = this.client.publish("orderCreated", {
orderId,
customerId: "CUST-123",
amount,
items: [],
});
result.match({
Ok: () => console.log(`Order ${orderId} published successfully`),
Error: (error) => {
console.error("Failed to publish order:", error);
throw new Error(`Publish failed: ${error.message}`);
},
});
return { orderId };
}
}Structured Error Handling
import { Injectable, BadRequestException, InternalServerErrorException } from "@nestjs/common";
import { AmqpClientService } from "@amqp-contract/client-nestjs";
import { MessageValidationError, TechnicalError } from "@amqp-contract/client";
import { match, P } from "ts-pattern";
import type { contract } from "./contract";
@Injectable()
export class OrderService {
constructor(private readonly client: AmqpClientService<typeof contract>) {}
async createOrder(orderId: string, amount: number, items: any[]) {
const result = this.client.publish("orderCreated", {
orderId,
customerId: "CUST-123",
amount,
items,
});
result.match({
Ok: () => {},
Error: (error) =>
match(error)
.with(P.instanceOf(MessageValidationError), (err) => {
// Schema validation failed
throw new BadRequestException({
message: "Invalid order data",
issues: err.issues,
});
})
.with(P.instanceOf(TechnicalError), (err) => {
// Runtime/network error
throw new InternalServerErrorException({
message: "Failed to publish order",
cause: err.cause,
});
})
.exhaustive(),
});
return { orderId };
}
}Logging
Use NestJS's built-in logger for structured logging:
import { Injectable, Logger } from "@nestjs/common";
import { AmqpClientService } from "@amqp-contract/client-nestjs";
import type { contract } from "./contract";
@Injectable()
export class OrderService {
private readonly logger = new Logger(OrderService.name);
constructor(private readonly client: AmqpClientService<typeof contract>) {}
async createOrder(orderId: string, amount: number) {
this.logger.log(`Publishing order ${orderId}`);
const result = this.client.publish(
"orderCreated",
{
orderId,
customerId: "CUST-123",
amount,
items: [],
},
{
options: {
persistent: true,
headers: {
"x-timestamp": new Date().toISOString(),
},
},
},
);
result.match({
Ok: () => this.logger.log(`Order ${orderId} published successfully`),
Error: (error) => {
this.logger.error(`Failed to publish order ${orderId}`, error.message);
throw error;
},
});
return { orderId };
}
}Integration Patterns
oRPC Integration
Use oRPC for type-safe RPC that aligns with amqp-contract's contract-first philosophy:
// order.router.ts
import { initServer } from "@orpc/server";
import { z } from "zod";
import { AmqpClientService } from "@amqp-contract/client-nestjs";
import type { contract } from "./contract";
// Helper function to generate unique order IDs
function generateOrderId(): string {
return `ORD-${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
}
// Define oRPC router with type-safe schema
export const orderRouter = initServer.router({
createOrder: initServer
.input(
z.object({
customerId: z.string(),
amount: z.number().positive(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number().int().positive(),
price: z.number().positive(),
}),
),
}),
)
.output(
z.object({
orderId: z.string(),
message: z.string(),
}),
)
.handler(async ({ input, context }) => {
const client = context.client as AmqpClientService<typeof contract>;
const orderId = generateOrderId();
await client
.publish("orderCreated", {
orderId,
...input,
})
.mapError((error) => new Error(`Failed to publish order: ${error.message}`))
.resultToPromise();
return {
orderId,
message: "Order submitted for processing",
};
}),
});
// order.module.ts
import { Module } from "@nestjs/common";
import { AmqpClientModule } from "@amqp-contract/client-nestjs";
import { contract } from "./contract";
import { OrderController } from "./order.controller";
import { OrderService } from "./order.service";
@Module({
imports: [
AmqpClientModule.forRoot({
contract,
urls: ["amqp://localhost"],
}),
],
controllers: [OrderController],
providers: [OrderService],
})
export class OrderModule {}
// order.controller.ts - Expose oRPC router as NestJS controller
import { Controller, All, Req, Res } from "@nestjs/common";
import { Request, Response } from "express";
import { createServerAdapter } from "@orpc/server/node";
import { AmqpClientService } from "@amqp-contract/client-nestjs";
import { orderRouter } from "./order.router";
import type { contract } from "./contract";
@Controller("orders")
export class OrderController {
private handler: ReturnType<typeof createServerAdapter>;
constructor(private readonly client: AmqpClientService<typeof contract>) {
this.handler = createServerAdapter({
router: orderRouter,
context: { client: this.client },
});
}
@All("*")
async handleRpc(@Req() req: Request, @Res() res: Response) {
return this.handler(req, res);
}
}This pattern provides:
- End-to-end type safety from client to server
- Contract-first design similar to amqp-contract
- Automatic validation with shared schemas
- Better DX with full TypeScript inference
Event Publishing Service
Create a dedicated event publishing service:
// order-event.service.ts
import { Injectable, Logger } from "@nestjs/common";
import { AmqpClientService } from "@amqp-contract/client-nestjs";
import type { contract } from "./contract";
@Injectable()
export class OrderEventService {
private readonly logger = new Logger(OrderEventService.name);
constructor(private readonly client: AmqpClientService<typeof contract>) {}
async publishOrderCreated(order: any) {
this.logger.log(`Publishing OrderCreated event for ${order.orderId}`);
await this.client
.publish("orderCreated", order, {
persistent: true,
headers: {
"event-type": "OrderCreated",
"event-version": "1.0",
"aggregate-id": order.orderId,
timestamp: new Date().toISOString(),
},
})
.resultToPromise();
}
async publishOrderUpdated(order: any) {
this.logger.log(`Publishing OrderUpdated event for ${order.orderId}`);
await this.client
.publish("orderUpdated", order, {
persistent: true,
headers: {
"event-type": "OrderUpdated",
"event-version": "1.0",
"aggregate-id": order.orderId,
timestamp: new Date().toISOString(),
},
})
.resultToPromise();
}
async publishOrderCancelled(orderId: string) {
this.logger.log(`Publishing OrderCancelled event for ${orderId}`);
await this.client
.publish(
"orderCancelled",
{ orderId },
{
persistent: true,
headers: {
"event-type": "OrderCancelled",
"event-version": "1.0",
"aggregate-id": orderId,
timestamp: new Date().toISOString(),
},
},
)
.resultToPromise();
}
}Multiple Clients
Use multiple clients for different domains:
// order.module.ts
@Module({
imports: [
AmqpClientModule.forRoot({
contract: orderContract,
urls: ["amqp://localhost"],
}),
],
providers: [OrderService, OrderController],
exports: [OrderService],
})
export class OrderModule {}
// payment.module.ts
@Module({
imports: [
AmqpClientModule.forRoot({
contract: paymentContract,
urls: ["amqp://localhost"],
}),
],
providers: [PaymentService, PaymentController],
exports: [PaymentService],
})
export class PaymentModule {}
// app.module.ts
@Module({
imports: [OrderModule, PaymentModule],
})
export class AppModule {}Testing
Unit Testing with Mocks
// order.service.spec.ts
import { Test, TestingModule } from "@nestjs/testing";
import { AmqpClientService } from "@amqp-contract/client-nestjs";
import { OrderService } from "./order.service";
describe("OrderService", () => {
let service: OrderService;
let client: AmqpClientService<any>;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
OrderService,
{
provide: AmqpClientService,
useValue: {
publish: jest.fn(),
},
},
],
}).compile();
service = module.get<OrderService>(OrderService);
client = module.get<AmqpClientService<any>>(AmqpClientService);
});
it("should publish order created event", async () => {
const publishSpy = jest.spyOn(client, "publish").mockResolvedValue(true);
const result = await service.createOrder("CUST-123", 99.99, []);
expect(result).toHaveProperty("orderId");
expect(publishSpy).toHaveBeenCalledWith(
"orderCreated",
expect.objectContaining({
orderId: expect.any(String),
customerId: "CUST-123",
amount: 99.99,
items: [],
}),
);
});
it("should handle publishing errors", async () => {
jest.spyOn(client, "publish").mockRejectedValue(new Error("Connection failed"));
await expect(service.createOrder("CUST-123", 99.99, [])).rejects.toThrow("Connection failed");
});
});Integration Testing
// app.e2e-spec.ts
import { Test, TestingModule } from "@nestjs/testing";
import { INestApplication } from "@nestjs/common";
import * as request from "supertest";
import { connect, Connection, Channel } from "amqplib";
import { AppModule } from "../src/app.module";
describe("Order API (e2e)", () => {
let app: INestApplication;
let connection: Connection;
let channel: Channel;
beforeAll(async () => {
connection = await connect("amqp://localhost");
channel = await connection.createChannel();
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = moduleFixture.createNestApplication();
await app.init();
});
afterAll(async () => {
await channel.close();
await connection.close();
await app.close();
});
it("/orders (POST)", async () => {
// Setup consumer to verify message
const messages: any[] = [];
await channel.consume("order-processing", (msg) => {
if (msg) {
messages.push(JSON.parse(msg.content.toString()));
channel.ack(msg);
}
});
// Send HTTP request
const response = await request(app.getHttpServer())
.post("/orders")
.send({
customerId: "CUST-TEST-123",
amount: 99.99,
items: [],
})
.expect(202);
expect(response.body).toHaveProperty("orderId");
// Wait for message
await new Promise((resolve) => setTimeout(resolve, 1000));
// Verify message was published
expect(messages).toHaveLength(1);
expect(messages[0]).toMatchObject({
customerId: "CUST-TEST-123",
amount: 99.99,
});
});
});Best Practices
- Use forRootAsync - Always use
forRootAsyncfor configuration - Error Handling - Handle publishing errors appropriately
- Persistent Messages - Use
persistent: truefor important messages - Correlation IDs - Use
correlationIdto track related messages - Headers - Add metadata in headers for filtering and debugging
- Logging - Use structured logging with context
- Type Safety - Leverage TypeScript inference for compile-time safety
- Testing - Mock the client service in unit tests
- Graceful Shutdown - Call
app.enableShutdownHooks()in main.ts - Validation - Trust contract schemas, but add business validation
Complete Example
See a full working example:
import {
defineContract,
defineExchange,
definePublisher,
defineMessage,
} from "@amqp-contract/contract";
import { z } from "zod";
const ordersExchange = defineExchange("orders", "topic", { durable: true });
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
customerId: z.string(),
amount: z.number().positive(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number().int().positive(),
price: z.number().positive(),
}),
),
}),
);
export const contract = defineContract({
exchanges: { orders: ordersExchange },
publishers: {
orderCreated: definePublisher(ordersExchange, orderMessage, {
routingKey: "order.created",
}),
},
});import { Injectable, Logger, BadRequestException } from "@nestjs/common";
import { AmqpClientService } from "@amqp-contract/client-nestjs";
import type { contract } from "./contract";
@Injectable()
export class OrderService {
private readonly logger = new Logger(OrderService.name);
constructor(private readonly client: AmqpClientService<typeof contract>) {}
async createOrder(customerId: string, amount: number, items: any[]) {
const orderId = this.generateOrderId();
this.logger.log(`Creating order ${orderId} for customer ${customerId}`);
await this.client
.publish(
"orderCreated",
{
orderId,
customerId,
amount,
items,
},
{
persistent: true,
headers: {
"x-source": "order-service",
"x-timestamp": new Date().toISOString(),
"x-customer-id": customerId,
},
},
)
.mapError((error) => {
this.logger.error(`Failed to publish order ${orderId}`, error);
return new Error(`Failed to create order: ${error.message}`);
})
.resultToPromise();
this.logger.log(`Order ${orderId} published successfully`);
return { orderId };
}
private isValidationError(error: unknown): error is { issues: unknown } {
return typeof error === "object" && error !== null && "issues" in error;
}
private generateOrderId(): string {
return `ORD-${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
}
}import { Controller, Post, Body, HttpCode, HttpStatus } from "@nestjs/common";
import { OrderService } from "./order.service";
interface CreateOrderDto {
customerId: string;
amount: number;
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
}
@Controller("orders")
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@Post()
@HttpCode(HttpStatus.ACCEPTED)
async createOrder(@Body() dto: CreateOrderDto) {
const result = await this.orderService.createOrder(dto.customerId, dto.amount, dto.items);
return {
message: "Order submitted for processing",
...result,
};
}
}import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { AmqpClientModule } from "@amqp-contract/client-nestjs";
import { contract } from "./contract";
import { OrderService } from "./order.service";
import { OrderController } from "./order.controller";
import { amqpConfig } from "./config/amqp.config";
@Module({
imports: [
ConfigModule.forRoot({
load: [amqpConfig],
}),
AmqpClientModule.forRootAsync({
imports: [ConfigModule],
useFactory: () => ({
contract,
urls: [amqpConfig().url],
}),
}),
],
controllers: [OrderController],
providers: [OrderService],
})
export class AppModule {}import { NestFactory } from "@nestjs/core";
import { Logger, ValidationPipe } from "@nestjs/common";
import { AppModule } from "./app.module";
async function bootstrap() {
const logger = new Logger("Bootstrap");
const app = await NestFactory.create(AppModule);
// Enable validation
app.useGlobalPipes(new ValidationPipe());
// Enable graceful shutdown
app.enableShutdownHooks();
await app.listen(3000);
logger.log("Application is running on: http://localhost:3000");
logger.log("AMQP client connected and ready to publish");
}
bootstrap();Next Steps
- Learn about NestJS Worker Usage for consuming messages
- Explore Client API for core functionality
- See Client NestJS API for detailed API reference
- Read about Defining Contracts