mongoose-jobqueue
A modernized fork of https://www.npmjs.com/package/mongoose-jobqueue that is compatible with mongoose v5 and the latest mongodb driver and implements performance optimizations for querying mongodb.
Some of the differences from the original library:
- No lodash and No external dependencies besides mongoose
- Uses ES6 Promise instead of Bluebird
- Uses object spread instead of underscore/lodash dependency
- Compatible with mongoose v5
- Uses arrow functions instead of
var self = this;
- Creates Mongodb indexes for faster access and query performance
- uses
.lean()
in mongoose for faster query performance - Supports sorting query results in CosmosDB
- Maintained & Open-Source with an available repository link.
mongoose-jobqueue
A very simple job queue, using mongoosejs for storage.
Quickstart
Create a queue object by handing over your mongoose instance:
const mongoose = require('mongoose');
const mongooseJobQueue = require('mongoose-jobqueue');
const queue = mongooseJobQueue(mongoose, 'job-queue');
All functions of the queue return a Promise
.
Add a job to a queue:
queue.add({ message: 'Hey' }).then((job) => {
}, (err) => {
});
Get a job from the queue:
queue.checkout().then((job) => {
console.log('job._id=' + job._id);
console.log('job.ack=' + job.ack);
console.log('job.payload=' + job.payload);
console.log('job.tries=' + job.tries);
});
Ping a job to keep it's visibility open for long-running tasks:
queue.ping(job.ack).then((job) => {
})
When pinging a job we can specify by how many seconds the window is extended,
and the progress of the job in percent if we want:
queue.ping(job.ack, 60, 15).then((job) => {
})
Acknowledge a job (and remove it from the queue):
queue.ack(job.ack).then((job) => {
})
By default, all finished jobs are left in the queue, and are only marked as
deleted. You can call the following function to remove processed jobs:
queue.cleanup().then((delCount) => {
});
In-Code-Docs
The source code is documented using JSDoc tags.
Creating a Queue
To create a queue, call the exported function with the Mongoose instance,
the name of the collection and a set of options.
const mongoose = require('mongoose');
const mongooseJobQueue = require('mongoose-jobqueue');
const queueA = mongooseJobQueue(mongoose, 'a-queue');
const queueB = mongoDbQueue(mongoose, 'b-queue');
Note: You can start using the queue right away, since mongoose stores requests
until a connection to the MongoDB is established.
To pass in options for the queue:
const myQueue = mongooseJobQueue(mongoose, 'my-queue', {
visibility : 30,
delay : 15
});
This example shows a queue with a job visibility of 30s and a insert-delay
of 15s.
Options
name - Collection Name
This is the name of the MongoDB Collection you wish to use to store the jobs.
Each queue you create will be it's own collection.
e.g.
const queueA = mongooseJobQueue(mongoose, 'a-queue');
const queueB = mongooseJobQueue(mongoose, 'b-queue');
This will create two collections in MongoDB called a-queue
and b-queue
.
visibility - Job Visibility Window
Default: 30
By default, if you don't ack a job within the first 30s after checking it out
of the queue, it is placed back in the queue so it can be fetched again.
This is called the visibility window.
You may set this visibility window on a per queue basis. For example, to set the
visibility to 15 seconds:
const queue = mongooseJobQueue(mongoose, 'queue', { visibility : 15 });
All jobs in this queue now have a visibility window of 15s, instead of the
default 30s.
You can also specify the visibility window when checking out a job of the queue:
const queue = mongooseJobQueue(mongoose, 'queue', { visibility : 15 });
queue.checkout(90).then((job) => {
});
The returned job now has a visibility window of 90s. This does not change the
visibility window of the queue or other jobs in the queue.
delay - Delay Jobs on Queue
Default: 0
When a job is added to a queue, it is immediately available for checkout.
However, there are times when you might like to delay jobs coming off a queue.
If you set delay to be 10
, then every job will only be available for
checkout 10s after being added.
To delay all jobs by 10 seconds, do this:
const queue = mongooseJobQueue(mongoose, 'queue', { delay : 10 });
This is now the default for every job added to the queue.
deadQueue - Dead Job Queue
Default: null
Jobs that have been retried over maxRetries
will be pushed to this queue so
you can debug problematic jobs.
Pass in a collection name onto which these jobs will be pushed:
const queue = mongooseJobQueue(mongoose, 'queue', { deadQueue : 'dead-jobs' });
If you checkout a job out of the queue
over maxRetries
times and have still
not acked it, it will be pushed onto the deadQueue
for you.
This happens when you call .checkout()
the next time. (not when you miss
acking a job within it's visibility window).
maxRetries - Maximum Retries per Job
Default: 5
This option only comes into effect if you pass in a deadQueue
as shown above.
What this means is that if job is checked out of the queue maxRetries
times
(e.g. 5) and not acked, it will be moved to the deadQueue
the next time it is
checked out.
strictAck - Disallow ack if Visibility Window timed out
Default: true
By default you are only allowed to acknowledge a checked out job within the
visibility window. Set this option to false
if you want to allow acks of a job
outside of the visibility window, given that the job was not already checked out
a second time by another user.
raw - Return raw JavaScript Objects
Default: true
By default all functions return plain JavaScript objects. Set this option to
false
if you want the original mongoose documents returned. For example, this
gives you the ability to edit a returned job and call the .save()
method on in.
Use this option at own risk.
cosmosDb - Use compatibility mode for Azure CosmosDB
Default: false
Operations
.add()
You can add a string to the queue:
queue.add('Hey').then((job) => {
});
Or add an object:
queue.add({ message: 'Hey' }).then((job) => {
});
Or add multiple jobs (strings or objects):
queue.add(['One', 'Two', 'Three']).then(jobs => {
});
You can delay jobs from being visible by passing the second delay
parameter:
queue.add('Later', 120).then((job) => {
});
.checkout()
Retrieve a job from the queue:
queue.checkout().then((job) => {
});
You can choose the visibility window of an individual retrieved job by passing
the visibility
parameter:
queue.checkout(90).then((job) => {
});
Jobs will have the following structure:
{
_id: '533b1eb64ee78a57664cc76c',
ack: 'c8a3cc585cbaaacf549d746d7db72f69',
payload: 'Hey',
tries: 1
}
.ack()
After you have received a job from a queue and processed it, you can delete it
by calling .ack()
with the unique ack
key returned:
queue.checkout().then((job) => {
queue.ack(job.ack).then((job) => {
});
});
.ping()
After you have checked out a job from a queue and you are taking a while
to process it, you can .ping()
the job to tell the queue that you are
still alive and continuing to process the job:
queue.checkout().then((job) => {
queue.ping(job.ack).then((job) => {
});
});
You can also choose the visibility time that gets added by the ping operation by
passing the visibility
parameter:
queue.checkout().then((job) => {
queue.ping(job.ack, 120).then((job) => {
});
});
You can pass an optional progress
parameter, when pinging a job to show the
progress of your operation in percent:
queue.checkout().then((job) => {
queue.ping(job.ack, 120, 12).then((job) => {
});
});
Like the progress
parameter you can pass the payload
parameter to update the
payload of a job. This will overwrite the old payload value.
queue.checkout().then((job) => {
queue.ping(job.ack, null, null, { message: 'my new payload' }).then((job) => {
});
});
.get()
Get a list of jobs in the queue. Does not check out any jobs.
queue.get().then(jobs => {
});
You can pass a filter object to only retrieve a subset of jobs.
The filter object follows the MongoDB query syntax.
queue.get({
deleted: null
}).then(jobs => {
});
This example returns only unfinished jobs.
queue.get({
tries: { $gt: 1, $lt: 4 }
}).then(jobs => {
});
This example returns jobs with 2-3 tries.
queue.get({
'payload.message': 'Hey'
}).then(jobs => {
});
This example returns jobs where the payload contains the message Hey
.
.cleanup()
By default, all finished jobs are left in the queue, and are only marked as
deleted. You can call the cleanup()
function to remove processed jobs:
queue.cleanup().then(delCount => {
});
You can specify the minimum age of jobs to be deleted by using the age
parameter:
queue.cleanup(120).then(delCount => {
});
.cleanupDead()
Removes all jobs from the deadQueue.
Unlike the cleanup()
function, this operation does not have a age parameter.
queue.cleanupDead().then(delCount => {
});
.reset()
Deletes ALL jobs from the queue (and the deadQueue if configured), regardless of
checked out jobs.
queue.reset().then(totalDeleted => {
});
Author
Originally Written by Michael Sperk
Updated by Khaled Osman
License
MIT - https://choosealicense.com/licenses/mit/