@amqp-contract/worker-nestjs
@amqp-contract/worker-nestjs
Classes
AmqpWorkerModule
Defined in: worker-nestjs/src/worker.module.ts:88
NestJS module for AMQP worker integration This module provides type-safe AMQP worker functionality using @amqp-contract/worker without relying on NestJS decorators (except for dependency injection)
Type Param
The contract definition type for type-safe handlers
Example
// Synchronous configuration
@Module({
imports: [
AmqpWorkerModule.forRoot({
contract: myContract,
handlers: {
processOrder: async (message) => {
// message is fully typed based on the contract
console.log('Order:', message.orderId);
}
},
urls: ['amqp://localhost']
})
]
})
export class AppModule {}
// Asynchronous configuration
@Module({
imports: [
AmqpWorkerModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
contract: myContract,
handlers: {
processOrder: async (message) => {
console.log('Order:', message.orderId);
}
},
urls: configService.get('AMQP_URLS')
}),
inject: [ConfigService]
})
]
})
export class AppModule {}Constructors
Constructor
new AmqpWorkerModule(): AmqpWorkerModule;Returns
Methods
forRoot()
static forRoot<TContract>(options): DynamicModule;Defined in: worker-nestjs/src/worker.module.ts:95
Register the AMQP worker module with synchronous configuration
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
Parameters
| Parameter | Type | Description |
|---|---|---|
options | AmqpWorkerModuleOptions<TContract> | The worker configuration options with contract and handlers |
Returns
DynamicModule
A dynamic module for NestJS
forRootAsync()
static forRootAsync<TContract>(options): DynamicModule;Defined in: worker-nestjs/src/worker.module.ts:117
Register the AMQP worker module with asynchronous configuration
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
Parameters
| Parameter | Type | Description |
|---|---|---|
options | AmqpWorkerModuleAsyncOptions<TContract> | Async configuration options with factory function |
Returns
DynamicModule
A dynamic module for NestJS
AmqpWorkerService
Defined in: worker-nestjs/src/worker.service.ts:74
Type-safe AMQP worker service for NestJS applications.
This service wraps TypedAmqpWorker and integrates it with the NestJS lifecycle, automatically starting message consumption on module init and cleaning up resources on module destroy.
Example
// In your module
import { AmqpWorkerModule } from '@amqp-contract/worker-nestjs';
@Module({
imports: [
AmqpWorkerModule.forRoot({
contract: myContract,
handlers: {
processOrder: async (message) => {
console.log('Received order:', message.orderId);
// Process the order...
}
},
urls: ['amqp://localhost']
})
]
})
export class AppModule {}
// The worker automatically starts consuming messages when the module initializes
// and stops gracefully when the application shuts downType Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
Implements
OnModuleInitOnModuleDestroy
Constructors
Constructor
new AmqpWorkerService<TContract>(options): AmqpWorkerService<TContract>;Defined in: worker-nestjs/src/worker.service.ts:79
Parameters
| Parameter | Type |
|---|---|
options | AmqpWorkerModuleOptions<TContract> |
Returns
AmqpWorkerService<TContract>
Methods
onModuleDestroy()
onModuleDestroy(): Promise<void>;Defined in: worker-nestjs/src/worker.service.ts:105
Close the AMQP worker when the NestJS module is destroyed.
This lifecycle hook ensures proper cleanup of resources when the NestJS application shuts down, gracefully stopping message consumption and closing the connection.
Returns
Promise<void>
Implementation of
OnModuleDestroy.onModuleDestroyonModuleInit()
onModuleInit(): Promise<void>;Defined in: worker-nestjs/src/worker.service.ts:94
Initialize the AMQP worker when the NestJS module starts.
This lifecycle hook automatically creates and starts the worker, beginning message consumption from all configured consumers. The connection will be established in the background with automatic reconnection handling.
Returns
Promise<void>
Throws
Error if the worker fails to start
Implementation of
OnModuleInit.onModuleInitType Aliases
AmqpWorkerModuleAsyncOptions
type AmqpWorkerModuleAsyncOptions<TContract> = object;Defined in: worker-nestjs/src/worker.module.ts:22
Options for async module configuration using factory pattern
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
Properties
| Property | Type | Description | Defined in |
|---|---|---|---|
imports? | ModuleMetadata["imports"] | Optional list of imported modules that export providers needed by the factory | worker-nestjs/src/worker.module.ts:37 |
inject? | (string | symbol | Type<unknown>)[] | Optional dependencies to inject into the factory function. Can be a token (string/symbol) a class or a reference to a provider. | worker-nestjs/src/worker.module.ts:33 |
useFactory | (...args) => AmqpWorkerModuleOptionsFactory<TContract> | Factory function that returns the module options. Can use injected dependencies to create configuration. | worker-nestjs/src/worker.module.ts:28 |
AmqpWorkerModuleOptions
type AmqpWorkerModuleOptions<TContract> = object;Defined in: worker-nestjs/src/worker.service.ts:28
Configuration options for the AMQP worker NestJS module.
Example
const options: AmqpWorkerModuleOptions<typeof contract> = {
contract: myContract,
handlers: {
processOrder: async (message) => {
console.log('Processing order:', message.orderId);
}
},
urls: ['amqp://localhost'],
connectionOptions: {
heartbeatIntervalInSeconds: 30
}
};Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
Properties
| Property | Type | Description | Defined in |
|---|---|---|---|
connectionOptions? | AmqpConnectionManagerOptions | Optional connection configuration (heartbeat, reconnect settings, etc.) | worker-nestjs/src/worker.service.ts:36 |
contract | TContract | The AMQP contract definition specifying consumers and their message schemas | worker-nestjs/src/worker.service.ts:30 |
handlers | WorkerInferConsumerHandlers<TContract> | Message handlers for each consumer defined in the contract | worker-nestjs/src/worker.service.ts:32 |
urls | ConnectionUrl[] | AMQP broker URL(s). Multiple URLs provide failover support | worker-nestjs/src/worker.service.ts:34 |
WorkerInferConsumerHandlers
type WorkerInferConsumerHandlers<TContract> = { [K in InferConsumerNames<TContract>]: WorkerInferConsumerHandlerEntry<TContract, K> };Defined in: worker/dist/index.d.mts:84
Infer all consumer handlers for a contract. Handlers can be either single-message handlers, batch handlers, or a tuple of [handler, options].
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
Variables
MODULE_OPTIONS_TOKEN
const MODULE_OPTIONS_TOKEN: typeof MODULE_OPTIONS_TOKEN;Defined in: worker-nestjs/src/worker.module-definition.ts:5
Injection token for AMQP worker module options Used by NestJS DI system to inject configuration into AmqpWorkerService
Functions
defineHandler()
Call Signature
function defineHandler<TContract, TName>(
contract,
consumerName,
handler): WorkerInferConsumerHandlerEntry<TContract, TName>;Defined in: worker/dist/index.d.mts:323
Define a type-safe handler for a specific consumer in a contract.
This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.
Supports three patterns:
- Simple handler: just the function (single message handler)
- Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
- Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)
Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer |
consumerName | TName | The name of the consumer from the contract |
handler | WorkerInferConsumerHandler<TContract, TName> | The async handler function that processes messages (single or batch) |
Returns
WorkerInferConsumerHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Example
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';
// Simple single-message handler without options
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
async (message) => {
console.log('Processing order:', message.orderId);
await processPayment(message);
}
);
// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
orderContract,
'processOrder',
async (message) => {
await processOrder(message);
},
{ prefetch: 10 }
);
// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
orderContract,
'processOrders',
async (messages) => {
// messages is an array - batchSize configuration is REQUIRED
await db.insertMany(messages);
},
{ batchSize: 5, batchTimeout: 1000 }
);Call Signature
function defineHandler<TContract, TName>(
contract,
consumerName,
handler,
options): WorkerInferConsumerHandlerEntry<TContract, TName>;Defined in: worker/dist/index.d.mts:324
Define a type-safe handler for a specific consumer in a contract.
This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.
Supports three patterns:
- Simple handler: just the function (single message handler)
- Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
- Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)
Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer |
consumerName | TName | The name of the consumer from the contract |
handler | WorkerInferConsumerHandler<TContract, TName> | The async handler function that processes messages (single or batch) |
options | { batchSize?: undefined; batchTimeout?: undefined; prefetch?: number; } | Optional consumer options (prefetch, batchSize, batchTimeout) - For single-message handlers: { prefetch?: number } is optional - For batch handlers: { batchSize: number, batchTimeout?: number } is REQUIRED |
options.batchSize? | undefined | - |
options.batchTimeout? | undefined | - |
options.prefetch? | number | - |
Returns
WorkerInferConsumerHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Example
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';
// Simple single-message handler without options
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
async (message) => {
console.log('Processing order:', message.orderId);
await processPayment(message);
}
);
// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
orderContract,
'processOrder',
async (message) => {
await processOrder(message);
},
{ prefetch: 10 }
);
// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
orderContract,
'processOrders',
async (messages) => {
// messages is an array - batchSize configuration is REQUIRED
await db.insertMany(messages);
},
{ batchSize: 5, batchTimeout: 1000 }
);Call Signature
function defineHandler<TContract, TName>(
contract,
consumerName,
handler,
options): WorkerInferConsumerHandlerEntry<TContract, TName>;Defined in: worker/dist/index.d.mts:329
Define a type-safe handler for a specific consumer in a contract.
This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.
Supports three patterns:
- Simple handler: just the function (single message handler)
- Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
- Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)
Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer |
consumerName | TName | The name of the consumer from the contract |
handler | WorkerInferConsumerBatchHandler<TContract, TName> | The async handler function that processes messages (single or batch) |
options | { batchSize: number; batchTimeout?: number; prefetch?: number; } | Optional consumer options (prefetch, batchSize, batchTimeout) - For single-message handlers: { prefetch?: number } is optional - For batch handlers: { batchSize: number, batchTimeout?: number } is REQUIRED |
options.batchSize | number | - |
options.batchTimeout? | number | - |
options.prefetch? | number | - |
Returns
WorkerInferConsumerHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Example
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';
// Simple single-message handler without options
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
async (message) => {
console.log('Processing order:', message.orderId);
await processPayment(message);
}
);
// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
orderContract,
'processOrder',
async (message) => {
await processOrder(message);
},
{ prefetch: 10 }
);
// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
orderContract,
'processOrders',
async (messages) => {
// messages is an array - batchSize configuration is REQUIRED
await db.insertMany(messages);
},
{ batchSize: 5, batchTimeout: 1000 }
);defineHandlers()
function defineHandlers<TContract>(contract, handlers): WorkerInferConsumerHandlers<TContract>;Defined in: worker/dist/index.d.mts:390
Define multiple type-safe handlers for consumers in a contract.
This utility allows you to define all handlers at once outside of the worker creation, ensuring type safety and providing better code organization.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumers |
handlers | WorkerInferConsumerHandlers<TContract> | An object with async handler functions for each consumer |
Returns
WorkerInferConsumerHandlers<TContract>
A type-safe handlers object that can be used with TypedAmqpWorker
Examples
import { defineHandlers } from '@amqp-contract/worker';
import { orderContract } from './contract';
// Define all handlers at once
const handlers = defineHandlers(orderContract, {
processOrder: async (message) => {
// message is fully typed based on the contract
console.log('Processing order:', message.orderId);
await processPayment(message);
},
notifyOrder: async (message) => {
await sendNotification(message);
},
shipOrder: async (message) => {
await prepareShipment(message);
},
});
// Use the handlers in worker
const worker = await TypedAmqpWorker.create({
contract: orderContract,
handlers,
connection: 'amqp://localhost',
});// Separate handler definitions for better organization
async function handleProcessOrder(message: WorkerInferConsumerInput<typeof orderContract, 'processOrder'>) {
await processOrder(message);
}
async function handleNotifyOrder(message: WorkerInferConsumerInput<typeof orderContract, 'notifyOrder'>) {
await sendNotification(message);
}
const handlers = defineHandlers(orderContract, {
processOrder: handleProcessOrder,
notifyOrder: handleNotifyOrder,
});