Prisma PGMQ
A TypeScript library that provides type-safe methods for PostgreSQL Message Queue (PGMQ) operations in your Prisma-based applications.
Features
- 🔒 Type-safe: Full TypeScript support with proper type definitions
- 📦 Easy to use: Simple API with functional methods
- 🔌 Prisma Integration: Seamless integration with your existing Prisma setup
Installation
npm install prisma-pgmq
pnpm add prisma-pgmq
yarn add prisma-pgmq
Prerequisites
- PostgreSQL database with the PGMQ extension installed
- Prisma Client v5.0.0 or higher
- Node.js 16+
Enabling the PGMQ extension via Prisma
You can manage PostgreSQL extensions (including PGMQ) directly in your Prisma schema using the postgresqlExtensions preview feature. Add the extension to your datasource block in schema.prisma:
generator client {
provider = "prisma-client-js"
previewFeatures = ["postgresqlExtensions"]
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
extensions = [pgmq]
}
For more details, see the Prisma documentation on PostgreSQL extensions.
Quick Start
Functional API
import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-pgmq';
const prisma = new PrismaClient();
await pgmq.createQueue(prisma, 'my-work-queue');
await pgmq.send(prisma, 'my-work-queue', {
userId: 123,
action: 'send-email',
email: 'user@example.com'
});
API Reference
Message Operations
send(tx, queueName, message, delay?)
Send a single message to a queue.
const msgId = await pgmq.send(tx, 'my-queue', { data: 'hello' });
const msgId = await pgmq.send(tx, 'my-queue', { data: 'hello' }, 30);
const msgId = await pgmq.send(
tx,
'my-queue',
{ data: 'hello' },
new Date('2024-01-01T10:00:00Z')
);
sendBatch(tx, queueName, messages, delay?)
Send multiple messages to a queue in a single operation.
const msgIds = await pgmq.sendBatch(tx, 'my-queue', [
{ id: 1, data: 'message 1' },
{ id: 2, data: 'message 2' },
{ id: 3, data: 'message 3' }
]);
read(tx, queueName, vt, qty?, conditional?)
Read messages from a queue with visibility timeout.
const messages = await pgmq.read(tx, 'my-queue', 30, 5);
const messages = await pgmq.read(tx, 'my-queue', 30, 5, { priority: 'high' });
readWithPoll(tx, queueName, vt, qty?, maxPollSeconds?, pollIntervalMs?, conditional?)
Read messages with polling (wait for messages if none available).
const messages = await pgmq.readWithPoll(tx, 'my-queue', 30, 1, 10, 500);
pop(tx, queueName)
Read and immediately delete a message (atomic operation).
const messages = await pgmq.pop(tx, 'my-queue');
Message Management
deleteMessage(tx, queueName, msgId)
Delete a specific message.
const deleted = await pgmq.deleteMessage(tx, 'my-queue', 123);
deleteBatch(tx, queueName, msgIds)
Delete multiple messages.
const deletedIds = await pgmq.deleteBatch(tx, 'my-queue', [123, 124, 125]);
archive(tx, queueName, msgId)
Archive a message (move to archive table).
const archived = await pgmq.archive(tx, 'my-queue', 123);
archiveBatch(tx, queueName, msgIds)
Archive multiple messages.
const archivedIds = await pgmq.archiveBatch(tx, 'my-queue', [123, 124, 125]);
Queue Management
createQueue(tx, queueName)
Create a new queue.
await pgmq.createQueue(tx, 'my-new-queue');
createPartitionedQueue(tx, queueName, partitionInterval?, retentionInterval?)
Create a partitioned queue for high-throughput scenarios.
await pgmq.createPartitionedQueue(tx, 'high-volume-queue', '10000', '100000');
createUnloggedQueue(tx, queueName)
Create an unlogged queue (better performance, less durability).
await pgmq.createUnloggedQueue(tx, 'temp-queue');
dropQueue(tx, queueName)
Delete a queue and all its messages.
const dropped = await pgmq.dropQueue(tx, 'old-queue');
purgeQueue(tx, queueName)
Remove all messages from a queue.
const messageCount = await pgmq.purgeQueue(tx, 'my-queue');
Utilities
setVt(tx, queueName, msgId, vtOffset)
Set visibility timeout for a specific message.
const message = await pgmq.setVt(tx, 'my-queue', 123, 60);
listQueues(tx)
Get information about all queues.
const queues = await pgmq.listQueues(tx);
console.log(queues);
metrics(tx, queueName)
Get metrics for a specific queue.
const metrics = await pgmq.metrics(tx, 'my-queue');
console.log(metrics);
metricsAll(tx)
Get metrics for all queues.
const allMetrics = await pgmq.metricsAll(tx);
Type Definitions
Task
type Task = Record<string, unknown>;
MessageRecord
interface MessageRecord {
msg_id: number;
read_ct: number;
enqueued_at: Date;
vt: Date;
message: Task;
}
QueueMetrics
interface QueueMetrics {
queue_name: string;
queue_length: number;
newest_msg_age_sec: number | null;
oldest_msg_age_sec: number | null;
total_messages: number;
scrape_time: Date;
}
QueueInfo
interface QueueInfo {
queue_name: string;
created_at: Date;
is_partitioned: boolean;
is_unlogged: boolean;
}
Examples
Basic Worker Pattern
import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-pgmq';
const prisma = new PrismaClient();
async function sendTask(taskData: any) {
await pgmq.send(prisma, 'work-queue', {
type: 'process-user-data',
data: taskData,
timestamp: Date.now()
});
}
async function processMessages() {
const messages = await pgmq.readWithPoll(prisma, 'work-queue', 30, 5, 10, 1000);
for (const message of messages) {
try {
await handleTask(message.message);
await pgmq.deleteMessage(prisma, 'work-queue', message.msg_id);
} catch (error) {
console.error('Task failed:', error);
await pgmq.archive(prisma, 'work-queue', message.msg_id);
}
}
}
async function handleTask(task: any) {
console.log('Processing task:', task.type);
}
Delayed Message Scheduling
const futureDate = new Date(Date.now() + 24 * 60 * 60 * 1000);
await pgmq.send(prisma, 'scheduled-tasks', {
type: 'send-reminder',
userId: 123,
reminder: 'Your subscription expires tomorrow'
}, futureDate);
Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature)
- Commit your changes (
git commit -m 'feat: add amazing feature')
- Push to the branch (
git push origin feature/amazing-feature)
- Open a Pull Request
Development Setup
git clone https://github.com/dvlkv/prisma-pgmq.git
cd prisma-pgmq
pnpm install
pnpm test
pnpm build
pnpm dev
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
- PGMQ - PostgreSQL Message Queue extension
- Prisma - Next-generation ORM for TypeScript & Node.js