Message Compression
Learn how to use message compression to reduce bandwidth and improve performance for large AMQP payloads.
Overview
The @amqp-contract library supports optional message compression using industry-standard algorithms (gzip and deflate). Compression is applied at publish time as a runtime decision, giving you flexibility to compress messages based on size, content type, or other runtime conditions.
Key Features
- Runtime Decision: Choose whether to compress each message when publishing
- Automatic Decompression: Workers automatically decompress messages based on
contentEncodingheader - Type-Safe: Compression options are fully type-checked
- Zero Consumer Config: No configuration needed on the consumer side
- Multiple Algorithms: Support for gzip and deflate compression
When to Use Compression
Compression is beneficial for:
✅ Large messages (>1KB) - Reduces network bandwidth and transmission time
✅ Text-heavy payloads - JSON, XML, and text compress very well
✅ High-volume messaging - Reduces network costs and improves throughput
✅ Limited bandwidth - Useful in constrained network environments
Compression may not be beneficial for:
❌ Small messages (<500 bytes) - Compression overhead may exceed savings
❌ Already compressed data - Images, videos, or pre-compressed files
❌ CPU-constrained systems - Compression requires CPU resources
Basic Usage
Publishing with Compression
Compression is specified in the publish options:
import { TypedAmqpClient } from "@amqp-contract/client";
import { contract } from "./contract";
const client = await TypedAmqpClient.create({
contract,
urls: ["amqp://localhost"],
}).resultToPromise();
// Publish with gzip compression
await client
.publish(
"orderCreated",
{
orderId: "ORD-123",
items: [...], // Large array of items
},
{
compression: "gzip",
},
)
.resultToPromise();
// Publish with deflate compression
await client
.publish(
"orderCreated",
{
orderId: "ORD-124",
items: [...],
},
{
compression: "deflate",
},
)
.resultToPromise();
// Publish without compression
await client
.publish("orderCreated", {
orderId: "ORD-125",
items: [],
})
.resultToPromise();Consuming Compressed Messages
No configuration needed! Workers automatically detect and decompress messages:
import { TypedAmqpWorker } from "@amqp-contract/worker";
import { contract } from "./contract";
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: ({ payload }) => {
// Message is automatically decompressed
console.log("Processing order:", payload.orderId);
console.log("Items:", payload.items); // Already decompressed
return Future.value(Result.Ok(undefined));
},
},
urls: ["amqp://localhost"],
}).resultToPromise();The worker automatically:
- Reads the
contentEncodingheader - Decompresses the payload if needed
- Validates and passes the decompressed message to your handler
Conditional Compression
Compress messages based on runtime conditions:
class OrderPublisher {
constructor(private client: TypedAmqpClient<typeof contract>) {}
async publishOrder(order: Order) {
// Calculate message size
const messageSize = JSON.stringify(order).length;
// Compress if message is larger than 1KB
const shouldCompress = messageSize > 1024;
await this.client
.publish("orderCreated", order, {
compression: shouldCompress ? "gzip" : undefined,
persistent: true,
})
.resultToPromise();
}
}Compression Algorithms
gzip
- Best for: General-purpose compression
- Compression ratio: High (typically 70-80% reduction for text)
- Speed: Moderate
- Compatibility: Widely supported
client.publish("event", data, { compression: "gzip" });deflate
- Best for: Faster compression with slightly lower ratio
- Compression ratio: Good (typically 65-75% reduction for text)
- Speed: Fast
- Compatibility: Widely supported
client.publish("event", data, { compression: "deflate" });How It Works
Publishing Flow
- Message validation: Schema validation happens first
- Serialization: Message is converted to JSON
- Compression: If specified, payload is compressed using the chosen algorithm
- Header setting:
contentEncodingheader is set to the algorithm name - Publishing: Compressed payload is sent to RabbitMQ
Consumption Flow
- Message received: Worker receives the message
- Header check:
contentEncodingheader is read - Decompression: If present, payload is decompressed
- Deserialization: JSON is parsed
- Validation: Schema validation runs
- Handler invocation: Your handler receives the validated message
Error Handling
Compression Errors
Compression errors are returned in the Result type:
await client
.publish("event", data, {
compression: "gzip",
})
.resultToPromise();Decompression Errors
Unsupported encodings throw errors during consumption:
// Worker automatically handles known encodings (gzip, deflate)
// Unsupported encodings will throw an error and reject the messageBest Practices
1. Measure Before Compressing
Test compression with your actual data:
const testData = {
/* your typical message */
};
const json = JSON.stringify(testData);
console.log("Original size:", json.length, "bytes");
// Test compression
const { gzip } = require("node:zlib");
const { promisify } = require("node:util");
const gzipAsync = promisify(gzip);
const compressed = await gzipAsync(Buffer.from(json));
console.log("Compressed size:", compressed.length, "bytes");
console.log("Reduction:", ((1 - compressed.length / json.length) * 100).toFixed(1) + "%");2. Set a Size Threshold
Only compress messages above a certain size:
const SIZE_THRESHOLD = 1024; // 1KB
function shouldCompress(message: unknown): boolean {
return JSON.stringify(message).length > SIZE_THRESHOLD;
}
await client
.publish("event", data, {
compression: shouldCompress(data) ? "gzip" : undefined,
})
.resultToPromise();3. Monitor Performance
Track compression ratios and performance:
const startTime = Date.now();
const originalSize = JSON.stringify(data).length;
await client
.publish("event", data, {
compression: "gzip",
})
.resultToPromise();
const duration = Date.now() - startTime;
console.log("Published in", duration, "ms");
console.log("Original size:", originalSize, "bytes");4. Consider Content Type
Compress text-heavy content, skip binary data:
function getCompressionForContent(data: unknown): "gzip" | undefined {
// Check if data is text-heavy (JSON, strings, arrays)
if (Array.isArray(data) || typeof data === "object") {
return "gzip";
}
// Skip compression for binary or already compressed data
return undefined;
}5. Document Compression Usage
Document which messages use compression in your contract comments:
/**
* Order created event
*
* @remarks
* Consider using gzip compression for large orders with many items
*/
const orderMessage = defineMessage(orderSchema, {
summary: "Order created event",
});Troubleshooting
Message Size Still Large
- Verify compression is actually being applied (check
contentEncodingheader in RabbitMQ UI) - Try a different algorithm (gzip vs deflate)
- Consider if your data is already compressed (images, etc.)
Performance Issues
- Compression adds CPU overhead; monitor your application's CPU usage
- Consider reducing compression for time-sensitive messages
- Use deflate instead of gzip for faster (but slightly less effective) compression
Compatibility Issues
- Ensure all consumers support decompression (built into
@amqp-contract/worker) - Check that RabbitMQ version supports content-encoding headers (3.x+)
- Verify network proxies don't interfere with compressed payloads
Next Steps
- Learn about Client Usage
- Explore Worker Usage
- See Testing strategies