Core Concepts
Understanding these core concepts will help you use amqp-contract effectively.
Contract-First Design
Everything starts with a contract that defines:
- Exchanges - Where messages are published
- Queues - Where messages are stored
- Bindings - How queues connect to exchanges
- Publishers - What messages can be published
- Consumers - What messages can be consumed
Define once, use everywhere with full type safety.
End-to-End Type Safety
Type safety flows automatically from your contract:
import { z } from 'zod';
// 1. Define resources and message
const ordersExchange = defineExchange('orders', 'topic', { durable: true });
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
amount: z.number(),
})
);
// 2. Publisher-first pattern (recommended for events)
const { publisher: orderCreatedPublisher, createConsumer: createOrderCreatedConsumer } = definePublisherFirst(
ordersExchange,
orderMessage,
{ routingKey: 'order.created' }
);
// 3. Create consumer from event
const orderProcessingQueue = defineQueue('order-processing', { durable: true });
const { consumer: processOrderConsumer, binding: orderBinding } =
createOrderCreatedConsumer(orderProcessingQueue);
// 4. Compose contract
const contract = defineContract({
exchanges: { orders: ordersExchange },
queues: { orderProcessing: orderProcessingQueue },
bindings: { orderBinding },
publishers: {
orderCreated: orderCreatedPublisher,
},
consumers: {
processOrder: processOrderConsumer,
},
});
// 5. Client knows exact types
const clientResult = await TypedAmqpClient.create({
contract,
urls: ['amqp://localhost']
});
if (clientResult.isError()) {
throw clientResult.error; // Handle connection error
}
const client = clientResult.get();
const result = await client.publish('orderCreated', {
orderId: 'ORD-123', // ✅ TypeScript knows!
amount: 99.99, // ✅ TypeScript knows!
// invalid: true, // ❌ TypeScript error!
});
result.match({
Ok: () => console.log('Published'),
Error: (error) => console.error('Failed:', error),
});Automatic Validation
Messages are validated automatically at network boundaries:
- On publish: Client validates before sending
- On consume: Worker validates before calling handlers
Invalid messages are caught early with clear error messages.
// This returns a validation error (doesn't throw)
const result = await client.publish('orderCreated', {
orderId: 'ORD-123',
amount: 'not-a-number', // ❌ Validation error!
});
result.match({
Ok: () => console.log('Published'),
Error: (error) => {
// Handle MessageValidationError or TechnicalError
console.error('Failed:', error.message);
},
});Schema Libraries
amqp-contract uses Standard Schema, supporting:
All examples use Zod, but you can use any compatible library:
import { z } from 'zod';
import * as v from 'valibot';
import { type } from 'arktype';
const ordersExchange = defineExchange('orders', 'topic', { durable: true });
// All work the same way with definePublisherFirst:
const { publisher: zodPublisher, createConsumer: createZodConsumer } = definePublisherFirst(
ordersExchange,
defineMessage(z.object({ id: z.string() })),
{ routingKey: 'order.created' }
);
const { publisher: valibotPublisher, createConsumer: createValibotConsumer } = definePublisherFirst(
ordersExchange,
defineMessage(v.object({ id: v.string() })),
{ routingKey: 'order.created' }
);
const { publisher: arktypePublisher, createConsumer: createArktypeConsumer } = definePublisherFirst(
ordersExchange,
defineMessage(type({ id: 'string' })),
{ routingKey: 'order.created' }
);AMQP Resources
Exchanges
Exchanges receive and route messages to queues:
const ordersExchange = defineExchange(
'orders', // name
'topic', // type: direct, fanout, or topic
{ durable: true } // options
);Exchange Types:
direct- Exact routing key matchtopic- Pattern matching with wildcards (*,#)fanout- Broadcast to all bound queues
Queues
Queues store messages until consumed:
const orderProcessingQueue = defineQueue(
'order-processing', // name
{
durable: true, // survives broker restart
exclusive: false, // shared across connections
}
);Messages
Messages combine schemas with optional metadata:
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
amount: z.number(),
}),
{
summary: 'Order created event',
description: 'Emitted when a new order is created',
}
);Bindings
Bindings connect queues to exchanges:
const orderBinding = defineQueueBinding(
orderProcessingQueue, // queue
ordersExchange, // exchange
{
routingKey: 'order.created', // routing pattern
}
);Publishers
Publishers define what messages can be published:
const orderCreatedPublisher = definePublisher(
ordersExchange, // exchange
orderMessage, // message definition
{
routingKey: 'order.created',
}
);Consumers
Consumers define what messages can be consumed:
const processOrderConsumer = defineConsumer(
orderProcessingQueue, // queue
orderMessage // message definition
);Message Flow
Here's how messages flow through the system:
- Client publishes a message
- Message is validated against schema
- Message sent to exchange
- Exchange routes to queues via bindings
- Worker consumes from queue
- Message is validated again
- Handler called with typed message
- Message acknowledged
All with automatic type safety and validation!
Next Steps
- Learn about Defining Contracts
- Explore Client Usage
- Understand Worker Usage