Socket
Book a DemoInstallSign in
Socket

prisma-extension-pgmq

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install
Package was removed
Sorry, it seems this package was removed from the registry

prisma-extension-pgmq

A Prisma Client extension for PostgreSQL Message Queue (PGMQ) providing type-safe message queue operations

latest
Source
npmnpm
Version
0.1.2
Version published
Maintainers
1
Created
Source

Prisma PGMQ

A TypeScript library that provides a Prisma Client extension for PostgreSQL Message Queue (PGMQ), enabling type-safe message queue operations in your Prisma-based applications.

Features

  • 🔒 Type-safe: Full TypeScript support with proper type definitions
  • 📦 Easy to use: Simple API with both functional and class-based interfaces
  • 🔌 Prisma Integration: Seamless integration with your existing Prisma setup

Installation

npm install prisma-extension-pgmq
# or
pnpm add prisma-extension-pgmq
# or
yarn add prisma-extension-pgmq

Prerequisites

  • PostgreSQL database with PGMQ extension installed
  • Prisma Client v5.0.0 or higher
  • Node.js 16+

Quick Start

The Prisma Client extension provides automatic transaction management and seamless integration:

import { PrismaClient } from '@prisma/client';
import { pgmqExtension } from 'prisma-extension-pgmq';

const prisma = new PrismaClient().$extends(pgmqExtension);

// Create a queue
await prisma.$pgmq.createQueue('my-work-queue');

// Send a message
const msgId = await prisma.$pgmq.send('my-work-queue', {
  userId: 123,
  action: 'send-email',
  email: 'user@example.com'
});

// Read and process messages
const messages = await prisma.$pgmq.read('my-work-queue', 30, 5);

for (const message of messages) {
  console.log('Processing message:', message.message);
  
  // Process the message...
  
  // Delete the message when done
  await prisma.$pgmq.deleteMessage('my-work-queue', message.msg_id);
}

Functional API (Advanced Usage)

For advanced usage, you can access the core functions directly within transactions:

import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-extension-pgmq';

const prisma = new PrismaClient();

// Create a queue
await prisma.$transaction(async (tx) => {
  await pgmq.createQueue(tx, 'my-work-queue');
});

// Send a message
await prisma.$transaction(async (tx) => {
  const msgId = await pgmq.send(tx, 'my-work-queue', {
    userId: 123,
    action: 'send-email',
    email: 'user@example.com'
  });
  console.log('Message sent with ID:', msgId);
});

API Reference

Message Operations

send(queueName, message, delay?)

Send a single message to a queue.

// Send immediately
const msgId = await prisma.$pgmq.send('my-queue', { data: 'hello' });

// Send with delay (seconds)
const msgId = await prisma.$pgmq.send('my-queue', { data: 'hello' }, 30);

// Send with specific time
const msgId = await prisma.$pgmq.send('my-queue', { data: 'hello' }, new Date('2024-01-01T10:00:00Z'));

sendBatch(queueName, messages, delay?)

Send multiple messages to a queue in a single operation.

const msgIds = await prisma.$pgmq.sendBatch('my-queue', [
  { id: 1, data: 'message 1' },
  { id: 2, data: 'message 2' },
  { id: 3, data: 'message 3' }
]);

read(queueName, vt, qty?, conditional?)

Read messages from a queue with visibility timeout.

// Read up to 5 messages with 30 second visibility timeout
const messages = await prisma.$pgmq.read('my-queue', 30, 5);

// Read with conditional filtering
const messages = await prisma.$pgmq.read('my-queue', 30, 5, { priority: 'high' });

readWithPoll(queueName, vt, qty?, maxPollSeconds?, pollIntervalMs?, conditional?)

Read messages with polling (wait for messages if none available).

// Poll for up to 10 seconds, checking every 500ms
const messages = await prisma.$pgmq.readWithPoll('my-queue', 30, 1, 10, 500);

pop(queueName)

