pgdb-queue
A lightweight Kafka-style queue built using a PostgreSQL database. Designed for simplicity, performance, and low cost β no need for Kafka, Redis, or additional infrastructure.
Ideal for startups and small apps that need reliable background task processing without operational overhead.
π Features
- β
Kafka-like producer/consumer API
- β
Uses PostgreSQL for persistence
- β
Efficient LISTEN/NOTIFY-based message wake-up
- β
Zero-polling after queue is drained (saves DB read costs)
- β
Row-level locking using
FOR UPDATE SKIP LOCKED
- β
Automatic retries for failed messages
- β
FIFO message ordering
- β
Minimal setup
π¦ Installation
npm install pgdb-queue
π οΈ Setup
Before producing or consuming messages, initialize the queue:
import { initQueue } from "pgdb-queue";
await initQueue(
"postgres://user:password@host:port/database",
"schemaName.tableName"
);
This sets up the connection pool and ensures the message_queue
table exists.
π€ Producing Messages
import { produce } from "pgdb-queue";
await produce(
"email-topic",
JSON.stringify({
to: "user@example.com",
subject: "Welcome!",
})
);
The message must be a string. Use JSON.stringify()
to send structured data.
Parameters:
topic
: string
message
: string
(recommended: JSON.stringify(object)
)
π₯ Consume a Single Message (Manual)
This gives you full control (useful in cron jobs or custom workers):
import { consume } from "pgdb-queue";
await consume("email-topic", async (msg, id) => {
const data = JSON.parse(msg);
console.log("Received:", data);
});
π startConsumer
: Auto-Wake Continuous Consumer
Automatically listens for new messages and drains the queue efficiently:
import { startConsumer } from "pgdb-queue";
await startConsumer(
"email-topic",
async (msg, id) => {
const data = JSON.parse(msg);
console.log("Processing:", data);
},
{
rateLimitMs: 10,
}
);
How it saves DB read costs
- Uses PostgreSQLβs
LISTEN/NOTIFY
to be notified only when a new message is inserted.
- Drains messages until the queue is empty.
- Once the queue is empty, it stops polling β and resumes only when a new message is pushed.
No setInterval
polling = minimal read overhead.
Perfect for scalable background jobs with low or bursty volume.
ποΈ Table Schema
The following table is automatically created (if missing):
CREATE TABLE IF NOT EXISTS schemaName.tableName (
id SERIAL PRIMARY KEY,
topic TEXT NOT NULL,
message TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
retry_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_topic_status ON schemaName.tableName (topic, status);
message
is stored as TEXT
.
βοΈ Roadmap
Planned features:
- β³ Delayed message delivery
- βοΈ Per-topic concurrency limit
- π Visibility timeouts
- π Dead-letter queue for persistent failures
π§ͺ Example Use Cases
- π§ Email sending
- π Webhook dispatching
- π§Ύ Background jobs (e.g. invoice generation)
- π Microservice coordination
- π² Simple job queue for frontend apps (via API)
π License
MIT License
Β© 2025 Shivaji Kumar