@amqp-contract/worker
Type-safe AMQP worker for consuming messages.
Installation
pnpm add @amqp-contract/workerMain Exports
TypedAmqpWorker.create
Creates a type-safe AMQP worker from a contract with message handlers. Automatically connects to RabbitMQ and starts consuming all messages.
Signature:
static async create<TContract>(
options: CreateWorkerOptions<TContract>
): Promise<TypedAmqpWorker<TContract>>Example:
import { TypedAmqpWorker } from '@amqp-contract/worker';
import { contract } from './contract';
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message) => {
console.log('Processing:', message.orderId);
},
},
connection: 'amqp://localhost',
});Parameters:
options- Configuration object:contract- Contract definition created withdefineContracthandlers- Object with handler functions for each consumerconnection- AMQP connection URL (string) or connection options (Options.Connect)
Returns: Promise that resolves to a type-safe AMQP worker
TypedAmqpWorker API
close
Closes the worker, stops consuming, and closes the connection.
Signature:
async close(): Promise<void>Example:
await worker.close();Types
CreateWorkerOptions
interface CreateWorkerOptions<TContract> {
contract: TContract;
handlers: Handlers<TContract>;
connection: string | Options.Connect;
}Message Handlers
Handler Signature
type MessageHandler<TMessage> = (
message: TMessage,
context?: HandlerContext
) => Promise<void> | voidHandlerContext
The handler context provides message acknowledgment functions:
interface HandlerContext {
ack(): void;
nack(options?: NackOptions): void;
reject(options?: RejectOptions): void;
}Methods:
ack()- Acknowledge the message (mark as successfully processed)nack(options)- Negative acknowledge (reject with optional requeue)requeue- Whether to requeue the message (default:false)
reject(options)- Reject the messagerequeue- Whether to requeue the message (default:false)
Basic Example
import { TypedAmqpWorker } from '@amqp-contract/worker';
import { contract } from './contract';
async function main() {
// Create worker with handlers (automatically connects and starts consuming)
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message) => {
console.log(`Processing order ${message.orderId}`);
console.log(`Customer: ${message.customerId}`);
console.log(`Amount: $${message.amount}`);
// Your business logic here
await saveToDatabase(message);
},
notifyOrder: async (message) => {
console.log(`Sending notification for ${message.orderId}`);
await sendEmail(message);
},
},
connection: 'amqp://localhost',
});
console.log('Worker ready, waiting for messages...');
}
main();Manual Acknowledgment
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message, { ack, nack }) => {
try {
await processOrder(message);
// Explicitly acknowledge
ack();
} catch (error) {
console.error('Processing failed:', error);
// Reject and requeue
nack({ requeue: true });
}
},
},
connection,
});Error Handling
By default, errors in handlers are caught and logged:
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message) => {
// If this throws, the message is NOT acknowledged
// and remains in the queue
await riskyOperation(message);
},
},
connection,
});With manual acknowledgment for better control:
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message, { ack, nack, reject }) => {
try {
await processOrder(message);
ack();
} catch (error) {
if (isRetryable(error)) {
// Requeue for retry
nack({ requeue: true });
} else {
// Permanent failure, don't requeue
nack({ requeue: false });
}
}
},
},
connection,
});Graceful Shutdown
async function shutdown() {
console.log('Shutting down...');
// Stop consuming new messages
await worker.close();
// Close connection
await connection.close();
console.log('Shutdown complete');
process.exit(0);
}
// Handle shutdown signals
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);Multiple Workers
Run multiple workers for different consumers:
const processingWorker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message) => {
// Heavy processing
await processOrder(message);
},
},
connection: connection1,
});
const notificationWorker = await TypedAmqpWorker.create({
contract,
handlers: {
notifyOrder: async (message) => {
// Fast notifications
await sendNotification(message);
},
},
connection: connection2,
});Retry Logic
Implement custom retry logic:
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message, { ack, nack }) => {
try {
await processOrder(message);
ack();
} catch (error) {
console.error('Processing failed:', error);
// Requeue for retry (simple approach)
// For sophisticated retry logic, use dead letter queues
// with TTL to track retry attempts
nack({ requeue: true });
}
},
},
connection,
});Note: For production-grade retry logic with retry counting, configure Dead Letter Queues with message TTL and routing.
Dead Letter Queues
Configure dead letter queues in your contract:
const contract = defineContract({
queues: {
orderProcessing: defineQueue('order-processing', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'orders-dlx',
'x-dead-letter-routing-key': 'orders-dead',
},
}),
},
// ... rest of contract
});Consumer Options
Configure consumer behavior in the contract:
const contract = defineContract({
consumers: {
processOrder: defineConsumer('order-processing', orderSchema, {
prefetch: 10, // Process up to 10 messages concurrently
noAck: false, // Require explicit acknowledgment
exclusive: false, // Allow multiple consumers
}),
},
});Type Inference
The worker provides full type inference for consumer handlers:
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message) => {
// message is fully typed based on the schema
message.orderId; // string
message.amount; // number
message.items; // array
},
// ❌ TypeScript error: missing required handler
// notifyOrder: ...
},
connection,
});Complete Example
import { TypedAmqpWorker } from '@amqp-contract/worker';
import { connect } from 'amqplib';
import { contract } from './contract';
async function main() {
const connection = await connect('amqp://localhost');
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: async (message, { ack, nack }) => {
try {
console.log(`[PROCESS] Order ${message.orderId}`);
// Your business logic
await saveToDatabase(message);
await sendConfirmationEmail(message);
ack();
console.log(`[PROCESS] Order ${message.orderId} completed`);
} catch (error) {
console.error(`[PROCESS] Error:`, error);
nack({ requeue: true });
}
},
notifyOrder: async (message) => {
console.log(`[NOTIFY] Order ${message.orderId} event`);
await sendNotification(message);
},
},
connection,
});
console.log('Worker ready, waiting for messages...');
// Graceful shutdown
const shutdown = async () => {
console.log('Shutting down...');
await worker.close();
await connection.close();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
main().catch(console.error);See Also
- Contract API - Defining contracts
- Client API - Publishing messages
- Worker Usage Guide