@amqp-contract/core
@amqp-contract/core
Classes
AmqpClient
Defined in: packages/core/src/amqp-client.ts:84
AMQP client that manages connections and channels with automatic topology setup.
This class handles:
- Connection management with automatic reconnection via amqp-connection-manager
- Connection pooling and sharing across instances with the same URLs
- Automatic AMQP topology setup (exchanges, queues, bindings) from contract
- Channel creation with JSON serialization enabled by default
All operations return Future<Result<T, TechnicalError>> for consistent error handling.
Example
const client = new AmqpClient(contract, {
urls: ['amqp://localhost'],
connectionOptions: { heartbeatIntervalInSeconds: 30 }
});
// Wait for connection
await client.waitForConnect().resultToPromise();
// Publish a message
const result = await client.publish('exchange', 'routingKey', { data: 'value' }).resultToPromise();
// Close when done
await client.close().resultToPromise();Constructors
Constructor
new AmqpClient(contract, options): AmqpClient;Defined in: packages/core/src/amqp-client.ts:101
Create a new AMQP client instance.
The client will automatically:
- Get or create a shared connection using the singleton pattern
- Set up AMQP topology (exchanges, queues, bindings) from the contract
- Create a channel with JSON serialization enabled
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | ContractDefinition | The contract definition specifying the AMQP topology |
options | AmqpClientOptions | Client configuration options |
Returns
Methods
ack()
ack(msg, allUpTo?): void;Defined in: packages/core/src/amqp-client.ts:219
Acknowledge a message.
Parameters
| Parameter | Type | Default value | Description |
|---|---|---|---|
msg | ConsumeMessage | undefined | The message to acknowledge |
allUpTo | boolean | false | If true, acknowledge all messages up to and including this one |
Returns
void
addSetup()
addSetup(setup): void;Defined in: packages/core/src/amqp-client.ts:241
Add a setup function to be called when the channel is created or reconnected.
This is useful for setting up channel-level configuration like prefetch.
Parameters
| Parameter | Type | Description |
|---|---|---|
setup | (channel) => void | Promise<void> | The setup function to add |
Returns
void
cancel()
cancel(consumerTag): Future<Result<void, TechnicalError>>;Defined in: packages/core/src/amqp-client.ts:207
Cancel a consumer by its consumer tag.
Parameters
| Parameter | Type | Description |
|---|---|---|
consumerTag | string | The consumer tag to cancel |
Returns
Future<Result<void, TechnicalError>>
A Future that resolves when the consumer is cancelled
close()
close(): Future<Result<void, TechnicalError>>;Defined in: packages/core/src/amqp-client.ts:270
Close the channel and release the connection reference.
This will:
- Close the channel wrapper
- Decrease the reference count on the shared connection
- Close the connection if this was the last client using it
Returns
Future<Result<void, TechnicalError>>
A Future that resolves when the channel and connection are closed
consume()
consume(
queue,
callback,
options?): Future<Result<string, TechnicalError>>;Defined in: packages/core/src/amqp-client.ts:191
Start consuming messages from a queue.
Parameters
| Parameter | Type | Description |
|---|---|---|
queue | string | The queue name |
callback | ConsumeCallback | The callback to invoke for each message |
options? | Consume | Optional consume options |
Returns
Future<Result<string, TechnicalError>>
A Future with Result<string> - the consumer tag
getConnection()
getConnection(): IAmqpConnectionManager;Defined in: packages/core/src/amqp-client.ts:148
Get the underlying connection manager
This method exposes the AmqpConnectionManager instance that this client uses. The connection is automatically shared across all AmqpClient instances that use the same URLs and connection options.
Returns
IAmqpConnectionManager
The AmqpConnectionManager instance used by this client
nack()
nack(
msg,
allUpTo?,
requeue?): void;Defined in: packages/core/src/amqp-client.ts:230
Negative acknowledge a message.
Parameters
| Parameter | Type | Default value | Description |
|---|---|---|---|
msg | ConsumeMessage | undefined | The message to nack |
allUpTo | boolean | false | If true, nack all messages up to and including this one |
requeue | boolean | true | If true, requeue the message(s) |
Returns
void
on()
on(event, listener): void;Defined in: packages/core/src/amqp-client.ts:256
Register an event listener on the channel wrapper.
Available events:
- 'connect': Emitted when the channel is (re)connected
- 'close': Emitted when the channel is closed
- 'error': Emitted when an error occurs
Parameters
| Parameter | Type | Description |
|---|---|---|
event | string | The event name |
listener | (...args) => void | The event listener |
Returns
void
publish()
publish(
exchange,
routingKey,
content,
options?): Future<Result<boolean, TechnicalError>>;Defined in: packages/core/src/amqp-client.ts:172
Publish a message to an exchange.
Parameters
| Parameter | Type | Description |
|---|---|---|
exchange | string | The exchange name |
routingKey | string | The routing key |
content | unknown | The message content (will be JSON serialized if json: true) |
options? | Publish | Optional publish options |
Returns
Future<Result<boolean, TechnicalError>>
A Future with Result<boolean> - true if message was sent, false if channel buffer is full
waitForConnect()
waitForConnect(): Future<Result<void, TechnicalError>>;Defined in: packages/core/src/amqp-client.ts:157
Wait for the channel to be connected and ready.
Returns
Future<Result<void, TechnicalError>>
A Future that resolves when the channel is connected
ConnectionManagerSingleton
Defined in: packages/core/src/connection-manager.ts:23
Connection manager singleton for sharing AMQP connections across clients.
This singleton implements connection pooling to avoid creating multiple connections to the same broker, which is a RabbitMQ best practice. Connections are identified by their URLs and connection options, and reference counting ensures connections are only closed when all clients have released them.
Example
const manager = ConnectionManagerSingleton.getInstance();
const connection = manager.getConnection(['amqp://localhost']);
// ... use connection ...
await manager.releaseConnection(['amqp://localhost']);Methods
getConnection()
getConnection(urls, connectionOptions?): IAmqpConnectionManager;Defined in: packages/core/src/connection-manager.ts:52
Get or create a connection for the given URLs and options.
If a connection already exists with the same URLs and options, it is reused and its reference count is incremented. Otherwise, a new connection is created.
Parameters
| Parameter | Type | Description |
|---|---|---|
urls | ConnectionUrl[] | AMQP broker URL(s) |
connectionOptions? | AmqpConnectionManagerOptions | Optional connection configuration |
Returns
IAmqpConnectionManager
The AMQP connection manager instance
releaseConnection()
releaseConnection(urls, connectionOptions?): Promise<void>;Defined in: packages/core/src/connection-manager.ts:81
Release a connection reference.
Decrements the reference count for the connection. If the count reaches zero, the connection is closed and removed from the pool.
Parameters
| Parameter | Type | Description |
|---|---|---|
urls | ConnectionUrl[] | AMQP broker URL(s) used to identify the connection |
connectionOptions? | AmqpConnectionManagerOptions | Optional connection configuration used to identify the connection |
Returns
Promise<void>
A promise that resolves when the connection is released (and closed if necessary)
getInstance()
static getInstance(): ConnectionManagerSingleton;Defined in: packages/core/src/connection-manager.ts:35
Get the singleton instance of the connection manager.
Returns
The singleton instance
MessageValidationError
Defined in: packages/core/src/errors.ts:33
Error thrown when message validation fails (payload or headers).
Used by both the client (publish-time payload validation) and the worker (consume-time payload and headers validation).
Param
The name of the publisher or consumer that triggered the validation
Param
The validation issues from the Standard Schema validation
Extends
Error
Constructors
Constructor
new MessageValidationError(source, issues): MessageValidationError;Defined in: packages/core/src/errors.ts:34
Parameters
| Parameter | Type |
|---|---|
source | string |
issues | unknown |
Returns
Overrides
Error.constructorProperties
| Property | Modifier | Type | Description | Inherited from | Defined in |
|---|---|---|---|---|---|
cause? | public | unknown | - | Error.cause | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es2022.error.d.ts:26 |
issues | readonly | unknown | - | - | packages/core/src/errors.ts:36 |
message | public | string | - | Error.message | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1077 |
name | public | string | - | Error.name | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1076 |
source | readonly | string | - | - | packages/core/src/errors.ts:35 |
stack? | public | string | - | Error.stack | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1078 |
stackTraceLimit | static | number | The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames. | Error.stackTraceLimit | node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:67 |
Methods
captureStackTrace()
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:51
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
Error.captureStackTraceprepareStackTrace()
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:55
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
Error.prepareStackTraceTechnicalError
Defined in: packages/core/src/errors.ts:7
Error for technical/runtime failures that cannot be prevented by TypeScript.
This includes AMQP connection failures, channel issues, validation failures, and other runtime errors. This error is shared across core, worker, and client packages.
Extends
Error
Constructors
Constructor
new TechnicalError(message, cause?): TechnicalError;Defined in: packages/core/src/errors.ts:8
Parameters
| Parameter | Type |
|---|---|
message | string |
cause? | unknown |
Returns
Overrides
Error.constructorProperties
| Property | Modifier | Type | Description | Inherited from | Defined in |
|---|---|---|---|---|---|
cause? | readonly | unknown | - | Error.cause | packages/core/src/errors.ts:10 |
message | public | string | - | Error.message | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1077 |
name | public | string | - | Error.name | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1076 |
stack? | public | string | - | Error.stack | node_modules/.pnpm/typescript@5.9.3/node_modules/typescript/lib/lib.es5.d.ts:1078 |
stackTraceLimit | static | number | The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames. | Error.stackTraceLimit | node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:67 |
Methods
captureStackTrace()
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:51
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
Error.captureStackTraceprepareStackTrace()
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@25.3.2/node_modules/@types/node/globals.d.ts:55
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
Error.prepareStackTraceType Aliases
AmqpClientOptions
type AmqpClientOptions = object;Defined in: packages/core/src/amqp-client.ts:45
Options for creating an AMQP client.
Properties
| Property | Type | Description | Defined in |
|---|---|---|---|
channelOptions? | Partial<CreateChannelOpts> | Optional channel configuration options. | packages/core/src/amqp-client.ts:48 |
connectionOptions? | AmqpConnectionManagerOptions | Optional connection configuration (heartbeat, reconnect settings, etc.). | packages/core/src/amqp-client.ts:47 |
urls | ConnectionUrl[] | AMQP broker URL(s). Multiple URLs provide failover support. | packages/core/src/amqp-client.ts:46 |
ConsumeCallback()
type ConsumeCallback = (msg) => void | Promise<void>;Defined in: packages/core/src/amqp-client.ts:54
Callback type for consuming messages.
Parameters
| Parameter | Type |
|---|---|
msg | ConsumeMessage | null |
Returns
void | Promise<void>
Logger
type Logger = object;Defined in: packages/core/src/logger.ts:30
Logger interface for amqp-contract packages.
Provides a simple logging abstraction that can be implemented by users to integrate with their preferred logging framework.
Example
// Simple console logger implementation
const logger: Logger = {
debug: (message, context) => console.debug(message, context),
info: (message, context) => console.info(message, context),
warn: (message, context) => console.warn(message, context),
error: (message, context) => console.error(message, context),
};Methods
debug()
debug(message, context?): void;Defined in: packages/core/src/logger.ts:36
Log debug level messages
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | The log message |
context? | LoggerContext | Optional context to include with the log |
Returns
void
error()
error(message, context?): void;Defined in: packages/core/src/logger.ts:57
Log error level messages
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | The log message |
context? | LoggerContext | Optional context to include with the log |
Returns
void
info()
info(message, context?): void;Defined in: packages/core/src/logger.ts:43
Log info level messages
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | The log message |
context? | LoggerContext | Optional context to include with the log |
Returns
void
warn()
warn(message, context?): void;Defined in: packages/core/src/logger.ts:50
Log warning level messages
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | The log message |
context? | LoggerContext | Optional context to include with the log |
Returns
void
LoggerContext
type LoggerContext = Record<string, unknown> & object;Defined in: packages/core/src/logger.ts:9
Context object for logger methods.
This type includes reserved keys that provide consistent naming for common logging context properties.
Type Declaration
| Name | Type | Defined in |
|---|---|---|
error? | unknown | packages/core/src/logger.ts:10 |
TelemetryProvider
type TelemetryProvider = object;Defined in: packages/core/src/telemetry.ts:53
Telemetry provider for AMQP operations. Uses lazy loading to gracefully handle cases where OpenTelemetry is not installed.
Properties
| Property | Type | Description | Defined in |
|---|---|---|---|
getConsumeCounter | () => Counter | undefined | Get a counter for messages consumed. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:70 |
getConsumeLatencyHistogram | () => Histogram | undefined | Get a histogram for consume/process latency. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:82 |
getPublishCounter | () => Counter | undefined | Get a counter for messages published. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:64 |
getPublishLatencyHistogram | () => Histogram | undefined | Get a histogram for publish latency. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:76 |
getTracer | () => Tracer | undefined | Get a tracer instance for creating spans. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:58 |
Variables
defaultTelemetryProvider
const defaultTelemetryProvider: TelemetryProvider;Defined in: packages/core/src/telemetry.ts:194
Default telemetry provider that uses OpenTelemetry API if available.
MessagingSemanticConventions
const MessagingSemanticConventions: object;Defined in: packages/core/src/telemetry.ts:25
Semantic conventions for AMQP messaging following OpenTelemetry standards.
Type Declaration
| Name | Type | Default value | Defined in |
|---|---|---|---|
AMQP_CONSUMER_NAME | "amqp.consumer.name" | "amqp.consumer.name" | packages/core/src/telemetry.ts:36 |
AMQP_PUBLISHER_NAME | "amqp.publisher.name" | "amqp.publisher.name" | packages/core/src/telemetry.ts:35 |
ERROR_TYPE | "error.type" | "error.type" | packages/core/src/telemetry.ts:39 |
MESSAGING_DESTINATION | "messaging.destination.name" | "messaging.destination.name" | packages/core/src/telemetry.ts:28 |
MESSAGING_DESTINATION_KIND | "messaging.destination.kind" | "messaging.destination.kind" | packages/core/src/telemetry.ts:29 |
MESSAGING_DESTINATION_KIND_EXCHANGE | "exchange" | "exchange" | packages/core/src/telemetry.ts:43 |
MESSAGING_DESTINATION_KIND_QUEUE | "queue" | "queue" | packages/core/src/telemetry.ts:44 |
MESSAGING_OPERATION | "messaging.operation" | "messaging.operation" | packages/core/src/telemetry.ts:30 |
MESSAGING_OPERATION_PROCESS | "process" | "process" | packages/core/src/telemetry.ts:46 |
MESSAGING_OPERATION_PUBLISH | "publish" | "publish" | packages/core/src/telemetry.ts:45 |
MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG | "messaging.rabbitmq.message.delivery_tag" | "messaging.rabbitmq.message.delivery_tag" | packages/core/src/telemetry.ts:34 |
MESSAGING_RABBITMQ_ROUTING_KEY | "messaging.rabbitmq.destination.routing_key" | "messaging.rabbitmq.destination.routing_key" | packages/core/src/telemetry.ts:33 |
MESSAGING_SYSTEM | "messaging.system" | "messaging.system" | packages/core/src/telemetry.ts:27 |
MESSAGING_SYSTEM_RABBITMQ | "rabbitmq" | "rabbitmq" | packages/core/src/telemetry.ts:42 |
See
https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/
Functions
endSpanError()
function endSpanError(span, error): void;Defined in: packages/core/src/telemetry.ts:288
End a span with error status.
Parameters
| Parameter | Type |
|---|---|
span | Span | undefined |
error | Error |
Returns
void
endSpanSuccess()
function endSpanSuccess(span): void;Defined in: packages/core/src/telemetry.ts:273
End a span with success status.
Parameters
| Parameter | Type |
|---|---|
span | Span | undefined |
Returns
void
recordConsumeMetric()
function recordConsumeMetric(
provider,
queueName,
consumerName,
success,
durationMs): void;Defined in: packages/core/src/telemetry.ts:332
Record a consume metric.
Parameters
| Parameter | Type |
|---|---|
provider | TelemetryProvider |
queueName | string |
consumerName | string |
success | boolean |
durationMs | number |
Returns
void
recordPublishMetric()
function recordPublishMetric(
provider,
exchangeName,
routingKey,
success,
durationMs): void;Defined in: packages/core/src/telemetry.ts:305
Record a publish metric.
Parameters
| Parameter | Type |
|---|---|
provider | TelemetryProvider |
exchangeName | string |
routingKey | string | undefined |
success | boolean |
durationMs | number |
Returns
void
setupAmqpTopology()
function setupAmqpTopology(channel, contract): Promise<void>;Defined in: packages/core/src/setup.ts:26
Setup AMQP topology (exchanges, queues, and bindings) from a contract definition.
This function sets up the complete AMQP topology in the correct order:
- Assert all exchanges defined in the contract
- Validate dead letter exchanges are declared before referencing them
- Assert all queues with their configurations (including dead letter settings)
- Create all bindings (queue-to-exchange and exchange-to-exchange)
Parameters
| Parameter | Type | Description |
|---|---|---|
channel | Channel | The AMQP channel to use for topology setup |
contract | ContractDefinition | The contract definition containing the topology specification |
Returns
Promise<void>
Throws
If any exchanges, queues, or bindings fail to be created
Throws
If a queue references a dead letter exchange not declared in the contract
Example
const channel = await connection.createChannel();
await setupAmqpTopology(channel, contract);startConsumeSpan()
function startConsumeSpan(
provider,
queueName,
consumerName,
attributes?): Span | undefined;Defined in: packages/core/src/telemetry.ts:241
Create a span for a consume/process operation. Returns undefined if OpenTelemetry is not available.
Parameters
| Parameter | Type |
|---|---|
provider | TelemetryProvider |
queueName | string |
consumerName | string |
attributes? | Attributes |
Returns
Span | undefined
startPublishSpan()
function startPublishSpan(
provider,
exchangeName,
routingKey,
attributes?): Span | undefined;Defined in: packages/core/src/telemetry.ts:206
Create a span for a publish operation. Returns undefined if OpenTelemetry is not available.
Parameters
| Parameter | Type |
|---|---|
provider | TelemetryProvider |
exchangeName | string |
routingKey | string | undefined |
attributes? | Attributes |
Returns
Span | undefined