@amqp-contract/core
@amqp-contract/core
Classes
AmqpClient
Defined in: packages/core/src/amqp-client.ts:141
AMQP client that manages connections and channels with automatic topology setup.
This class handles:
- Connection management with automatic reconnection via amqp-connection-manager
- Connection pooling and sharing across instances with the same URLs
- Automatic AMQP topology setup (exchanges, queues, bindings) from contract
- Channel creation with JSON serialization enabled by default
All operations return ResultAsync<T, TechnicalError> for consistent error handling.
Example
const client = new AmqpClient(contract, {
urls: ['amqp://localhost'],
connectionOptions: { heartbeatIntervalInSeconds: 30 }
});
// Wait for connection (ResultAsync is thenable)
await client.waitForConnect();
// Publish a message
const result = await client.publish('exchange', 'routingKey', { data: 'value' });
// Close when done
await client.close();Constructors
Constructor
new AmqpClient(contract, options): AmqpClient;Defined in: packages/core/src/amqp-client.ts:169
Create a new AMQP client instance.
The client will automatically:
- Get or create a shared connection using the singleton pattern
- Set up AMQP topology (exchanges, queues, bindings) from the contract
- Create a channel with JSON serialization enabled
Parameters
| Parameter | Type | Description |
|---|---|---|
contract | ContractDefinition | The contract definition specifying the AMQP topology |
options | AmqpClientOptions | Client configuration options |
Returns
Methods
ack()
ack(msg, allUpTo?): void;Defined in: packages/core/src/amqp-client.ts:426
Acknowledge a message.
Parameters
| Parameter | Type | Default value | Description |
|---|---|---|---|
msg | ConsumeMessage | undefined | The message to acknowledge |
allUpTo | boolean | false | If true, acknowledge all messages up to and including this one |
Returns
void
addSetup()
addSetup(setup): void;Defined in: packages/core/src/amqp-client.ts:448
Add a setup function to be called when the channel is created or reconnected.
This is useful for setting up channel-level configuration like prefetch.
Parameters
| Parameter | Type | Description |
|---|---|---|
setup | (channel) => void | Promise<void> | The setup function to add |
Returns
void
cancel()
cancel(consumerTag): ResultAsync<void, TechnicalError>;Defined in: packages/core/src/amqp-client.ts:395
Cancel a consumer by its consumer tag.
Parameters
| Parameter | Type |
|---|---|
consumerTag | string |
Returns
ResultAsync<void, TechnicalError>
close()
close(): ResultAsync<void, TechnicalError>;Defined in: packages/core/src/amqp-client.ts:478
Close the channel and release the connection reference.
This will:
- Close the channel wrapper
- Decrease the reference count on the shared connection
- Close the connection if this was the last client using it
Both steps run regardless of each other's outcome; if both fail, the errors are wrapped in an AggregateError.
Returns
ResultAsync<void, TechnicalError>
consume()
consume(
queue,
callback,
options?): ResultAsync<string, TechnicalError>;Defined in: packages/core/src/amqp-client.ts:321
Start consuming messages from a queue.
If options.prefetch is set, a per-consumer prefetch count is applied via channel.prefetch(count, false) registered as a setup function on the channel wrapper before the underlying consume call. Registering it via addSetup ensures the prefetch is reapplied automatically on channel reconnect; using global=false scopes it to subsequent consumers on the channel (RabbitMQ semantics — opposite of intuition: false is per- consumer, true is channel-wide).
prefetch is stripped from the options handed to channelWrapper.consume because it is not a valid amqplib Options.Consume field — leaving it in would just travel as a no-op key-value pair on the consume frame.
Parameters
| Parameter | Type |
|---|---|
queue | string |
callback | ConsumeCallback |
options? | ConsumerOptions |
Returns
ResultAsync<string, TechnicalError>
ResultAsync resolving to the consumer tag.
getConnection()
getConnection(): IAmqpConnectionManager;Defined in: packages/core/src/amqp-client.ts:224
Get the underlying connection manager
This method exposes the AmqpConnectionManager instance that this client uses. The connection is automatically shared across all AmqpClient instances that use the same URLs and connection options.
Returns
IAmqpConnectionManager
The AmqpConnectionManager instance used by this client
nack()
nack(
msg,
allUpTo?,
requeue?): void;Defined in: packages/core/src/amqp-client.ts:437
Negative acknowledge a message.
Parameters
| Parameter | Type | Default value | Description |
|---|---|---|---|
msg | ConsumeMessage | undefined | The message to nack |
allUpTo | boolean | false | If true, nack all messages up to and including this one |
requeue | boolean | true | If true, requeue the message(s) |
Returns
void
on()
on(event, listener): void;Defined in: packages/core/src/amqp-client.ts:463
Register an event listener on the channel wrapper.
Available events:
- 'connect': Emitted when the channel is (re)connected
- 'close': Emitted when the channel is closed
- 'error': Emitted when an error occurs
Parameters
| Parameter | Type | Description |
|---|---|---|
event | string | The event name |
listener | (...args) => void | The event listener |
Returns
void
publish()
publish(
exchange,
routingKey,
content,
options?): ResultAsync<boolean, TechnicalError>;Defined in: packages/core/src/amqp-client.ts:276
Publish a message to an exchange.
Parameters
| Parameter | Type |
|---|---|
exchange | string |
routingKey | string |
content | unknown |
options? | Publish |
Returns
ResultAsync<boolean, TechnicalError>
ResultAsync resolving to true if the message was sent, false if the channel buffer is full.
sendToQueue()
sendToQueue(
queue,
content,
options?): ResultAsync<boolean, TechnicalError>;Defined in: packages/core/src/amqp-client.ts:293
Publish a message directly to a queue.
Parameters
| Parameter | Type |
|---|---|
queue | string |
content | unknown |
options? | Publish |
Returns
ResultAsync<boolean, TechnicalError>
ResultAsync resolving to true if the message was sent, false if the channel buffer is full.
waitForConnect()
waitForConnect(): ResultAsync<void, TechnicalError>;Defined in: packages/core/src/amqp-client.ts:242
Wait for the channel to be connected and ready.
If connectTimeoutMs was provided in the constructor options, the returned ResultAsync resolves to err(TechnicalError) once the timeout elapses. Without a timeout, this waits forever — amqp-connection-manager retries connections indefinitely and never errors on its own.
NOTE: When using AmqpClient directly (not via TypedAmqpClient / TypedAmqpWorker), the constructor has already incremented the pooled connection's reference count. Callers must invoke close() on the error path to release the connection — waitForConnect does not do this automatically. The typed factories handle this cleanup for you.
Returns
ResultAsync<void, TechnicalError>
MessageValidationError
Defined in: packages/core/src/errors.ts:33
Error thrown when message validation fails (payload or headers).
Used by both the client (publish-time payload validation) and the worker (consume-time payload and headers validation).
Param
The name of the publisher or consumer that triggered the validation
Param
The validation issues from the Standard Schema validation
Extends
Error
Constructors
Constructor
new MessageValidationError(source, issues): MessageValidationError;Defined in: packages/core/src/errors.ts:34
Parameters
| Parameter | Type |
|---|---|
source | string |
issues | unknown |
Returns
Overrides
Error.constructorProperties
| Property | Modifier | Type | Description | Inherited from | Defined in |
|---|---|---|---|---|---|
cause? | public | unknown | - | Error.cause | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es2022.error.d.ts:24 |
issues | readonly | unknown | - | - | packages/core/src/errors.ts:36 |
message | public | string | - | Error.message | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1075 |
name | public | string | - | Error.name | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1074 |
source | readonly | string | - | - | packages/core/src/errors.ts:35 |
stack? | public | string | - | Error.stack | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1076 |
stackTraceLimit | static | number | The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames. | Error.stackTraceLimit | node_modules/.pnpm/@types+node@25.6.0/node_modules/@types/node/globals.d.ts:67 |
Methods
captureStackTrace()
Call Signature
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@25.6.0/node_modules/@types/node/globals.d.ts:51
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
Error.captureStackTraceCall Signature
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:52
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
Error.captureStackTraceprepareStackTrace()
Call Signature
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@25.6.0/node_modules/@types/node/globals.d.ts:55
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
Error.prepareStackTraceCall Signature
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:56
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
Error.prepareStackTraceTechnicalError
Defined in: packages/core/src/errors.ts:7
Error for technical/runtime failures that cannot be prevented by TypeScript.
This includes AMQP connection failures, channel issues, validation failures, and other runtime errors. This error is shared across core, worker, and client packages.
Extends
Error
Constructors
Constructor
new TechnicalError(message, cause?): TechnicalError;Defined in: packages/core/src/errors.ts:8
Parameters
| Parameter | Type |
|---|---|
message | string |
cause? | unknown |
Returns
Overrides
Error.constructorProperties
| Property | Modifier | Type | Description | Inherited from | Defined in |
|---|---|---|---|---|---|
cause? | readonly | unknown | - | Error.cause | packages/core/src/errors.ts:10 |
message | public | string | - | Error.message | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1075 |
name | public | string | - | Error.name | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1074 |
stack? | public | string | - | Error.stack | node_modules/.pnpm/typescript@6.0.3/node_modules/typescript/lib/lib.es5.d.ts:1076 |
stackTraceLimit | static | number | The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)). The default value is 10 but may be set to any valid JavaScript number. Changes will affect any stack trace captured after the value has been changed. If set to a non-number value, or set to a negative number, stack traces will not capture any frames. | Error.stackTraceLimit | node_modules/.pnpm/@types+node@25.6.0/node_modules/@types/node/globals.d.ts:67 |
Methods
captureStackTrace()
Call Signature
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@25.6.0/node_modules/@types/node/globals.d.ts:51
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
Error.captureStackTraceCall Signature
static captureStackTrace(targetObject, constructorOpt?): void;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:52
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const myObject = {};
Error.captureStackTrace(myObject);
myObject.stack; // Similar to `new Error().stack`The first line of the trace will be prefixed with ${myObject.name}: ${myObject.message}.
The optional constructorOpt argument accepts a function. If given, all frames above constructorOpt, including constructorOpt, will be omitted from the generated stack trace.
The constructorOpt argument is useful for hiding implementation details of error generation from the user. For instance:
function a() {
b();
}
function b() {
c();
}
function c() {
// Create an error without stack trace to avoid calculating the stack trace twice.
const { stackTraceLimit } = Error;
Error.stackTraceLimit = 0;
const error = new Error();
Error.stackTraceLimit = stackTraceLimit;
// Capture the stack trace above function b
Error.captureStackTrace(error, b); // Neither function c, nor b is included in the stack trace
throw error;
}
a();Parameters
| Parameter | Type |
|---|---|
targetObject | object |
constructorOpt? | Function |
Returns
void
Inherited from
Error.captureStackTraceprepareStackTrace()
Call Signature
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@25.6.0/node_modules/@types/node/globals.d.ts:55
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
Error.prepareStackTraceCall Signature
static prepareStackTrace(err, stackTraces): any;Defined in: node_modules/.pnpm/@types+node@24.12.2/node_modules/@types/node/globals.d.ts:56
Parameters
| Parameter | Type |
|---|---|
err | Error |
stackTraces | CallSite[] |
Returns
any
See
https://v8.dev/docs/stack-trace-api#customizing-stack-traces
Inherited from
Error.prepareStackTraceType Aliases
AmqpClientOptions
type AmqpClientOptions = object;Defined in: packages/core/src/amqp-client.ts:73
Options for creating an AMQP client.
Properties
| Property | Type | Description | Defined in |
|---|---|---|---|
channelOptions? | Partial<CreateChannelOpts> | Optional channel configuration options. | packages/core/src/amqp-client.ts:76 |
connectionOptions? | AmqpConnectionManagerOptions | Optional connection configuration (heartbeat, reconnect settings, etc.). | packages/core/src/amqp-client.ts:75 |
connectTimeoutMs? | number | null | Maximum time in ms to wait for the channel to become ready in waitForConnect. Defaults to DEFAULT_CONNECT_TIMEOUT_MS. Pass null to disable the timeout entirely (amqp-connection-manager will retry indefinitely). | packages/core/src/amqp-client.ts:77 |
urls | ConnectionUrl[] | AMQP broker URL(s). Multiple URLs provide failover support. | packages/core/src/amqp-client.ts:74 |
ConsumeCallback
type ConsumeCallback = (msg) => void | Promise<void>;Defined in: packages/core/src/amqp-client.ts:83
Callback type for consuming messages.
Parameters
| Parameter | Type |
|---|---|
msg | ConsumeMessage | null |
Returns
void | Promise<void>
ConsumerOptions
type ConsumerOptions = Options.Consume & object;Defined in: packages/core/src/amqp-client.ts:108
Consume options that extend amqplib's Options.Consume with an optional per-consumer prefetch count.
prefetch is intercepted by AmqpClient.consume: it is stripped from the options handed to the underlying channelWrapper.consume(...) call (since amqplib's Options.Consume does not include it) and applied via channel.prefetch(count, false) registered through addSetup before the consume so the value is in effect when the consumer starts and is reapplied automatically on channel reconnect.
Type Declaration
| Name | Type | Description | Defined in |
|---|---|---|---|
prefetch? | number | Per-consumer prefetch count. Applied before channel.consume(...). | packages/core/src/amqp-client.ts:110 |
Logger
type Logger = object;Defined in: packages/core/src/logger.ts:30
Logger interface for amqp-contract packages.
Provides a simple logging abstraction that can be implemented by users to integrate with their preferred logging framework.
Example
// Simple console logger implementation
const logger: Logger = {
debug: (message, context) => console.debug(message, context),
info: (message, context) => console.info(message, context),
warn: (message, context) => console.warn(message, context),
error: (message, context) => console.error(message, context),
};Methods
debug()
debug(message, context?): void;Defined in: packages/core/src/logger.ts:36
Log debug level messages
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | The log message |
context? | LoggerContext | Optional context to include with the log |
Returns
void
error()
error(message, context?): void;Defined in: packages/core/src/logger.ts:57
Log error level messages
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | The log message |
context? | LoggerContext | Optional context to include with the log |
Returns
void
info()
info(message, context?): void;Defined in: packages/core/src/logger.ts:43
Log info level messages
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | The log message |
context? | LoggerContext | Optional context to include with the log |
Returns
void
warn()
warn(message, context?): void;Defined in: packages/core/src/logger.ts:50
Log warning level messages
Parameters
| Parameter | Type | Description |
|---|---|---|
message | string | The log message |
context? | LoggerContext | Optional context to include with the log |
Returns
void
LoggerContext
type LoggerContext = Record<string, unknown> & object;Defined in: packages/core/src/logger.ts:9
Context object for logger methods.
This type includes reserved keys that provide consistent naming for common logging context properties.
Type Declaration
| Name | Type | Defined in |
|---|---|---|
error? | unknown | packages/core/src/logger.ts:10 |
PublishOptions
type PublishOptions = Options.Publish;Defined in: packages/core/src/amqp-client.ts:95
Publish options for AmqpClient.publish / AmqpClient.sendToQueue.
Currently a re-export of amqplib's Options.Publish. A previous version of this type also exposed a timeout field, but that field never had a meaningful AMQP-level effect in this codebase and has been removed to avoid suggesting behaviour we do not provide. (amqp-connection-manager's own publishTimeout channel option is unrelated and is configured at channel creation, not per-publish.)
TelemetryProvider
type TelemetryProvider = object;Defined in: packages/core/src/telemetry.ts:54
Telemetry provider for AMQP operations. Uses lazy loading to gracefully handle cases where OpenTelemetry is not installed.
Properties
| Property | Type | Description | Defined in |
|---|---|---|---|
getConsumeCounter | () => Counter | undefined | Get a counter for messages consumed. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:71 |
getConsumeLatencyHistogram | () => Histogram | undefined | Get a histogram for consume/process latency. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:83 |
getLateRpcReplyCounter | () => Counter | undefined | Get a counter for RPC replies that arrive after the caller has gone away (timeout, cancellation, or unknown correlationId). Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:90 |
getPublishCounter | () => Counter | undefined | Get a counter for messages published. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:65 |
getPublishLatencyHistogram | () => Histogram | undefined | Get a histogram for publish latency. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:77 |
getTracer | () => Tracer | undefined | Get a tracer instance for creating spans. Returns undefined if OpenTelemetry is not available. | packages/core/src/telemetry.ts:59 |
Variables
DEFAULT_CONNECT_TIMEOUT_MS
const DEFAULT_CONNECT_TIMEOUT_MS: 30000 = 30_000;Defined in: packages/core/src/amqp-client.ts:47
Default time waitForConnect will wait for the broker before erroring out. Defaulting to a finite value (rather than waiting forever) means a fail-fast developer experience: a misconfigured URL, a down broker, or wrong credentials surface as an err within 30 seconds. Pass null explicitly to disable the timeout — Infinity and other non-finite values are also coerced to "no timeout" because Node's setTimeout clamps large delays to ~24.8 days and silently fires near-immediately on Infinity.
defaultTelemetryProvider
const defaultTelemetryProvider: TelemetryProvider;Defined in: packages/core/src/telemetry.ts:229
Default telemetry provider that uses OpenTelemetry API if available.
MessagingSemanticConventions
const MessagingSemanticConventions: object;Defined in: packages/core/src/telemetry.ts:26
Semantic conventions for AMQP messaging following OpenTelemetry standards.
Type Declaration
| Name | Type | Default value | Defined in |
|---|---|---|---|
AMQP_CONSUMER_NAME | "amqp.consumer.name" | "amqp.consumer.name" | packages/core/src/telemetry.ts:37 |
AMQP_PUBLISHER_NAME | "amqp.publisher.name" | "amqp.publisher.name" | packages/core/src/telemetry.ts:36 |
ERROR_TYPE | "error.type" | "error.type" | packages/core/src/telemetry.ts:40 |
MESSAGING_DESTINATION | "messaging.destination.name" | "messaging.destination.name" | packages/core/src/telemetry.ts:29 |
MESSAGING_DESTINATION_KIND | "messaging.destination.kind" | "messaging.destination.kind" | packages/core/src/telemetry.ts:30 |
MESSAGING_DESTINATION_KIND_EXCHANGE | "exchange" | "exchange" | packages/core/src/telemetry.ts:44 |
MESSAGING_DESTINATION_KIND_QUEUE | "queue" | "queue" | packages/core/src/telemetry.ts:45 |
MESSAGING_OPERATION | "messaging.operation" | "messaging.operation" | packages/core/src/telemetry.ts:31 |
MESSAGING_OPERATION_PROCESS | "process" | "process" | packages/core/src/telemetry.ts:47 |
MESSAGING_OPERATION_PUBLISH | "publish" | "publish" | packages/core/src/telemetry.ts:46 |
MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG | "messaging.rabbitmq.message.delivery_tag" | "messaging.rabbitmq.message.delivery_tag" | packages/core/src/telemetry.ts:35 |
MESSAGING_RABBITMQ_ROUTING_KEY | "messaging.rabbitmq.destination.routing_key" | "messaging.rabbitmq.destination.routing_key" | packages/core/src/telemetry.ts:34 |
MESSAGING_SYSTEM | "messaging.system" | "messaging.system" | packages/core/src/telemetry.ts:28 |
MESSAGING_SYSTEM_RABBITMQ | "rabbitmq" | "rabbitmq" | packages/core/src/telemetry.ts:43 |
See
https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/
Functions
endSpanError()
function endSpanError(span, error): void;Defined in: packages/core/src/telemetry.ts:324
End a span with error status.
Parameters
| Parameter | Type |
|---|---|
span | Span | undefined |
error | Error |
Returns
void
endSpanSuccess()
function endSpanSuccess(span): void;Defined in: packages/core/src/telemetry.ts:309
End a span with success status.
Parameters
| Parameter | Type |
|---|---|
span | Span | undefined |
Returns
void
recordConsumeMetric()
function recordConsumeMetric(
provider,
queueName,
consumerName,
success,
durationMs): void;Defined in: packages/core/src/telemetry.ts:368
Record a consume metric.
Parameters
| Parameter | Type |
|---|---|
provider | TelemetryProvider |
queueName | string |
consumerName | string |
success | boolean |
durationMs | number |
Returns
void
recordLateRpcReply()
function recordLateRpcReply(provider, reason): void;Defined in: packages/core/src/telemetry.ts:398
Record an RPC reply that arrived after the caller stopped waiting.
Parameters
| Parameter | Type | Description |
|---|---|---|
provider | TelemetryProvider | - |
reason | "unknown-correlation-id" | "missing-correlation-id" | Why the reply was orphaned. "unknown-correlation-id" is the typical "caller already timed out" case; "missing-correlation-id" means the broker delivered a reply with no correlationId at all (a protocol violation by the responder). |
Returns
void
recordPublishMetric()
function recordPublishMetric(
provider,
exchangeName,
routingKey,
success,
durationMs): void;Defined in: packages/core/src/telemetry.ts:341
Record a publish metric.
Parameters
| Parameter | Type |
|---|---|
provider | TelemetryProvider |
exchangeName | string |
routingKey | string | undefined |
success | boolean |
durationMs | number |
Returns
void
safeJsonParse()
function safeJsonParse<E>(buffer, errorFn): Result<unknown, E>;Defined in: packages/core/src/parsing.ts:24
Parse a Buffer as JSON, mapping any JSON.parse exception to the caller-supplied error type.
Use this in consume / reply paths where a parse failure is a typed value, not a thrown exception — the caller decides how to translate the raw error into a domain-level error (e.g. TechnicalError).
Type Parameters
| Type Parameter | Description |
|---|---|
E | The error type produced by errorFn. |
Parameters
| Parameter | Type | Description |
|---|---|---|
buffer | Buffer | The raw message body to parse. |
errorFn | (raw) => E | Callback invoked with the underlying JSON.parse error. |
Returns
Result<unknown, E>
A Result containing the parsed unknown value or the mapped error.
Example
const parsed = safeJsonParse(
msg.content,
(error) => new TechnicalError("Failed to parse JSON", error),
);setupAmqpTopology()
function setupAmqpTopology(channel, contract): Promise<void>;Defined in: packages/core/src/setup.ts:26
Setup AMQP topology (exchanges, queues, and bindings) from a contract definition.
This function sets up the complete AMQP topology in the correct order:
- Assert all exchanges defined in the contract
- Validate dead letter exchanges are declared before referencing them
- Assert all queues with their configurations (including dead letter settings)
- Create all bindings (queue-to-exchange and exchange-to-exchange)
Parameters
| Parameter | Type | Description |
|---|---|---|
channel | Channel | The AMQP channel to use for topology setup |
contract | ContractDefinition | The contract definition containing the topology specification |
Returns
Promise<void>
Throws
If any exchanges, queues, or bindings fail to be created
Throws
If a queue references a dead letter exchange not declared in the contract
Example
const channel = await connection.createChannel();
await setupAmqpTopology(channel, contract);startConsumeSpan()
function startConsumeSpan(
provider,
queueName,
consumerName,
attributes?): Span | undefined;Defined in: packages/core/src/telemetry.ts:277
Create a span for a consume/process operation. Returns undefined if OpenTelemetry is not available.
Parameters
| Parameter | Type |
|---|---|
provider | TelemetryProvider |
queueName | string |
consumerName | string |
attributes? | Attributes |
Returns
Span | undefined
startPublishSpan()
function startPublishSpan(
provider,
exchangeName,
routingKey,
attributes?): Span | undefined;Defined in: packages/core/src/telemetry.ts:242
Create a span for a publish operation. Returns undefined if OpenTelemetry is not available.
Parameters
| Parameter | Type |
|---|---|
provider | TelemetryProvider |
exchangeName | string |
routingKey | string | undefined |
attributes? | Attributes |
Returns
Span | undefined