Skip to content

@amqp-contract/core


@amqp-contract/core

Classes

AmqpClient

Defined in: packages/core/src/amqp-client.ts:84

AMQP client that manages connections and channels with automatic topology setup.

This class handles:

  • Connection management with automatic reconnection via amqp-connection-manager
  • Connection pooling and sharing across instances with the same URLs
  • Automatic AMQP topology setup (exchanges, queues, bindings) from contract
  • Channel creation with JSON serialization enabled by default

All operations return Future<Result<T, TechnicalError>> for consistent error handling.

Example

typescript
const client = new AmqpClient(contract, {
  urls: ['amqp://localhost'],
  connectionOptions: { heartbeatIntervalInSeconds: 30 }
});

// Wait for connection
await client.waitForConnect().resultToPromise();

// Publish a message
const result = await client.publish('exchange', 'routingKey', { data: 'value' }).resultToPromise();

// Close when done
await client.close().resultToPromise();

Constructors

Constructor
ts
new AmqpClient(contract, options): AmqpClient;

Defined in: packages/core/src/amqp-client.ts:101

Create a new AMQP client instance.

The client will automatically:

  • Get or create a shared connection using the singleton pattern
  • Set up AMQP topology (exchanges, queues, bindings) from the contract
  • Create a channel with JSON serialization enabled
Parameters
ParameterTypeDescription
contractContractDefinitionThe contract definition specifying the AMQP topology
optionsAmqpClientOptionsClient configuration options
Returns

AmqpClient

Methods

ack()
ts
ack(msg, allUpTo?): void;

Defined in: packages/core/src/amqp-client.ts:219

Acknowledge a message.

Parameters
ParameterTypeDefault valueDescription
msgConsumeMessageundefinedThe message to acknowledge
allUpTobooleanfalseIf true, acknowledge all messages up to and including this one
Returns

void

addSetup()
ts
addSetup(setup): void;

Defined in: packages/core/src/amqp-client.ts:241

Add a setup function to be called when the channel is created or reconnected.

This is useful for setting up channel-level configuration like prefetch.

Parameters
ParameterTypeDescription
setup(channel) => void | Promise<void>The setup function to add
Returns

void

cancel()
ts
cancel(consumerTag): Future<Result<void, TechnicalError>>;

Defined in: packages/core/src/amqp-client.ts:207

Cancel a consumer by its consumer tag.

Parameters
ParameterTypeDescription
consumerTagstringThe consumer tag to cancel
Returns

Future<Result<void, TechnicalError>>

A Future that resolves when the consumer is cancelled

close()
ts
close(): Future<Result<void, TechnicalError>>;

Defined in: packages/core/src/amqp-client.ts:270

Close the channel and release the connection reference.

This will:

  • Close the channel wrapper
  • Decrease the reference count on the shared connection
  • Close the connection if this was the last client using it
Returns

Future<Result<void, TechnicalError>>

A Future that resolves when the channel and connection are closed

consume()
ts
consume(
   queue, 
   callback, 
   options?): Future<Result<string, TechnicalError>>;

Defined in: packages/core/src/amqp-client.ts:191

Start consuming messages from a queue.

Parameters
ParameterTypeDescription
queuestringThe queue name
callbackConsumeCallbackThe callback to invoke for each message
options?ConsumeOptional consume options
Returns

Future<Result<string, TechnicalError>>

A Future with Result<string> - the consumer tag

getConnection()
ts
getConnection(): IAmqpConnectionManager;

Defined in: packages/core/src/amqp-client.ts:148

Get the underlying connection manager

This method exposes the AmqpConnectionManager instance that this client uses. The connection is automatically shared across all AmqpClient instances that use the same URLs and connection options.

Returns

IAmqpConnectionManager

The AmqpConnectionManager instance used by this client

nack()
ts
nack(
   msg, 
   allUpTo?, 
   requeue?): void;

Defined in: packages/core/src/amqp-client.ts:230

Negative acknowledge a message.

Parameters
ParameterTypeDefault valueDescription
msgConsumeMessageundefinedThe message to nack
allUpTobooleanfalseIf true, nack all messages up to and including this one
requeuebooleantrueIf true, requeue the message(s)
Returns

