simple-worker
A drop-dead simple priority job queue, based on kue
Install
npm install simple-worker
Basic usage
Producer
The producer queues jobs for the consumer to work on later / when processing power is available. You can queue jobs anywhere in your application or have a dedicated process for it, e.g. to schedule jobs at regular time invervals.
import {setup, queueJob} from 'simple-worker'
setup()
queueJob({
name: 'send-email',
title: 'Sending an email to bib',
data: {receiver: 'bib@bob.com'}
})
queueJob({
name: 'check-ping',
title: 'Check the ping of our website',
data: {url: 'http://mywebsite.com'}
schedule: '* * * * *'
})
Consumer / Worker
The consumer registers handler functions for the jobs by name and then works on incoming jobs. This should be a dedicated process, because it may block the event loop.
import {setup, registerJobs, processJobs} from 'simple-worker'
setup()
registerJob('send-email', (job, done) => {
console.log('Sending email an email to ' + job.data.receiver)
done()
})
processJobs()
CLI
With some basic setup you can get a CLI for queueing and executing jobs straight form the command line.
import {setup, registerJob, cli} from 'simple-worker'
setup()
registerJob(...)
cli()
cli({
validNames: ['send-email']
})
Monitoring CLI
With some basic setup you can get a CLI for monitoring jobs straight form the command line.
import {setup, monitoringCli} from 'simple-worker'
setup()
monitoringCli()
Web Interface
You can also easily set up a web interface for the worker, which will show you statistics about running, queued and finished / failed jobs.
import {setup, webInterface} from 'simple-worker'
setup()
webInterface(3000)
webInterface(3000, 'foo', 'bar')
Advanced Usage
Options for setting up the worker
setup
takes an object of options with which you can e.g. customise the connection to redis:
worker.setup({
prefix: 'sw',
redis: {
port: 1234,
host: '10.0.50.20',
password: 'password'
}
})
Options for creating jobs
createJob
takes multiple different options with which you can customise the behaviour:
name
(required) - The identifier of the job, as needed for the registerJob
functiontitle
(required) - A human-readable title for the job, this is the display name in the web interfacedata
- An object with data the job can access via job.data
. Useful for giving parameters to the job. Defaults to {}
.priority
- Any of low
, normal
, medium
, high
and critical
. Determines with which priority the job will get processed. Jobs with a higher priority will always get processed earlier. Defaults to normal
.attempts
- How many times a job may be processed before getting marked as failed. Default: 1
backoff
- How much re-attempts of jobs upon failures are delayed. Default: {delay: 30 * 1000, type: 'exponential'}
schedule
- A schedule of this job, similar to cronjobs. The format is described here. Default: false
ttl
- How long a job may be processed before failing as "timed out" (in ms). Default: 60 * 60 * 1000
delay
- How long a job should be delayed before getting processed (in ms) or a Date
in the future. Default: false
callback
- A function getting called when the job exists with done
. Typical node callback with the structure (err, result)
. Default: noop
Job handler function
registerJob
takes a handler function with a job
object and a done
function. It offers the following functions:
job.log(string)
- Add a job specific log string which gets displayed in the web interfacejob.process(completed, total [, data])
- Update a jobs process which gets displayed in the web interfacedone(new Error('Oh no.'))
- Exit a job with an error (gets passed to the optional callback)done(null, 'Yay!')
- Exit a job with a successful result (gets passed to the optional callback)
Helpers
listJobs(status)
- Lists the current jobs for a specific status (e.g. "inactive")clearJobs(status, [limit])
- Clears jobs of a specific status (e.g. failed) with an optional limit of jobs
Example for clustering
Below is an example of how you might cluster the processing part of this module. Another (recommended) option would be using something like pm2.
import cluster from 'cluster'
import os from 'os'
import worker from 'simple-worker'
const clusterSize = os.cpus().length * 3
if (cluster.isMaster) {
for (let i = 0; i < clusterSize; i++) {
cluster.fork()
}
} else {
worker.setup()
worker.registerJob('some-name', someFunction)
worker.processJobs()
}
Debugging
This module uses debug
,
so you can inspect what it does after setting the environment DEBUG='simple-worker'
Testing
You can test simple-worker
the same way that kue
can be tested.
Tests
npm test
Note: Be aware that the tests are flushing the used redis database
Licence
MIT