Queue Runtime
A powerful, flexible multi-queue job processing runtime with global worker pool, automatic worker redistribution, and support for multiple queue drivers.
Features
Core Features
- ✅ Multi-queue support - Manage multiple job queues independently
- ✅ Global worker pool - Shared worker pool across all queues with
maxWorkers limit
- ✅ Per-queue concurrency limits - Control concurrency for each queue
- ✅ Worker redistribution - Automatically redistribute idle workers to busy queues
- ✅ Rebalancing on release - Rebalance workers when queues become idle
- ✅ Graceful shutdown - Wait for running jobs to complete before shutdown
Error Handling
- ✅ Retry mechanism - Automatic retry with exponential backoff
- ✅ Dead Letter Queue (DLQ) - Failed jobs after max attempts sent to DLQ
- ✅ Error tracking - Track and report errors with detailed statistics
Queue Drivers
- ✅ Redis - Production-ready Redis driver (default)
- ✅ RabbitMQ - RabbitMQ driver (optional dependency)
- ✅ AWS SQS - AWS SQS driver (optional dependency)
Installation
npm install queue-runtime
Optional Dependencies
For RabbitMQ support:
npm install amqplib @types/amqplib
For AWS SQS support:
npm install @aws-sdk/client-sqs
Quick Start
import { RedisDriver, QueueRuntime } from 'queue-runtime';
const driver = new RedisDriver('redis://localhost:6379');
await driver.connect();
const runtime = new QueueRuntime(driver, {
maxWorkers: 15,
enableRedistributeIdleWorkers: true,
enableRebalanceOnRelease: true,
pollInterval: 50,
});
runtime.registerJob({
queueName: 'send-email',
concurrency: 8,
maxAttempts: 3,
handler: async (payload) => {
console.log('Sending email to:', payload.to);
},
});
await runtime.enqueue('send-email', { to: 'user@example.com' });
runtime.start();
Configuration
QueueRuntimeConfig
interface QueueRuntimeConfig {
maxWorkers: number;
enableRedistributeIdleWorkers?: boolean;
enableRebalanceOnRelease?: boolean;
pollInterval?: number;
errorHandler?: ErrorHandlerConfig;
}
JobDefinition
interface JobDefinition {
queueName: string;
concurrency: number;
handler: (payload: any) => Promise<any>;
maxAttempts?: number;
retryBackoffMs?: number;
retryBackoffMultiplier?: number;
maxRetryDelayMs?: number;
dlqEnabled?: boolean;
}
Queue Drivers
Redis Driver
import { RedisDriver } from 'queue-runtime/drivers/redis';
const driver = new RedisDriver('redis://localhost:6379');
await driver.connect();
RabbitMQ Driver
import { RabbitMQDriver } from 'queue-runtime';
const driver = new RabbitMQDriver({
url: 'amqp://localhost',
durable: true,
prefetch: 1,
});
await driver.connect();
AWS SQS Driver
import { SQSDriver } from 'queue-runtime';
const driver = new SQSDriver({
region: 'us-east-1',
accessKeyId: 'your-key',
secretAccessKey: 'your-secret',
});
await driver.connect();
Memory Driver (Testing)
import { MemoryDriver } from 'queue-runtime';
const driver = new MemoryDriver();
await driver.connect();
Statistics & Monitoring
const stats = runtime.getStats();
console.log(stats);
const errors = runtime.getErrorReports(10);
Error Handling
Jobs automatically retry on failure with exponential backoff. After max attempts, jobs are sent to a Dead Letter Queue (DLQ).
runtime.registerJob({
queueName: 'process-order',
concurrency: 5,
maxAttempts: 3,
retryBackoffMs: 1000,
retryBackoffMultiplier: 2,
maxRetryDelayMs: 30000,
dlqEnabled: true,
handler: async (payload) => {
},
});
Worker Redistribution
The runtime automatically redistributes idle workers to busy queues:
- Idle Worker Redistribution: When total concurrency < maxWorkers, idle workers are distributed evenly across queues
- Rebalance on Release: When a queue releases all workers, they're redistributed to other active queues
License
MIT
Requirements