Read and immediately delete a message (atomic operation).

const messages = await prisma.$pgmq.pop('my-queue');

Message Management

deleteMessage(queueName, msgId)

Delete a specific message.

const deleted = await prisma.$pgmq.deleteMessage('my-queue', 123);

deleteBatch(queueName, msgIds)

Delete multiple messages.

const deletedIds = await prisma.$pgmq.deleteBatch('my-queue', [123, 124, 125]);

archive(queueName, msgId)

Archive a message (move to archive table).

const archived = await prisma.$pgmq.archive('my-queue', 123);

archiveBatch(queueName, msgIds)

Archive multiple messages.

const archivedIds = await prisma.$pgmq.archiveBatch('my-queue', [123, 124, 125]);

Queue Management

createQueue(queueName)

Create a new queue.

await prisma.$pgmq.createQueue('my-new-queue');

createPartitionedQueue(queueName, partitionInterval?, retentionInterval?)

Create a partitioned queue for high-throughput scenarios.

await prisma.$pgmq.createPartitionedQueue('high-volume-queue', '10000', '100000');

createUnloggedQueue(queueName)

Create an unlogged queue (better performance, less durability).

await prisma.$pgmq.createUnloggedQueue('temp-queue');

dropQueue(queueName)

Delete a queue and all its messages.

const dropped = await prisma.$pgmq.dropQueue('old-queue');

purgeQueue(queueName)

Remove all messages from a queue.

const messageCount = await prisma.$pgmq.purgeQueue('my-queue');

Utilities

setVt(queueName, msgId, vtOffset)

Set visibility timeout for a specific message.

const message = await prisma.$pgmq.setVt('my-queue', 123, 60); // 60 seconds

listQueues()

Get information about all queues.

const queues = await prisma.$pgmq.listQueues();
console.log(queues); // [{ queue_name: 'my-queue', created_at: ..., is_partitioned: false }]

metrics(queueName)

Get metrics for a specific queue.

const metrics = await prisma.$pgmq.metrics('my-queue');
console.log(metrics);
// {
//   queue_name: 'my-queue',
//   queue_length: 5n,
//   newest_msg_age_sec: 10,
//   oldest_msg_age_sec: 300,
//   total_messages: 1000n,
//   scrape_time: 2024-01-01T10:00:00.000Z
// }

metricsAll()

Get metrics for all queues.

const allMetrics = await prisma.$pgmq.metricsAll();

Type Definitions

Task

type Task = Record<string, unknown>;

MessageRecord

interface MessageRecord {
  msg_id: number;
  read_ct: number;
  enqueued_at: Date;
  vt: Date;
  message: Task;
}

QueueMetrics

interface QueueMetrics {
  queue_name: string;
  queue_length: number;
  newest_msg_age_sec: number | null;
  oldest_msg_age_sec: number | null;
  total_messages: number;
  scrape_time: Date;
}

QueueInfo

interface QueueInfo {
  queue_name: string;
  created_at: Date;
  is_partitioned: boolean;
  is_unlogged: boolean;
}

Testing

The library includes both unit and integration tests:

# Run all tests
pnpm test

# Run only unit tests (no database required)
pnpm test:unit

# Run only integration tests (requires database)
pnpm test:db

# Run tests with coverage
pnpm test:coverage

# Watch mode for development
pnpm test:watch

Setting up Integration Tests

Integration tests require a PostgreSQL database with the PGMQ extension. Set up your test database and configure the connection string in your environment.

Best Practices

1. Use Appropriate Visibility Timeouts

Choose visibility timeouts based on your message processing time:

// For quick operations (30 seconds)
const messages = await prisma.$pgmq.read('quick-tasks', 30);

// For longer operations (5 minutes)
const messages = await prisma.$pgmq.read('heavy-tasks', 300);

2. Handle Message Processing Failures

Always delete or archive messages after successful processing:

const messages = await prisma.$pgmq.read('my-queue', 30, 10);

