@amqp-contract/worker
@amqp-contract/worker
Classes
MessageValidationError
Defined in: packages/worker/src/errors.ts:35
Error thrown when message validation fails
Extends
WorkerError
Constructors
Constructor
new MessageValidationError(consumerName, issues): MessageValidationError;Defined in: packages/worker/src/errors.ts:36
Parameters
| Parameter | Type |
|---|---|
consumerName | string |
issues | unknown |
Returns
Overrides
WorkerError.constructorProperties
| Property | Modifier | Type | Description | Inherited from | Defined in |
|---|---|---|---|---|---|
cause? | public | unknown | - | WorkerError.cause | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es2022.error.d.ts:26 |
consumerName | readonly | string | - | - | packages/worker/src/errors.ts:37 |
issues | readonly | unknown | - | - | packages/worker/src/errors.ts:38 |
message | public | string | - | WorkerError.message | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1077 |
name | public | string | - | WorkerError.name | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1076 |
stack? | public | string | - | WorkerError.stack | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1078 |
stackTraceLimit | static | number | The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames. | WorkerError.stackTraceLimit | node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:67 |
Methods
captureStackTrace()
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:51
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
WorkerError.captureStackTraceprepareStackTrace()
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:55
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
WorkerError.prepareStackTraceTechnicalError
Defined in: packages/worker/src/errors.ts:22
Error for technical/runtime failures in worker operations This includes validation failures, parsing failures, and processing failures
Extends
WorkerError
Constructors
Constructor
new TechnicalError(message, cause?): TechnicalError;Defined in: packages/worker/src/errors.ts:23
Parameters
| Parameter | Type |
|---|---|
message | string |
cause? | unknown |
Returns
Overrides
WorkerError.constructorProperties
| Property | Modifier | Type | Description | Inherited from | Defined in |
|---|---|---|---|---|---|
cause? | readonly | unknown | - | WorkerError.cause | packages/worker/src/errors.ts:25 |
message | public | string | - | WorkerError.message | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1077 |
name | public | string | - | WorkerError.name | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1076 |
stack? | public | string | - | WorkerError.stack | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1078 |
stackTraceLimit | static | number | The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames. | WorkerError.stackTraceLimit | node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:67 |
Methods
captureStackTrace()
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:51
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
WorkerError.captureStackTraceprepareStackTrace()
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@25.0.3/node_modules/@types/node/globals.d.ts:55
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
WorkerError.prepareStackTraceTypedAmqpWorker
Defined in: packages/worker/src/worker.ts:129
Type-safe AMQP worker for consuming messages from RabbitMQ.
This class provides automatic message validation, connection management, and error handling for consuming messages based on a contract definition.
Example
import { TypedAmqpWorker } from '@amqp-contract/worker';
import { z } from 'zod';
const contract = defineContract({
queues: {
orderProcessing: defineQueue('order-processing', { durable: true })
},
consumers: {
processOrder: defineConsumer('order-processing', z.object({
orderId: z.string(),
amount: z.number()
}))
}
});
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message) => {
console.log('Processing order', message.orderId);
// Process the order...
}
},
urls: ['amqp://localhost']
}).resultToPromise();
// Close when done
await worker.close().resultToPromise();Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
Methods
close()
close(): Future<Result<void, TechnicalError>>;Defined in: packages/worker/src/worker.ts:242
Close the AMQP channel and connection.
This gracefully closes the connection to the AMQP broker, stopping all message consumption and cleaning up resources.
Returns
Future<Result<void, TechnicalError>>
A Future that resolves to a Result indicating success or failure
Example
const closeResult = await worker.close().resultToPromise();
if (closeResult.isOk()) {
console.log('Worker closed successfully');
}create()
static create<TContract>(options): Future<Result<TypedAmqpWorker<TContract>, TechnicalError>>;Defined in: packages/worker/src/worker.ts:203
Create a type-safe AMQP worker from a contract.
Connection management (including automatic reconnection) is handled internally by amqp-connection-manager via the AmqpClient. The worker will set up consumers for all contract-defined handlers asynchronously in the background once the underlying connection and channels are ready.
Connections are automatically shared across clients and workers with the same URLs and connection options, following RabbitMQ best practices.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
Parameters
| Parameter | Type | Description |
|---|---|---|
options | CreateWorkerOptions<TContract> | Configuration options for the worker |
Returns
Future<Result<TypedAmqpWorker<TContract>, TechnicalError>>
A Future that resolves to a Result containing the worker or an error
Example
const workerResult = await TypedAmqpWorker.create({
contract: myContract,
handlers: {
processOrder: async (msg) => console.log('Order:', msg.orderId)
},
urls: ['amqp://localhost']
}).resultToPromise();
if (workerResult.isError()) {
console.error('Failed to create worker:', workerResult.error);
}Type Aliases
CreateWorkerOptions
type CreateWorkerOptions<TContract> = object;Defined in: packages/worker/src/worker.ts:76
Options for creating a type-safe AMQP worker.
Example
const options: CreateWorkerOptions<typeof contract> = {
contract: myContract,
handlers: {
// Simple handler
processOrder: async (message) => {
console.log('Processing order:', message.orderId);
},
// Handler with options (prefetch)
processPayment: [
async (message) => {
console.log('Processing payment:', message.paymentId);
},
{ prefetch: 10 }
],
// Handler with batch processing
processNotifications: [
async (messages) => {
console.log('Processing batch:', messages.length);
},
{ batchSize: 5, batchTimeout: 1000 }
]
},
urls: ['amqp://localhost'],
connectionOptions: {
heartbeatIntervalInSeconds: 30
},
logger: myLogger
};Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
Properties
| Property | Type | Description | Defined in |
|---|---|---|---|
connectionOptions? | AmqpConnectionManagerOptions | Optional connection configuration (heartbeat, reconnect settings, etc.) | packages/worker/src/worker.ts:84 |
contract | TContract | The AMQP contract definition specifying consumers and their message schemas | packages/worker/src/worker.ts:78 |
handlers | WorkerInferConsumerHandlers<TContract> | Handlers for each consumer defined in the contract. Can be a function or a tuple of [handler, options] | packages/worker/src/worker.ts:80 |
logger? | Logger | Optional logger for logging message consumption and errors | packages/worker/src/worker.ts:86 |
urls | ConnectionUrl[] | AMQP broker URL(s). Multiple URLs provide failover support | packages/worker/src/worker.ts:82 |
WorkerInferConsumerBatchHandler()
type WorkerInferConsumerBatchHandler<TContract, TName> = (messages) => Promise<void>;Defined in: packages/worker/src/types.ts:56
Infer consumer handler type for batch processing. Batch handlers receive an array of messages.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferConsumerNames<TContract> |
Parameters
| Parameter | Type |
|---|---|
messages | WorkerInferConsumerInput<TContract, TName>[] |
Returns
Promise<void>
WorkerInferConsumerHandler()
type WorkerInferConsumerHandler<TContract, TName> = (message) => Promise<void>;Defined in: packages/worker/src/types.ts:47
Infer consumer handler type for a specific consumer. Handlers always receive a single message by default. For batch processing, use consumerOptions to configure batch behavior.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferConsumerNames<TContract> |
Parameters
| Parameter | Type |
|---|---|
message | WorkerInferConsumerInput<TContract, TName> |
Returns
Promise<void>
WorkerInferConsumerHandlerEntry
type WorkerInferConsumerHandlerEntry<TContract, TName> =
| WorkerInferConsumerHandler<TContract, TName>
| readonly [WorkerInferConsumerHandler<TContract, TName>, {
batchSize?: never;
batchTimeout?: never;
prefetch?: number;
}]
| readonly [WorkerInferConsumerBatchHandler<TContract, TName>, {
batchSize: number;
batchTimeout?: number;
prefetch?: number;
}];Defined in: packages/worker/src/types.ts:69
Infer handler entry for a consumer - either a function or a tuple of [handler, options].
Three patterns are supported:
- Simple handler:
async (message) => { ... } - Handler with prefetch:
[async (message) => { ... }, { prefetch: 10 }] - Batch handler:
[async (messages) => { ... }, { batchSize: 5, batchTimeout: 1000 }]
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferConsumerNames<TContract> |
WorkerInferConsumerHandlers
type WorkerInferConsumerHandlers<TContract> = { [K in InferConsumerNames<TContract>]: WorkerInferConsumerHandlerEntry<TContract, K> };Defined in: packages/worker/src/types.ts:87
Infer all consumer handlers for a contract. Handlers can be either single-message handlers, batch handlers, or a tuple of [handler, options].
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
WorkerInferConsumerInput
type WorkerInferConsumerInput<TContract, TName> = ConsumerInferInput<InferConsumer<TContract, TName>>;Defined in: packages/worker/src/types.ts:37
Worker perspective types - for consuming messages
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferConsumerNames<TContract> |
Functions
defineHandler()
Call Signature
function defineHandler<TContract, TName>(
contract,
consumerName,
handler): WorkerInferConsumerHandlerEntry<TContract, TName>;Defined in: packages/worker/src/handlers.ts:70
Define a type-safe handler for a specific consumer in a contract.
This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.
Supports three patterns:
- Simple handler: just the function (single message handler)
- Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
- Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)
Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer |
consumerName | TName | The name of the consumer from the contract |
handler | WorkerInferConsumerHandler<TContract, TName> | The async handler function that processes messages (single or batch) |
Returns
WorkerInferConsumerHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Example
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';
// Simple single-message handler without options
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
async (message) => {
console.log('Processing order:', message.orderId);
await processPayment(message);
}
);
// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
orderContract,
'processOrder',
async (message) => {
await processOrder(message);
},
{ prefetch: 10 }
);
// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
orderContract,
'processOrders',
async (messages) => {
// messages is an array - batchSize configuration is REQUIRED
await db.insertMany(messages);
},
{ batchSize: 5, batchTimeout: 1000 }
);Call Signature
function defineHandler<TContract, TName>(
contract,
consumerName,
handler,
options): WorkerInferConsumerHandlerEntry<TContract, TName>;Defined in: packages/worker/src/handlers.ts:78
Define a type-safe handler for a specific consumer in a contract.
This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.
Supports three patterns:
- Simple handler: just the function (single message handler)
- Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
- Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)
Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer |
consumerName | TName | The name of the consumer from the contract |
handler | WorkerInferConsumerHandler<TContract, TName> | The async handler function that processes messages (single or batch) |
options | { batchSize?: undefined; batchTimeout?: undefined; prefetch?: number; } | Optional consumer options (prefetch, batchSize, batchTimeout) - For single-message handlers: { prefetch?: number } is optional - For batch handlers: { batchSize: number, batchTimeout?: number } is REQUIRED |
options.batchSize? | undefined | - |
options.batchTimeout? | undefined | - |
options.prefetch? | number | - |
Returns
WorkerInferConsumerHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Example
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';
// Simple single-message handler without options
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
async (message) => {
console.log('Processing order:', message.orderId);
await processPayment(message);
}
);
// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
orderContract,
'processOrder',
async (message) => {
await processOrder(message);
},
{ prefetch: 10 }
);
// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
orderContract,
'processOrders',
async (messages) => {
// messages is an array - batchSize configuration is REQUIRED
await db.insertMany(messages);
},
{ batchSize: 5, batchTimeout: 1000 }
);Call Signature
function defineHandler<TContract, TName>(
contract,
consumerName,
handler,
options): WorkerInferConsumerHandlerEntry<TContract, TName>;Defined in: packages/worker/src/handlers.ts:87
Define a type-safe handler for a specific consumer in a contract.
This utility allows you to define handlers outside of the worker creation, providing better code organization and reusability.
Supports three patterns:
- Simple handler: just the function (single message handler)
- Handler with prefetch: [handler, { prefetch: 10 }] (single message handler with config)
- Batch handler: [batchHandler, { batchSize: 5, batchTimeout: 1000 }] (REQUIRES batchSize config)
Important: Batch handlers (handlers that accept an array of messages) MUST include batchSize configuration. You cannot create a batch handler without specifying batchSize.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer |
consumerName | TName | The name of the consumer from the contract |
handler | WorkerInferConsumerBatchHandler<TContract, TName> | The async handler function that processes messages (single or batch) |
options | { batchSize: number; batchTimeout?: number; prefetch?: number; } | Optional consumer options (prefetch, batchSize, batchTimeout) - For single-message handlers: { prefetch?: number } is optional - For batch handlers: { batchSize: number, batchTimeout?: number } is REQUIRED |
options.batchSize | number | - |
options.batchTimeout? | number | - |
options.prefetch? | number | - |
Returns
WorkerInferConsumerHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Example
import { defineHandler } from '@amqp-contract/worker';
import { orderContract } from './contract';
// Simple single-message handler without options
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
async (message) => {
console.log('Processing order:', message.orderId);
await processPayment(message);
}
);
// Single-message handler with prefetch
const processOrderWithPrefetch = defineHandler(
orderContract,
'processOrder',
async (message) => {
await processOrder(message);
},
{ prefetch: 10 }
);
// Batch handler - MUST include batchSize
const processBatchOrders = defineHandler(
orderContract,
'processOrders',
async (messages) => {
// messages is an array - batchSize configuration is REQUIRED
await db.insertMany(messages);
},
{ batchSize: 5, batchTimeout: 1000 }
);defineHandlers()
function defineHandlers<TContract>(contract, handlers): WorkerInferConsumerHandlers<TContract>;Defined in: packages/worker/src/handlers.ts:181
Define multiple type-safe handlers for consumers in a contract.
This utility allows you to define all handlers at once outside of the worker creation, ensuring type safety and providing better code organization.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumers |
handlers | WorkerInferConsumerHandlers<TContract> | An object with async handler functions for each consumer |
Returns
WorkerInferConsumerHandlers<TContract>
A type-safe handlers object that can be used with TypedAmqpWorker
Examples
import { defineHandlers } from '@amqp-contract/worker';
import { orderContract } from './contract';
// Define all handlers at once
const handlers = defineHandlers(orderContract, {
processOrder: async (message) => {
// message is fully typed based on the contract
console.log('Processing order:', message.orderId);
await processPayment(message);
},
notifyOrder: async (message) => {
await sendNotification(message);
},
shipOrder: async (message) => {
await prepareShipment(message);
},
});
// Use the handlers in worker
const worker = await TypedAmqpWorker.create({
contract: orderContract,
handlers,
connection: 'amqp://localhost',
});// Separate handler definitions for better organization
async function handleProcessOrder(message: WorkerInferConsumerInput<typeof orderContract, 'processOrder'>) {
await processOrder(message);
}
async function handleNotifyOrder(message: WorkerInferConsumerInput<typeof orderContract, 'notifyOrder'>) {
await sendNotification(message);
}
const handlers = defineHandlers(orderContract, {
processOrder: handleProcessOrder,
notifyOrder: handleNotifyOrder,
});