Skip to content

@amqp-contract/worker-nestjs


@amqp-contract/worker-nestjs

Classes

AmqpWorkerModule

Defined in: worker-nestjs/src/worker.module.ts:88

NestJS module for AMQP worker integration This module provides type-safe AMQP worker functionality using @amqp-contract/worker without relying on NestJS decorators (except for dependency injection)

Type Param

The contract definition type for type-safe handlers

Example

typescript
// Synchronous configuration
@Module({
  imports: [
    AmqpWorkerModule.forRoot({
      contract: myContract,
      handlers: {
        processOrder: async (message) => {
          // message is fully typed based on the contract
          console.log('Order:', message.orderId);
        }
      },
      urls: ['amqp://localhost']
    })
  ]
})
export class AppModule {}

// Asynchronous configuration
@Module({
  imports: [
    AmqpWorkerModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({
        contract: myContract,
        handlers: {
          processOrder: async (message) => {
            console.log('Order:', message.orderId);
          }
        },
        urls: configService.get('AMQP_URLS')
      }),
      inject: [ConfigService]
    })
  ]
})
export class AppModule {}

Constructors

Constructor
ts
new AmqpWorkerModule(): AmqpWorkerModule;
Returns

AmqpWorkerModule

Methods

forRoot()
ts
static forRoot<TContract>(options): DynamicModule;

Defined in: worker-nestjs/src/worker.module.ts:95

Register the AMQP worker module with synchronous configuration

Type Parameters
Type Parameter
TContract extends ContractDefinition
Parameters
ParameterTypeDescription
optionsAmqpWorkerModuleOptions<TContract>The worker configuration options with contract and handlers
Returns

DynamicModule

A dynamic module for NestJS

forRootAsync()
ts
static forRootAsync<TContract>(options): DynamicModule;

Defined in: worker-nestjs/src/worker.module.ts:117

Register the AMQP worker module with asynchronous configuration

Type Parameters
Type Parameter
TContract extends ContractDefinition
Parameters
ParameterTypeDescription
optionsAmqpWorkerModuleAsyncOptions<TContract>Async configuration options with factory function
Returns

DynamicModule

A dynamic module for NestJS


AmqpWorkerService

Defined in: worker-nestjs/src/worker.service.ts:74

Type-safe AMQP worker service for NestJS applications.

This service wraps TypedAmqpWorker and integrates it with the NestJS lifecycle, automatically starting message consumption on module init and cleaning up resources on module destroy.

Example

typescript
// In your module
import { AmqpWorkerModule } from '@amqp-contract/worker-nestjs';

@Module({
  imports: [
    AmqpWorkerModule.forRoot({
      contract: myContract,
      handlers: {
        processOrder: async (message) => {
          console.log('Received order:', message.orderId);
          // Process the order...
        }
      },
      urls: ['amqp://localhost']
    })
  ]
})
export class AppModule {}

// The worker automatically starts consuming messages when the module initializes
// and stops gracefully when the application shuts down

Type Parameters

Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type

Implements

  • OnModuleInit
  • OnModuleDestroy

Constructors

Constructor
ts
new AmqpWorkerService<TContract>(options): AmqpWorkerService<TContract>;

Defined in: worker-nestjs/src/worker.service.ts:79

Parameters
ParameterType
optionsAmqpWorkerModuleOptions<TContract>
Returns

AmqpWorkerService<TContract>

Methods

onModuleDestroy()
ts
onModuleDestroy(): Promise<void>;

Defined in: worker-nestjs/src/worker.service.ts:105

Close the AMQP worker when the NestJS module is destroyed.

This lifecycle hook ensures proper cleanup of resources when the NestJS application shuts down, gracefully stopping message consumption and closing the connection.

Returns

Promise<void>

Implementation of
ts
OnModuleDestroy.onModuleDestroy
onModuleInit()
ts
onModuleInit(): Promise<void>;

Defined in: worker-nestjs/src/worker.service.ts:94

Initialize the AMQP worker when the NestJS module starts.

This lifecycle hook automatically creates and starts the worker, beginning message consumption from all configured consumers. The connection will be established in the background with automatic reconnection handling.

Returns

Promise<void>

Throws

Error if the worker fails to start

Implementation of
ts
OnModuleInit.onModuleInit

Type Aliases

AmqpWorkerModuleAsyncOptions

ts
type AmqpWorkerModuleAsyncOptions<TContract> = object;

Defined in: worker-nestjs/src/worker.module.ts:22

Options for async module configuration using factory pattern

Type Parameters

Type Parameter
TContract extends ContractDefinition