void

on()
ts
on(event, listener): void;

Defined in: packages/core/src/amqp-client.ts:256

Register an event listener on the channel wrapper.

Available events:

  • 'connect': Emitted when the channel is (re)connected
  • 'close': Emitted when the channel is closed
  • 'error': Emitted when an error occurs
Parameters
ParameterTypeDescription
eventstringThe event name
listener(...args) => voidThe event listener
Returns

void

publish()
ts
publish(
   exchange, 
   routingKey, 
   content, 
   options?): Future<Result<boolean, TechnicalError>>;

Defined in: packages/core/src/amqp-client.ts:172

Publish a message to an exchange.

Parameters
ParameterTypeDescription
exchangestringThe exchange name
routingKeystringThe routing key
contentunknownThe message content (will be JSON serialized if json: true)
options?PublishOptional publish options
Returns

Future<Result<boolean, TechnicalError>>

A Future with Result<boolean> - true if message was sent, false if channel buffer is full

waitForConnect()
ts
waitForConnect(): Future<Result<void, TechnicalError>>;

Defined in: packages/core/src/amqp-client.ts:157

Wait for the channel to be connected and ready.

Returns

Future<Result<void, TechnicalError>>

A Future that resolves when the channel is connected


ConnectionManagerSingleton

Defined in: packages/core/src/connection-manager.ts:23

Connection manager singleton for sharing AMQP connections across clients.

This singleton implements connection pooling to avoid creating multiple connections to the same broker, which is a RabbitMQ best practice. Connections are identified by their URLs and connection options, and reference counting ensures connections are only closed when all clients have released them.

Example

typescript
const manager = ConnectionManagerSingleton.getInstance();
const connection = manager.getConnection(['amqp://localhost']);
// ... use connection ...
await manager.releaseConnection(['amqp://localhost']);

Methods

getConnection()
ts
getConnection(urls, connectionOptions?): IAmqpConnectionManager;

Defined in: packages/core/src/connection-manager.ts:52

Get or create a connection for the given URLs and options.

If a connection already exists with the same URLs and options, it is reused and its reference count is incremented. Otherwise, a new connection is created.

Parameters
ParameterTypeDescription
urlsConnectionUrl[]AMQP broker URL(s)
connectionOptions?AmqpConnectionManagerOptionsOptional connection configuration
Returns

IAmqpConnectionManager

The AMQP connection manager instance

releaseConnection()
ts
releaseConnection(urls, connectionOptions?): Promise<void>;

Defined in: packages/core/src/connection-manager.ts:81

Release a connection reference.

Decrements the reference count for the connection. If the count reaches zero, the connection is closed and removed from the pool.

Parameters
ParameterTypeDescription
urlsConnectionUrl[]AMQP broker URL(s) used to identify the connection
connectionOptions?AmqpConnectionManagerOptionsOptional connection configuration used to identify the connection
Returns

Promise<void>

A promise that resolves when the connection is released (and closed if necessary)

getInstance()
ts
static getInstance(): ConnectionManagerSingleton;

Defined in: packages/core/src/connection-manager.ts:35

Get the singleton instance of the connection manager.

Returns

ConnectionManagerSingleton

The singleton instance


MessageValidationError

Defined in: packages/core/src/errors.ts:33

Error thrown when message validation fails (payload or headers).

Used by both the client (publish-time payload validation) and the worker (consume-time payload and headers validation).

Param

The name of the publisher or consumer that triggered the validation

Param

The validation issues from the Standard Schema validation

Extends

  • Error

Constructors

Constructor
ts
new MessageValidationError(source, issues): MessageValidationError;

Defined in: packages/core/src/errors.ts:34

Parameters
ParameterType
sourcestring
issuesunknown
Returns

MessageValidationError

Overrides
ts
Error.constructor

Properties

PropertyModifierTypeDescriptionInherited fromDefined in
cause?publicunknown-Error.causenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es2022.error.d.ts:26
issuesreadonlyunknown--packages/core/src/errors.ts:36
messagepublicstring-Error.messagenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1077
namepublicstring-Error.namenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1076
sourcereadonlystring--packages/core/src/errors.ts:35
stack?publicstring-Error.stacknode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1078
stackTraceLimitstaticnumberThe Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames.Error.stackTraceLimitnode_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:67

