Skip to content

@amqp-contract/worker


@amqp-contract/worker

Classes

MessageValidationError

Defined in: packages/worker/src/errors.ts:35

Error thrown when message validation fails

Extends

  • WorkerError

Constructors

Constructor
ts
new MessageValidationError(consumerName, issues): MessageValidationError;

Defined in: packages/worker/src/errors.ts:36

Parameters
ParameterType
consumerNamestring
issuesunknown
Returns

MessageValidationError

Overrides
ts
WorkerError.constructor

Properties

PropertyModifierTypeDescriptionInherited fromDefined in
cause?publicunknown-WorkerError.causenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es2022.error.d.ts:26
consumerNamereadonlystring--packages/worker/src/errors.ts:37
issuesreadonlyunknown--packages/worker/src/errors.ts:38
messagepublicstring-WorkerError.messagenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1077
namepublicstring-WorkerError.namenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1076
stack?publicstring-WorkerError.stacknode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1078
stackTraceLimitstaticnumberThe Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames.WorkerError.stackTraceLimitnode_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:67

Methods

captureStackTrace()
ts
static captureStackTrace(targetObject, constructorOpt?): void;

Defined in: node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:51

Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.

js
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack;  // Similar to `new Error().stack`

The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.

The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.

The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:

js
function a() {
  b();
}

function b() {
  c();
}

function c() {
  // Create an error without stack trace to avoid calculating the stack trace twice.
  const { stackTraceLimit } = Error;
  Error.stackTraceLimit = 0;
  const error = new Error();
  Error.stackTraceLimit = stackTraceLimit;

  // Capture the stack trace above function b
  Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
  throw error;
}

a();
Parameters
ParameterType
targetObjectobject
constructorOpt?Function
Returns

void

Inherited from
ts
WorkerError.captureStackTrace
prepareStackTrace()
ts
static prepareStackTrace(err, stackTraces): any;

Defined in: node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:55

Parameters
ParameterType
errError
stackTracesCallSite[]
Returns

any

See

https://v8.dev/docs/stack-trace-api#customizing-stack-traces

Inherited from
ts
WorkerError.prepareStackTrace

TechnicalError

Defined in: packages/worker/src/errors.ts:22

Error for technical/runtime failures in worker operations This includes validation failures, parsing failures, and processing failures

Extends

  • WorkerError

Constructors

Constructor
ts
new TechnicalError(message, cause?): TechnicalError;

Defined in: packages/worker/src/errors.ts:23

Parameters
ParameterType
messagestring
cause?unknown
Returns

TechnicalError

Overrides
ts
WorkerError.constructor

Properties

PropertyModifierTypeDescriptionInherited fromDefined in
cause?readonlyunknown-WorkerError.causepackages/worker/src/errors.ts:25
messagepublicstring-WorkerError.messagenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1077
namepublicstring-WorkerError.namenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1076
stack?publicstring-WorkerError.stacknode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1078
stackTraceLimitstaticnumberThe Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames.WorkerError.stackTraceLimitnode_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:67

Methods

captureStackTrace()
ts
static captureStackTrace(targetObject, constructorOpt?): void;

Defined in: node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:51

Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.

js
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack;  // Similar to `new Error().stack`

The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.

The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.

The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:

js
function a() {
  b();
}

function b() {
  c();
}

function c() {
  // Create an error without stack trace to avoid calculating the stack trace twice.
  const { stackTraceLimit } = Error;
  Error.stackTraceLimit = 0;
  const error = new Error();
  Error.stackTraceLimit = stackTraceLimit;

  // Capture the stack trace above function b
  Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
  throw error;
}

a();
Parameters
ParameterType
targetObjectobject
constructorOpt?Function
Returns

void

Inherited from
ts
WorkerError.captureStackTrace
prepareStackTrace()
ts
static prepareStackTrace(err, stackTraces): any;

Defined in: node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:55

Parameters
ParameterType
errError
stackTracesCallSite[]
Returns

any

See

https://v8.dev/docs/stack-trace-api#customizing-stack-traces

Inherited from
ts
WorkerError.prepareStackTrace

TypedAmqpWorker

Defined in: packages/worker/src/worker.ts:129

Type-safe AMQP worker for consuming messages from RabbitMQ.

This class provides automatic message validation, connection management, and error handling for consuming messages based on a contract definition.

Example

typescript
import { TypedAmqpWorker } from '@amqp-contract/worker';
import { z } from 'zod';