Properties

PropertyTypeDescriptionDefined in
imports?ModuleMetadata["imports"]Optional list of imported modules that export providers needed by the factoryworker-nestjs/src/worker.module.ts:37
inject?(string | symbol | Type<unknown>)[]Optional dependencies to inject into the factory function. Can be a token (string/symbol) a class or a reference to a provider.worker-nestjs/src/worker.module.ts:33
useFactory(...args) => AmqpWorkerModuleOptionsFactory<TContract>Factory function that returns the module options. Can use injected dependencies to create configuration.worker-nestjs/src/worker.module.ts:28

AmqpWorkerModuleOptions

ts
type AmqpWorkerModuleOptions<TContract> = object;

Defined in: worker-nestjs/src/worker.service.ts:28

Configuration options for the AMQP worker NestJS module.

Example

typescript
const options: AmqpWorkerModuleOptions<typeof contract> = {
  contract: myContract,
  handlers: {
    processOrder: async (message) => {
      console.log('Processing order:', message.orderId);
    }
  },
  urls: ['amqp://localhost'],
  connectionOptions: {
    heartbeatIntervalInSeconds: 30
  }
};

Type Parameters

Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type

Properties

PropertyTypeDescriptionDefined in
connectionOptions?AmqpConnectionManagerOptionsOptional connection configuration (heartbeat, reconnect settings, etc.)worker-nestjs/src/worker.service.ts:36
contractTContractThe AMQP contract definition specifying consumers and their message schemasworker-nestjs/src/worker.service.ts:30
handlersWorkerInferConsumerHandlers<TContract>Message handlers for each consumer defined in the contractworker-nestjs/src/worker.service.ts:32
urlsConnectionUrl[]AMQP broker URL(s). Multiple URLs provide failover supportworker-nestjs/src/worker.service.ts:34

WorkerInferConsumerHandlers

ts
type WorkerInferConsumerHandlers<TContract> = { [K in InferConsumerNames<TContract>]: WorkerInferConsumerHandlerEntry<TContract, K> };

Defined in: worker/dist/index.d.mts:84

Infer all consumer handlers for a contract. Handlers can be either single-message handlers, batch handlers, or a tuple of [handler, options].

Type Parameters

Type Parameter
TContract extends ContractDefinition

Variables

MODULE_OPTIONS_TOKEN

ts
const MODULE_OPTIONS_TOKEN: typeof MODULE_OPTIONS_TOKEN;

Defined in: worker-nestjs/src/worker.module-definition.ts:5

Injection token for AMQP worker module options Used by NestJS DI system to inject configuration into AmqpWorkerService

Functions

defineHandler()

Call Signature

ts
function defineHandler<TContract, TName>(
   contract, 
   consumerName, 
handler): WorkerInferConsumerHandlerEntry<TContract, TName>;

Defined in: worker/dist/index.d.mts:323

Define a type-safe handler for a specific consumer in a contract.

This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.

Supports three patterns:

  1. Simple handler: just the function (single message handler)
  2. Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
  3. Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)

Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.

Type Parameters
Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type
TName extends string | number | symbolThe consumer name from the contract
Parameters
ParameterTypeDescription
contractTContractThe contract definition containing the consumer
consumerNameTNameThe name of the consumer from the contract
handlerWorkerInferConsumerHandler<TContract, TName>The async handler function that processes messages (single or batch)
Returns

WorkerInferConsumerHandlerEntry<TContract, TName>

A type-safe handler that can be used with TypedAmqpWorker

Example
typescript
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';

// Simple single-message handler without options
const processOrderHandler = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    console.log('Processing order:', message.orderId);
    await processPayment(message);
  }
);

// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    await processOrder(message);
  },
  { prefetch: 10 }
);

// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
  orderContract,
  'processOrders',
  async (messages) => {
    // messages is an array - batchSize configuration is REQUIRED
    await db.insertMany(messages);
  },
  { batchSize: 5, batchTimeout: 1000 }
);

Call Signature

ts
function defineHandler<TContract, TName>(
   contract, 
   consumerName, 
   handler, 
options): WorkerInferConsumerHandlerEntry<TContract, TName>;

Defined in: worker/dist/index.d.mts:324

Define a type-safe handler for a specific consumer in a contract.

This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.

Supports three patterns:

  1. Simple handler: just the function (single message handler)
  2. Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
  3. Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)

Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.