Methods

captureStackTrace()
ts
static captureStackTrace(targetObject, constructorOpt?): void;

Defined in: node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:51

Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.

js
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack;  // Similar to `new Error().stack`

The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.

The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.

The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:

js
function a() {
  b();
}

function b() {
  c();
}

function c() {
  // Create an error without stack trace to avoid calculating the stack trace twice.
  const { stackTraceLimit } = Error;
  Error.stackTraceLimit = 0;
  const error = new Error();
  Error.stackTraceLimit = stackTraceLimit;

  // Capture the stack trace above function b
  Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
  throw error;
}

a();
Parameters
ParameterType
targetObjectobject
constructorOpt?Function
Returns

void

Inherited from
ts
Error.captureStackTrace
prepareStackTrace()
ts
static prepareStackTrace(err, stackTraces): any;

Defined in: node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:55

Parameters
ParameterType
errError
stackTracesCallSite[]
Returns

any

See

https://v8.dev/docs/stack-trace-api#customizing-stack-traces

Inherited from
ts
Error.prepareStackTrace

TechnicalError

Defined in: packages/core/src/errors.ts:7

Error for technical/runtime failures that cannot be prevented by TypeScript.

This includes AMQP connection failures, channel issues, validation failures, and other runtime errors. This error is shared across core, worker, and client packages.

Extends

  • Error

Constructors

Constructor
ts
new TechnicalError(message, cause?): TechnicalError;

Defined in: packages/core/src/errors.ts:8

Parameters
ParameterType
messagestring
cause?unknown
Returns

TechnicalError

Overrides
ts
Error.constructor

Properties

PropertyModifierTypeDescriptionInherited fromDefined in
cause?readonlyunknown-Error.causepackages/core/src/errors.ts:10
messagepublicstring-Error.messagenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1077
namepublicstring-Error.namenode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1076
stack?publicstring-Error.stacknode_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1078
stackTraceLimitstaticnumberThe Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames.Error.stackTraceLimitnode_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:67

Methods

captureStackTrace()
ts
static captureStackTrace(targetObject, constructorOpt?): void;

Defined in: node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:51

Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.

js
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack;  // Similar to `new Error().stack`

The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.

The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.

The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:

js
function a() {
  b();
}

function b() {
  c();
}

function c() {
  // Create an error without stack trace to avoid calculating the stack trace twice.
  const { stackTraceLimit } = Error;
  Error.stackTraceLimit = 0;
  const error = new Error();
  Error.stackTraceLimit = stackTraceLimit;

  // Capture the stack trace above function b
  Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
  throw error;
}

a();
Parameters
ParameterType
targetObjectobject
constructorOpt?Function
Returns

void

Inherited from
ts
Error.captureStackTrace
prepareStackTrace()
ts
static prepareStackTrace(err, stackTraces): any;

Defined in: node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:55

Parameters
ParameterType
errError
stackTracesCallSite[]
Returns

any

See

https://v8.dev/docs/stack-trace-api#customizing-stack-traces

Inherited from
ts
Error.prepareStackTrace

Type Aliases

AmqpClientOptions

ts
type AmqpClientOptions = object;

Defined in: packages/core/src/amqp-client.ts:45

Options for creating an AMQP client.

Properties

PropertyTypeDescriptionDefined in
channelOptions?Partial<CreateChannelOpts>Optional channel configuration options.packages/core/src/amqp-client.ts:48
connectionOptions?AmqpConnectionManagerOptionsOptional connection configuration (heartbeat, reconnect settings, etc.).packages/core/src/amqp-client.ts:47
urlsConnectionUrl[]AMQP broker URL(s). Multiple URLs provide failover support.packages/core/src/amqp-client.ts:46

ConsumeCallback()

ts
type ConsumeCallback = (msg) => void | Promise<void>;

Defined in: packages/core/src/amqp-client.ts:54

Callback type for consuming messages.

Parameters

ParameterType
msgConsumeMessage | null

Returns

void | Promise<void>


Logger

ts
type Logger = object;

Defined in: packages/core/src/logger.ts:30

Logger interface for amqp-contract packages.

Provides a simple logging abstraction that can be implemented by users to integrate with their preferred logging framework.

Example

typescript
// Simple console logger implementation
const logger: Logger = {
  debug: (message, context) => console.debug(message, context),
  info: (message, context) => console.info(message, context),
  warn: (message, context) => console.warn(message, context),
  error: (message, context) => console.error(message, context),
};

Methods

debug()
ts
debug(message, context?): void;

Defined in: packages/core/src/logger.ts:36

Log debug level messages

Parameters
ParameterTypeDescription
messagestringThe log message
context?LoggerContextOptional context to include with the log
Returns

void

error()
ts
error(message, context?): void;

Defined in: packages/core/src/logger.ts:57

Log error level messages

Parameters
ParameterTypeDescription
messagestringThe log message
context?LoggerContextOptional context to include with the log
Returns

void

info()
ts
info(message, context?): void;

Defined in: packages/core/src/logger.ts:43

Log info level messages

Parameters
ParameterTypeDescription
messagestringThe log message
context?LoggerContextOptional context to include with the log
Returns

void

warn()
ts
warn(message, context?): void;

Defined in: packages/core/src/logger.ts:50

Log warning level messages

Parameters
ParameterTypeDescription
messagestringThe log message
context?LoggerContextOptional context to include with the log
Returns

void


LoggerContext

ts
type LoggerContext = Record<string, unknown> & object;

Defined in: packages/core/src/logger.ts:9

Context object for logger methods.

This type includes reserved keys that provide consistent naming for common logging context properties.

Type Declaration

NameTypeDefined in
error?unknownpackages/core/src/logger.ts:10

TelemetryProvider

ts
type TelemetryProvider = object;

Defined in: packages/core/src/telemetry.ts:53

Telemetry provider for AMQP operations. Uses lazy loading to gracefully handle cases where OpenTelemetry is not installed.

Properties

PropertyTypeDescriptionDefined in
getConsumeCounter() => Counter | undefinedGet a counter for messages consumed. Returns undefined if OpenTelemetry is not available.packages/core/src/telemetry.ts:70
getConsumeLatencyHistogram() => Histogram | undefinedGet a histogram for consume/process latency. Returns undefined if OpenTelemetry is not available.packages/core/src/telemetry.ts:82
getPublishCounter() => Counter | undefinedGet a counter for messages published. Returns undefined if OpenTelemetry is not available.packages/core/src/telemetry.ts:64
getPublishLatencyHistogram() => Histogram | undefinedGet a histogram for publish latency. Returns undefined if OpenTelemetry is not available.packages/core/src/telemetry.ts:76
getTracer() => Tracer | undefinedGet a tracer instance for creating spans. Returns undefined if OpenTelemetry is not available.packages/core/src/telemetry.ts:58

Variables

defaultTelemetryProvider

ts
const defaultTelemetryProvider: TelemetryProvider;

Defined in: packages/core/src/telemetry.ts:194

Default telemetry provider that uses OpenTelemetry API if available.


MessagingSemanticConventions

ts
const MessagingSemanticConventions: object;

Defined in: packages/core/src/telemetry.ts:25

Semantic conventions for AMQP messaging following OpenTelemetry standards.

Type Declaration

