Research
Security News
Threat Actor Exposes Playbook for Exploiting npm to Build Blockchain-Powered Botnets
A threat actor's playbook for exploiting the npm ecosystem was exposed on the dark web, detailing how to build a blockchain-powered botnet.
Job Queues an pipelines on selectable backends (for now: mongodb and redis) for node.js
Job Queues an pipelines on selectable backends (for now: mongodb and redis) for node.js
Keuss is an attempt or experiment to provide a serverless, persistent and high-available queue middleware supporting delays/schedule, using mongodb and redis to provide most of the backend needs. As of now, it has evolved into a rather capable and complete queue middleware.
The underlying idea is that the key to provide persistency, HA and load balance is to rely on a storage subsystem that provides that, and build the rest on top. Instead of reinventing the wheel by building such as storage I simply tried to adapt what's already out there.
Modelling a queue with mongodb, for example, proved easy enough. It resulted simple, cheap and provides great persistency, HA and decent support for load balancing. Although using Redis provided similar results, in both cases the load balancing part was somewhat incomplete: the whole thing lacked a bus to signal all clients about, for example, when an insertion in a particular queue takes place. Without this layer a certain amount of polling is needed, so it's obviously a Nice Thing To Have.
Keuss ended up being a somewhat serverless queue system, where the server or common parts are bare storage systems such as redis or mongodb. There is no need for any extra keuss server in between clients and storage (although an actual keuss-server does exist, serving a different purpose on top of plain keuss). Thus, all keuss actually lays at the client side.
a Queue is more of an interface, a definition of what it can do. Keuss queues are capable of:
element here translates to any js object. Internally, it's usually managed as json
A pipeline is an enhanced queue that provides an extra operation: pass an element to another queue atomically. In an scenario where processors are linked with queues, it is usually a good feature to allow the 'commit element in incoming queue, insert element in the next queue' to be atomic. This removes chances for race conditions, or message losses.
The pipeline concept is, indeed, an extension of the reserve-commit model; it is so far implemented only atop mongodb, and it is anyway considered as a 'low-level' feature, best used by means of specialized classes to encapsulate the aforementioned processors.
Storage or Backend provides almost-complete queue primitives, fully functional and already usable as is. Keuss comes with 7 backends, with various levels of features and performance:
As mentioned before, persistence and HA depends exclusively on the underliying system: mongodb provides production-grade HA and persistence while using potentially gigantic queues, and with redis one can balance performance and simplicity over reliability and durability, by using standalone redis, redis sentinel or redis cluster. Keuss uses ioredis as redis driver, which supports all 3 cases.
The following table shows the capabilities of each backend:
backend | delay/schedule | reserve/commit | pipelining | history | throughput |
---|---|---|---|---|---|
redis-list | - | - | - | - | ++++ |
redis-oq | x | x | - | - | +++ |
mongo | x | x | - | - | ++ |
pl-mongo | x | x | x | - | + |
ps-mongo | x | x | - | x | ++ |
bucket-mongo | - | - | - | - | +++++ |
bucket-mongo-safe | x | x | - | - | +++++ |
Signaller provides a bus interconnecting all keuss clients, so events can be shared. Keuss provides 3 signallers:
So far, the only events published by keuss are:
Stats provides counters and metrics on queues, shared among keuss clients. The supported stats are:
Three options are provided to store the stats:
The concept of deadletter is very common on queue middlewares: in the case reserve/commit/rollback is used to consume, a maximum number of fails (reserve-rollback) can be set on each element; if an element sees more rollbacks than allowed, the element is moved to an special queue (dead letter queue) for later, offline inspection
By default, keuss uses no deadletter queue; it can be activated vy passing an object deadletter
at factory creation time, inside the options:
var factory_opts = {
url: 'mongodb://localhost/qeus',
deadletter: {
max_ko: 3
}
};
// initialize factory
MQ(factory_opts, (err, factory) => {
...
This object must not be empty, and can contain the following keys:
max_ko
: maximum number of rollbacks pero element allowed. The next rollback will cause the element to be moved to the deadletter queue. Defaults to 0, which means infinite
queue
: queue name of the deadletter queue, defaults to __deadletter__
All storage backends support deadletter. In ps-mongo
the move-to-deadletter (as it is the case with other move-to-queue operations) is atomic; in the rest, the element is first committed in the original queue and then pushed inside deadletter
More info on pipelines here
npm install keuss
Backends, which work as queue factories, have the following operations
var QM = require ('keuss/backends/<backend>');
MQ (opts, (err, factory) => {
// factory contains the actual factory, initialized
})
where 'opts' is an object containing initialization options. Options common to all backends are:
name
: Name for the factory, defaults to 'N'stats
:
provider
: stats backend to use, as result of require ('keuss/stats/<provider>')
. Defaults to require ('keuss/stats/mem')
opts
: options for the providersignaller
:
provider
: signaller provider to use, as result of require ('keuss/signal/<provider>')
. Defaults to require ('keuss/signal/local')
opts
: options for the providerdeadletter
: deadletter options, described above
max_ko
: max rollbacks per elementqueue
: deadletter queue namethe following backend-dependent values:
url
: mongodb url to use, defaults to mongodb://localhost:27017/keuss
redis
: data to create a redis connection to the Redis acting as backend, see belowttl
: time to keep consumed elements in the collection after being removed. Defauls to 3600 secs// factory has been initialized
var q = factory.queue (<name>, <options>);
Where:
name
: string to be used as queue name. Queues with the same name are in fact the same queue if they're backed in the same factory type using the same initialization data (mongodb url or redis conn-data)options
: the options passed at backend initialization are used as default values:
signaller
: signaller to use for the queue
provider
: signaller factoryopts
: options for the signaller factory (see below)stats
: stats store to use for this queue
provider
: stats factoryopts
: options for the stats factory (see below)factory.close (err => {...});
Frees up resources on the factory. Queues created with the factory will become unusable afterwards. See 'Shutdown process' below for more info.
Signaller factory is passed to queues either in queue creation or in backend init, inside opts.signaller. Note that the result for the new operation is indeed the factory; the result of the require is therefore a metafactory.
var signal_redis_pubsub = require ('keuss/signal/redis-pubsub');
var local_redis_opts = {
Redis: {
port: 6379,
host: 'localhost',
db: 6
}
};
var f_opts = {
signaller: {
provider: signal_redis_pubsub,
opts: local_redis_opts
}
.
.
.
}
MQ (f_opts, (err, factory) => {
// queues created by factory here will use a redis pubsub signaller, hosted at redis at localhost, db 6
})
The signaller has no public api per se; it is considered just a piece of infrastructure to glue queues together
Stats factory is passed to queues either in queue creation or in backend init, inside opts.signaller. Note that the result fo the new operation is indeed the factory; the result of the require is therefore a metafactory
var local_redis_pubsub = require ('keuss/signal/redis-pubsub');
var local_redis_opts = {
Redis: {
port: 6379,
host: 'localhost',
db: 6
}
};
var f_opts = {
stats: {
provider: signal_redis_pubsub,
opts: local_redis_opts
}
.
.
.
}
MQ (f_opts, (err, factory) => {
// queues created by factory here will use a redis-backed stats, hosted at redis at localhost, db 6
})
Stats objects, as of now, store the numer of elements inserted and the number of elements extracted; they are created behind the scenes and tied to queue instances, and the stats-related interface is in fact part of the queues' interface
q.stats ((err, res) => {
...
})
var qname = q.name ()
var qtype = q.type ()
returns a string with the type of the queue (the type of backend which was used to create it)
q.size ((err, res) => {
...
})
returns the number of elements in the queue that are already elligible (that is, excluding scheduled elements with a schedule time in the future)
q.totalSize ((err, res) => {
...
})
returns the number of elements in the queue (that is, including scheduled elements with a schedule time in the future)
q.schedSize ((err, res) => {
...
})
returns the number of scheduled elements in the queue (that is, those with a schedule time in the future). Returns 0 id the queue does not support scheduling
q.resvSize ((err, res) => {
...
})
returns the number of reserved elements in the queue. REturns null
if the queue does not support reserve
// pauses the queue
q.pause (true)
// resumes the queue
q.pause (false)
// gets paused status of queue
q.paused ((err, is_paused) => {
...
})
Pauses/Resumes all consumers on this queue (calls to pop()). Producers are not afected (calls to push())
The pause/resume condition is propagated via the signallers, so this affects all consumers, not only those local to the process, if a redis-pubsub or mongo-capped signaller is used
Also, the paused condition is stored as stats, so any new call to pop() will honor it
q.next_t ((err, res) => {
...
})
Returns a Date, or null if queue is empty. Queues with no support for schedule/delay always return null
q.push (payload, [opts,] (err, res) => {
...
})
Adds payload to the queue, calls passed callback upon completion. Callback's res will contain the id assigned to the inserted element, if the backup provides one
Possible opts:
note: mature and delay have no effect if the backend does not support delay/schedule
var tr = q.pop (cid, [opts,] (err, res) => {
...
})
Obtains an element from the queue. Callback is called with the element obtained if any, or if an error happened. If defined, the operation will wait for opts.timeout seconds for an element to appear in the queue before bailing out (with both err and res being null). However, it immediately returns an id that can be used to cancel the operation at anytime
cid is an string that identifies the consumer entity; it is used only for debugging purposes
Possible opts:
var tr = q.pop (cid, opts, (err, res) => {...});
.
.
.
q.cancel (tr);
Cancels a pending pop operation, identified by the value returned by pop()
If no tr is passed, or it is null, all pending pop operations on the queue are cancelled. Cancelled pop operations will get 'cancel' (a string) as error in the callback
q.ok (id, (err, res) => {
...
})
commits a reserved element by its id (the id would be at res._id on the res param of pop() operation). This effectively erases the element from the queue.
Alternatively, you can pass the entire res
object from the pop()
operation:
var tr = q.pop ('my-consumer-id', {reserve: true}, (err, res) => {
// do something with res
...
// commit it
q.ok (res, (err, res) => {
...
});
});
q.ko (id, next_t, (err, res) => {
...
})
rolls back a reserved element by its id (the id would be at res._id on the res param of pop() operation). This effectively makes the element available again at the queue, marking to be mature at next_t (next_t being a millsec-unixtime). If no next_t is specified or a null is passed, now()
is assumed.
As with ok()
, you can use the entire res
instead:
var tr = q.pop ('my-consumer-id', {reserve: true}, (err, res) => {
// do something with res
...
// commit or rollback it
if (succeed) q.ok (res, (err, res) => {
...
})
else q.ko (res, (err, res) => {
...
})
});
NOTE: you must pass the entire res
for the deadletter feature to work; even if activated at the factory, ko()
will not honor deadletter if you only pass the res._id
as id
q.drain (err => {
...
})
drains a queue. This is a needed operation when a backend does read-ahead upon a pop(), or buffers push() operations for later; in this case, you may want to be sure that all extra elemens read are actually popped, and all pending pushes are committed.
'drain' will immediately inhibit push(): any call to push() will immediately result in a 'drain' (a string) error. The callback will be called when all pending pushes are committed, and all read-ahead on a pop() has been actually popped.
Also, drain() will also call cancel() on the queue immediately before finishing, in case of success.
Keuss relies on ioredis for connecting to redis. Anytime a redis connection is needed, keuss will create it from the opts object passed:
Examples:
var MQ = require ('keuss/backends/redis-list');
var factory_opts = {};
MQ (factory_opts, (err, factory) => {
...
});
var MQ = require ('keuss/backends/redis-list');
var factory_opts = {
redis: {
Redis: {
port: 12293,
host: 'some-redis-instance.somewhere.com',
family: 4,
password: 'xxxx',
db: 0
}
}
};
MQ (factory_opts, (err, factory) => {
...
});
var MQ = require ('keuss/backends/redis-list');
var Redis = require ('ioredis');
var factory_opts = {
redis: function () {
return new Redis ({
port: 12293,
host: 'some-redis-instance.somewhere.com',
family: 4,
password: 'xxxx',
db: 0
})
}
};
MQ (factory_opts, (err, factory) => {
...
});
It is a good practice to call close(cb) on the factories to release all resources once you're done, or at shutdown if you want your shutdowns clean and graceful; also, you should loop over your queues and perform a drain() on them before calling close() on their factories: this will ensure any un-consumed data is popped, and any unwritten data is written. Also, it'll ensure all your (local) waiting consumers will end (on 'cancel' error).
Factories do not keep track of the created Queues, so this can't be done internally as part of the close(); this may change in the future.
Even when using signallers, get operations on queue never block or wait forever; waiting get operations rearm themselves every 15000 millisec (or whatever specified in the pollInterval). This feature provides the ability to work with more than one process without signallers, getting a maximum latency of pollInterval millisecs, but also provides a safe backup in the event of signalling loss.
Up to version 1.4.X all backends worked in the same way, one element at a time: pushing and popping elements fired one or more operations per element on the underlying storage. This means the bottleneck would end up being the storage's I/O; redis and mongo both allow quite high I/O rates, enough to work at thousands of operations per second. Still, the limit was there.
Starting with v1.5.2 keuss includes 2 backends that do not share this limitation: they work by packing many elements inside a single 'storage unit'. Sure enough, this adds some complexity and extra risks, but the throughput improvement is staggering: on mongodb it goes from 3-4 Ktps to 35-40Ktps, and the bottleneck shifted from mongod to the client's cpu, busy serializing and deserializing payloads.
Two bucked-based backends were added, both based on mongodb: bucket-mongo and bucket-mongo-safe. Both are usable, but there is little gain on using fhe first over the second: bucket-mongo was used as a prototyping area, and although perfectly usable, it turned out bucket-mongo-safe is better in almost every aspect: it provides better guarantees and more features, at about the same performance.
In addition to the general options, the factory accepts the following extra options:
bucket-mongo-safe works by packing many payloads in a single mongodb object:
Thus, it is important to call drain() on queues of this backend: this call ensures all pending write buckets are interted in mongodb, and also ensures all in-memory buckets left are completely read (served through pop/reserve)
Also, there is little difference in performance and I/O between pop and reserve/commit; performance is no longer a reason to prefer one over the other.
Scheduling on bucket-mongo-safe is perfectly possible, but with a twist: the effective mature_t of a message will be the oldest in the whole bucket it resides in. This applies to both insert and rollback/ko. In practice this is usually not a big deal, since anyway the mature_t is a 'not before' time, and that's all Keuss (or any other queuing middleware) would guarantee.
This is a simpler version of buckets-on-mongodb, and to all purposes bucket-mongo-safe should be preferred; it does not provide reserve, nor schedule. It is however a tad faster and lighter on I/O.
It is provided only for historical and educational purposes
A set of funcioning examples can be found inside the examples directory:
FAQs
Enterprise-grade Job Queues for node.js backed by redis, MongoDB or PostgreSQL
The npm package keuss receives a total of 13 weekly downloads. As such, keuss popularity was classified as not popular.
We found that keuss demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
A threat actor's playbook for exploiting the npm ecosystem was exposed on the dark web, detailing how to build a blockchain-powered botnet.
Security News
NVD’s backlog surpasses 20,000 CVEs as analysis slows and NIST announces new system updates to address ongoing delays.
Security News
Research
A malicious npm package disguised as a WhatsApp client is exploiting authentication flows with a remote kill switch to exfiltrate data and destroy files.