Message Queues in Node
Introduction
When work must survive restarts, scale across processes, or decouple services, teams use message queues—RabbitMQ, Redis queues, Amazon SQS, Kafka, and others. Node producers publish messages; consumers process them asynchronously. This chapter explains concepts and a minimal Redis-style pattern without requiring a running broker in every exercise.
Prerequisites
Why Queues Exist
HTTP request/response is synchronous from the client’s view. Background jobs—send email, resize images, rebuild search index—should not block the API response.
Benefits:
- Buffer spikes — absorb traffic bursts
- Retry — failed jobs requeued with backoff
- Scale consumers — add worker processes independently
Core Terms
| Term | Meaning |
|---|---|
| Producer | Publishes a message |
| Consumer | Reads and handles a message |
| Broker | Middle service (RabbitMQ, Redis, SQS) |
| Queue | Named channel messages wait in |
| Topic / exchange | Routing rules (varies by product) |
Flow Diagram (Conceptual)
API (Node) --publish--> [Queue] --consume--> Worker (Node)API returns 202 Accepted quickly; worker completes job later.
Redis List as Simple Queue (Pattern)
With Redis running locally:
npm install ioredis// producer.mjs
import Redis from "ioredis";
const redis = new Redis(process.env.REDIS_URL ?? "redis://127.0.0.1:6379");
async function enqueueEmailJob(payload) {
const job = JSON.stringify({ type: "send_email", payload, enqueuedAt: Date.now() });
await redis.lpush("jobs:email", job);
console.log("enqueued");
}
await enqueueEmailJob({ to: "user@example.com", subject: "Welcome" });
await redis.quit();// consumer.mjs
import Redis from "ioredis";
const redis = new Redis(process.env.REDIS_URL ?? "redis://127.0.0.1:6379");
async function processOne() {
const raw = await redis.brpop("jobs:email", 0);
const job = JSON.parse(raw[1]);
console.log("processing", job.type, job.payload);
}
await processOne();
await redis.quit();BRPOP blocks until a job arrives—real workers loop with timeout and error handling.
RabbitMQ / SQS (High Level)
- RabbitMQ — AMQP, exchanges, routing keys;
amqplibin Node - Amazon SQS — managed queue in AWS;
@aws-sdk/client-sqs - Kafka — log streaming at scale; heavier ops setup
Pick based on ops capacity and delivery guarantees (at-least-once vs exactly-once).
Idempotent Consumers
Messages may deliver more than once. Consumers should tolerate duplicates:
// Use idempotency key in DB
async function handlePaymentJob({ paymentId }) {
const existing = await findProcessed(paymentId);
if (existing) return;
await chargeAndRecord(paymentId);
}Dead Letter Queue (DLQ)
After N failures, move message to a DLQ for manual inspection instead of infinite retry.
Mini Example: In-Memory Queue (Dev Only)
// Not for production — process memory lost on restart
const queue = [];
export function publish(task) {
queue.push(task);
}
export async function drain(handler) {
while (queue.length) {
const task = queue.shift();
await handler(task);
}
}
publish({ id: 1, action: "resize" });
await drain(async (task) => console.log("run", task));FAQ
Queue vs cron?
Cron runs on a schedule; queues react to events as they arrive.
vs Web Workers?
Workers are in-browser threads; message queues are usually cross-process/server.
Ordering?
Some brokers guarantee order per partition/key; others do not—design accordingly.