NameTypeDefault valueDefined in
AMQP_CONSUMER_NAME"amqp.consumer.name""amqp.consumer.name"packages/core/src/telemetry.ts:36
AMQP_PUBLISHER_NAME"amqp.publisher.name""amqp.publisher.name"packages/core/src/telemetry.ts:35
ERROR_TYPE"error.type""error.type"packages/core/src/telemetry.ts:39
MESSAGING_DESTINATION"messaging.destination.name""messaging.destination.name"packages/core/src/telemetry.ts:28
MESSAGING_DESTINATION_KIND"messaging.destination.kind""messaging.destination.kind"packages/core/src/telemetry.ts:29
MESSAGING_DESTINATION_KIND_EXCHANGE"exchange""exchange"packages/core/src/telemetry.ts:43
MESSAGING_DESTINATION_KIND_QUEUE"queue""queue"packages/core/src/telemetry.ts:44
MESSAGING_OPERATION"messaging.operation""messaging.operation"packages/core/src/telemetry.ts:30
MESSAGING_OPERATION_PROCESS"process""process"packages/core/src/telemetry.ts:46
MESSAGING_OPERATION_PUBLISH"publish""publish"packages/core/src/telemetry.ts:45
MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG"messaging.rabbitmq.message.delivery_tag""messaging.rabbitmq.message.delivery_tag"packages/core/src/telemetry.ts:34
MESSAGING_RABBITMQ_ROUTING_KEY"messaging.rabbitmq.destination.routing_key""messaging.rabbitmq.destination.routing_key"packages/core/src/telemetry.ts:33
MESSAGING_SYSTEM"messaging.system""messaging.system"packages/core/src/telemetry.ts:27
MESSAGING_SYSTEM_RABBITMQ"rabbitmq""rabbitmq"packages/core/src/telemetry.ts:42

See

https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/

Functions

endSpanError()

ts
function endSpanError(span, error): void;

Defined in: packages/core/src/telemetry.ts:288

End a span with error status.

Parameters

ParameterType
spanSpan | undefined
errorError

Returns

void


endSpanSuccess()

ts
function endSpanSuccess(span): void;

Defined in: packages/core/src/telemetry.ts:273

End a span with success status.

Parameters

ParameterType
spanSpan | undefined

Returns

void


recordConsumeMetric()

ts
function recordConsumeMetric(
   provider, 
   queueName, 
   consumerName, 
   success, 
   durationMs): void;

Defined in: packages/core/src/telemetry.ts:332

Record a consume metric.

Parameters

ParameterType
providerTelemetryProvider
queueNamestring
consumerNamestring
successboolean
durationMsnumber

Returns

void


recordPublishMetric()

ts
function recordPublishMetric(
   provider, 
   exchangeName, 
   routingKey, 
   success, 
   durationMs): void;

Defined in: packages/core/src/telemetry.ts:305

Record a publish metric.

Parameters

ParameterType
providerTelemetryProvider
exchangeNamestring
routingKeystring | undefined
successboolean
durationMsnumber

Returns

void


setupAmqpTopology()

ts
function setupAmqpTopology(channel, contract): Promise<void>;

Defined in: packages/core/src/setup.ts:26

Setup AMQP topology (exchanges, queues, and bindings) from a contract definition.

This function sets up the complete AMQP topology in the correct order:

  1. Assert all exchanges defined in the contract
  2. Validate dead letter exchanges are declared before referencing them
  3. Assert all queues with their configurations (including dead letter settings)
  4. Create all bindings (queue-to-exchange and exchange-to-exchange)

Parameters

ParameterTypeDescription
channelChannelThe AMQP channel to use for topology setup
contractContractDefinitionThe contract definition containing the topology specification

Returns

Promise<void>

Throws

If any exchanges, queues, or bindings fail to be created

Throws

If a queue references a dead letter exchange not declared in the contract

Example

typescript
const channel = await connection.createChannel();
await setupAmqpTopology(channel, contract);

startConsumeSpan()

ts
function startConsumeSpan(
   provider, 
   queueName, 
   consumerName, 
   attributes?): Span | undefined;

Defined in: packages/core/src/telemetry.ts:241

Create a span for a consume/process operation. Returns undefined if OpenTelemetry is not available.

Parameters

ParameterType
providerTelemetryProvider
queueNamestring
consumerNamestring
attributes?Attributes

Returns

Span | undefined


startPublishSpan()

ts
function startPublishSpan(
   provider, 
   exchangeName, 
   routingKey, 
   attributes?): Span | undefined;

Defined in: packages/core/src/telemetry.ts:206

Create a span for a publish operation. Returns undefined if OpenTelemetry is not available.

Parameters

ParameterType
providerTelemetryProvider
exchangeNamestring
routingKeystring | undefined
attributes?Attributes

Returns

Span | undefined

Released under the MIT License.