Core Concepts
Understanding these core concepts will help you use amqp-contract effectively.
Contract-First Design
Everything starts with a contract that defines:
- Exchanges - Where messages are published
- Queues - Where messages are stored
- Bindings - How queues connect to exchanges
- Publishers - What messages can be published
- Consumers - What messages can be consumed
Define once, use everywhere with full type safety.
End-to-End Type Safety
Type safety flows automatically from your contract:
import { z } from "zod";
// 1. Define resources and message
const ordersExchange = defineExchange("orders");
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
amount: z.number(),
}),
);
// 2. Event pattern (recommended for broadcasts)
const orderCreatedEvent = defineEventPublisher(ordersExchange, orderMessage, {
routingKey: "order.created",
});
// 3. Define queue for processing
const orderProcessingQueue = defineQueue("order-processing");
// 4. Compose contract - only publishers and consumers needed
// Exchanges, queues, and bindings are automatically extracted
const contract = defineContract({
publishers: {
orderCreated: orderCreatedEvent,
},
consumers: {
processOrder: defineEventConsumer(orderCreatedEvent, orderProcessingQueue),
},
});
// 5. Client knows exact types
const client = (
await TypedAmqpClient.create({
contract,
urls: ["amqp://localhost"],
})
)._unsafeUnwrap();
const result = await client.publish("orderCreated", {
orderId: "ORD-123", // ✅ TypeScript knows!
amount: 99.99, // ✅ TypeScript knows!
// invalid: true, // ❌ TypeScript error!
});
result.match(
() => console.log("Published"),
(error) => console.error("Failed:", error),
);Automatic Validation
Messages are validated automatically at network boundaries:
- On publish: Client validates before sending
- On consume: Worker validates before calling handlers
Invalid messages are caught early with clear error messages.
// This returns a validation error (doesn't throw)
const result = await client.publish("orderCreated", {
orderId: "ORD-123",
amount: "not-a-number", // ❌ Validation error!
});
result.match(
() => console.log("Published"),
(error) => {
// Handle MessageValidationError or TechnicalError
console.error("Failed:", error.message);
},
);Schema Libraries
amqp-contract uses Standard Schema, supporting:
All examples use Zod, but you can use any compatible library:
import { z } from "zod";
import * as v from "valibot";
import { type } from "arktype";
const ordersExchange = defineExchange("orders");
// All work the same way with defineEventPublisher:
const zodEvent = defineEventPublisher(ordersExchange, defineMessage(z.object({ id: z.string() })), {
routingKey: "order.created",
});
const valibotEvent = defineEventPublisher(
ordersExchange,
defineMessage(v.object({ id: v.string() })),
{ routingKey: "order.created" },
);
const arktypeEvent = defineEventPublisher(ordersExchange, defineMessage(type({ id: "string" })), {
routingKey: "order.created",
});AMQP Resources
Exchanges
Exchanges receive and route messages to queues. By default, exchanges are created as topic exchanges and are durable:
// Define default topic exchange and durable
const ordersExchange = defineExchange("orders");
// Define exchange with custom options
const tasksExchange = defineExchange("tasks", {
type: "direct", // one of "topic", "direct", "fanout", "headers" (default: "topic")
durable: false, // (default: true)
});Exchange Types:
topic(default) - Pattern matching with wildcards (*,#)direct- Exact routing key matchfanout- Broadcast to all bound queuesheaders- Routes based on message headers
Queues
Queues store messages until consumed. By default, queues are created as quorum queues for better durability:
// Quorum queue (default, recommended)
const orderProcessingQueue = defineQueue("order-processing");
// Classic queue (for special cases)
const tempQueue = defineQueue("temp-queue", {
type: "classic",
durable: false,
});Queue Types:
quorum(default) - Better durability and high-availability via Raft consensus (always durable, do not support exclusive, auto-deleting, or priority queues)classic- Traditional queues for non-durable, exclusive, auto-deleting, or priority queue use cases
Messages
Messages combine schemas with optional metadata:
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
amount: z.number(),
}),
{
summary: "Order created event",
description: "Emitted when a new order is created",
},
);Bindings
Bindings connect queues to exchanges:
const orderBinding = defineQueueBinding(
orderProcessingQueue, // queue
ordersExchange, // exchange
{
routingKey: "order.created", // routing pattern
},
);Publishers
Publishers define what messages can be published:
const orderCreatedPublisher = definePublisher(
ordersExchange, // exchange
orderMessage, // message definition
{
routingKey: "order.created",
},
);Consumers
Consumers define what messages can be consumed:
const processOrderConsumer = defineConsumer(
orderProcessingQueue, // queue
orderMessage, // message definition
);Message Flow
Here's how messages flow through the system:
- Client publishes a message
- Message is validated against schema
- Message sent to exchange
- Exchange routes to queues via bindings
- Worker consumes from queue
- Message is validated again
- Handler called with typed message
- Message acknowledged
All with automatic type safety and validation!
Next Steps
- Learn about Defining Contracts
- Explore Client Usage
- Understand Worker Usage