Product
Introducing License Enforcement in Socket
Ensure open-source compliance with Socket’s License Enforcement Beta. Set up your License Policy and secure your software!
Job Queues on selectable backends (for now: mongodb, redis) for node.js
Still beta, basic structure may be in flux
Keuss is an attempt or experiment to provide a serverless, persistent and high-available queue middleware supporting delays, using mongodb and redis to provide most of the backend needs
As it seems, the key to provide persistency, HA and load balance is to have a storage subsystem that provides that, and use it to store your queues. Instead of reinventing the wheel by building such as storage I simply tried to adapt what's already out there
Modelling a queue with mongodb, for example, proved easy enough. It resulted simple, cheap and mongodb provides great persistency and HA, and decent support for load balancing. Although using Redis provided similar results, in both cases the load balancing part was somewhat incomplete: the whole thing lacked a bus to signal all clients about, for example, when an insertion in a particular queue takes place. Without this layer a certain amount of polling is needed, so it's obviously a Nice Thing To Have
Keuss ended up being a somewhat serverless queue system, where the server or common parts are bare storage systems such as redis or mongodb. There is no need for any extra keuss server in between clients and storage (although an actual keuss-server does exist, serving a different purpose on top of plain keuss). Thus, all keuss actually lays at the client side
a Queue is more of an interface, a definition of what it can do. Keuss queues are capable of:
element here translates to any js object. Internally, it's usually managed as json
Storage or Backend provides almost-complete queue primitives, fully functional-complee and already usable as is. Keuss comes with 3 backends, with various levels of features and performance:
As mentioned before, persistence and HA depends exclusively on the underliying system: mongodb provides production-grade HA and persistence while using potentially gigantic queues, and with redis one can balance performance and simplicity over reliability and durability, by using standalone redis, redis sentinel or redis cluster. Keuss uses ioredis as redis driver, which supports all 3 cases
The following table shows the capabilities of each backend:
backend | delay/schedule | reserve/commit |
---|---|---|
redis-list | - | - |
redis-oq | x | x |
mongo | x | x |
Signaller provides a bus interconnecting all keuss clients, so events can be shared. Keuss provides 2 signallers:
So far, the only events published by keuss is element inserted in queue X, which allows other clients waiting for elements to be available to wake up and retry. A client will not fire an event if another one of the same type (same client, same queue) was already fired less than 50ms ago
Stats provides counters and metrics on queues, shared among keuss clients. So far, only 'elements inserted' and 'elements got' are maintained. Two options are provided:
npm install keuss
Backends, which work as queue factories, have the following operations
var QM = require ('keuss/backends/<backend>');
MQ (opts, function (err, factory) {
// factory contains the actual factory, initialized
})
where 'opts' is an object containing default values for queue creation (such as pollInterval, signaller or stats), plus the following backend-dependent values:
mongodb://localhost:27017/keuss
// factory has been initialized
var q = factory.queue (<name>, <options>);
Where:
Signaller factory is passed to queues either in queue creation or in backend init, inside opts.signaller. Note that the result fo the new operation is indeed the factory; the result of the require is therefore a metafactory
var signal_redis_pubsub = require ('../signal/redis-pubsub');
var local_redis_opts = {
Redis: {
port: 6379,
host: 'localhost',
db: 6
}
};
var f_opts = {
signaller: {
provider: new signal_redis_pubsub (local_redis_opts)
}
.
.
.
}
MQ (f_opts, function (err, factory) {
// queues created by factory here will use a redis pubsub signaller, hosted at redis at localhost, db 6
The signaller has no public api per se; it is considered just a piece of infrastructure to glue queues together
Stats factory is passed to queues either in queue creation or in backend init, inside opts.signaller. Note that the result fo the new operation is indeed the factory; the result of the require is therefore a metafactory
var local_redis_pubsub = require ('../signal/redis-pubsub');
var local_redis_opts = {
Redis: {
port: 6379,
host: 'localhost',
db: 6
}
};
var f_opts = {
stats: {
provider: new signal_redis_pubsub (local_redis_opts)
}
.
.
.
}
MQ (f_opts, function (err, factory) {
// queues created by factory here will use a redis-backed stats, hosted at redis at localhost, db 6
Stats objects, as of now, store the numer of elements inserted and the number of elements extracted; they are created behind the scenes and tied to queue instances, and the stats-related interface is in fact part fo the queues' interface
q.stats (function (err, res) {
...
})
var qnane = q.name ()
var qtype = q.type ()
returns a string with the type of the queue (the type of backend who was used to create it)
q.size (function (err, res){
...
})
res contains the number of elements in the queue that are already elligible (that is, excluding scheduled elements with a schedule time in the future)
q.totalSize (function (err, res){
...
})
res contains the number of elements in the queue (that is, including scheduled elements with a schedule time in the future)
q.next_t (function (err, res){
...
})
Returns a Date, or null if queue is empty. Queues with no support for schedule/delay always return null
q.push (payload, opts, function (err, res) {
...
})
Adds payload to the queue, calls passed callback upon completion. Callback's res will contain the id assigned to the inserted element, if the backup provides one
Possible opts:
note: mature and delay have no effect if the backend does not support delay/schedule
var tr = q.pop (cid, opts, function (err, res) {
...
})
Obtains an element from the queue. Callback is called with the element obtained if any, or if an error happened. If defined, the operation will wait for opts.timeout seconds for an element to appear in the queue before bailing out (with both err and res being null). However, it immediately returns an id that can be used to cancel the operation at anytime
cid is an string that identifies the consumer entity, which is considered to be a simple label
Possible opts:
var tr = q.pop (cid, opts, function (err, res) {...});
.
.
.
q.cancel (tr);
Cancels a pending pop operation, identified by the value returned by pop()
q.ok (id, function (err, res) {
...
})
commits a reserved element by its id (the id would be at res._id on the res param of pop() operation). This effectively erases the element from the queue
q.ko (id, function (err, res) {
...
})
rolls back a reserved element by its id (the id would be at res._id on the res param of pop() operation). This effectively makes the element available again at the queue, but it's up to the backend to decide whether to apply a delay to it (as if it were inserted with opts.delay)
Keuss relies on ioredis for connecting to redis. Anytime a redis connection is needed, keuss will create it from the opts object passed:
Examples:
var MQ = require ('keuss/backends/redis-list');
var factory_opts = {};
MQ (factory_opts, function (err, factory) {
...
});
var MQ = require ('keuss/backends/redis-list');
var factory_opts = {
redis: {
Redis: {
port: 12293,
host: 'some-redis-instance.somewhere.com',
family: 4,
password: 'xxxx',
db: 0
}
}
};
MQ (factory_opts, function (err, factory) {
...
});
var MQ = require ('keuss/backends/redis-list');
var Redis = require ('ioredis');
var factory_opts = {
redis: function () {
return new Redis ({
port: 12293,
host: 'some-redis-instance.somewhere.com',
family: 4,
password: 'xxxx',
db: 0
})
}
};
MQ (factory_opts, function (err, factory) {
...
});
Mostly all objects that are created with a opts object can receive and use a winston logger as opts.logger
Even when using signallers, get operations on queue never block or wait forever; waiting get operations rearm themselves every 15000 millisec (or whatever specified in the pollInterval). This feature provides the ability to work with more than one process without signallers, geting a maximum latency of pollInterval millisecs, but also provides a safe backup in the event of signalling lost for whatever reason
A set of funcioning examples can be found inside the examples directory
FAQs
Enterprise-grade Job Queues for node.js backed by redis, MongoDB or PostgreSQL
The npm package keuss receives a total of 24 weekly downloads. As such, keuss popularity was classified as not popular.
We found that keuss demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Ensure open-source compliance with Socket’s License Enforcement Beta. Set up your License Policy and secure your software!
Product
We're launching a new set of license analysis and compliance features for analyzing, managing, and complying with licenses across a range of supported languages and ecosystems.
Product
We're excited to introduce Socket Optimize, a powerful CLI command to secure open source dependencies with tested, optimized package overrides.