
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
AnQueue is a robust, lightweight task queue system for Node.js that discovers task executors from a directory, executes tasks in isolated worker processes, optionally persists state to a database, and provides comprehensive retry/validation hooks with automatic worker management.
npm install anqueue
import Queue, { Task, PrismaAdapter } from "anqueue";
import { PrismaClient } from "@prisma/client";
const prisma = new PrismaClient();
// Point the queue at your task executors directory
const queue = new Queue("./tasks", {
db: new PrismaAdapter(prisma),
workerPrefix: "MyAppWorker",
maxWorkers: 2
});
// Initialize the queue (spawns workers and registers executors)
await queue.init();
// Start automatic task processing every 5 seconds
queue.runAutomatically(5);
// Create and enqueue a task
const task = new Task({
name: "Send Welcome Email",
type: "test-task", // matches file ./tasks/test-task.ts
description: "Send welcome email to new user",
priority: 1,
data: {
email: new Email({ to: [], from: "", cc: [] })
},
runAt: new Date(new Date.now() + 5000) // Delay execution by 5 seconds
});
queue.add(task);
Each file in your task directory (e.g., ./tasks) must export a default class extending TaskExecutor. The executor's type is derived from the filename.
import { Task, TaskExecutor, TaskValidationRule, TaskResult, hasProperty } from "anqueue";
import Email from "../src/models/Email.js";
export interface TestTaskData {
email: Email
}
export interface TestTaskResult {
data?: boolean
}
export default class TestTask extends TaskExecutor<TestTaskData, TestTaskResult> {
retrySchema(): string[] {
return [];
}
validationSchema(): TaskValidationRule[] {
return [
(task: Task<any>) => {
return hasProperty(task.data, "email") && task.data.email instanceof Email;
},
];
}
async exec(_task: Task<TestTaskData>): Promise<TaskResult<TestTaskResult>> {
return { processed: true }
}
async onComplete(_task: Task<TestTaskData>, _result: TaskResult<TestTaskResult>, _db: any): Promise<void> {
// Optional: Handle successful completion
}
async onFailure(_task: Task<TestTaskData>, _result: TaskResult<TestTaskResult> | null, _error: unknown, _db: any): Promise<void> {
// Optional: Handle task failure
}
async saveResult(_task: Task<TestTaskData>, _result: TaskResult<TestTaskResult>, _db: any): Promise<void> {
// Optional: Save result to database
}
}
import { Task, TaskExecutor, TaskResult, TaskValidationRule } from "anqueue";
import { PrismaClient } from '@prisma/client';
import sharp from "sharp";
export interface CompressImageTaskData {
image: File;
sizes: { width: number; height: number }[];
}
export interface CompressImageTaskResult {
files?: File[];
}
export default class CompressImageTask extends TaskExecutor {
override retrySchema(): string[] {
return [];
}
override validationSchema(): TaskValidationRule[] {
return [
(task) => {
const t = task as Task<CompressImageTaskData>;
return typeof t.data?.image !== "undefined";
},
(task) => {
const t = task as Task<CompressImageTaskData>;
return Array.isArray(t.data?.sizes);
},
];
}
override async exec(task: Task<CompressImageTaskData>) {
try {
const image = task.data.image;
const sizes = task.data.sizes;
const progressRate = 100 / sizes.length;
const processedFiles = [];
// Update progress as work is done
for (let i = 0; i < sizes.length; i++) {
// Simulate work
await task.sleep(200);
const compressedImage = new File([], image.name);
processedFiles.push(compressedImage);
task.updateProgress((i + 1) * progressRate);
}
return {
files: processedFiles,
processed: true
};
} catch (error) {
task.addError(error instanceof Error ? error : new Error(String(error)));
throw error;
}
}
override async onComplete(task: Task<CompressImageTaskData>) {
// Handle completion
}
override async saveResult(task: Task<CompressImageTaskData>, result: CompressImageTaskResult, db: PrismaClient) {
// Save to database
}
}
queue.init() spawns worker processes and initializes the executor registry from your task directoryqueue.add(task) enqueues a Task instance in memoryqueue.runAutomatically(seconds) periodically:
Task instances and run executor hooks:
validationSchema() → each validator function must return trueexec(task) → must return { processed: boolean, ... }onComplete(task, result, db) on successonFailure(task, result, error, db) on errorexec() throws, the task decides whether to retry based on retrySchema() patternsTasks support various configuration options:
const task = new Task({
name: "Task Name",
type: "task-type",
description: "Task description",
priority: 1, // Higher numbers = higher priority
maxRetries: 3, // Default: 3
delay: 1000, // Delay before execution (ms)
timeout: 30000, // Execution timeout (ms, default: 30s)
runAt: new Date(), // Schedule for specific time
data: { /* your data */ },
userId: 123, // Optional user association
metadata: { /* custom metadata */ }
});
const queue = new Queue("./tasks", {
workerPrefix: "MyAppWorker", // Default: "anqueue-worker-"
maxWorkers: 3, // Default: 3
// Workers automatically handle up to 3 concurrent tasks each
});
AnQueue supports optional database integration for persistent task storage and tracking. By connecting a database adapter (such as the provided PrismaAdapter), tasks are automatically saved, updated, and synchronized between memory and your database. This enables reliable task recovery, auditing, and coordination across multiple processes or restarts.
CREATE TABLE tasks (
uid VARCHAR PRIMARY KEY,
type VARCHAR NOT NULL,
name VARCHAR,
description VARCHAR,
data VARCHAR,
status VARCHAR NOT NULL,
data JSON,
error TEXT,
started_at TIMESTAMP,
finished_at TIMESTAMP
);
When using a database adapter, tasks are automatically persisted and updated. Pass your generated PrismaClient to the adapter to enable task persistence:
Note: Currently only an adapter for prisma is available, more are planned in the future.
const queue = new Queue("./tasks", {
db: new PrismaAdapter(new PrismaClient())
});
constructor(taskDirectory: string, options?: QueueOptions)init(): Promise<Queue> – spawns workers and registers executorssetDatabase(adaptor: AdapterImplementation): void – set/replace database connectionrunAutomatically(timeoutSeconds: number): Promise<void> – periodic processing looprunTasks(tasks?: Task[]): Promise<void> – send tasks to workers; defaults to all pending in-memory tasks if none specified.scheduleTasks(): Promise<void> – sort in-memory tasks by priorityadd(task: Task): this – add task to queueremove(taskId: string): boolean – remove task by IDcancel(taskId: string): boolean – cancel pending taskgetTask(taskId: string): Task | undefined – get task by IDgetPendingTasks(): Task[] – get all pending tasksgetTaskStatuses(): TaskStatus[] – get status of all tasksclear(): void – clear all tasksconstructor(options: TaskOptions<TData>)uid, name, type, description, status, progress, priority, retryCount, maxRetries, delay, timeout, runAt, data, userId, metadatasleep(ms) – pause executionvalidate(validationSchema) – run validation rulesexecute(executor, retrySchema) – execute with timeout and retry handlingretry() – prepare for retry attemptcancel() – cancel executionupdateProgress(0..100) – update progress percentagegetStatus() – get current task statusreadyToRun() – check if task is ready to executeaddError(error) – add error contextconstructor(taskType: string)validationSchema(): TaskValidationRule[] – validation functionsretrySchema(): string[] – retry patternsexec(task): Promise<TaskResult<R>> – main execution logiconFailure(task, result, error, db) – failure handlingonComplete(task, result, db) – completion handlingsaveResult(task, result, db) – result persistenceANQUEUE_GENERATE_TYPES – Set to "false" to disable automatic type generationMAX_TASK_RETRIES – Default maximum retry attempts (default: 3)TASK_TIMEOUT_MS – Default task timeout in milliseconds (default: 30000)AnQueue automatically generates TypeScript types for your task directory at startup. This is controlled by the ANQUEUE_GENERATE_TYPES environment variable and stores a hash in .anqueue-types.hash inside your task folder.
task.addError(error) to attach contexttask.addError() to provide context for debuggingtask.updateProgress() for long-running tasksonComplete and onFailure hooks for complex tasksThe src directory contains the core components that power AnQueue:
database-adapter.ts: Defines the interface for database interactions, allowing AnQueue to be decoupled from specific ORMs or database clients.task.ts: Represents a single task in the queue, encapsulating its state, data, and lifecycle methods.task-executor.ts: The base class for all task executors, defining the hooks and methods for task execution, validation, and retry logic.task-registry.ts: Manages the discovery and registration of TaskExecutor classes from the specified task directory.task-store.ts: Handles in-memory storage and retrieval of tasks, acting as the central source of truth for pending and active tasks.task-strategies.ts: Contains various strategies for task management, such as scheduling, retry policies, and worker assignment.worker.ts: Defines the individual worker process responsible for executing tasks in isolation.worker-manager.ts: Manages the lifecycle of worker processes, including spawning, monitoring, and restarting them as needed.worker-script.ts: The entry point script executed within each isolated worker process to set up the task execution environment.lib/files.ts: Utility functions for file system operations, primarily used for task executor discovery.lib/util.ts: General utility functions used throughout the AnQueue codebase.MIT
FAQs
An asynchronous node task queue
We found that anqueue demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.