Socket
Socket
Sign inDemoInstall

keuss

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

keuss - npm Package Compare versions

Comparing version 0.0.10 to 0.0.11

examples/simple-consumer-producer.js

3

AsyncQueue.js

@@ -39,3 +39,4 @@ 'use strict';

this._signaller = new (this._opts.signaller.provider || LocalSignal) (this, this._opts.signaller.opts);
var signaller_factory = this._opts.signaller.provider || new LocalSignal ();
this._signaller = signaller_factory.signal (this, this._opts.signaller.opts);

@@ -42,0 +43,0 @@ this._stats.incr ('get', 0);

@@ -168,3 +168,3 @@ 'use strict';

var _opts = opts || {};
var rediscl = RedisConn.conn (_opts);
var rediscl = RedisConn.conn (_opts.redis);

@@ -171,0 +171,0 @@ return cb (null, new Factory (_opts, rediscl));

@@ -223,3 +223,3 @@ 'use strict';

var _opts = opts || {};
var _rediscl = RedisConn.conn (_opts);
var _rediscl = RedisConn.conn (_opts.redis);
var _roq_factory = new RedisOrderedQueue (_rediscl);

@@ -226,0 +226,0 @@ return cb (null, new Factory (_opts, _rediscl, _roq_factory));

@@ -27,5 +27,5 @@ var async = require ('async');

setTimeout (function () {
// setTimeout (function () {
run_consumer (q);
}, random.from0to (2000) + 100);
// }, random.from0to (2000) + 100);
});

@@ -41,3 +41,3 @@ }

run_producer (q);
}, (random.from0to (30) + 100) * 10);
}, (random.from0to (10) + 1) * 1000);
});

@@ -48,5 +48,13 @@ }

var MQ = require ('../backends/redis-oq');
var redis_signaller = require ('../signal/redis-pubsub');
var redis_stats = require ('../stats/redis');
var opts = {
logger: logger
logger: logger,
signaller: {
provider: new redis_signaller ()
},
stats: {
provider: new redis_stats ()
}
};

@@ -59,3 +67,3 @@

var q = factory.queue('test_queue', opts);
var q = factory.queue('bench_test_queue', opts);

@@ -62,0 +70,0 @@ run_consumer (q);

@@ -129,4 +129,5 @@ var winston = require ('winston');

if (program.signaller) {
var signal_provider = require ('../signal/' + program.signaller);
q_opts.signaller = {
provider: require ('../signal/' + program.signaller)
provider: new signal_provider ()
}

@@ -136,4 +137,5 @@ }

if (program.stats) {
var stats_provider = require ('../stats/' + program.stats);
q_opts.stats = {
provider: require ('../stats/' + program.stats)
provider: new stats_provider ()
}

@@ -140,0 +142,0 @@ }

