
Security News
Another Round of TEA Protocol Spam Floods npm, But It’s Not a Worm
Recent coverage mislabels the latest TEA protocol spam as a worm. Here’s what’s actually happening.
prisma-extension-pgmq
Advanced tools
A Prisma Client extension for PostgreSQL Message Queue (PGMQ) providing type-safe message queue operations
A TypeScript library that provides a Prisma Client extension for PostgreSQL Message Queue (PGMQ), enabling type-safe message queue operations in your Prisma-based applications.
npm install prisma-extension-pgmq
# or
pnpm add prisma-extension-pgmq
# or
yarn add prisma-extension-pgmq
The Prisma Client extension provides automatic transaction management and seamless integration:
import { PrismaClient } from '@prisma/client';
import { pgmqExtension } from 'prisma-extension-pgmq';
const prisma = new PrismaClient().$extends(pgmqExtension);
// Create a queue
await prisma.$pgmq.createQueue('my-work-queue');
// Send a message
const msgId = await prisma.$pgmq.send('my-work-queue', {
userId: 123,
action: 'send-email',
email: 'user@example.com'
});
// Read and process messages
const messages = await prisma.$pgmq.read('my-work-queue', 30, 5);
for (const message of messages) {
console.log('Processing message:', message.message);
// Process the message...
// Delete the message when done
await prisma.$pgmq.deleteMessage('my-work-queue', message.msg_id);
}
For advanced usage, you can access the core functions directly within transactions:
import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-extension-pgmq';
const prisma = new PrismaClient();
// Create a queue
await prisma.$transaction(async (tx) => {
await pgmq.createQueue(tx, 'my-work-queue');
});
// Send a message
await prisma.$transaction(async (tx) => {
const msgId = await pgmq.send(tx, 'my-work-queue', {
userId: 123,
action: 'send-email',
email: 'user@example.com'
});
console.log('Message sent with ID:', msgId);
});
send(queueName, message, delay?)Send a single message to a queue.
// Send immediately
const msgId = await prisma.$pgmq.send('my-queue', { data: 'hello' });
// Send with delay (seconds)
const msgId = await prisma.$pgmq.send('my-queue', { data: 'hello' }, 30);
// Send with specific time
const msgId = await prisma.$pgmq.send('my-queue', { data: 'hello' }, new Date('2024-01-01T10:00:00Z'));
sendBatch(queueName, messages, delay?)Send multiple messages to a queue in a single operation.
const msgIds = await prisma.$pgmq.sendBatch('my-queue', [
{ id: 1, data: 'message 1' },
{ id: 2, data: 'message 2' },
{ id: 3, data: 'message 3' }
]);
read(queueName, vt, qty?, conditional?)Read messages from a queue with visibility timeout.
// Read up to 5 messages with 30 second visibility timeout
const messages = await prisma.$pgmq.read('my-queue', 30, 5);
// Read with conditional filtering
const messages = await prisma.$pgmq.read('my-queue', 30, 5, { priority: 'high' });
readWithPoll(queueName, vt, qty?, maxPollSeconds?, pollIntervalMs?, conditional?)Read messages with polling (wait for messages if none available).
// Poll for up to 10 seconds, checking every 500ms
const messages = await prisma.$pgmq.readWithPoll('my-queue', 30, 1, 10, 500);
pop(queueName)Read and immediately delete a message (atomic operation).
const messages = await prisma.$pgmq.pop('my-queue');
deleteMessage(queueName, msgId)Delete a specific message.
const deleted = await prisma.$pgmq.deleteMessage('my-queue', 123);
deleteBatch(queueName, msgIds)Delete multiple messages.
const deletedIds = await prisma.$pgmq.deleteBatch('my-queue', [123, 124, 125]);
archive(queueName, msgId)Archive a message (move to archive table).
const archived = await prisma.$pgmq.archive('my-queue', 123);
archiveBatch(queueName, msgIds)Archive multiple messages.
const archivedIds = await prisma.$pgmq.archiveBatch('my-queue', [123, 124, 125]);
createQueue(queueName)Create a new queue.
await prisma.$pgmq.createQueue('my-new-queue');
createPartitionedQueue(queueName, partitionInterval?, retentionInterval?)Create a partitioned queue for high-throughput scenarios.
await prisma.$pgmq.createPartitionedQueue('high-volume-queue', '10000', '100000');
createUnloggedQueue(queueName)Create an unlogged queue (better performance, less durability).
await prisma.$pgmq.createUnloggedQueue('temp-queue');
dropQueue(queueName)Delete a queue and all its messages.
const dropped = await prisma.$pgmq.dropQueue('old-queue');
purgeQueue(queueName)Remove all messages from a queue.
const messageCount = await prisma.$pgmq.purgeQueue('my-queue');
setVt(queueName, msgId, vtOffset)Set visibility timeout for a specific message.
const message = await prisma.$pgmq.setVt('my-queue', 123, 60); // 60 seconds
listQueues()Get information about all queues.
const queues = await prisma.$pgmq.listQueues();
console.log(queues); // [{ queue_name: 'my-queue', created_at: ..., is_partitioned: false }]
metrics(queueName)Get metrics for a specific queue.
const metrics = await prisma.$pgmq.metrics('my-queue');
console.log(metrics);
// {
// queue_name: 'my-queue',
// queue_length: 5n,
// newest_msg_age_sec: 10,
// oldest_msg_age_sec: 300,
// total_messages: 1000n,
// scrape_time: 2024-01-01T10:00:00.000Z
// }
metricsAll()Get metrics for all queues.
const allMetrics = await prisma.$pgmq.metricsAll();
Tasktype Task = Record<string, unknown>;
MessageRecordinterface MessageRecord {
msg_id: number;
read_ct: number;
enqueued_at: Date;
vt: Date;
message: Task;
}
QueueMetricsinterface QueueMetrics {
queue_name: string;
queue_length: number;
newest_msg_age_sec: number | null;
oldest_msg_age_sec: number | null;
total_messages: number;
scrape_time: Date;
}
QueueInfointerface QueueInfo {
queue_name: string;
created_at: Date;
is_partitioned: boolean;
is_unlogged: boolean;
}
The library includes both unit and integration tests:
# Run all tests
pnpm test
# Run only unit tests (no database required)
pnpm test:unit
# Run only integration tests (requires database)
pnpm test:db
# Run tests with coverage
pnpm test:coverage
# Watch mode for development
pnpm test:watch
Integration tests require a PostgreSQL database with the PGMQ extension. Set up your test database and configure the connection string in your environment.
Choose visibility timeouts based on your message processing time:
// For quick operations (30 seconds)
const messages = await prisma.$pgmq.read('quick-tasks', 30);
// For longer operations (5 minutes)
const messages = await prisma.$pgmq.read('heavy-tasks', 300);
Always delete or archive messages after successful processing:
const messages = await prisma.$pgmq.read('my-queue', 30, 10);
for (const message of messages) {
try {
await processMessage(message.message);
await prisma.$pgmq.deleteMessage('my-queue', message.msg_id);
} catch (error) {
// Handle error - message will become visible again after timeout
console.error('Failed to process message:', error);
// Optionally archive failed messages
await prisma.$pgmq.archive('my-queue', message.msg_id);
}
}
Group related PGMQ operations in transactions:
await prisma.$pgmq.transaction(async (pgmq) => {
// Send notification message
const notificationId = await pgmq.send('notifications', {
type: 'order-confirmation',
orderId: order.id
});
// Send processing message
const processingId = await pgmq.send('order-processing', {
orderId: order.id,
notificationId
});
return { notificationId, processingId };
});
Regularly check queue metrics to ensure healthy operation:
const metrics = await prisma.$pgmq.metrics('my-queue');
if (metrics.queue_length > 1000) {
console.warn('Queue is getting full:', metrics.queue_length);
}
if (metrics.oldest_msg_age_sec && metrics.oldest_msg_age_sec > 3600) {
console.warn('Messages are getting stale:', metrics.oldest_msg_age_sec);
}
import { PrismaClient } from '@prisma/client';
import { pgmqExtension } from 'prisma-extension-pgmq';
const prisma = new PrismaClient().$extends(pgmqExtension);
// Producer
async function sendTask(taskData: any) {
await prisma.$pgmq.send('work-queue', {
type: 'process-user-data',
data: taskData,
timestamp: Date.now()
});
}
// Consumer
async function processMessages() {
while (true) {
const messages = await prisma.$pgmq.readWithPoll('work-queue', 30, 5, 10, 1000);
for (const message of messages) {
try {
// Process the message
await handleTask(message.message);
// Delete on success
await prisma.$pgmq.deleteMessage('work-queue', message.msg_id);
} catch (error) {
console.error('Task failed:', error);
// Archive failed messages for later analysis
await prisma.$pgmq.archive('work-queue', message.msg_id);
}
}
}
}
async function handleTask(task: any) {
// Your business logic here
console.log('Processing task:', task.type);
}
// Schedule a message for later processing
const futureDate = new Date(Date.now() + 24 * 60 * 60 * 1000); // 24 hours
await prisma.$pgmq.send('scheduled-tasks', {
type: 'send-reminder',
userId: 123,
reminder: 'Your subscription expires tomorrow'
}, futureDate);
// Send high-priority message
await prisma.$pgmq.send('tasks', {
priority: 'high',
type: 'urgent-processing',
data: urgentData
});
// Read high-priority messages first
const highPriorityMessages = await prisma.$pgmq.read('tasks', 30, 10, { priority: 'high' });
if (highPriorityMessages.length === 0) {
// Fall back to normal priority
const normalMessages = await prisma.$pgmq.read('tasks', 30, 10, { priority: 'normal' });
}
git checkout -b feature/amazing-feature)git commit -m 'feat: add amazing feature')git push origin feature/amazing-feature)# Clone the repository
git clone https://github.com/your-username/prisma-extension-pgmq.git
cd prisma-extension-pgmq
# Install dependencies
pnpm install
# Run tests
pnpm test
# Build the library
pnpm build
# Watch for changes during development
pnpm dev
This project is licensed under the MIT License - see the LICENSE file for details.
FAQs
A Prisma Client extension for PostgreSQL Message Queue (PGMQ) providing type-safe message queue operations
We found that prisma-extension-pgmq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Recent coverage mislabels the latest TEA protocol spam as a worm. Here’s what’s actually happening.

Security News
PyPI adds Trusted Publishing support for GitLab Self-Managed as adoption reaches 25% of uploads

Research
/Security News
A malicious Chrome extension posing as an Ethereum wallet steals seed phrases by encoding them into Sui transactions, enabling full wallet takeover.