for (const message of messages) {
  try {
    await processMessage(message.message);
    await prisma.$pgmq.deleteMessage('my-queue', message.msg_id);
  } catch (error) {
    // Handle error - message will become visible again after timeout
    console.error('Failed to process message:', error);
    // Optionally archive failed messages
    await prisma.$pgmq.archive('my-queue', message.msg_id);
  }
}

Group related PGMQ operations in transactions:

await prisma.$pgmq.transaction(async (pgmq) => {
  // Send notification message
  const notificationId = await pgmq.send('notifications', {
    type: 'order-confirmation',
    orderId: order.id
  });
  
  // Send processing message
  const processingId = await pgmq.send('order-processing', {
    orderId: order.id,
    notificationId
  });
  
  return { notificationId, processingId };
});

4. Monitor Queue Metrics

Regularly check queue metrics to ensure healthy operation:

const metrics = await prisma.$pgmq.metrics('my-queue');

if (metrics.queue_length > 1000) {
  console.warn('Queue is getting full:', metrics.queue_length);
}

if (metrics.oldest_msg_age_sec && metrics.oldest_msg_age_sec > 3600) {
  console.warn('Messages are getting stale:', metrics.oldest_msg_age_sec);
}

Examples

Basic Worker Pattern

import { PrismaClient } from '@prisma/client';
import { pgmqExtension } from 'prisma-extension-pgmq';

const prisma = new PrismaClient().$extends(pgmqExtension);

// Producer
async function sendTask(taskData: any) {
  await prisma.$pgmq.send('work-queue', {
    type: 'process-user-data',
    data: taskData,
    timestamp: Date.now()
  });
}

// Consumer
async function processMessages() {
  while (true) {
    const messages = await prisma.$pgmq.readWithPoll('work-queue', 30, 5, 10, 1000);
    
    for (const message of messages) {
      try {
        // Process the message
        await handleTask(message.message);
        
        // Delete on success
        await prisma.$pgmq.deleteMessage('work-queue', message.msg_id);
      } catch (error) {
        console.error('Task failed:', error);
        // Archive failed messages for later analysis
        await prisma.$pgmq.archive('work-queue', message.msg_id);
      }
    }
  }
}

async function handleTask(task: any) {
  // Your business logic here
  console.log('Processing task:', task.type);
}

Delayed Message Scheduling

// Schedule a message for later processing
const futureDate = new Date(Date.now() + 24 * 60 * 60 * 1000); // 24 hours

await prisma.$pgmq.send('scheduled-tasks', {
  type: 'send-reminder',
  userId: 123,
  reminder: 'Your subscription expires tomorrow'
}, futureDate);

Priority Queue Pattern

// Send high-priority message
await prisma.$pgmq.send('tasks', {
  priority: 'high',
  type: 'urgent-processing',
  data: urgentData
});

// Read high-priority messages first
const highPriorityMessages = await prisma.$pgmq.read('tasks', 30, 10, { priority: 'high' });

if (highPriorityMessages.length === 0) {
  // Fall back to normal priority
  const normalMessages = await prisma.$pgmq.read('tasks', 30, 10, { priority: 'normal' });
}

Contributing

  • Fork the repository
  • Create your feature branch (git checkout -b feature/amazing-feature)
  • Commit your changes (git commit -m 'feat: add amazing feature')
  • Push to the branch (git push origin feature/amazing-feature)
  • Open a Pull Request

Development Setup

# Clone the repository
git clone https://github.com/your-username/prisma-extension-pgmq.git
cd prisma-extension-pgmq

# Install dependencies
pnpm install

# Run tests
pnpm test

# Build the library
pnpm build

# Watch for changes during development
pnpm dev

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • PGMQ - PostgreSQL Message Queue extension
  • Prisma - Next-generation ORM for TypeScript & Node.js

Keywords

prisma

FAQs

Package last updated on 26 Jun 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts