
Security News
Another Round of TEA Protocol Spam Floods npm, But It’s Not a Worm
Recent coverage mislabels the latest TEA protocol spam as a worm. Here’s what’s actually happening.
prisma-pgmq
Advanced tools
A TypeScript library that provides type-safe methods for PostgreSQL Message Queue (PGMQ) operations in your Prisma-based applications.
npm install prisma-pgmq
# or
pnpm add prisma-pgmq
# or
yarn add prisma-pgmq
Enabling the PGMQ extension via Prisma
You can manage PostgreSQL extensions (including PGMQ) directly in your Prisma schema using the
postgresqlExtensionspreview feature. Add the extension to yourdatasourceblock inschema.prisma:generator client { provider = "prisma-client-js" previewFeatures = ["postgresqlExtensions"] } datasource db { provider = "postgresql" url = env("DATABASE_URL") extensions = [pgmq] }For more details, see the Prisma documentation on PostgreSQL extensions.
import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-pgmq';
const prisma = new PrismaClient();
// Create a queue
await pgmq.createQueue(prisma, 'my-work-queue');
// Send a message
await pgmq.send(prisma, 'my-work-queue', {
userId: 123,
action: 'send-email',
email: 'user@example.com'
});
send(tx, queueName, message, delay?)Send a single message to a queue.
const msgId = await pgmq.send(tx, 'my-queue', { data: 'hello' });
// Send with delay (seconds)
const msgId = await pgmq.send(tx, 'my-queue', { data: 'hello' }, 30);
// Send with specific time
const msgId = await pgmq.send(
tx,
'my-queue',
{ data: 'hello' },
new Date('2024-01-01T10:00:00Z')
);
sendBatch(tx, queueName, messages, delay?)Send multiple messages to a queue in a single operation.
const msgIds = await pgmq.sendBatch(tx, 'my-queue', [
{ id: 1, data: 'message 1' },
{ id: 2, data: 'message 2' },
{ id: 3, data: 'message 3' }
]);
read(tx, queueName, vt, qty?, conditional?)Read messages from a queue with visibility timeout.
// Read up to 5 messages with 30 second visibility timeout
const messages = await pgmq.read(tx, 'my-queue', 30, 5);
// Read with conditional filtering
const messages = await pgmq.read(tx, 'my-queue', 30, 5, { priority: 'high' });
readWithPoll(tx, queueName, vt, qty?, maxPollSeconds?, pollIntervalMs?, conditional?)Read messages with polling (wait for messages if none available).
// Poll for up to 10 seconds, checking every 500ms
const messages = await pgmq.readWithPoll(tx, 'my-queue', 30, 1, 10, 500);
pop(tx, queueName)Read and immediately delete a message (atomic operation).
const messages = await pgmq.pop(tx, 'my-queue');
deleteMessage(tx, queueName, msgId)Delete a specific message.
const deleted = await pgmq.deleteMessage(tx, 'my-queue', 123);
deleteBatch(tx, queueName, msgIds)Delete multiple messages.
const deletedIds = await pgmq.deleteBatch(tx, 'my-queue', [123, 124, 125]);
archive(tx, queueName, msgId)Archive a message (move to archive table).
const archived = await pgmq.archive(tx, 'my-queue', 123);
archiveBatch(tx, queueName, msgIds)Archive multiple messages.
const archivedIds = await pgmq.archiveBatch(tx, 'my-queue', [123, 124, 125]);
createQueue(tx, queueName)Create a new queue.
await pgmq.createQueue(tx, 'my-new-queue');
createPartitionedQueue(tx, queueName, partitionInterval?, retentionInterval?)Create a partitioned queue for high-throughput scenarios.
await pgmq.createPartitionedQueue(tx, 'high-volume-queue', '10000', '100000');
createUnloggedQueue(tx, queueName)Create an unlogged queue (better performance, less durability).
await pgmq.createUnloggedQueue(tx, 'temp-queue');
dropQueue(tx, queueName)Delete a queue and all its messages.
const dropped = await pgmq.dropQueue(tx, 'old-queue');
purgeQueue(tx, queueName)Remove all messages from a queue.
const messageCount = await pgmq.purgeQueue(tx, 'my-queue');
setVt(tx, queueName, msgId, vtOffset)Set visibility timeout for a specific message.
const message = await pgmq.setVt(tx, 'my-queue', 123, 60); // 60 seconds
listQueues(tx)Get information about all queues.
const queues = await pgmq.listQueues(tx);
console.log(queues); // [{ queue_name: 'my-queue', created_at: ..., is_partitioned: false }]
metrics(tx, queueName)Get metrics for a specific queue.
const metrics = await pgmq.metrics(tx, 'my-queue');
console.log(metrics);
// {
// queue_name: 'my-queue',
// queue_length: 5,
// newest_msg_age_sec: 10,
// oldest_msg_age_sec: 300,
// total_messages: 1000,
// scrape_time: 2024-01-01T10:00:00.000Z
// }
metricsAll(tx)Get metrics for all queues.
const allMetrics = await pgmq.metricsAll(tx);
Tasktype Task = Record<string, unknown>;
MessageRecordinterface MessageRecord {
msg_id: number;
read_ct: number;
enqueued_at: Date;
vt: Date;
message: Task;
}
QueueMetricsinterface QueueMetrics {
queue_name: string;
queue_length: number;
newest_msg_age_sec: number | null;
oldest_msg_age_sec: number | null;
total_messages: number;
scrape_time: Date;
}
QueueInfointerface QueueInfo {
queue_name: string;
created_at: Date;
is_partitioned: boolean;
is_unlogged: boolean;
}
import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-pgmq';
const prisma = new PrismaClient();
// Producer
async function sendTask(taskData: any) {
await pgmq.send(prisma, 'work-queue', {
type: 'process-user-data',
data: taskData,
timestamp: Date.now()
});
}
// Consumer
async function processMessages() {
const messages = await pgmq.readWithPoll(prisma, 'work-queue', 30, 5, 10, 1000);
for (const message of messages) {
try {
// Process the message
await handleTask(message.message);
// Delete on success
await pgmq.deleteMessage(prisma, 'work-queue', message.msg_id);
} catch (error) {
console.error('Task failed:', error);
// Archive failed messages for later analysis
await pgmq.archive(prisma, 'work-queue', message.msg_id);
}
}
}
async function handleTask(task: any) {
// Your business logic here
console.log('Processing task:', task.type);
}
// Schedule a message for later processing
const futureDate = new Date(Date.now() + 24 * 60 * 60 * 1000); // 24 hours
await pgmq.send(prisma, 'scheduled-tasks', {
type: 'send-reminder',
userId: 123,
reminder: 'Your subscription expires tomorrow'
}, futureDate);
git checkout -b feature/amazing-feature)git commit -m 'feat: add amazing feature')git push origin feature/amazing-feature)# Clone the repository
git clone https://github.com/dvlkv/prisma-pgmq.git
cd prisma-pgmq
# Install dependencies
pnpm install
# Run tests
pnpm test
# Build the library
pnpm build
# Watch for changes during development
pnpm dev
This project is licensed under the MIT License - see the LICENSE file for details.
FAQs
A Prisma PGMQ implementation providing type-safe message queue operations
The npm package prisma-pgmq receives a total of 54 weekly downloads. As such, prisma-pgmq popularity was classified as not popular.
We found that prisma-pgmq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Recent coverage mislabels the latest TEA protocol spam as a worm. Here’s what’s actually happening.

Security News
PyPI adds Trusted Publishing support for GitLab Self-Managed as adoption reaches 25% of uploads

Research
/Security News
A malicious Chrome extension posing as an Ethereum wallet steals seed phrases by encoding them into Sui transactions, enabling full wallet takeover.