const contract = defineContract({
  queues: {
    orderProcessing: defineQueue('order-processing', { durable: true })
  },
  consumers: {
    processOrder: defineConsumer('order-processing', z.object({
      orderId: z.string(),
      amount: z.number()
    }))
  }
});

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: async (message) => {
      console.log('Processing order', message.orderId);
      // Process the order...
    }
  },
  urls: ['amqp://localhost']
}).resultToPromise();

// Close when done
await worker.close().resultToPromise();

Type Parameters

Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type

Methods

close()
ts
close(): Future<Result<void, TechnicalError>>;

Defined in: packages/worker/src/worker.ts:242

Close the AMQP channel and connection.

This gracefully closes the connection to the AMQP broker, stopping all message consumption and cleaning up resources.

Returns

Future<Result<void, TechnicalError>>

A Future that resolves to a Result indicating success or failure

Example
typescript
const closeResult = await worker.close().resultToPromise();
if (closeResult.isOk()) {
  console.log('Worker closed successfully');
}
create()
ts
static create<TContract>(options): Future<Result<TypedAmqpWorker<TContract>, TechnicalError>>;

Defined in: packages/worker/src/worker.ts:203

Create a type-safe AMQP worker from a contract.

Connection management (including automatic reconnection) is handled internally by amqp-connection-manager via the AmqpClient. The worker will set up consumers for all contract-defined handlers asynchronously in the background once the underlying connection and channels are ready.

Connections are automatically shared across clients and workers with the same URLs and connection options, following RabbitMQ best practices.

Type Parameters
Type Parameter
TContract extends ContractDefinition
Parameters
ParameterTypeDescription
optionsCreateWorkerOptions<TContract>Configuration options for the worker
Returns

Future<Result<TypedAmqpWorker<TContract>, TechnicalError>>

A Future that resolves to a Result containing the worker or an error

Example
typescript
const workerResult = await TypedAmqpWorker.create({
  contract: myContract,
  handlers: {
    processOrder: async (msg) => console.log('Order:', msg.orderId)
  },
  urls: ['amqp://localhost']
}).resultToPromise();

if (workerResult.isError()) {
  console.error('Failed to create worker:', workerResult.error);
}

Type Aliases

CreateWorkerOptions

ts
type CreateWorkerOptions<TContract> = object;

Defined in: packages/worker/src/worker.ts:76

Options for creating a type-safe AMQP worker.

Example

typescript
const options: CreateWorkerOptions<typeof contract> = {
  contract: myContract,
  handlers: {
    // Simple handler
    processOrder: async (message) => {
      console.log('Processing order:', message.orderId);
    },
    // Handler with options (prefetch)
    processPayment: [
      async (message) => {
        console.log('Processing payment:', message.paymentId);
      },
      { prefetch: 10 }
    ],
    // Handler with batch processing
    processNotifications: [
      async (messages) => {
        console.log('Processing batch:', messages.length);
      },
      { batchSize: 5, batchTimeout: 1000 }
    ]
  },
  urls: ['amqp://localhost'],
  connectionOptions: {
    heartbeatIntervalInSeconds: 30
  },
  logger: myLogger
};

Type Parameters

Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type

Properties

PropertyTypeDescriptionDefined in
connectionOptions?AmqpConnectionManagerOptionsOptional connection configuration (heartbeat, reconnect settings, etc.)packages/worker/src/worker.ts:84
contractTContractThe AMQP contract definition specifying consumers and their message schemaspackages/worker/src/worker.ts:78
handlersWorkerInferConsumerHandlers<TContract>Handlers for each consumer defined in the contract. Can be a function or a tuple of [handler, options]packages/worker/src/worker.ts:80
logger?LoggerOptional logger for logging message consumption and errorspackages/worker/src/worker.ts:86
urlsConnectionUrl[]AMQP broker URL(s). Multiple URLs provide failover supportpackages/worker/src/worker.ts:82

WorkerInferConsumerBatchHandler()

ts
type WorkerInferConsumerBatchHandler<TContract, TName> = (messages) => Promise<void>;

Defined in: packages/worker/src/types.ts:56

Infer consumer handler type for batch processing. Batch handlers receive an array of messages.

Type Parameters

Type Parameter
TContract extends ContractDefinition
TName extends InferConsumerNames<TContract>

Parameters

ParameterType
messagesWorkerInferConsumerInput<TContract, TName>[]

Returns

Promise<void>


WorkerInferConsumerHandler()

ts
type WorkerInferConsumerHandler<TContract, TName> = (message) => Promise<void>;

