Basic Order Processing
A complete example demonstrating type-safe AMQP messaging with the RabbitMQ topic pattern.
Overview
This example showcases:
- ✅ Contract definition with Zod schemas
- ✅ Type-safe message publishing
- ✅ Type-safe message consumption
- ✅ RabbitMQ topic exchange with wildcards
- ✅ Multiple consumers with different routing patterns
- ✅ Full end-to-end type safety
Architecture
The example consists of three packages:
- Contract - Shared contract definition
- Client - Publisher application
- Worker - Consumer application with multiple handlers
Topic Exchange Pattern
This example demonstrates RabbitMQ's powerful topic exchange pattern for flexible message routing.
Routing Diagram
Routing Keys
The example uses these routing keys:
order.created- New ordersorder.updated- Regular status updatesorder.shipped- Shipped ordersorder.*.urgent- Urgent updates (wildcard pattern)
Routing Patterns
Exact Match
order.created→ matches onlyorder.createdmessagesorder.shipped→ matches onlyorder.shippedmessages
Multiple Word Wildcard (#)
order.#→ matches zero or more words after "order."- ✅ Matches:
order.created,order.updated,order.shipped,order.updated.urgent
- ✅ Matches:
Single Word Wildcard (*)
order.*.urgent→ matches any single word between "order." and ".urgent"- ✅ Matches:
order.created.urgent,order.updated.urgent - ❌ Does NOT match:
order.created,order.updated
- ✅ Matches:
Running the Example
Prerequisites
Start RabbitMQ:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4-managementSetup
Install dependencies and build:
pnpm install
pnpm buildRun
Open two terminals:
Terminal 1 - Start the worker:
pnpm --filter @amqp-contract-examples/basic-order-processing-worker devTerminal 2 - Run the client:
pnpm --filter @amqp-contract-examples/basic-order-processing-client devExpected Output
The client publishes 5 messages, and you'll see the worker process them according to the routing patterns:
Client Output:
1️⃣ Publishing NEW ORDER (order.created)
✓ Published order ORD-001
→ Will be received by: processing & notifications queues
2️⃣ Publishing ORDER UPDATE (order.updated)
✓ Published update for ORD-001
→ Will be received by: notifications queue only
3️⃣ Publishing ORDER SHIPPED (order.shipped)
✓ Published shipment for ORD-001
→ Will be received by: notifications & shipping queues
4️⃣ Publishing ANOTHER NEW ORDER (order.created)
✓ Published order ORD-002
→ Will be received by: processing & notifications queues
5️⃣ Publishing URGENT ORDER UPDATE (order.updated.urgent)
✓ Published urgent update for ORD-002
→ Will be received by: notifications & urgent queuesWorker Output:
Subscribed to:
• order.created → processOrder handler
• order.# → notifyOrder handler (all events)
• order.shipped → shipOrder handler
• order.*.urgent → handleUrgentOrder handler
[PROCESSING] New order received (ORD-001)
[NOTIFICATIONS] Event received (new_order: ORD-001)
[NOTIFICATIONS] Event received (status_update: ORD-001)
[SHIPPING] Shipment notification received (ORD-001)
[NOTIFICATIONS] Event received (new_order: ORD-002)
[PROCESSING] New order received (ORD-002)
[URGENT] Priority order update received! (ORD-002)
[NOTIFICATIONS] Event received (status_update: ORD-002)Contract Definition
The contract is defined in a separate package (@amqp-contract-examples/basic-order-processing-contract) that is shared between the client and worker.
Message Schemas
Order Schema (for new orders):
const orderSchema = z.object({
orderId: z.string(),
customerId: z.string(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number().int().positive(),
price: z.number().positive(),
}),
),
totalAmount: z.number().positive(),
createdAt: z.string().datetime(),
});Order Status Schema (for updates):
const orderStatusSchema = z.object({
orderId: z.string(),
status: z.enum(["processing", "shipped", "delivered", "cancelled"]),
updatedAt: z.string().datetime(),
});Contract Structure
// 1. Define resources first
const ordersExchange = defineExchange("orders", "topic", { durable: true });
const orderProcessingQueue = defineQueue("order-processing", { durable: true });
const orderNotificationsQueue = defineQueue("order-notifications", { durable: true });
const orderShippingQueue = defineQueue("order-shipping", { durable: true });
const orderUrgentQueue = defineQueue("order-urgent", { durable: true });
// 2. Define messages
const orderMessage = defineMessage(orderSchema, {
summary: "Order created event",
description: "Emitted when a new order is created",
});
const orderStatusMessage = defineMessage(orderStatusSchema);
const orderUnionMessage = defineMessage(z.union([orderSchema, orderStatusSchema]));
// 3. Compose contract using object references
export const orderContract = defineContract({
exchanges: {
orders: ordersExchange,
},
queues: {
orderProcessing: orderProcessingQueue,
orderNotifications: orderNotificationsQueue,
orderShipping: orderShippingQueue,
orderUrgent: orderUrgentQueue,
},
bindings: {
orderProcessingBinding: defineQueueBinding(orderProcessingQueue, ordersExchange, {
routingKey: "order.created",
}),
orderNotificationsBinding: defineQueueBinding(orderNotificationsQueue, ordersExchange, {
routingKey: "order.#",
}),
orderShippingBinding: defineQueueBinding(orderShippingQueue, ordersExchange, {
routingKey: "order.shipped",
}),
orderUrgentBinding: defineQueueBinding(orderUrgentQueue, ordersExchange, {
routingKey: "order.*.urgent",
}),
},
publishers: {
orderCreated: definePublisher(ordersExchange, orderMessage, {
routingKey: "order.created",
}),
orderUpdated: definePublisher(ordersExchange, orderStatusMessage, {
routingKey: "order.updated",
}),
orderShipped: definePublisher(ordersExchange, orderStatusMessage, {
routingKey: "order.shipped",
}),
orderUrgentUpdate: definePublisher(ordersExchange, orderStatusMessage, {
routingKey: "order.updated.urgent",
}),
},
consumers: {
processOrder: defineConsumer(orderProcessingQueue, orderMessage),
notifyOrder: defineConsumer(orderNotificationsQueue, orderUnionMessage),
shipOrder: defineConsumer(orderShippingQueue, orderStatusMessage),
handleUrgentOrder: defineConsumer(orderUrgentQueue, orderStatusMessage),
},
});Client Implementation
The client is in a separate package (@amqp-contract-examples/basic-order-processing-client) that imports the contract:
import { TypedAmqpClient } from "@amqp-contract/client";
import { orderContract } from "@amqp-contract-examples/basic-order-processing-contract";
const client = await TypedAmqpClient.create({
contract: orderContract,
urls: ["amqp://localhost"],
}).resultToPromise();
// Publish new order with explicit error handling
const result = await client.publish("orderCreated", {
orderId: "ORD-001",
customerId: "CUST-123",
items: [{ productId: "PROD-A", quantity: 2, price: 29.99 }],
totalAmount: 59.98,
createdAt: new Date().toISOString(),
});
result.match({
Ok: () => console.log("Order published successfully"),
Error: (error) => {
console.error("Failed to publish:", error.message);
// Handle error appropriately
},
});
// Publish status update
const updateResult = await client.publish("orderUpdated", {
orderId: "ORD-001",
status: "processing",
updatedAt: new Date().toISOString(),
});
updateResult.match({
Ok: () => console.log("Status update published"),
Error: (error) => console.error("Failed:", error),
});Worker Implementation
The worker is in a separate package (@amqp-contract-examples/basic-order-processing-worker) that imports the contract:
import { TypedAmqpWorker } from "@amqp-contract/worker";
import { connect } from "amqplib";
import { orderContract } from "@amqp-contract-examples/basic-order-processing-contract";
const connection = await connect("amqp://localhost");
const worker = await TypedAmqpWorker.create({
contract: orderContract,
handlers: {
processOrder: ({ payload }) => {
console.log(`[PROCESSING] Order ${payload.orderId}`);
console.log(` Customer: ${payload.customerId}`);
console.log(` Total: $${payload.totalAmount}`);
return Future.value(Result.Ok(undefined));
},
notifyOrder: ({ payload }) => {
console.log(`[NOTIFICATION] Order ${payload.orderId} event`);
return Future.value(Result.Ok(undefined));
},
shipOrder: ({ payload }) => {
console.log(`[SHIPPING] Order ${payload.orderId} - ${payload.status}`);
return Future.value(Result.Ok(undefined));
},
handleUrgentOrder: ({ payload }) => {
console.log(`[URGENT] Order ${payload.orderId} - ${payload.status}`);
return Future.value(Result.Ok(undefined));
},
},
connection,
});Message Routing Table
| Message Published | Routing Key | Queues Receiving | Handlers Triggered |
|---|---|---|---|
| New Order | order.created | ✅ order-processing ✅ order-notifications | processOrder notifyOrder |
| Regular Update | order.updated | ✅ order-notifications | notifyOrder |
| Shipped Order | order.shipped | ✅ order-notifications ✅ order-shipping | notifyOrder shipOrder |
| Urgent Update | order.updated.urgent | ✅ order-notifications ✅ order-urgent | notifyOrder handleUrgentOrder |
Message Flow Example
This sequence diagram shows how a message flows through the system:
Key Takeaways
- Flexible Routing - Topic patterns enable complex routing without code changes
- Type Safety - TypeScript ensures correctness at compile time
- Validation - Zod validates all messages at runtime
- Decoupling - Publishers don't need to know about consumers
- Scalability - Easy to add new routing patterns
Source Code
The complete source code is available in the repository:
Next Steps
- Try modifying the routing keys
- Add new publishers or consumers
- Learn about Client Usage and Worker Usage