

A high performance job queue for Node.js and PostgreSQL. With support for alternative database backends. Written in
TypeScript. Also available for Perl.
import Minion from '@minionjs/core';
const minion = new Minion('postgres://user:password@localhost:5432/database');
await minion.update();
minion.addTask('somethingSlow', async (job, ...args) => {
console.log('This is a background worker process.');
});
await minion.enqueue('somethingSlow', ['foo', 'bar']);
await minion.enqueue('somethingSlow', [1, 2, 3], {priority: 5});
await minion.enqueue('somethingSlow', ['foo', 'bar']);
await minion.performJobs();
const worker = minion.worker();
worker.status.jobs = 12;
await worker.start();
Features
- Multiple named queues
- Priorities
- High priority fast lane
- Delayed jobs
- Job dependencies
- Job progress
- Job results
- Retries with backoff
- Rate limiting
- Unique jobs
- Expiring jobs
- Statistics
- Distributed workers
- Parallel processing
- Remote control
- Multiple backends (such as PostgreSQL)
- mojo.js admin ui
Job Queue
Job queues allow you to process time and/or computationally intensive tasks in background processes, outside of the
request/response lifecycle of web applications. Among those tasks you'll commonly find image resizing, spam filtering,
HTTP downloads, building tarballs, warming caches and basically everything else you can imagine that's not super fast.
Web Applications +--------------+ Minion
|- Node.js [1] enqueue job -> | | -> dequeue job |- Worker [1]
|- Node.js [2] | PostgreSQL | |- Worker [2]
|- Node.js [3] retrieve result <- | | <- store result |- Worker [3]
+- Node.js [4] +--------------+ |- Worker [4]
+- Worker [5]
They are not to be confused with time based job schedulers, such as cron or systemd timers. Both serve very different
purposes, and cron jobs are in fact commonly used to enqueue Minion jobs that need to follow a schedule. For example
to perform regular maintenance tasks.
Consistency
Every new job starts out as inactive
, then progresses to active
when it is dequeued by a worker, and finally ends
up as finished
or failed
, depending on its result. Every failed
job can then be retried to progress back to the
inactive
state and start all over again.
+----------+
| |
+-----> | finished |
+----------+ +--------+ | | |
| | | | | +----------+
| inactive | -------> | active | ------+
| | | | | +----------+
+----------+ +--------+ | | |
+-----> | failed | -----+
^ | | |
| +----------+ |
| |
+----------------------------------------------------------------+
The system is eventually consistent and will preserve job results for as long as you like, depending on the value of
the minion.removeAfter
property. But be aware that failed
results are preserved indefinitely, and need to be
manually removed by an administrator if they are out of automatic retries.
While individual workers can fail in the middle of processing a job, the system will detect this and ensure that no job
is left in an uncertain state, depending on the value of the minion.missingAfter
property. Jobs that do not get
processed after a certain amount of time, will be considered stuck and fail automatically, depending on the value of
the minion.stuckAfter
property. So an admin can take a look and resolve the issue.
Examples
This distribution also contains a great example application you can use for inspiration. The
link checker will show you how to integrate
background jobs into well-structured mojo.js applications.
API
Minion uses a PostgreSQL backend by default, but allows for 3rd party implementations of alternative backends. See the
PgBackend class and its
tests for inspiration.
const minion = new Minion('postgres://user:password@localhost:5432/database', {
missingAfter: 1800000,
removeAfter: 172800000,
stuckAfter: 172800000
});
const minion = new Minion('sqlite:test.db', {backendClass: SQLiteBackend});
Enqueue
New jobs are created with the minion.enqueue()
method, which requires a task name to tell the worker what kind of
workload the job represents, an array with job arguments, and an object with optional features to use for processing
this job. Every newly created job has a unique id that can be used to check its current status.
const jobId = await minion.enqueue('task', ['arg1', 'arg2', 'arg3'], {
attempts: 5,
delay: 5000,
expire: 10000,
lax: true,
notes: {foo: 'bar'},
parents: [23, 24, 25],
priority: 9,
queue: 'important'
});
Tasks
Tasks are created with minion.addTask()
, and are async functions that represent the individual workloads workers can
perform. Not all workers need to have the same tasks, but it is recommended for easier to maintain code. If you want to
route individual jobs to specific workers it is better to use named queues for that.
minion.addTask('somethingSlow', async job => {
console.log('This is a background worker process.');
});
minion.addTask('somethingWithResult', async (job, num1, num2) => {
const rersult = num1 + num2;
await job.finish(result);
});
Jobs
Individual jobs are represented as instances of the Job
class, which are the first argument passed to all task
functions. To check the current status of a specific job you can use the minion.job()
method.
const job = await minion.job(23);
const jobId = job.id;
const task = job.task;
const args = job.args;
const retries = job.retries;
const info = await job.info();
const attempts = info.attempts;
const children = info.children;
const created = info.created;
const delayed = info.delayed;
const expires = info.expires;
const finished = info.finished;
const lax = info.lax;
const notes = info.notes;
const parents = info.parents;
const priority = info.priority;
const queue = info.queue;
const result = info.result;
const retried = info.retried;
const started = info.started;
const state = info.state;
const time = info.time;
const worker = info.worker;
const success = await job.note({just: 'a note', another: ['note'], foo: null});
const success = await job.remove();
const success = await job.finish('Huge success!');
const success = await job.fail('Something went wrong!');
Every job still in the database can be retried at any time, this is also the only way to change most of the available
processing options. A worker already processing this job will not be able to assign a result afterwards, but it will
not stop processing.
const success = await job.retry({
attempts: 3,
delay: 3000,
expire: 5000,
lax: false,
parents: [23, 25],
priority: 5,
queue: 'unimportant'
});
The iterator API allows you to search through all jobs currently in the database.
const jobs = minion.jobs({
ids: [23, 24],
notes: ['foo', 'bar'],
queues: ['important', 'unimportant'],
states: ['inactive', 'active'],
tasks: ['foo', 'bar']
});
const total = await jobs.total();
for await (const info of jobs) {
const {id, state} = info;
console.log(`${id}: ${state}`);
}
Locks
Named locks are a tool that can be used for many things, inside and outside of task functions. They expire
automatically after a certain amont of time in milliseconds. You can release the lock manually with minion.unlock()
to limit concurrency, or let it expire fro rate limiting.
const success = await minion.lock('fragile_backend_service', 5000, {
limit: 5
});
const success = await minion.isLocked('fragile_backend_service');
const success = await minion.unlock('fragile_backend_service');
The most common use for named locks is limiting access to shared resources.
minion.addTask('do_unique_stuff', async job => {
if (await minion.lock('fragile_web_service', 7200000) !== true) {
await minion.finish('Previous job still active')
return;
}
...
await minion.unlock('fragile_web_service');
});
minion.addTask('do_concurrent_stuff', async job => {
while (await minion.lock('some_web_service', 60000, {limit: 5}) !== true) {
await sleep(1000);
}
...
await minion.unlock('some_web_service');
});
minion.addTask('do_rate_limit_stuff', async job => {
if (await minion.lock('another_web_service', 360000, {limit: 100}) !== true) {
await minion.retry({delay: 3600000})
return;
}
...
});
An expiration time of 0
can be used to check if a named lock could have been acquired without creating one.
const success = await $minion.lock('foo', 0);
Utilities
const success = await minion.foreground(23);
const success = await minion.broadcast('jobs', [0], [23]);
await minion.reset({
all: true,
locks: true
});
mojo.js
You can use Minion as a standalone job queue, or integrate it into mojo.js applications with
minionPlugin
.
import {minionPlugin} from '@minionjs/core';
import mojo from '@mojojs/core';
export const app = mojo();
app.plugin(minionPlugin, {config: 'postgres://user:password@localhost:5432/database'});
app.models.minion.addTask('poke_mojo', async job => {
await job.app.ua.get('mojolicious.org');
job.app.log.debug('We have poked mojolicious.org for a visitor');
});
app.get('/', async ctx => {
await ctx.models.minion.enqueue('poke_mojo');
await ctx.render({text: 'We will poke mojolicious.org for you soon.'});
});
app.start();
Background worker processes are usually started with the minion-worker
command, which becomes automatically available
when an application loads minionPlugin
.
$ node index.js minion-worker
Jobs can be managed right from the command line with the minion-job
command.
$ node index.js minion-job
You can also add an admin ui to your application by loading minionAdminPlugin
. Just make sure to secure access before
making your application publicly accessible.
import {minionPlugin, minionAdminPlugin} from '@minionjs/core';
import mojo from '@mojojs/core';
export const app = mojo();
const minionPrefix = app.any('/minion');
app.plugin(minionPlugin, {config: 'postgres://user:password@localhost:5432/database'});
app.plugin(minionAdminPlugin, {route: minionPrefix});
app.start();
Deployment
To manage background worker processes with systemd, you can use a unit configuration file like this.
[Unit]
Description=My mojo.js application worker
After=postgresql.service
[Service]
Type=simple
ExecStart=NODE_ENV=production node /home/sri/myapp/myapp.js minion-worker
KillMode=process
[Install]
WantedBy=multi-user.target
Installation
All you need is Node.js 16.0.0 (or newer).
$ npm install @minionjs/core
Support
If you have any questions the documentation might not yet answer, don't hesitate to ask in the
Forum, on Matrix, or
IRC.