
Product
Reachability for Ruby Now in Beta
Reachability analysis for Ruby is now in beta, helping teams identify which vulnerabilities are truly exploitable in their applications.
prisma-pgmq
Advanced tools
A TypeScript library that provides type-safe methods for PostgreSQL Message Queue (PGMQ) operations in your Prisma-based applications.
npm install prisma-pgmq
# or
pnpm add prisma-pgmq
# or
yarn add prisma-pgmq
Enabling the PGMQ extension via Prisma
You can manage PostgreSQL extensions (including PGMQ) directly in your Prisma schema using the
postgresqlExtensionspreview feature. Add the extension to yourdatasourceblock inschema.prisma:generator client { provider = "prisma-client-js" previewFeatures = ["postgresqlExtensions"] } datasource db { provider = "postgresql" url = env("DATABASE_URL") extensions = [pgmq] }For more details, see the Prisma documentation on PostgreSQL extensions.
import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-pgmq';
const prisma = new PrismaClient();
// Create a queue
await pgmq.createQueue(prisma, 'my-work-queue');
// Send a message
await pgmq.send(prisma, 'my-work-queue', {
userId: 123,
action: 'send-email',
email: 'user@example.com'
});
send(tx, queueName, message, delay?)Send a single message to a queue.
const msgId = await pgmq.send(tx, 'my-queue', { data: 'hello' });
// Send with delay (seconds)
const msgId = await pgmq.send(tx, 'my-queue', { data: 'hello' }, 30);
// Send with specific time
tx, 'my-queue', { data: 'hello' }, new Date('2024-01-01T10:00:00Z'));
sendBatch(tx, queueName, messages, delay?)Send multiple messages to a queue in a single operation.
const msgIds = await pgmq.sendBatch(tx, 'my-queue', [
{ id: 1, data: 'message 1' },
{ id: 2, data: 'message 2' },
{ id: 3, data: 'message 3' }
]);
read(tx, queueName, vt, qty?, conditional?)Read messages from a queue with visibility timeout.
// Read up to 5 messages with 30 second visibility timeout
const messages = await pgmq.read(tx, 'my-queue', 30, 5);
// Read with conditional filtering
const messages = await pgmq.read(tx, 'my-queue', 30, 5, { priority: 'high' });
readWithPoll(tx, queueName, vt, qty?, maxPollSeconds?, pollIntervalMs?, conditional?)Read messages with polling (wait for messages if none available).
// Poll for up to 10 seconds, checking every 500ms
const messages = await pgmq.readWithPoll(tx, 'my-queue', 30, 1, 10, 500);
pop(tx, queueName)Read and immediately delete a message (atomic operation).
const messages = await pgmq.pop(tx, 'my-queue');
deleteMessage(tx, queueName, msgId)Delete a specific message.
const deleted = await pgmq.deleteMessage(tx, 'my-queue', 123);
deleteBatch(tx, queueName, msgIds)Delete multiple messages.
const deletedIds = await pgmq.deleteBatch(tx, 'my-queue', [123, 124, 125]);
archive(tx, queueName, msgId)Archive a message (move to archive table).
const archived = await pgmq.archive(tx, 'my-queue', 123);
archiveBatch(tx, queueName, msgIds)Archive multiple messages.
const archivedIds = await pgmq.archiveBatch(tx, 'my-queue', [123, 124, 125]);
createQueue(tx, queueName)Create a new queue.
await pgmq.createQueue(tx, 'my-new-queue');
createPartitionedQueue(tx, queueName, partitionInterval?, retentionInterval?)Create a partitioned queue for high-throughput scenarios.
await pgmq.createPartitionedQueue(tx, 'high-volume-queue', '10000', '100000');
createUnloggedQueue(tx, queueName)Create an unlogged queue (better performance, less durability).
await pgmq.createUnloggedQueue(tx, 'temp-queue');
dropQueue(tx, queueName)Delete a queue and all its messages.
const dropped = await pgmq.dropQueue(tx, 'old-queue');
purgeQueue(tx, queueName)Remove all messages from a queue.
const messageCount = await pgmq.purgeQueue(tx, 'my-queue');
setVt(tx, queueName, msgId, vtOffset)Set visibility timeout for a specific message.
const message = await pgmq.setVt(tx, 'my-queue', 123, 60); // 60 seconds
listQueues(tx)Get information about all queues.
const queues = await pgmq.listQueues(tx);
console.log(queues); // [{ queue_name: 'my-queue', created_at: ..., is_partitioned: false }]
metrics(tx, queueName)Get metrics for a specific queue.
const metrics = await pgmq.metrics(tx, 'my-queue');
console.log(metrics);
// {
// queue_name: 'my-queue',
// queue_length: 5,
// newest_msg_age_sec: 10,
// oldest_msg_age_sec: 300,
// total_messages: 1000,
// scrape_time: 2024-01-01T10:00:00.000Z
// }
metricsAll(tx)Get metrics for all queues.
const allMetrics = await pgmq.metricsAll(tx);
Tasktype Task = Record<string, unknown>;
MessageRecordinterface MessageRecord {
msg_id: number;
read_ct: number;
enqueued_at: Date;
vt: Date;
message: Task;
}
QueueMetricsinterface QueueMetrics {
queue_name: string;
queue_length: number;
newest_msg_age_sec: number | null;
oldest_msg_age_sec: number | null;
total_messages: number;
scrape_time: Date;
}
QueueInfointerface QueueInfo {
queue_name: string;
created_at: Date;
is_partitioned: boolean;
is_unlogged: boolean;
}
Choose visibility timeouts based on your message processing time:
// For quick operations (30 seconds)
const messages = await prisma.$pgmq.read('quick-tasks', 30);
// For longer operations (5 minutes)
const messages = await prisma.$pgmq.read('heavy-tasks', 300);
Always delete or archive messages after successful processing:
const messages = await prisma.$pgmq.read('my-queue', 30, 10);
for (const message of messages) {
try {
await processMessage(message.message);
await prisma.$pgmq.deleteMessage('my-queue', message.msg_id);
} catch (error) {
// Handle error - message will become visible again after timeout
console.error('Failed to process message:', error);
// Optionally archive failed messages
await prisma.$pgmq.archive('my-queue', message.msg_id);
}
}
import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-pgmq';
const prisma = new PrismaClient();
// Producer
async function sendTask(taskData: any) {
await pgmq.send(prisma, 'work-queue', {
type: 'process-user-data',
data: taskData,
timestamp: Date.now()
});
}
// Consumer
async function processMessages() {
const messages = await pgmq.readWithPoll(prisma, 'work-queue', 30, 5, 10, 1000);
for (const message of messages) {
try {
// Process the message
await handleTask(message.message);
// Delete on success
await pgmq.deleteMessage(prisma, 'work-queue', message.msg_id);
} catch (error) {
console.error('Task failed:', error);
// Archive failed messages for later analysis
await pgmq.archive(prisma, 'work-queue', message.msg_id);
}
}
}
async function handleTask(task: any) {
// Your business logic here
console.log('Processing task:', task.type);
}
// Schedule a message for later processing
const futureDate = new Date(Date.now() + 24 * 60 * 60 * 1000); // 24 hours
await pgmq.send(prisma, 'scheduled-tasks', {
type: 'send-reminder',
userId: 123,
reminder: 'Your subscription expires tomorrow'
}, futureDate);
git checkout -b feature/amazing-feature)git commit -m 'feat: add amazing feature')git push origin feature/amazing-feature)# Clone the repository
git clone https://github.com/dvlkv/prisma-pgmq.git
cd prisma-pgmq
# Install dependencies
pnpm install
# Run tests
pnpm test
# Build the library
pnpm build
# Watch for changes during development
pnpm dev
This project is licensed under the MIT License - see the LICENSE file for details.
FAQs
A Prisma PGMQ implementation providing type-safe message queue operations
We found that prisma-pgmq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Reachability analysis for Ruby is now in beta, helping teams identify which vulnerabilities are truly exploitable in their applications.

Research
/Security News
Malicious npm packages use Adspect cloaking and fake CAPTCHAs to fingerprint visitors and redirect victims to crypto-themed scam sites.

Security News
Recent coverage mislabels the latest TEA protocol spam as a worm. Here’s what’s actually happening.