@amqp-contract/worker
@amqp-contract/worker
Classes
abstract HandlerError
Defined in: packages/worker/src/errors.ts:10
Abstract base class for all handler-signalled errors.
Concrete subclasses (RetryableError, NonRetryableError) discriminate on the name property so exhaustive narrowing in user code keeps working. error instanceof HandlerError is true for any handler error.
Extends
Error
Extended by
Constructors
Constructor
new HandlerError(message, cause?): HandlerError;Defined in: packages/worker/src/errors.ts:13
Parameters
| Parameter | Type |
|---|---|
message | string |
cause? | unknown |
Returns
Overrides
Error.constructorProperties
| Property | Modifier | Type | Description | Overrides | Inherited from | Defined in |
|---|---|---|---|---|---|---|
cause? | readonly | unknown | - | - | Error.cause | packages/worker/src/errors.ts:15 |
message | public | string | - | - | Error.message | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1075 |
name | abstract | "RetryableError" | "NonRetryableError" | - | Error.name | - | packages/worker/src/errors.ts:11 |
stack? | public | string | - | - | Error.stack | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1076 |
stackTraceLimit | static | number | The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames. | - | Error.stackTraceLimit | node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:68 |
Methods
captureStackTrace()
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:52
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
Error.captureStackTraceprepareStackTrace()
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:56
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
Error.prepareStackTraceMessageValidationError
Defined in: packages/core/dist/index.d.mts:27
Error thrown when message validation fails (payload or headers).
Used by both the client (publish-time payload validation) and the worker (consume-time payload and headers validation).
Param
The name of the publisher or consumer that triggered the validation
Param
The validation issues from the Standard Schema validation
Extends
Error
Constructors
Constructor
new MessageValidationError(source, issues): MessageValidationError;Defined in: packages/core/dist/index.d.mts:30
Parameters
| Parameter | Type |
|---|---|
source | string |
issues | unknown |
Returns
Overrides
Error.constructorProperties
Methods
captureStackTrace()
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:52
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
Error.captureStackTraceprepareStackTrace()
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:56
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
Error.prepareStackTraceNonRetryableError
Defined in: packages/worker/src/errors.ts:46
Non-retryable errors - permanent failures that should not be retried Examples: invalid data, business rule violations, permanent external failures
Use this error type when retrying would not help - the message will be immediately sent to the dead letter queue (DLQ) if configured.
Extends
Constructors
Constructor
new NonRetryableError(message, cause?): NonRetryableError;Defined in: packages/worker/src/errors.ts:13
Parameters
| Parameter | Type |
|---|---|
message | string |
cause? | unknown |
Returns
Inherited from
Properties
| Property | Modifier | Type | Description | Overrides | Inherited from | Defined in |
|---|---|---|---|---|---|---|
cause? | readonly | unknown | - | - | HandlerError.cause | packages/worker/src/errors.ts:15 |
message | public | string | - | - | HandlerError.message | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1075 |
name | readonly | "NonRetryableError" | - | HandlerError.name | - | packages/worker/src/errors.ts:47 |
stack? | public | string | - | - | HandlerError.stack | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1076 |
stackTraceLimit | static | number | The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames. | - | HandlerError.stackTraceLimit | node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:68 |
Methods
captureStackTrace()
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:52
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
HandlerError.captureStackTrace
prepareStackTrace()
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:56
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
HandlerError.prepareStackTrace
RetryableError
Defined in: packages/worker/src/errors.ts:35
Retryable errors - transient failures that may succeed on retry Examples: network timeouts, rate limiting, temporary service unavailability
Use this error type when the operation might succeed if retried. The worker will apply exponential backoff and retry the message.
Extends
Constructors
Constructor
new RetryableError(message, cause?): RetryableError;Defined in: packages/worker/src/errors.ts:13
Parameters
| Parameter | Type |
|---|---|
message | string |
cause? | unknown |
Returns
Inherited from
Properties
| Property | Modifier | Type | Description | Overrides | Inherited from | Defined in |
|---|---|---|---|---|---|---|
cause? | readonly | unknown | - | - | HandlerError.cause | packages/worker/src/errors.ts:15 |
message | public | string | - | - | HandlerError.message | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1075 |
name | readonly | "RetryableError" | - | HandlerError.name | - | packages/worker/src/errors.ts:36 |
stack? | public | string | - | - | HandlerError.stack | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1076 |
stackTraceLimit | static | number | The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames. | - | HandlerError.stackTraceLimit | node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:68 |
Methods
captureStackTrace()
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:52
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
HandlerError.captureStackTrace
prepareStackTrace()
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:56
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
HandlerError.prepareStackTrace
TypedAmqpWorker
Defined in: packages/worker/src/worker.ts:182
Type-safe AMQP worker for consuming messages from RabbitMQ.
This class provides automatic message validation, connection management, and error handling for consuming messages based on a contract definition.
Example
import { TypedAmqpWorker } from '@amqp-contract/worker';
import { defineQueue, defineMessage, defineContract, defineConsumer } from '@amqp-contract/contract';
import { okAsync } from 'neverthrow';
import { z } from 'zod';
const orderQueue = defineQueue('order-processing');
const orderMessage = defineMessage(z.object({
orderId: z.string(),
amount: z.number()
}));
const contract = defineContract({
consumers: {
processOrder: defineConsumer(orderQueue, orderMessage)
}
});
const result = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: ({ payload }) => {
console.log('Processing order', payload.orderId);
return okAsync(undefined);
},
},
urls: ['amqp://localhost'],
});
if (result.isErr()) throw result.error;
const worker = result.value;
// Close when done
await worker.close();Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
Methods
close()
close(): ResultAsync<void, TechnicalError>;Defined in: packages/worker/src/worker.ts:342
Close the AMQP channel and connection.
This gracefully closes the connection to the AMQP broker, stopping all message consumption and cleaning up resources.
Returns
ResultAsync<void, TechnicalError>
Example
const closeResult = await worker.close();
if (closeResult.isOk()) {
console.log('Worker closed successfully');
}create()
static create<TContract>(__namedParameters): ResultAsync<TypedAmqpWorker<TContract>, TechnicalError>;Defined in: packages/worker/src/worker.ts:281
Create a type-safe AMQP worker from a contract.
Connection management (including automatic reconnection) is handled internally by amqp-connection-manager via the AmqpClient. The worker will set up consumers for all contract-defined handlers asynchronously in the background once the underlying connection and channels are ready.
Connections are automatically shared across clients and workers with the same URLs and connection options, following RabbitMQ best practices.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
Parameters
| Parameter | Type |
|---|---|
__namedParameters | CreateWorkerOptions<TContract> |
Returns
ResultAsync<TypedAmqpWorker<TContract>, TechnicalError>
A ResultAsync that resolves to the worker or a TechnicalError.
Example
const result = await TypedAmqpWorker.create({
contract: myContract,
handlers: {
processOrder: ({ payload }) => okAsync(undefined),
},
urls: ['amqp://localhost'],
});Type Aliases
ConsumerOptions
type ConsumerOptions = AmqpClientConsumerOptions;Defined in: packages/worker/src/worker.ts:50
CreateWorkerOptions
type CreateWorkerOptions<TContract> = object;Defined in: packages/worker/src/worker.ts:97
Options for creating a type-safe AMQP worker.
Example
const options: CreateWorkerOptions<typeof contract> = {
contract: myContract,
handlers: {
// Simple handler
processOrder: ({ payload }) => {
console.log('Processing order:', payload.orderId);
return okAsync(undefined);
},
// Handler with prefetch configuration
processPayment: [
({ payload }) => {
console.log('Processing payment:', payload.paymentId);
return okAsync(undefined);
},
{ prefetch: 10 }
]
},
urls: ['amqp://localhost'],
defaultConsumerOptions: {
prefetch: 5,
},
connectionOptions: {
heartbeatIntervalInSeconds: 30
},
logger: myLogger
};Note: Retry configuration is defined at the queue level in the contract, not at the handler level. See QueueDefinition.retry for configuration options.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
Properties
| Property | Type | Description | Defined in |
|---|---|---|---|
connectionOptions? | AmqpConnectionManagerOptions | Optional connection configuration (heartbeat, reconnect settings, etc.) | packages/worker/src/worker.ts:114 |
connectTimeoutMs? | number | null | Maximum time in ms to wait for the AMQP connection to become ready before create() resolves to an err(TechnicalError). Defaults to 30s (the AmqpClient's DEFAULT_CONNECT_TIMEOUT_MS). Pass null to disable the timeout and let amqp-connection-manager retry indefinitely. | packages/worker/src/worker.ts:134 |
contract | TContract | The AMQP contract definition specifying consumers and their message schemas | packages/worker/src/worker.ts:99 |
defaultConsumerOptions? | ConsumerOptions | Optional default consumer options applied to all consumer handlers. Handler-specific options provided in tuple form override these defaults. | packages/worker/src/worker.ts:127 |
handlers | WorkerInferHandlers<TContract> | Handlers for each consumers and rpcs entry in the contract. - Regular consumers return ResultAsync<void, HandlerError>. - RPC handlers return ResultAsync<TResponse, HandlerError> where TResponse is inferred from the RPC's response message schema. Use defineHandler / defineHandlers to create handlers with full type inference. | packages/worker/src/worker.ts:110 |
logger? | Logger | Optional logger for logging message consumption and errors | packages/worker/src/worker.ts:116 |
telemetry? | TelemetryProvider | Optional telemetry provider for tracing and metrics. If not provided, uses the default provider which attempts to load OpenTelemetry. OpenTelemetry instrumentation is automatically enabled if @opentelemetry/api is installed. | packages/worker/src/worker.ts:122 |
urls | ConnectionUrl[] | AMQP broker URL(s). Multiple URLs provide failover support | packages/worker/src/worker.ts:112 |
WorkerConsumedMessage
type WorkerConsumedMessage<TPayload, THeaders> = object;Defined in: packages/worker/src/types.ts:156
A consumed message containing parsed payload and headers.
This type represents the first argument passed to consumer handlers. It contains the validated payload and (if defined in the message schema) the validated headers.
Example
const handler = defineHandler(contract, 'processOrder', (message, rawMessage) => {
console.log(message.payload.orderId); // Typed payload
console.log(message.headers?.priority); // Typed headers (if defined)
console.log(rawMessage.fields.deliveryTag); // Raw AMQP message
return okAsync(undefined);
});Type Parameters
| Type Parameter | Default type | Description |
|---|---|---|
TPayload | - | The inferred payload type from the message schema |
THeaders | undefined | The inferred headers type from the message schema (undefined if not defined) |
Properties
| Property | Type | Description | Defined in |
|---|---|---|---|
headers | THeaders extends undefined ? undefined : THeaders | The validated message headers (present only when headers schema is defined) | packages/worker/src/types.ts:160 |
payload | TPayload | The validated message payload | packages/worker/src/types.ts:158 |
WorkerInferConsumedMessage
type WorkerInferConsumedMessage<TContract, TName> = WorkerConsumedMessage<WorkerInferConsumerPayload<TContract, TName>, WorkerInferConsumerHeaders<TContract, TName>>;Defined in: packages/worker/src/types.ts:166
Infer the full consumed message type for a regular consumer.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferConsumerNames<TContract> |
WorkerInferConsumerHandler
type WorkerInferConsumerHandler<TContract, TName> = (message, rawMessage) => ResultAsync<void, HandlerError>;Defined in: packages/worker/src/types.ts:197
Handler signature for a regular consumer (event/command). Returns ResultAsync<void, HandlerError> — there is no response message.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferConsumerNames<TContract> |
Parameters
| Parameter | Type |
|---|---|
message | WorkerInferConsumedMessage<TContract, TName> |
rawMessage | ConsumeMessage |
Returns
ResultAsync<void, HandlerError>
WorkerInferConsumerHandlerEntry
type WorkerInferConsumerHandlerEntry<TContract, TName> =
| WorkerInferConsumerHandler<TContract, TName>
| readonly [WorkerInferConsumerHandler<TContract, TName>, ConsumerOptions];Defined in: packages/worker/src/types.ts:223
Handler entry for a regular consumer — function or [handler, options].
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferConsumerNames<TContract> |
WorkerInferConsumerHeaders
type WorkerInferConsumerHeaders<TContract, TName> = ConsumerInferHeadersOutput<InferConsumer<TContract, TName>>;Defined in: packages/worker/src/types.ts:85
Infer the headers type for a regular consumer. Returns undefined if no headers schema is defined.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferConsumerNames<TContract> |
WorkerInferHandlers
type WorkerInferHandlers<TContract> = [InferConsumerNames<TContract>] extends [never] ? object : { [K in InferConsumerNames<TContract>]: WorkerInferConsumerHandlerEntry<TContract, K> } & [InferRpcNames<TContract>] extends [never] ? object : { [K in InferRpcNames<TContract>]: WorkerInferRpcHandlerEntry<TContract, K> };Defined in: packages/worker/src/types.ts:257
All handlers for a contract: one entry per consumers key plus one entry per rpcs key. The two name spaces are disjoint so the resulting object type is unambiguous.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
Example
const handlers: WorkerInferHandlers<typeof contract> = {
processOrder: ({ payload }) =>
ResultAsync.fromPromise(
processPayment(payload),
(error) => new RetryableError('Payment failed', error),
).map(() => undefined),
calculate: ({ payload }) => okAsync({ sum: payload.a + payload.b }),
};WorkerInferRpcConsumedMessage
type WorkerInferRpcConsumedMessage<TContract, TName> = WorkerConsumedMessage<WorkerInferRpcRequest<TContract, TName>, WorkerInferRpcHeaders<TContract, TName>>;Defined in: packages/worker/src/types.ts:178
Infer the consumed message type for an RPC handler — payload + headers from the request side of the RPC.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferRpcNames<TContract> |
WorkerInferRpcHandler
type WorkerInferRpcHandler<TContract, TName> = (message, rawMessage) => ResultAsync<WorkerInferRpcResponse<TContract, TName>, HandlerError>;Defined in: packages/worker/src/types.ts:212
Handler signature for an RPC. Returns ResultAsync<TResponse, HandlerError> where TResponse is the inferred response payload. The worker validates the response against the RPC's response schema and publishes it back to msg.properties.replyTo with the same correlationId.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferRpcNames<TContract> |
Parameters
| Parameter | Type |
|---|---|
message | WorkerInferRpcConsumedMessage<TContract, TName> |
rawMessage | ConsumeMessage |
Returns
ResultAsync<WorkerInferRpcResponse<TContract, TName>, HandlerError>
WorkerInferRpcHandlerEntry
type WorkerInferRpcHandlerEntry<TContract, TName> =
| WorkerInferRpcHandler<TContract, TName>
| readonly [WorkerInferRpcHandler<TContract, TName>, ConsumerOptions];Defined in: packages/worker/src/types.ts:233
Handler entry for an RPC — function or [handler, options].
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferRpcNames<TContract> |
WorkerInferRpcHeaders
type WorkerInferRpcHeaders<TContract, TName> = InferRpc<TContract, TName> extends RpcDefinition<infer TRequest, MessageDefinition> ? TRequest extends MessageDefinition<infer _TPayload, infer THeaders> ? THeaders extends StandardSchemaV1<Record<string, unknown>> ? InferSchemaOutput<THeaders> : undefined : undefined : undefined;Defined in: packages/worker/src/types.ts:107
Infer the request headers type for an RPC. Returns undefined unless the RPC's request MessageDefinition declares a headers schema.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferRpcNames<TContract> |
WorkerInferRpcRequest
type WorkerInferRpcRequest<TContract, TName> = InferRpc<TContract, TName> extends RpcDefinition<infer TRequest, MessageDefinition> ? TRequest extends MessageDefinition ? InferSchemaOutput<TRequest["payload"]> : never : never;Defined in: packages/worker/src/types.ts:93
Infer the request payload type for an RPC.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferRpcNames<TContract> |
WorkerInferRpcResponse
type WorkerInferRpcResponse<TContract, TName> = InferRpc<TContract, TName> extends RpcDefinition<MessageDefinition, infer TResponse> ? TResponse extends MessageDefinition ? InferSchemaOutput<TResponse["payload"]> : never : never;Defined in: packages/worker/src/types.ts:123
Infer the response payload type for an RPC. The handler must return a ResultAsync<TResponse, HandlerError> matching this shape.
Type Parameters
| Type Parameter |
|---|
TContract extends ContractDefinition |
TName extends InferRpcNames<TContract> |
Functions
defineHandler()
Call Signature
function defineHandler<TContract, TName>(
contract,
name,
handler): WorkerInferConsumerHandlerEntry<TContract, TName>;Defined in: packages/worker/src/handlers.ts:121
Define a type-safe handler for a specific consumer or RPC in a contract.
Recommended: This function creates handlers that return ResultAsync<void, HandlerError> (consumers) or ResultAsync<TResponse, HandlerError> (RPCs), providing explicit error handling and better control over retry behavior.
Supports two patterns:
- Simple handler: just the function
- Handler with options:
[handler, { prefetch: 10 }]
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer or RPC name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer or RPC |
name | TName | The name of the consumer or RPC from the contract |
handler | WorkerInferConsumerHandler<TContract, TName> | The handler function — for consumers, returns ResultAsync<void, HandlerError>; for RPCs, returns ResultAsync<TResponse, HandlerError>. |
Returns
WorkerInferConsumerHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Examples
import { defineHandler, RetryableError, NonRetryableError } from '@amqp-contract/worker';
import { errAsync, okAsync, ResultAsync } from 'neverthrow';
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
({ payload }) =>
ResultAsync.fromPromise(
processPayment(payload),
(error) => new RetryableError('Payment failed', error),
).map(() => undefined),
);const calculateHandler = defineHandler(
rpcContract,
'calculate',
({ payload }) => okAsync({ sum: payload.a + payload.b }),
);Call Signature
function defineHandler<TContract, TName>(
contract,
name,
handler,
options): WorkerInferConsumerHandlerEntry<TContract, TName>;Defined in: packages/worker/src/handlers.ts:129
Define a type-safe handler for a specific consumer or RPC in a contract.
Recommended: This function creates handlers that return ResultAsync<void, HandlerError> (consumers) or ResultAsync<TResponse, HandlerError> (RPCs), providing explicit error handling and better control over retry behavior.
Supports two patterns:
- Simple handler: just the function
- Handler with options:
[handler, { prefetch: 10 }]
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer or RPC name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer or RPC |
name | TName | The name of the consumer or RPC from the contract |
handler | WorkerInferConsumerHandler<TContract, TName> | The handler function — for consumers, returns ResultAsync<void, HandlerError>; for RPCs, returns ResultAsync<TResponse, HandlerError>. |
options | ConsumerOptions | Optional consumer options (prefetch) |
Returns
WorkerInferConsumerHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Examples
import { defineHandler, RetryableError, NonRetryableError } from '@amqp-contract/worker';
import { errAsync, okAsync, ResultAsync } from 'neverthrow';
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
({ payload }) =>
ResultAsync.fromPromise(
processPayment(payload),
(error) => new RetryableError('Payment failed', error),
).map(() => undefined),
);const calculateHandler = defineHandler(
rpcContract,
'calculate',
({ payload }) => okAsync({ sum: payload.a + payload.b }),
);Call Signature
function defineHandler<TContract, TName>(
contract,
name,
handler): WorkerInferRpcHandlerEntry<TContract, TName>;Defined in: packages/worker/src/handlers.ts:138
Define a type-safe handler for a specific consumer or RPC in a contract.
Recommended: This function creates handlers that return ResultAsync<void, HandlerError> (consumers) or ResultAsync<TResponse, HandlerError> (RPCs), providing explicit error handling and better control over retry behavior.
Supports two patterns:
- Simple handler: just the function
- Handler with options:
[handler, { prefetch: 10 }]
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer or RPC name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer or RPC |
name | TName | The name of the consumer or RPC from the contract |
handler | WorkerInferRpcHandler<TContract, TName> | The handler function — for consumers, returns ResultAsync<void, HandlerError>; for RPCs, returns ResultAsync<TResponse, HandlerError>. |
Returns
WorkerInferRpcHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Examples
import { defineHandler, RetryableError, NonRetryableError } from '@amqp-contract/worker';
import { errAsync, okAsync, ResultAsync } from 'neverthrow';
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
({ payload }) =>
ResultAsync.fromPromise(
processPayment(payload),
(error) => new RetryableError('Payment failed', error),
).map(() => undefined),
);const calculateHandler = defineHandler(
rpcContract,
'calculate',
({ payload }) => okAsync({ sum: payload.a + payload.b }),
);Call Signature
function defineHandler<TContract, TName>(
contract,
name,
handler,
options): WorkerInferRpcHandlerEntry<TContract, TName>;Defined in: packages/worker/src/handlers.ts:146
Define a type-safe handler for a specific consumer or RPC in a contract.
Recommended: This function creates handlers that return ResultAsync<void, HandlerError> (consumers) or ResultAsync<TResponse, HandlerError> (RPCs), providing explicit error handling and better control over retry behavior.
Supports two patterns:
- Simple handler: just the function
- Handler with options:
[handler, { prefetch: 10 }]
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
TName extends string | number | symbol | The consumer or RPC name from the contract |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumer or RPC |
name | TName | The name of the consumer or RPC from the contract |
handler | WorkerInferRpcHandler<TContract, TName> | The handler function — for consumers, returns ResultAsync<void, HandlerError>; for RPCs, returns ResultAsync<TResponse, HandlerError>. |
options | ConsumerOptions | Optional consumer options (prefetch) |
Returns
WorkerInferRpcHandlerEntry<TContract, TName>
A type-safe handler that can be used with TypedAmqpWorker
Examples
import { defineHandler, RetryableError, NonRetryableError } from '@amqp-contract/worker';
import { errAsync, okAsync, ResultAsync } from 'neverthrow';
const processOrderHandler = defineHandler(
orderContract,
'processOrder',
({ payload }) =>
ResultAsync.fromPromise(
processPayment(payload),
(error) => new RetryableError('Payment failed', error),
).map(() => undefined),
);const calculateHandler = defineHandler(
rpcContract,
'calculate',
({ payload }) => okAsync({ sum: payload.a + payload.b }),
);defineHandlers()
function defineHandlers<TContract>(contract, handlers): WorkerInferHandlers<TContract>;Defined in: packages/worker/src/handlers.ts:198
Define multiple type-safe handlers for consumers and RPCs in a contract.
Recommended: This function creates handlers that return ResultAsync<void, HandlerError> (consumers) or ResultAsync<TResponse, HandlerError> (RPCs), providing explicit error handling and better control over retry behavior.
The handlers object must contain exactly one entry per consumers and rpcs key in the contract — see WorkerInferHandlers.
Type Parameters
| Type Parameter | Description |
|---|---|
TContract extends ContractDefinition | The contract definition type |
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | TContract | The contract definition containing the consumers and RPCs |
handlers | WorkerInferHandlers<TContract> | An object with handler functions for each consumer and RPC |
Returns
WorkerInferHandlers<TContract>
A type-safe handlers object that can be used with TypedAmqpWorker
Example
import { defineHandlers, RetryableError } from '@amqp-contract/worker';
import { okAsync, ResultAsync } from 'neverthrow';
const handlers = defineHandlers(orderContract, {
processOrder: ({ payload }) =>
ResultAsync.fromPromise(
processPayment(payload),
(error) => new RetryableError('Payment failed', error),
).map(() => undefined),
calculate: ({ payload }) => okAsync({ sum: payload.a + payload.b }),
});isHandlerError()
function isHandlerError(error): error is HandlerError;Defined in: packages/worker/src/errors.ts:124
Type guard to check if an error is any HandlerError (RetryableError or NonRetryableError).
Parameters
| Parameter | Type | Description |
|---|---|---|
error | unknown | The error to check |
Returns
error is HandlerError
True if the error is a HandlerError
Example
import { isHandlerError } from '@amqp-contract/worker';
function handleError(error: unknown) {
if (isHandlerError(error)) {
// error is RetryableError | NonRetryableError
console.log('Handler error:', error.name, error.message);
}
}isNonRetryableError()
function isNonRetryableError(error): error is NonRetryableError;Defined in: packages/worker/src/errors.ts:102
Type guard to check if an error is a NonRetryableError.
Use this to check error types in catch blocks or error handlers.
Parameters
| Parameter | Type | Description |
|---|---|---|
error | unknown | The error to check |
Returns
error is NonRetryableError
True if the error is a NonRetryableError
Example
import { isNonRetryableError } from '@amqp-contract/worker';
try {
await processMessage();
} catch (error) {
if (isNonRetryableError(error)) {
console.log('Will not retry:', error.message);
}
}isRetryableError()
function isRetryableError(error): error is RetryableError;Defined in: packages/worker/src/errors.ts:77
Type guard to check if an error is a RetryableError.
Use this to check error types in catch blocks or error handlers.
Parameters
| Parameter | Type | Description |
|---|---|---|
error | unknown | The error to check |
Returns
error is RetryableError
True if the error is a RetryableError
Example
import { isRetryableError } from '@amqp-contract/worker';
try {
await processMessage();
} catch (error) {
if (isRetryableError(error)) {
console.log('Will retry:', error.message);
} else {
console.log('Permanent failure:', error);
}
}nonRetryable()
function nonRetryable(message, cause?): NonRetryableError;Defined in: packages/worker/src/errors.ts:187
Create a NonRetryableError with less verbosity.
This is a shorthand factory function for creating NonRetryableError instances. Use it for cleaner error creation in handlers.
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | Error message describing the failure |
cause? | unknown | Optional underlying error that caused this failure |
Returns
A new NonRetryableError instance
Example
import { nonRetryable } from '@amqp-contract/worker';
import { errAsync, okAsync } from 'neverthrow';
const handler = ({ payload }) => {
if (!isValidPayload(payload)) {
return errAsync(nonRetryable('Invalid payload format'));
}
return okAsync(undefined);
};
// Equivalent to:
// return errAsync(new NonRetryableError('Invalid payload format'));retryable()
function retryable(message, cause?): RetryableError;Defined in: packages/worker/src/errors.ts:157
Create a RetryableError with less verbosity.
This is a shorthand factory function for creating RetryableError instances. Use it for cleaner error creation in handlers.
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | Error message describing the failure |
cause? | unknown | Optional underlying error that caused this failure |
Returns
A new RetryableError instance
Example
import { retryable } from '@amqp-contract/worker';
import { ResultAsync } from 'neverthrow';
const handler = ({ payload }) =>
ResultAsync.fromPromise(
processPayment(payload),
(e) => retryable('Payment service unavailable', e),
).map(() => undefined);
// Equivalent to:
// ResultAsync.fromPromise(processPayment(payload), (e) => new RetryableError('...', e))