Core Concepts
Understanding these core concepts will help you use amqp-contract effectively.
Contract-First Design
Everything starts with a contract that defines:
- Exchanges - Where messages are published
- Queues - Where messages are stored
- Bindings - How queues connect to exchanges
- Publishers - What messages can be published
- Consumers - What messages can be consumed
Define once, use everywhere with full type safety.
End-to-End Type Safety
Type safety flows automatically from your contract:
import { z } from "zod";
// 1. Define resources and message
const ordersExchange = defineExchange("orders", "topic", { durable: true });
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
amount: z.number(),
}),
);
// 2. Event pattern (recommended for broadcasts)
const orderCreatedEvent = defineEventPublisher(ordersExchange, orderMessage, {
routingKey: "order.created",
});
// 3. Define queue for processing
const orderProcessingQueue = defineQueue("order-processing", { durable: true });
// 4. Compose contract - configs go directly, bindings auto-generated
const contract = defineContract({
exchanges: { orders: ordersExchange },
queues: { orderProcessing: orderProcessingQueue },
publishers: {
// EventPublisherConfig → auto-extracted to publisher
orderCreated: orderCreatedEvent,
},
consumers: {
// EventConsumerResult → auto-extracted to consumer + binding
processOrder: defineEventConsumer(orderCreatedEvent, orderProcessingQueue),
},
});
// 5. Client knows exact types
const client = await TypedAmqpClient.create({
contract,
urls: ["amqp://localhost"],
}).resultToPromise();
const result = await client.publish("orderCreated", {
orderId: "ORD-123", // ✅ TypeScript knows!
amount: 99.99, // ✅ TypeScript knows!
// invalid: true, // ❌ TypeScript error!
});
result.match({
Ok: () => console.log("Published"),
Error: (error) => console.error("Failed:", error),
});Automatic Validation
Messages are validated automatically at network boundaries:
- On publish: Client validates before sending
- On consume: Worker validates before calling handlers
Invalid messages are caught early with clear error messages.
// This returns a validation error (doesn't throw)
const result = await client.publish("orderCreated", {
orderId: "ORD-123",
amount: "not-a-number", // ❌ Validation error!
});
result.match({
Ok: () => console.log("Published"),
Error: (error) => {
// Handle MessageValidationError or TechnicalError
console.error("Failed:", error.message);
},
});Schema Libraries
amqp-contract uses Standard Schema, supporting:
All examples use Zod, but you can use any compatible library:
import { z } from "zod";
import * as v from "valibot";
import { type } from "arktype";
const ordersExchange = defineExchange("orders", "topic", { durable: true });
// All work the same way with defineEventPublisher:
const zodEvent = defineEventPublisher(ordersExchange, defineMessage(z.object({ id: z.string() })), {
routingKey: "order.created",
});
const valibotEvent = defineEventPublisher(
ordersExchange,
defineMessage(v.object({ id: v.string() })),
{ routingKey: "order.created" },
);
const arktypeEvent = defineEventPublisher(ordersExchange, defineMessage(type({ id: "string" })), {
routingKey: "order.created",
});AMQP Resources
Exchanges
Exchanges receive and route messages to queues:
const ordersExchange = defineExchange(
"orders", // name
"topic", // type: direct, fanout, or topic
{ durable: true }, // options
);Exchange Types:
direct- Exact routing key matchtopic- Pattern matching with wildcards (*,#)fanout- Broadcast to all bound queues
Queues
Queues store messages until consumed. By default, queues are created as quorum queues for better durability:
// Quorum queue (default, recommended)
const orderProcessingQueue = defineQueue("order-processing");
// Classic queue (for special cases)
const tempQueue = defineQueue("temp-queue", {
type: "classic",
durable: false,
});Queue Types:
quorum(default) - Better durability and high-availability via Raft consensusclassic- Traditional queues for non-durable, exclusive, or priority queue use cases
Messages
Messages combine schemas with optional metadata:
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
amount: z.number(),
}),
{
summary: "Order created event",
description: "Emitted when a new order is created",
},
);Bindings
Bindings connect queues to exchanges:
const orderBinding = defineQueueBinding(
orderProcessingQueue, // queue
ordersExchange, // exchange
{
routingKey: "order.created", // routing pattern
},
);Publishers
Publishers define what messages can be published:
const orderCreatedPublisher = definePublisher(
ordersExchange, // exchange
orderMessage, // message definition
{
routingKey: "order.created",
},
);Consumers
Consumers define what messages can be consumed:
const processOrderConsumer = defineConsumer(
orderProcessingQueue, // queue
orderMessage, // message definition
);Message Flow
Here's how messages flow through the system:
- Client publishes a message
- Message is validated against schema
- Message sent to exchange
- Exchange routes to queues via bindings
- Worker consumes from queue
- Message is validated again
- Handler called with typed message
- Message acknowledged
All with automatic type safety and validation!
Next Steps
- Learn about Defining Contracts
- Explore Client Usage
- Understand Worker Usage