DBQueue
A simple job queue that prioritizes infrastructural simplicity and common requirements over speed and scalability, inspired by TheSchwartz
Usage
See usage in the tests, or see below example:
Overview
var DBQueue = require('dbqueue');
var queue_options = {
host: '127.0.0.1',
port: 3306,
user: 'root',
table_name: 'custom_jobs_table',
password: '',
database: 'dbqueue_testing_db',
};
DBQueue.connect(queue_options, function(err, queue) {
if (err) {
}
var job_details = {
example: 'job data',
};
queue.insert('queue_name_here', JSON.stringify(job_details), function(err) {
if (err) {
}
});
queue.consume('queue_name_here', function(err, job, finished) {
if (err) {
}
if (!job) {
}
var job_data = JSON.parse(job);
finished(some_err);
finished(null, function(err) {
if (err) {
}
});
});
});
Connecting
Connect asynchronously to discover connectivity issues as soon as possible:
var queue_options = {
};
DBQueue.connect(queue_options, function(err, queue) {
if (err) {
}
});
Connect lazily for less boilerplate
var queue_options = {
};
var queue = new DBQueue(queue_options);
Inserting messages into the queue
var queue_name = 'example queue';
var message_data = { example: 'message data' };
queue.insert(queue_name, message_data, function(err) {
});
Consuming messages from the queue
Message consumption currently reserves the message for five minutes. If the message is not ACK'ed within that time, the message may be processed by another worker.
A customizable reservation time is a forthcoming feature.
var queue_name = 'example queue';
queue.consume(queue_name, function(err, message_data, ackMessageCallback) {
});
An optional options object can be provided with the following attributes:
- count: the number of messages to attempt to consume
- lock_time: how long to lock the messages in the queue.
var queue_name = 'example queue';
var options = {
count: 10,
lock_time: 60*60,
};
function consumer(err, message_data, ackMessageCallback) {
console.log("message:", message_data);
ackMessageCallback();
}
queue.consume(queue_name, options, consumer);
ACK'ing and NACK'ing messages
Calling the ackMessageCallback without an error will remove it from the queue.
Calling the ackMessageCallback with an error will leave it on the queue to be processed again after some time.
Not calling the ackMessageCallback will leave it on the queue to be processed again after some time.
var queue_name = 'example queue';
queue.consume(queue_name, function(err, message_data, ackMessageCallback) {
doSomethingWithMessage(message_data, function(err) {
ackMessageCallback(err);
});
});
Listening to the queue
var queue_name = 'default queue configuration';
var options = {
interval: 1000,
max_outstanding: 1,
max_jobs_per_interval: 0,
};
function consumer(err, message_data, ackMessageCallback) {
}
queue.listen(queue_name, options, consumer);
Example rate-limited consumer for slow jobs
Consume at a steady rate of ~4 messages/sec, up to 10,000 jobs in flight.
var queue_name = 'slow job queue with high concurrency';
var options = {
interval: 500,
max_jobs_per_interval: 2,
max_outstanding: 10000,
lock_time: 10*60,
};
function consumer(err, message_data, ackMessageCallback) {
}
queue.listen(queue_name, options, consumer);
Custom serialization
In case you would like something other than JSON.stringify and JSON.parse for serialization, provide your own serialization methods.
Note that binary formats are currently not supported.
var yaml = require('js-yaml');
var queue_options = {
serializer: yaml.dump,
deserializer: yaml.load,
};
var queue = new DBQueue(queue_options);
When this might be a useful library
- You don't want to introduce another dependency for simple/trivial functionality
- You need a simple, durable queue
- You are okay with at least once semantics
- You would like message deferral without dead letter queue complexity
When this is NOT the solution for you
- You need guarantees that a job will be delivered once and only once (your jobs are not idempotent)
- You need near-realtime performance
- You need to scale to large numbers of jobs and/or very high throughput
Performance improvements
- fetch batches of jobs rather than one at a time
- when #pop is called
- and we have no items in the working batch
- look for N jobs to work on
- reserve them all
- shift the first off and return it
- and we do have items in the working batch
- shift the first off and return it
- reserve another N ?
- so long as we can process the jobs quickly, this should be okay
- but if we're too slow, we might have stale jobs that someone else is working on