Defined in: packages/worker/src/types.ts:47

Infer consumer handler type for a specific consumer. Handlers always receive a single message by default. For batch processing, use consumerOptions to configure batch behavior.

Type Parameters

Type Parameter
TContract extends ContractDefinition
TName extends InferConsumerNames<TContract>

Parameters

ParameterType
messageWorkerInferConsumerInput<TContract, TName>

Returns

Promise<void>


WorkerInferConsumerHandlerEntry

ts
type WorkerInferConsumerHandlerEntry<TContract, TName> = 
  | WorkerInferConsumerHandler<TContract, TName>
  | readonly [WorkerInferConsumerHandler<TContract, TName>, {
  batchSize?: never;
  batchTimeout?: never;
  prefetch?: number;
}]
  | readonly [WorkerInferConsumerBatchHandler<TContract, TName>, {
  batchSize: number;
  batchTimeout?: number;
  prefetch?: number;
}];

Defined in: packages/worker/src/types.ts:69

Infer handler entry for a consumer - either a function or a tuple of [handler, options].

Three patterns are supported:

  1. Simple handler: async (message) => { ... }
  2. Handler with prefetch: [async (message) => { ... }, { prefetch: 10 }]
  3. Batch handler: [async (messages) => { ... }, { batchSize: 5, batchTimeout: 1000 }]

Type Parameters

Type Parameter
TContract extends ContractDefinition
TName extends InferConsumerNames<TContract>

WorkerInferConsumerHandlers

ts
type WorkerInferConsumerHandlers<TContract> = { [K in InferConsumerNames<TContract>]: WorkerInferConsumerHandlerEntry<TContract, K> };

Defined in: packages/worker/src/types.ts:87

Infer all consumer handlers for a contract. Handlers can be either single-message handlers, batch handlers, or a tuple of [handler, options].

Type Parameters

Type Parameter
TContract extends ContractDefinition

WorkerInferConsumerInput

ts
type WorkerInferConsumerInput<TContract, TName> = ConsumerInferInput<InferConsumer<TContract, TName>>;

Defined in: packages/worker/src/types.ts:37

Worker perspective types - for consuming messages

Type Parameters

Type Parameter
TContract extends ContractDefinition
TName extends InferConsumerNames<TContract>

Functions

defineHandler()

Call Signature

ts
function defineHandler<TContract, TName>(
   contract, 
   consumerName, 
handler): WorkerInferConsumerHandlerEntry<TContract, TName>;

Defined in: packages/worker/src/handlers.ts:70

Define a type-safe handler for a specific consumer in a contract.

This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.

Supports three patterns:

  1. Simple handler: just the function (single message handler)
  2. Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
  3. Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)

Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.

Type Parameters
Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type
TName extends string | number | symbolThe consumer name from the contract
Parameters
ParameterTypeDescription
contractTContractThe contract definition containing the consumer
consumerNameTNameThe name of the consumer from the contract
handlerWorkerInferConsumerHandler<TContract, TName>The async handler function that processes messages (single or batch)
Returns

WorkerInferConsumerHandlerEntry<TContract, TName>

A type-safe handler that can be used with TypedAmqpWorker

Example
typescript
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';

// Simple single-message handler without options
const processOrderHandler = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    console.log('Processing order:', message.orderId);
    await processPayment(message);
  }
);

// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    await processOrder(message);
  },
  { prefetch: 10 }
);

// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
  orderContract,
  'processOrders',
  async (messages) => {
    // messages is an array - batchSize configuration is REQUIRED
    await db.insertMany(messages);
  },
  { batchSize: 5, batchTimeout: 1000 }
);

Call Signature

ts
function defineHandler<TContract, TName>(
   contract, 
   consumerName, 
   handler, 
options): WorkerInferConsumerHandlerEntry<TContract, TName>;

Defined in: packages/worker/src/handlers.ts:78

Define a type-safe handler for a specific consumer in a contract.

This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.

Supports three patterns:

  1. Simple handler: just the function (single message handler)
  2. Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
  3. Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)

Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.

Type Parameters
Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type
TName extends string | number | symbolThe consumer name from the contract
Parameters
ParameterTypeDescription
contractTContractThe contract definition containing the consumer
consumerNameTNameThe name of the consumer from the contract
handlerWorkerInferConsumerHandler<TContract, TName>The async handler function that processes messages (single or batch)
options{ batchSize?: undefined; batchTimeout?: undefined; prefetch?: number; }Optional consumer options (prefetch, batchSize, batchTimeout) - For single-message handlers: { prefetch?: number } is optional - For batch handlers: { batchSize: number, batchTimeout?: number } is REQUIRED
options.batchSize?undefined-
options.batchTimeout?undefined-
options.prefetch?number-
Returns