Type Parameters
Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type
TName extends string | number | symbolThe consumer name from the contract
Parameters
ParameterTypeDescription
contractTContractThe contract definition containing the consumer
consumerNameTNameThe name of the consumer from the contract
handlerWorkerInferConsumerHandler<TContract, TName>The async handler function that processes messages (single or batch)
options{ batchSize?: undefined; batchTimeout?: undefined; prefetch?: number; }Optional consumer options (prefetch, batchSize, batchTimeout) - For single-message handlers: { prefetch?: number } is optional - For batch handlers: { batchSize: number, batchTimeout?: number } is REQUIRED
options.batchSize?undefined-
options.batchTimeout?undefined-
options.prefetch?number-
Returns

WorkerInferConsumerHandlerEntry<TContract, TName>

A type-safe handler that can be used with TypedAmqpWorker

Example
typescript
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';

// Simple single-message handler without options
const processOrderHandler = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    console.log('Processing order:', message.orderId);
    await processPayment(message);
  }
);

// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    await processOrder(message);
  },
  { prefetch: 10 }
);

// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
  orderContract,
  'processOrders',
  async (messages) => {
    // messages is an array - batchSize configuration is REQUIRED
    await db.insertMany(messages);
  },
  { batchSize: 5, batchTimeout: 1000 }
);

Call Signature

ts
function defineHandler<TContract, TName>(
   contract, 
   consumerName, 
   handler, 
options): WorkerInferConsumerHandlerEntry<TContract, TName>;

Defined in: worker/dist/index.d.mts:329

Define a type-safe handler for a specific consumer in a contract.

This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.

Supports three patterns:

  1. Simple handler: just the function (single message handler)
  2. Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
  3. Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)

Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.

Type Parameters
Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type
TName extends string | number | symbolThe consumer name from the contract
Parameters
ParameterTypeDescription
contractTContractThe contract definition containing the consumer
consumerNameTNameThe name of the consumer from the contract
handlerWorkerInferConsumerBatchHandler<TContract, TName>The async handler function that processes messages (single or batch)
options{ batchSize: number; batchTimeout?: number; prefetch?: number; }Optional consumer options (prefetch, batchSize, batchTimeout) - For single-message handlers: { prefetch?: number } is optional - For batch handlers: { batchSize: number, batchTimeout?: number } is REQUIRED
options.batchSizenumber-
options.batchTimeout?number-
options.prefetch?number-
Returns

WorkerInferConsumerHandlerEntry<TContract, TName>

A type-safe handler that can be used with TypedAmqpWorker

Example
typescript
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';

// Simple single-message handler without options
const processOrderHandler = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    console.log('Processing order:', message.orderId);
    await processPayment(message);
  }
);

// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    await processOrder(message);
  },
  { prefetch: 10 }
);

// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
  orderContract,
  'processOrders',
  async (messages) => {
    // messages is an array - batchSize configuration is REQUIRED
    await db.insertMany(messages);
  },
  { batchSize: 5, batchTimeout: 1000 }
);

defineHandlers()

ts
function defineHandlers<TContract>(contract, handlers): WorkerInferConsumerHandlers<TContract>;

Defined in: worker/dist/index.d.mts:390

Define multiple type-safe handlers for consumers in a contract.

This utility allows you to define all handlers at once outside of the worker creation, ensuring type safety and providing better code organization.

Type Parameters

Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type

Parameters

ParameterTypeDescription
contractTContractThe contract definition containing the consumers
handlersWorkerInferConsumerHandlers<TContract>An object with async handler functions for each consumer

Returns

WorkerInferConsumerHandlers<TContract>

A type-safe handlers object that can be used with TypedAmqpWorker

Examples

typescript
import { defineHandlers } from '@amqp-contract/worker';
import { orderContract } from './contract';

// Define all handlers at once
const handlers = defineHandlers(orderContract, {
  processOrder: async (message) => {
    // message is fully typed based on the contract
    console.log('Processing order:', message.orderId);
    await processPayment(message);
  },
  notifyOrder: async (message) => {
    await sendNotification(message);
  },
  shipOrder: async (message) => {
    await prepareShipment(message);
  },
});

// Use the handlers in worker
const worker = await TypedAmqpWorker.create({
  contract: orderContract,
  handlers,
  connection: 'amqp://localhost',
});
typescript
// Separate handler definitions for better organization
async function handleProcessOrder(message: WorkerInferConsumerInput<typeof orderContract, 'processOrder'>) {
  await processOrder(message);
}

async function handleNotifyOrder(message: WorkerInferConsumerInput<typeof orderContract, 'notifyOrder'>) {
  await sendNotification(message);
}

const handlers = defineHandlers(orderContract, {
  processOrder: handleProcessOrder,
  notifyOrder: handleNotifyOrder,
});

Released under the MIT License.