
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
groupmq-plus
Advanced tools
A fast, reliable Redis-backed per-group FIFO queue for Node + TypeScript with guaranteed job ordering and parallel processing across groups.
Website
·
Created by OpenPanel.dev
npm i groupmq ioredis
import Redis from "ioredis";
import { Queue, Worker } from "groupmq";
const redis = new Redis("redis://127.0.0.1:6379");
const queue = new Queue({
redis,
namespace: "orders", // Will be prefixed with 'groupmq:'
jobTimeoutMs: 30_000, // How long before job times out
logger: true, // Enable logging (optional)
});
await queue.add({
groupId: "user:42",
data: { type: "charge", amount: 999 },
orderMs: Date.now(), // or event.createdAtMs
maxAttempts: 5,
});
const worker = new Worker({
queue,
concurrency: 1, // Process 1 job at a time (can increase for parallel processing)
handler: async (job) => {
console.log(`Processing:`, job.data);
},
});
worker.run();
'none', 'scheduler', or 'in-memory' methodsGroupMQ is heavily inspired by BullMQ, a fantastic library and one of the most popular Redis-based job queue libraries for Node.js. We've taken many core concepts and design patterns from BullMQ while adapting them for our specific use case of per-group FIFO processing.
orderMs timestamp-based ordering{ type: 'paint', data: { ... } } | { type: 'repair', data: { ... } }We're grateful to the BullMQ team for their excellent work and the foundation they've provided for the Redis job queue ecosystem.
While GroupMQ is inspired by BullMQ's design and concepts, we have also directly copied some code from BullMQ:
src/async-fifo-queue.ts - This file contains code copied from BullMQ's AsyncFifoQueue implementation. BullMQ's implementation is well-designed and fits our needs perfectly, so we've used it directly rather than reimplementing it.This code is used under the MIT License. The original copyright notice and license can be found at:
Original copyright: Copyright (c) Taskforce.sh and contributors
type QueueOptions = {
redis: Redis; // Redis client instance (required)
namespace: string; // Unique queue name, gets 'groupmq:' prefix (required)
logger?: boolean | LoggerInterface; // Enable logging (default: false)
jobTimeoutMs?: number; // Job processing timeout (default: 30000ms)
maxAttempts?: number; // Default max retry attempts (default: 3)
reserveScanLimit?: number; // Groups to scan when reserving (default: 20)
keepCompleted?: number; // Number of completed jobs to retain (default: 0)
keepFailed?: number; // Number of failed jobs to retain (default: 0)
schedulerLockTtlMs?: number; // Scheduler lock TTL (default: 1500ms)
orderingMethod?: OrderingMethod; // Ordering strategy (default: 'none')
orderingWindowMs?: number; // Time window for ordering (required for non-'none' methods)
orderingMaxWaitMultiplier?: number; // Max grace period multiplier for in-memory (default: 3)
orderingGracePeriodDecay?: number; // Grace period decay factor for in-memory (default: 1.0)
orderingMaxBatchSize?: number; // Max jobs to collect in batch for in-memory (default: 10)
};
type OrderingMethod = 'none' | 'scheduler' | 'in-memory';
Ordering Methods:
'none' - No ordering guarantees (fastest, zero overhead, no extra latency)'scheduler' - Redis buffering for large windows (≥1000ms, requires scheduler, adds latency)'in-memory' - Worker collection for small windows (50-500ms, no scheduler, adds latency per batch)See Ordering Methods for detailed comparison.
type WorkerOptions<T> = {
queue: Queue<T>; // Queue instance to process jobs from (required)
handler: (job: ReservedJob<T>) => Promise<unknown>; // Job processing function (required)
name?: string; // Worker name for logging (default: queue.name)
logger?: boolean | LoggerInterface; // Enable logging (default: false)
concurrency?: number; // Number of jobs to process in parallel (default: 1)
heartbeatMs?: number; // Heartbeat interval (default: Math.max(1000, jobTimeoutMs/3))
onError?: (err: unknown, job?: ReservedJob<T>) => void; // Error handler
maxAttempts?: number; // Max retry attempts (default: queue.maxAttempts)
backoff?: BackoffStrategy; // Retry backoff function (default: exponential with jitter)
enableCleanup?: boolean; // Periodic cleanup (default: true)
cleanupIntervalMs?: number; // Cleanup frequency (default: 60000ms)
schedulerIntervalMs?: number; // Scheduler frequency (default: adaptive)
blockingTimeoutSec?: number; // Blocking reserve timeout (default: 5s)
atomicCompletion?: boolean; // Atomic completion + next reserve (default: true)
stalledInterval?: number; // Check if stalled every N ms (default: 30000)
maxStalledCount?: number; // Fail after N stalls (default: 1)
stalledGracePeriod?: number; // Grace period before considering stalled (default: 0)
};
type BackoffStrategy = (attempt: number) => number; // returns delay in ms
When adding a job to the queue:
await queue.add({
groupId: string; // Required: Group ID for FIFO processing
data: T; // Required: Job payload data
orderMs?: number; // Timestamp for ordering (default: Date.now())
maxAttempts?: number; // Max retry attempts (default: queue.maxAttempts)
jobId?: string; // Custom job ID (default: auto-generated UUID)
delay?: number; // Delay in ms before job becomes available
runAt?: Date | number; // Specific time to run the job
repeat?: RepeatOptions; // Repeating job configuration (cron or interval)
});
type RepeatOptions =
| { every: number } // Repeat every N milliseconds
| { pattern: string }; // Cron pattern (standard 5-field format)
Example with delay:
await queue.add({
groupId: 'user:123',
data: { action: 'send-reminder' },
delay: 3600000, // Run in 1 hour
});
Example with specific time:
await queue.add({
groupId: 'user:123',
data: { action: 'scheduled-report' },
runAt: new Date('2025-12-31T23:59:59Z'),
});
Workers support configurable concurrency to process multiple jobs in parallel from different groups:
const worker = new Worker({
queue,
concurrency: 8, // Process up to 8 jobs simultaneously
handler: async (job) => {
// Jobs from different groups can run in parallel
// Jobs from the same group still run sequentially
},
});
Benefits:
Considerations:
Both Queue and Worker support optional logging for debugging and monitoring:
// Enable default logger
const queue = new Queue({
redis,
namespace: 'orders',
logger: true, // Logs to console with queue name prefix
});
const worker = new Worker({
queue,
logger: true, // Logs to console with worker name prefix
handler: async (job) => { /* ... */ },
});
Custom logger:
Works out of the box with both pino and winston
import type { LoggerInterface } from 'groupmq';
const customLogger: LoggerInterface = {
debug: (msg: string, ...args: any[]) => { /* custom logging */ },
info: (msg: string, ...args: any[]) => { /* custom logging */ },
warn: (msg: string, ...args: any[]) => { /* custom logging */ },
error: (msg: string, ...args: any[]) => { /* custom logging */ },
};
const queue = new Queue({
redis,
namespace: 'orders',
logger: customLogger,
});
What gets logged:
GroupMQ supports simple repeatable jobs using either a fixed interval (every) or a basic cron pattern (pattern). Repeats are materialized by a lightweight scheduler that runs as part of the worker's periodic cleanup cycle.
await queue.add({
groupId: 'reports',
data: { type: 'daily-summary' },
repeat: { every: 5000 }, // run every 5 seconds
});
const worker = new Worker({
queue,
handler: async (job) => {
// process...
},
// IMPORTANT: For timely repeats, run the scheduler frequently
cleanupIntervalMs: 1000, // <= repeat.every (recommended 1–2s for 5s repeats)
});
worker.run();
await queue.add({
groupId: 'emails',
data: { type: 'weekly-digest' },
repeat: { pattern: '0 9 * * 1-5' }, // 09:00 Mon–Fri
});
await queue.removeRepeatingJob('reports', { every: 5000 });
// or
await queue.removeRepeatingJob('emails', { pattern: '0 9 * * 1-5' });
cleanup(), promoteDelayedJobs(), and processRepeatingJobs().schedulerLockTtlMs, default: 1500ms)const queue = new Queue({
redis,
namespace: 'fast',
schedulerLockTtlMs: 50, // Allow fast scheduler lock
});
const worker = new Worker({
queue,
schedulerIntervalMs: 10, // Check every 10ms
cleanupIntervalMs: 100, // Cleanup every 100ms
handler: async (job) => { /* ... */ },
});
⚠️ Fast repeats (< 1s) increase Redis load and should be used sparingly.jobId, preserving per-group FIFO semantics.// Stop worker gracefully - waits for current job to finish
await worker.close(gracefulTimeoutMs);
// Wait for queue to be empty
const isEmpty = await queue.waitForEmpty(timeoutMs);
// Recover groups that might be stuck due to ordering delays
const recoveredCount = await queue.recoverDelayedGroups();
// Job counts and status
const counts = await queue.getJobCounts();
// { active: 5, waiting: 12, delayed: 3, total: 20, uniqueGroups: 8 }
const activeCount = await queue.getActiveCount();
const waitingCount = await queue.getWaitingCount();
const delayedCount = await queue.getDelayedCount();
const completedCount = await queue.getCompletedCount();
const failedCount = await queue.getFailedCount();
// Get job IDs by status
const activeJobIds = await queue.getActiveJobs();
const waitingJobIds = await queue.getWaitingJobs();
const delayedJobIds = await queue.getDelayedJobs();
// Get Job instances by status
const completedJobs = await queue.getCompletedJobs(limit); // returns Job[]
const failedJobs = await queue.getFailedJobs(limit);
// Group information
const groups = await queue.getUniqueGroups(); // ['user:123', 'order:456']
const groupCount = await queue.getUniqueGroupsCount();
const jobsInGroup = await queue.getGroupJobCount('user:123');
// Get specific job
const job = await queue.getJob(jobId); // returns Job instance
// Job manipulation
await queue.remove(jobId);
await queue.retry(jobId); // Re-enqueue a failed job
await queue.promote(jobId); // Promote delayed job to waiting
await queue.changeDelay(jobId, newDelayMs);
await queue.updateData(jobId, newData);
// Scheduler operations
await queue.runSchedulerOnce(); // Manual scheduler run
await queue.promoteDelayedJobs(); // Promote delayed jobs
await queue.recoverDelayedGroups(); // Recover stuck groups
// Cleanup and shutdown
await queue.waitForEmpty(timeoutMs);
await queue.close();
Jobs returned from queue.getJob(), queue.getCompletedJobs(), etc. have these methods:
const job = await queue.getJob(jobId);
// Manipulate the job
await job.remove();
await job.retry();
await job.promote();
await job.changeDelay(newDelayMs);
await job.updateData(newData);
await job.update(newData); // Alias for updateData
// Get job state
const state = await job.getState(); // 'active' | 'waiting' | 'delayed' | 'completed' | 'failed'
// Serialize job
const json = job.toJSON();
// Check worker status
const isProcessing = worker.isProcessing();
// Get current job(s) being processed
const currentJob = worker.getCurrentJob();
// { job: ReservedJob, processingTimeMs: 1500 } | null
// For concurrency > 1
const currentJobs = worker.getCurrentJobs();
// [{ job: ReservedJob, processingTimeMs: 1500 }, ...]
// Get worker metrics
const metrics = worker.getWorkerMetrics();
// { jobsInProgress: 2, lastJobPickupTime: 1234567890, ... }
// Graceful shutdown
await worker.close(gracefulTimeoutMs);
Workers emit events that you can listen to:
worker.on('ready', () => {
console.log('Worker is ready');
});
worker.on('completed', (job: Job) => {
console.log('Job completed:', job.id);
});
worker.on('failed', (job: Job) => {
console.log('Job failed:', job.id, job.failedReason);
});
worker.on('error', (error: Error) => {
console.error('Worker error:', error);
});
worker.on('closed', () => {
console.log('Worker closed');
});
worker.on('graceful-timeout', (job: Job) => {
console.log('Job exceeded graceful timeout:', job.id);
});
// Remove event listeners
worker.off('completed', handler);
worker.removeAllListeners();
GroupMQ provides a BullBoard adapter for visual monitoring and management:
import { createBullBoard } from '@bull-board/api';
import { ExpressAdapter } from '@bull-board/express';
import { BullBoardGroupMQAdapter } from 'groupmq';
import express from 'express';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullBoardGroupMQAdapter(queue, {
displayName: 'Order Processing',
description: 'Processes customer orders',
readOnlyMode: false, // Allow job manipulation through UI
}),
],
serverAdapter,
});
const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3000, () => {
console.log('BullBoard running at http://localhost:3000/admin/queues');
});
GroupMQ uses these Redis keys (all prefixed with groupmq:{namespace}:):
:g:{groupId}, sorted set of job IDs in a group, ordered by score (derived from orderMs and seq):ready, sorted set of group IDs that have jobs available, ordered by lowest job score:job:{jobId}, hash containing job data (id, groupId, data, attempts, status, etc.):lock:{groupId}, string with job ID that currently owns the group lock (with TTL):processing, sorted set of active job IDs, ordered by deadline:processing:{jobId}, hash with processing metadata (groupId, deadlineAt):delayed, sorted set of delayed jobs, ordered by runAt timestamp:completed, sorted set of completed job IDs (for retention):failed, sorted set of failed job IDs (for retention):repeats, hash of repeating job definitions (groupId → config):g:{groupId} and group is in :ready:delayed (scheduled for future):processing and group is locked:completed (retention):failed (retention)The worker runs a continuous loop optimized for both single and concurrent processing:
For concurrency = 1 (sequential):
while (!stopping) {
// 1. Blocking reserve (waits for job, efficient)
const job = await queue.reserveBlocking(timeoutSec);
// 2. Process job synchronously
if (job) {
await processOne(job);
}
// 3. Periodic scheduler run (every schedulerIntervalMs)
await queue.runSchedulerOnce(); // Promotes delayed jobs, processes repeats
}
For concurrency > 1 (parallel):
while (!stopping) {
// 1. Run lightweight scheduler periodically
await queue.runSchedulerOnce();
// 2. Try batch reservation if we have capacity
const capacity = concurrency - jobsInProgress.size;
if (capacity > 0) {
const jobs = await queue.reserveBatch(capacity);
// Process all jobs concurrently (fire and forget)
for (const job of jobs) {
void processOne(job);
}
}
// 3. Blocking reserve for remaining capacity
const job = await queue.reserveBlocking(blockingTimeoutSec);
if (job) {
void processOne(job); // Process async
}
}
Key optimizations:
All critical operations use Lua scripts for atomicity:
enqueue.lua, adds job to group queue, adds group to ready setreserve.lua, finds ready group, pops head job, locks groupreserve-batch.lua, reserves one job from multiple groups atomicallycomplete.lua, marks job complete, unlocks group, re-adds group to ready if more jobscomplete-and-reserve-next.lua, atomic completion + reservation from same groupretry.lua, increments attempts, re-adds job to group with backoff delayremove.lua, removes job from all data structuresWhen a worker reserves a job:
ZRANGE :ready 0 0 gets lowest-score groupPTTL :lock:{groupId} ensures group isn't lockedZPOPMIN :g:{groupId} 1 gets head job atomicallySET :lock:{groupId} {jobId} PX {timeout}:processing sorted set with deadlineZADD :ready {score} {groupId}When a job completes successfully:
DEL :processing:{jobId}, ZREM :processing {jobId}HSET :job:{jobId} status completedZADD :completed {now} {jobId}DEL :lock:{groupId} (only if this job owns the lock)ZCARD :g:{groupId}ZADD :ready {nextScore} {groupId}The critical fix in step 6 ensures that after a job completes, the group becomes available again for other workers to pick up the next job in the queue.
Jobs are ordered using a composite score:
score = (orderMs - baseEpoch) * 1000 + seq
orderMs, user-provided timestamp for event orderingbaseEpoch, fixed epoch timestamp (1704067200000) to keep scores manageableseq, auto-incrementing sequence for tiebreaking (resets daily to prevent overflow)This ensures:
orderMs process firstorderMs process in submission orderconcurrency = 1 (Sequential):
concurrency > 1 (Parallel):
Important: Per-group FIFO ordering is maintained regardless of concurrency level. Multiple jobs from the same group never run in parallel.
When a job fails:
HINCRBY :job:{jobId} attempts 1attempts >= maxAttempts, mark as failed:g:{groupId} with delayIf a job times out (visibility timeout expires):
SET :lock:{groupId} {jobId} PX {timeout}Periodic cleanup runs:
:delayed to waiting when runAt arrives:processing and unlock groupsLatest Benchmarks (MacBook M2, 500 jobs, 4 workers, multi-process):
GroupMQ maintains competitive performance while adding per-group FIFO ordering guarantees:
For detailed benchmark results and comparisons over time, see our Performance Benchmarks page.
Optimizations:
reserveBatch reduces round-trips for concurrent workersContributions are welcome! When making changes:
Requires a local Redis at 127.0.0.1:6379 (no auth).
npm i
npm run build
npm test
Optionally:
docker run --rm -p 6379:6379 redis:7
FAQs
Per-group FIFO queue on Redis with visibility timeouts and retries.
We found that groupmq-plus demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.