WorkerInferConsumerHandlerEntry<TContract, TName>

A type-safe handler that can be used with TypedAmqpWorker

Example
typescript
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';

// Simple single-message handler without options
const processOrderHandler = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    console.log('Processing order:', message.orderId);
    await processPayment(message);
  }
);

// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    await processOrder(message);
  },
  { prefetch: 10 }
);

// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
  orderContract,
  'processOrders',
  async (messages) => {
    // messages is an array - batchSize configuration is REQUIRED
    await db.insertMany(messages);
  },
  { batchSize: 5, batchTimeout: 1000 }
);

Call Signature

ts
function defineHandler<TContract, TName>(
   contract, 
   consumerName, 
   handler, 
options): WorkerInferConsumerHandlerEntry<TContract, TName>;

Defined in: packages/worker/src/handlers.ts:87

Define a type-safe handler for a specific consumer in a contract.

This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.

Supports three patterns:

  1. Simple handler: just the function (single message handler)
  2. Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
  3. Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)

Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.

Type Parameters
Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type
TName extends string | number | symbolThe consumer name from the contract
Parameters
ParameterTypeDescription
contractTContractThe contract definition containing the consumer
consumerNameTNameThe name of the consumer from the contract
handlerWorkerInferConsumerBatchHandler<TContract, TName>The async handler function that processes messages (single or batch)
options{ batchSize: number; batchTimeout?: number; prefetch?: number; }Optional consumer options (prefetch, batchSize, batchTimeout) - For single-message handlers: { prefetch?: number } is optional - For batch handlers: { batchSize: number, batchTimeout?: number } is REQUIRED
options.batchSizenumber-
options.batchTimeout?number-
options.prefetch?number-
Returns

WorkerInferConsumerHandlerEntry<TContract, TName>

A type-safe handler that can be used with TypedAmqpWorker

Example
typescript
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';

// Simple single-message handler without options
const processOrderHandler = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    console.log('Processing order:', message.orderId);
    await processPayment(message);
  }
);

// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
  orderContract,
  'processOrder',
  async (message) => {
    await processOrder(message);
  },
  { prefetch: 10 }
);

// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
  orderContract,
  'processOrders',
  async (messages) => {
    // messages is an array - batchSize configuration is REQUIRED
    await db.insertMany(messages);
  },
  { batchSize: 5, batchTimeout: 1000 }
);

defineHandlers()

ts
function defineHandlers<TContract>(contract, handlers): WorkerInferConsumerHandlers<TContract>;

Defined in: packages/worker/src/handlers.ts:181

Define multiple type-safe handlers for consumers in a contract.

This utility allows you to define all handlers at once outside of the worker creation, ensuring type safety and providing better code organization.

Type Parameters

Type ParameterDescription
TContract extends ContractDefinitionThe contract definition type

Parameters

ParameterTypeDescription
contractTContractThe contract definition containing the consumers
handlersWorkerInferConsumerHandlers<TContract>An object with async handler functions for each consumer

Returns

WorkerInferConsumerHandlers<TContract>

A type-safe handlers object that can be used with TypedAmqpWorker

Examples

typescript
import { defineHandlers } from '@amqp-contract/worker';
import { orderContract } from './contract';

// Define all handlers at once
const handlers = defineHandlers(orderContract, {
  processOrder: async (message) => {
    // message is fully typed based on the contract
    console.log('Processing order:', message.orderId);
    await processPayment(message);
  },
  notifyOrder: async (message) => {
    await sendNotification(message);
  },
  shipOrder: async (message) => {
    await prepareShipment(message);
  },
});

// Use the handlers in worker
const worker = await TypedAmqpWorker.create({
  contract: orderContract,
  handlers,
  connection: 'amqp://localhost',
});
typescript
// Separate handler definitions for better organization
async function handleProcessOrder(message: WorkerInferConsumerInput<typeof orderContract, 'processOrder'>) {
  await processOrder(message);
}

async function handleNotifyOrder(message: WorkerInferConsumerInput<typeof orderContract, 'notifyOrder'>) {
  await sendNotification(message);
}

const handlers = defineHandlers(orderContract, {
  processOrder: handleProcessOrder,
  notifyOrder: handleNotifyOrder,
});

Released under the MIT License.