{
"name": "keuss",
"version": "0.0.10",
"version": "0.0.11",
"keywords": ["queue", "job"],

@@ -5,0 +5,0 @@ "homepage":"https://github.com/pepmartinez/keuss",

@@ -35,3 +35,4 @@ 'use strict';

// stats
this._stats = new (this._opts.stats.provider || MemStats) (this.type () + ':' + this.name (), this._opts.stats.opts);
var stats_factory = this._opts.stats.provider || new MemStats ();
this._stats = stats_factory.stats (this.type () + ':' + this.name (), this._opts.stats.opts);

@@ -38,0 +39,0 @@ // most mature

# keuss
Job Queues on selectable backends (mongodb, redis)
Job Queues on selectable backends (for now: mongodb, redis) for node.js
An attempt to provide a serverless, persistent and high-available queue middleware supporting delays,
using mongodb and redis to provide most of the backend needs
Still alpha, basic structure in flux
Still very pre-alpha, basic structure in constant flux
## About
Keuss is an attempt or experiment to provide a serverless, persistent and high-available
queue middleware supporting delays, using mongodb and redis to provide most of the backend
needs
### Aim
As it seems, the key to provide persistency, HA and load balance is to have a storage subsystem
that provides that, and use it to store your queues. Instead of reinventing the wheel by
building such as storage I simply tried to adapt what's already out there
Modelling a queue with mongodb, for example, proved easy enough. It resulted simple, cheap and mongodb provides great persistency and HA, and decent support for load balancing. Although using Redis provided similar results, in both cases the load balancing part was somewhat incomplete: the whole thing lacked a *bus* to signal all clients about, for example, when an insertion in a particular queue takes place. Without this layer a certain amount of polling is needed, so it's obviously a Nice Thing To Have
Keuss ended up being a somewhat *serverless* queue system, where the *server* or common parts are bare storage systems such as redis or mongodb. There is no need for any extra *keuss server* in between clients and storage (although an actual keuss-server does exist, serving a different purpose on top of plain keuss). Thus, all keuss actually lays at the *client* side
### Concepts
#### Queue
a **Queue** is more of an interface, a definition of what it can do. Keuss queues are capable of:
* insert one element
* schedule an element: insert one element with a not-before datetime
* get an element, and block for some specified time if no element is available
* reserve an element, and block for some specified time if no element is available
* commit (remove) or rollback (return back) a previously reserved element
* get element count
* get element count whose not-before datetime is in the future (scheduled elements)
* get usage stats: elements inserted, elements extracted
*element* here translates to any js object. Internally, it's usually managed as json
#### Storage
**Storage** or **Backend** provides almost-complete queue primitives, fully functional-complee and already usable as is. Keuss comes with 3 backends, with various levels of features and performance:
* *mongo*, a mongodb-based backend that provides te full set of queue features, still with decent performance
* *redis-oq*, backed using an ordered queue on top of redis (made in turn with a sorted set, a hash and some lua). Provides all queue features but reserve-commit-rollback (although this support is planned). Noticeable faster than mongodb
* redis-list, backed using a redis list. Does not offer reserve-commit-rollback nor the ability to schedule, but is much faster than redis-oq
As mentioned before, persistence and HA depends exclusively on the underliying system: mongodb provides production-grade HA and persistence while using potentially gigantic queues, and with redis one can balance performance and simplicity over reliability and durability, by using standalone redis, redis sentinel or redis cluster. Keuss uses [ioredis](https://github.com/luin/ioredis) as redis driver, which supports all 3 cases
The following table shows the capabilities of each backend:
backend | delay/schedule | reserve/commit
-----------|:--------------:|:--------------:
redis-list | - | -
redis-oq | x | -
mongo | x | x
#### Signaller
**Signaller** provides a bus interconnecting all keuss clients, so events can be shared. Keuss provides 2 signallers:
* *local* : provides in-proccess messaging, useful only for simple cases or testing
* *redis-pubsub*: uses the pubsub subsystem provided by redis
So far, the only events published by keuss is *element inserted in queue X*, which allows other clients waiting for elements to be available to wake up and retry. A client will not fire an event if another one of the same type (same client, same queue) was already fired less than 50ms ago
#### Stats
**Stats** provides counters and metrics on queues, shared among keuss clients. So far, only 'elements inserted' and 'elements got' are maintained. Two options are provided:
* *mem*: very simple in-process, memory based
* *redis*: backed by redis hashes. Modifications are buffered in memory and flushed every 100ms
### How all fits together
* *Queues*, or rather clients to individual queues, are created using a *backend* as factory
* *Backends* need to be intialized before being used. Exact inittialization details depend on each backend
* When creating a *queue*, a *signaller* and a *stats* are assigned to it. The actual class/type to be used can be specified at the queue's creation moment, or at the backend initialization moment. By default *local* and *mem*, respectively, are used
* *Queues* are created on-demand, and are never destroyed as far as keuss is concerned. They do exist as long as the underlying backend kepts them in existence: for example, redis queues dissapear as such when they become empty
## Install

@@ -21,5 +75,136 @@ ```bash

## Usage
### Quickstart
```javascript
var s = "JavaScript syntax highlighting";
alert(s);
// create a simple producer on top of redis-list, no signaller, in-mem stats
var MQ = require ('keuss/backends/redis-list');
var factory_opts = {};
// initialize factory
MQ (factory_opts, function (err, factory) {
if (err) {
return console.err (err);
}
// factory ready, create one queue
var q_opts = {};
var q = factory.queue ('test_queue', q_opts);
// insert element
q.push ({a:1, b:'666'}, function (err, res) {
if (err) {
return console.err (err);
}
// element inserted at this point. pop it again
var pop_opts = {};
q.pop ('consumer-one', pop_opts, function (err, res) {
if (err) {
return console.err (err);
}
console.log ('got this: ', res.payload);
});
});
});
```
### Factory API
Backends, which work as queue factories, have the following operations
#### Initialization
```javascript
var QM = require ('keuss/backends/<backend>');
MQ (opts, function (err, factory) {
// factory contains the actual factory, initialized
})
```
where 'opts' is an object containing default values for queue creation, plus the following backend-dependent values:
* backend *mongo*
* url: mongodb url to use, defaults to `mongodb://localhost:27017/keuss`
* backends *redis-list* and *redis-oq*
* redis: data to create a redis connection to the Redis acting as backend, see below
#### Queue creation
```javascript
// factory has been initialized
var q = factory.queue (<name>, <options>);
```
Where:
* name: string to be used as queue name. Queues with the same name are in fact the same queue if they're backed in the same factory type using the same initialization data (mongodb url or redis conn-data)
* options: the options passed at backend initialization are used as default values:
* pollInterval: rearm or poll period in millisecs for get operations, defaults to 15000 (see *Working with no signallers* below)
* signaller: signaller to use for the queue
* provider: signaller factory (require)
* opts: options for the signaller factory (see below)
* stats: stats store to use for this queue
* provider: stats factory (require)
* opts: options for the stats factory (see below)
### Signaller
### Stats
### Queue API
#### Get Stats
```javascript
q.stats (function (err, res) {
...
})
```
* res contains usage stats (elements insterted, elements extracted)
#### Queue name
```javascript
var qnane = q.name ()
```
#### Queue type
```javascript
var qtype = q.type ()
```
returns a string with the type of the queue (the type of backend who was used to create it)
#### Queue occupation
```javascript
q.size (function (err, res){
...
})
```
res contains the number of elements in the queue that are already elligible (that is, excluding scheduled elements with a schedule time in the future)
#### Total Queue occupation
```javascript
totalSize (function (err, res){
...
})
```
res contains the number of elements in the queue (that is, including scheduled elements with a schedule time in the future)
#### Time of schedule of next message
```javascript
next_t (function (err, res){
...
})
```
Returns a Date, or null if queue is empty. Queues with no support for schedule/delay always return null
#### push (payload, opts, callback) {
#### pop (cid, opts, callback) {
#### cancel (tid, opts) {
#### reserve (function (err, res)
#### ok (id, cb) {
#### ko (id, cb) {
### Redis connections
### Logging
### Working with no signallers
Even when using signallers, get operations on queue never block or wait forever; waiting get operations rearm themselves
every 15000 millisec (or whatever specified in the *pollInterval*). This feature provides the ability to work with more than one process
without signallers, geting a maximum latency of *pollInterval* millisecs, but also provides a safe backup in the event of signalling lost for whatever reason

@@ -21,4 +21,3 @@ 'use strict';

}
signalInsertion (mature, cb) {

@@ -25,0 +24,0 @@ var emit = false;

@@ -6,12 +6,10 @@ 'use strict';

class LocalSignal extends Signal {
constructor (master, opts) {
super (master, opts);
this._verbose ('created local signaller on queue [%s]', master.name());
constructor (queue, factory, opts) {
super (queue, opts);
this._factory = factory;
this._verbose ('created local signaller on queue [%s]', queue.name());
}
type () {
return 'signal:local';
}
type () {return LocalSignalFactory.Type ()}
// to be extended:
emitInsertion (mature, cb) {

@@ -24,2 +22,14 @@ this._verbose ('calling master.emitInsertion(%d)', mature);

module.exports = LocalSignal;
class LocalSignalFactory {
constructor (opts) {
}
static Type () {return 'signal:local'}
type () {return LocalSignalFactory.Type ()}
signal (queue, opts) {
return new LocalSignal (queue, this, opts);
}
}
module.exports = LocalSignalFactory;

@@ -6,12 +6,12 @@ 'use strict';

class RPSSignal extends Signal {
constructor (master, opts) {
super (master, opts);
constructor (queue, factory, opts) {
super (queue, opts);
this._factory = factory;
this._channel = 'keuss:q:signal:' + master.type () + ':' + master.name ();
this._channel = 'keuss:q:signal:' + queue.type () + ':' + queue.name ();
this._opts = opts || {};
this._rediscl_pub = RedisConn.conn (this._opts);
this._rediscl_sub = RedisConn.conn (this._opts);
this._rediscl_pub = this._factory._rediscl_pub;
this._rediscl_sub = this._factory._rediscl_sub;

@@ -21,6 +21,5 @@ this._rediscl_sub.subscribe (this._channel);

var self = this;
this._rediscl_sub.on ("message", function (channel, message) {
this._rediscl_sub.on ('message', function (channel, message) {
var mature = parseInt (message);
self._verbose ('got redis pubsub event on ch [%s], mature is %d (%s)', channel, mature, message);
self._verbose ('calling master.emitInsertion(%d)', mature);
self._verbose ('got redis pubsub event on ch [%s], mature is %d (%s),calling master.emitInsertion(%d)', channel, mature, message, mature);
self._master.signalInsertion (new Date (mature));

@@ -31,7 +30,5 @@ });

}
type () {return RPSSignalFactory.Type ()}
type () {
return 'signal:redis-pubsub';
}
emitInsertion (mature, cb) {

@@ -43,3 +40,18 @@ this._verbose ('emit redis pubsub on channel [%s] mature %d)', this._channel, mature);

class RPSSignalFactory {
constructor (opts) {
this._opts = opts || {};
module.exports = RPSSignal;
this._rediscl_pub = RedisConn.conn (this._opts);
this._rediscl_sub = RedisConn.conn (this._opts);
}
static Type () {return 'signal:redis-pubsub'}
type () {return Type ()}
signal (channel, opts) {
return new RPSSignal (channel, this, opts);
}
}
module.exports = RPSSignalFactory;
'use strict';
class MemStats {
constructor () {
constructor (factory) {
this._factory = factory;
this._s = {};
}
static type () {return 'mem'}
static init (opts) {}
type () {return this._factory.type ()}

@@ -36,3 +36,14 @@ values (cb) {

class MemStatsFactory {
constructor (opts) {
}
module.exports = MemStats;
static Type () {return 'mem'}
type () {return Type ()}
stats () {
return new MemStats (this);
}
}
module.exports = MemStatsFactory;

@@ -12,17 +12,11 @@ 'use strict';

/*
*
* redis using HINCRBY
*
*/
*/
class RedisStats extends WithLog {
constructor (name, opts) {
constructor (name, factory, opts) {
super (opts);
this._name = 'keuss:stats:' + name;
this._opts = opts || {};
if (!_s_rediscl) {
RedisStats.init (opts);
}
this._rediscl = _s_rediscl;
this._factory = factory
this._rediscl = factory._rediscl;
this._cache = {};

@@ -33,9 +27,4 @@

static type () {return 'redis'}
type () {return this._factory.type ()}
static init (opts) {
_s_opts = opts || {};
_s_rediscl = RedisConn.conn (_s_opts);
}
values (cb) {

@@ -111,3 +100,16 @@ this._rediscl.hgetall (this._name, function (err, v) {

class RedisStatsFactory {
constructor (opts) {
this._opts = opts || {};
this._rediscl = RedisConn.conn (this._opts);
}
module.exports = RedisStats;
static Type () {return 'redis'}
type () {return Type ()}
stats (name) {
return new RedisStats (name, this);
}
}
module.exports = RedisStatsFactory;

@@ -10,6 +10,6 @@ var should = require ('should');

function run_tests_on_class (CL) {
describe (CL.type () + ' stats provider', function () {
describe (CL.Type () + ' stats provider', function () {
before (function (done) {
var mem = new CL (name);
var mem = new CL ().stats (name);
mem.clear (done);

@@ -19,3 +19,3 @@ });

after (function (done) {
var mem = new CL (name);
var mem = new CL ().stats (name);
mem.clear (done);

@@ -25,3 +25,3 @@ });

it ('creates ok', function (done) {
var mem = new CL (name);
var mem = new CL ().stats (name);
mem.values (function (err, vals) {

@@ -34,3 +34,3 @@ vals.should.eql ({});

it ('initializes ok', function (done) {
var mem = new CL (name);
var mem = new CL ().stats (name);

@@ -52,3 +52,3 @@ async.series([

it ('increments (default by 1) ok', function (done) {
var mem = new CL (name);
var mem = new CL ().stats (name);

@@ -71,3 +71,3 @@ async.series([

it ('increments (explicit deltas) ok', function (done) {
var mem = new CL (name);
var mem = new CL ().stats (name);

@@ -90,3 +90,3 @@ async.series([

it ('decrements (default by 1) ok', function (done) {
var mem = new CL (name);
var mem = new CL ().stats (name);

@@ -109,3 +109,3 @@ async.series([

it ('decrements (explicit deltas) ok', function (done) {
var mem = new CL (name);
var mem = new CL ().stats (name);

@@ -134,3 +134,3 @@ async.series([

it ('clears ok', function (done) {
var mem = new CL (name);
var mem = new CL ().stats (name);

@@ -137,0 +137,0 @@ async.series([

var Redis = require ('ioredis');
var _ = require ('lodash');
function conn (opts) {
if (!opts) opts = {};
if (!opts.retryStrategy) {
opts.retryStrategy = function (times) {
console.log ('redis-conn: redis reconnect!, retry #%d', times);
return Math.min (times * (opts.retry_factor || 15), (opts.retry_max || 15000));
};
if (_.isFunction (opts)) {
return opts ();
}
if (!opts.reconnectOnError) {
opts.reconnectOnError = function (err) {return true;};
if (opts.Cluster) {
return new Redis.Cluster (opts.Cluster);
}
var rediscl = new Redis (opts);
if (opts.Redis) {
return new Redis (opts.Redis);
}
/*
rediscl.on ('ready', function () {console.log ('RedisConn: rediscl ready');});
rediscl.on ('connect', function () {console.log ('RedisConn: rediscl connect');});
rediscl.on ('reconnecting', function () {console.log ('RedisConn: rediscl reconnecting');});
rediscl.on ('error', function (err) {console.log ('RedisConn: rediscl error: ' + err);});
rediscl.on ('end', function () {console.log ('RedisConn: rediscl end');});
*/
return rediscl;
return new Redis (opts);
}

@@ -30,0 +23,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc