New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

workmatic

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

workmatic

A persistent job queue for Node.js using SQLite

latest
Source
npmnpm
Version
1.0.5
Version published
Weekly downloads
16
-50%
Maintainers
1
Weekly downloads
 
Created
Source

Workmatic

A persistent job queue for Node.js using SQLite. Simple, reliable, and zero external dependencies beyond SQLite.

Why?

I love fastq - it's fast, simple, and has a great API. But it's in-memory only, which can be frustrating when you need jobs to survive process restarts or crashes.

Workmatic combines the simplicity of fastq with SQLite persistence. No Redis, no external services - just a single file that keeps your jobs safe. Perfect for small to medium workloads where you want durability without infrastructure complexity.

Features

  • Persistent: Jobs survive restarts via SQLite storage
  • Concurrent: Process multiple jobs simultaneously with fastq
  • Priority: Process high-priority jobs first
  • Delayed: Schedule jobs to run in the future
  • Retries: Automatic retries with exponential backoff
  • Lease-based: Prevents double processing with lease locks
  • State Persistence: Optionally persist worker state across restarts
  • Dashboard: Built-in web UI for monitoring
  • Type-safe: Full TypeScript support with Kysely

Installation

npm install workmatic

Quick Start

import { createDatabase, createClient, createWorker } from 'workmatic';

// Create database (use file path for persistence)
const db = createDatabase({ filename: './jobs.db' });

// Create a client to add jobs
const client = createClient({ db, queue: 'emails' });

// Add a job
const { id } = await client.add({ 
  to: 'user@example.com', 
  subject: 'Hello!' 
});
console.log(`Job created: ${id}`);

// Create a worker to process jobs
const worker = createWorker({
  db,
  queue: 'emails',
  concurrency: 4,
});

// Define the processor
worker.process(async (job) => {
  console.log(`Sending email to ${job.payload.to}`);
  await sendEmail(job.payload);
});

// Start processing
worker.start();

API Reference

createDatabase(options)

Initialize the database connection and schema.

const db = createDatabase({
  // Option 1: File path (creates or opens existing)
  filename: './jobs.db',
  
  // Option 2: In-memory (for testing)
  filename: ':memory:',
  
  // Option 3: Existing better-sqlite3 instance
  db: existingSqliteInstance,
});

createClient(options)

Create a client for adding jobs to a queue.

const client = createClient({
  db,                    // Required: Database instance
  queue: 'default',      // Optional: Queue name (default: 'default')
});

client.add(payload, options?)

Add a job to the queue.

const { ok, id } = await client.add(
  { email: 'user@example.com' },  // Payload (must be JSON-serializable)
  {
    priority: 0,      // Lower = higher priority (default: 0)
    delayMs: 5000,    // Delay before job becomes available (default: 0)
    maxAttempts: 3,   // Max retry attempts (default: 3)
  }
);

client.stats()

Get job statistics for the queue.

const stats = await client.stats();
// { ready: 5, running: 2, done: 100, failed: 0, dead: 1, total: 108 }

client.clear(options?)

Clear all jobs from the queue.

// Clear all jobs
const deleted = await client.clear();
console.log(`Deleted ${deleted} jobs`);

// Clear only jobs with specific status
const deleted = await client.clear({ status: 'done' });
console.log(`Deleted ${deleted} done jobs`);

createWorker(options)

Create a worker to process jobs from a queue.

const worker = createWorker({
  db,                     // Required: Database instance
  queue: 'default',       // Optional: Queue name (default: 'default')
  concurrency: 1,         // Optional: Parallel job count (default: 1)
  leaseMs: 30000,         // Optional: Job lease duration in ms (default: 30000)
  pollMs: 1000,           // Optional: Poll interval when idle (default: 1000)
  timeoutMs: 60000,       // Optional: Job execution timeout in ms (default: none)
  backoff: (n) => 1000 * Math.pow(2, n),  // Optional: Retry backoff function
  persistState: false,    // Optional: Persist worker state to database (default: false)
  autoRestore: true,      // Optional: Auto-restore state on creation (default: true)
});

worker.process(fn)

Set the job processor function.

worker.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  console.log(`Payload:`, job.payload);
  console.log(`Attempt ${job.attempts + 1} of ${job.maxAttempts}`);
  
  // Do work here
  // Throw an error to trigger retry
});

worker.start()

Start processing jobs.

worker.start();

worker.stop()

Stop processing and wait for current jobs to finish.

await worker.stop();

worker.pause() / worker.resume()

Pause and resume job processing.

worker.pause();   // Stop claiming new jobs
worker.resume();  // Resume claiming jobs

worker.stats()

Get job statistics for the queue.

const stats = await worker.stats();

worker.clear(options?)

Clear all jobs from the queue.

// Clear all jobs in the queue
const deleted = await worker.clear();
console.log(`Deleted ${deleted} jobs`);

// Clear only jobs with specific status
const deleted = await worker.clear({ status: 'dead' });
console.log(`Deleted ${deleted} dead jobs`);

Worker properties

worker.isRunning;  // boolean
worker.isPaused;   // boolean
worker.queue;      // string

Persist State Mode

When persistState: true, the worker's state (running/paused/stopped) is saved to the database. This is useful when you want the worker to remember its state across app restarts.

const worker = createWorker({
  db,
  queue: 'emails',
  persistState: true,   // Save state to database
  autoRestore: true,    // Auto-restore on creation (default: true)
});

worker.process(async (job) => {
  // ...
});

// Start the worker - state is saved as "running"
worker.start();

// Later, stop the worker - state is saved as "stopped"
await worker.stop();

// If the app restarts, the worker will NOT auto-start
// because the last saved state was "stopped"

With autoRestore: false, you can manually control when to restore:

const worker = createWorker({
  db,
  persistState: true,
  autoRestore: false,  // Don't auto-restore
});

worker.process(async (job) => { /* ... */ });

// Manually check and restore saved state
const savedState = await worker.restoreState();
console.log(`Restored state: ${savedState}`); // 'running' | 'paused' | 'stopped' | null

createDashboard(options)

Create a standalone web dashboard server for monitoring and control.

const dashboard = createDashboard({
  db,                    // Required: Database instance
  port: 3000,            // Optional: HTTP port (default: 3000)
  workers: [worker1],    // Optional: Workers to control
});

console.log(`Dashboard at http://localhost:${dashboard.port}`);

// Later, close the server
await dashboard.close();

createDashboardMiddleware(options)

Create an Express-compatible middleware to mount the dashboard on an existing app.

import express from 'express';
import { createDashboardMiddleware } from 'workmatic';

const app = express();

// Mount dashboard at /workmatic
app.use(createDashboardMiddleware({
  db,                        // Required: Database instance
  basePath: '/workmatic',    // Optional: URL prefix (default: '')
  workers: [worker],         // Optional: Workers to control
}));

app.listen(3000);
// Dashboard available at http://localhost:3000/workmatic

Works with any framework that supports Node.js (req, res, next) middleware:

// Fastify
import fastify from 'fastify';
import middie from '@fastify/middie';

const app = fastify();
await app.register(middie);
app.use(createDashboardMiddleware({ db, basePath: '/jobs' }));

// Hono
import { Hono } from 'hono';
import { handle } from 'hono/node-server';

const app = new Hono();
app.use('/workmatic/*', (c) => {
  return new Promise((resolve) => {
    const middleware = createDashboardMiddleware({ db, basePath: '/workmatic' });
    middleware(c.env.incoming, c.env.outgoing, resolve);
  });
});

Job Object

The job object passed to processors has these properties:

interface Job<TPayload> {
  id: string;           // Unique public ID (nanoid)
  queue: string;        // Queue name
  payload: TPayload;    // Your job data
  status: JobStatus;    // 'ready' | 'running' | 'done' | 'failed' | 'dead'
  priority: number;     // Priority value
  attempts: number;     // Current attempt count (starts at 0)
  maxAttempts: number;  // Maximum attempts allowed
  createdAt: number;    // Unix timestamp (ms)
  lastError: string | null;  // Last error message
}

Durability Model

Workmatic provides at-least-once delivery:

  • Jobs are persisted to SQLite before add() returns
  • A job may be processed multiple times if:
    • The worker crashes during processing
    • The lease expires before completion
  • Jobs are only marked done after successful processing

Idempotency Recommendation

Design your job handlers to be idempotent (safe to run multiple times):

worker.process(async (job) => {
  // Check if already processed
  const exists = await db.checkProcessed(job.id);
  if (exists) return;
  
  // Process the job
  await processPayment(job.payload);
  
  // Mark as processed
  await db.markProcessed(job.id);
});

Job Lifecycle

┌─────────┐     add()      ┌─────────┐
│  NEW    │ ─────────────▶ │  READY  │
└─────────┘                └────┬────┘
                                │
                         claim  │
                                ▼
                          ┌─────────┐
                          │ RUNNING │
                          └────┬────┘
                               │
              ┌────────────────┼────────────────┐
              │                │                │
         success          failure          failure
              │          (retries           (max
              │           left)           attempts)
              ▼                │                │
        ┌─────────┐           │                ▼
        │  DONE   │           │          ┌─────────┐
        └─────────┘           │          │  DEAD   │
                              │          └─────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │  READY (retry)  │
                    │  with backoff   │
                    └─────────────────┘

Options Glossary

OptionDefaultDescription
queue'default'Queue name for job isolation
concurrency1Number of jobs to process in parallel
leaseMs30000How long a job is "locked" during processing
pollMs1000How often to check for new jobs when idle
timeoutMsundefinedJob execution timeout (fails job if exceeded)
priority0Job priority (lower = processed first)
delayMs0Delay before job becomes available
maxAttempts3Maximum processing attempts
backoff2^n * 1000Function returning retry delay in ms

Dashboard

The built-in dashboard provides:

  • Real-time job statistics
  • Job list with filtering and pagination
  • Worker status and control (pause/resume)
  • Auto-refresh every 2 seconds

Dashboard Screenshot

Examples

See the examples/ directory:

  • basic.ts - Simple job processing
  • advanced.ts - Priority, delays, retries
  • with-dashboard.ts - Dashboard monitoring

Run examples:

npx tsx examples/basic.ts
npx tsx examples/with-dashboard.ts

Benchmarks

Run performance benchmarks:

# In-memory (fastest, for testing)
npm run bench

# File-based (realistic, persistent)
npm run bench -- --file

Results Comparison

BenchmarkIn-MemoryFile-based
Sequential Insert27,000/s13,000/s
Parallel Insert23,000/s12,000/s
Process (concurrency=1)1,100/s1,100/s
Process (concurrency=4)4,800/s4,800/s
Process (concurrency=8)10,000/s8,300/s
Process (concurrency=16)18,000/s5,700/s
Mixed Insert+Process7,500/s3,500/s
Claim + Process Batch23,600/s11,800/s

Note: File-based performance degrades at high concurrency due to disk I/O. For file-based databases, concurrency=8 is often optimal. Performance varies by hardware.

CLI

Workmatic includes a command-line tool for managing jobs directly from the database file.

# Show job statistics
npx workmatic stats ./jobs.db

# List queues with pause status
npx workmatic queues ./jobs.db

# Pause/resume a queue (workers stop/start claiming new jobs)
npx workmatic pause ./jobs.db emails
npx workmatic resume ./jobs.db emails

# List jobs (with filters)
npx workmatic list ./jobs.db --status=dead --limit=10

# Export jobs to CSV
npx workmatic export ./jobs.db backup.csv
npx workmatic export ./jobs.db --status=failed > failed.csv

# Import jobs from CSV
npx workmatic import ./jobs.db backup.csv

# Delete jobs by status
npx workmatic purge ./jobs.db --status=done

# Retry dead/failed jobs (reset to ready)
npx workmatic retry ./jobs.db --status=dead

CLI Commands

CommandDescription
stats <db>Show job counts by status and queue
queues <db>List all queues with pause status
pause <db> <queue>Pause a queue (workers stop claiming)
resume <db> <queue>Resume a paused queue
list <db>List jobs with optional filters
export <db> [file]Export jobs to CSV (stdout if no file)
import <db> <file>Import jobs from CSV
purge <db> --status=XDelete jobs with specific status
retry <db> --status=XReset jobs to ready status

CLI Options

OptionDescription
--status=<status>Filter by status (ready/running/done/failed/dead)
--queue=<queue>Filter by queue name
--limit=<n>Limit results (default: 100)

Live Pause/Resume

The pause and resume commands work on running workers in real-time. When you pause a queue:

  • Running workers immediately stop claiming new jobs
  • Jobs currently being processed will complete
  • The queue resumes when you run resume

This allows you to manage workers without restarting your application.

CSV Import for AI Workflows

The CSV import feature makes Workmatic particularly useful for AI-powered automation:

public_id,queue,payload,status,priority,run_at,attempts,max_attempts,lease_until,created_at,updated_at,last_error
job_001,emails,"{""to"":""user@example.com"",""template"":""welcome""}",ready,0,1704067200000,0,3,0,1704067200000,1704067200000,
job_002,emails,"{""to"":""other@example.com"",""template"":""reminder""}",ready,5,1704067200000,0,3,0,1704067200000,1704067200000,

Use cases:

  • AI Agents: Tools like Claude, GPT, or custom agents can generate CSV files with batch jobs. Simply ask an AI to "create 100 email jobs for these users" and import the result.
  • Spreadsheet workflows: Edit jobs in Excel/Google Sheets, export to CSV, and import into the queue.
  • Migration: Move jobs between environments or recover from backups.
  • Testing: Generate test datasets with specific job configurations.
  • Bulk operations: Create thousands of jobs without writing code.
# AI generates jobs.csv, then:
npx workmatic import ./jobs.db jobs.csv

# Or pipe directly from another tool:
cat ai-generated-jobs.csv | npx workmatic import ./jobs.db /dev/stdin

The simple CSV format means any tool that can output text can create jobs for your queue.

Architecture

┌──────────────────────────────────────────────────────────┐
│                        Your App                          │
├────────────────────┬─────────────────────────────────────┤
│      Client        │              Worker                 │
│  ┌─────────────┐   │   ┌─────────────┐  ┌─────────────┐  │
│  │   add()     │   │   │   pump()    │  │   fastq     │  │
│  │   stats()   │   │   │   claim()   │  │   pool      │  │
│  └─────────────┘   │   └─────────────┘  └─────────────┘  │
├────────────────────┴─────────────────────────────────────┤
│                    Kysely (Query Builder)                │
├──────────────────────────────────────────────────────────┤
│                  better-sqlite3 (SQLite)                 │
├──────────────────────────────────────────────────────────┤
│                      jobs.db (File)                      │
└──────────────────────────────────────────────────────────┘

License

MIT

Keywords

job-queue

FAQs

Package last updated on 24 Jan 2026

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts