Socket
Socket
Sign inDemoInstall

keuss

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

keuss - npm Package Compare versions

Comparing version 0.0.6 to 0.0.7

79

backends/mongo.js

@@ -13,20 +13,11 @@ 'use strict';

//////////////////////////////////////////////////////////////////
// static data
var _s_mongo_conn = null;
var _s_opts = null;
class SimpleMongoQueue extends AsyncQueue {
//////////////////////////////////////////////
constructor (name, opts) {
constructor (name, factory, opts) {
//////////////////////////////////////////////
if (!_s_mongo_conn) {
throw new Error ('MongoDB not initialized, call init()');
}
super (name, opts);
this._col = _s_mongo_conn.collection (name);
this._factory = factory;
this._col = factory._mongo_conn.collection (name);
this.ensureIndexes (function (err) {});

@@ -232,27 +223,19 @@ }

}
////////////////////////////////////////////////////////////////////////////////
// statics
//////////////////////////////////////////////////////////////////
static init (opts, cb) {
//////////////////////////////////////////////////////////////////
_s_opts = opts;
if (!_s_opts) _s_opts = {};
var m_url = _s_opts.url || 'mongodb://localhost:27017/keuss';
MongoClient.connect (m_url, function (err, db) {
_s_mongo_conn = db;
cb (err);
});
};
class Factory {
constructor (opts, mongo_conn) {
this._opts = opts;
this._mongo_conn = mongo_conn;
}
//////////////////////////////////////////////////////////////////
static end (cb) {
//////////////////////////////////////////////////////////////////
if (_s_mongo_conn) {
_s_mongo_conn.close ();
_s_mongo_conn = null;
queue (name, opts) {
return new SimpleMongoQueue (name, this, opts);
}
close (cb) {
if (this._mongo_conn) {
this._mongo_conn.close ();
this._mongo_conn = null;
}

@@ -264,7 +247,6 @@

}
//////////////////////////////////////////////////////////////////
static list (cb) {
//////////////////////////////////////////////////////////////////
_s_mongo_conn.collections (function (err, collections) {
list (cb) {
this._mongo_conn.collections (function (err, collections) {
if (err) {

@@ -283,6 +265,15 @@ return cb (err);

}
};
}
function creator (opts, cb) {
var _opts = opts || {};
var m_url = _opts.url || 'mongodb://localhost:27017/keuss';
MongoClient.connect (m_url, function (err, db) {
if (err) return cb (err);
return cb (null, new Factory (_opts, db));
});
}
module.exports = SimpleMongoQueue;
module.exports = creator;

@@ -289,0 +280,0 @@

@@ -10,19 +10,11 @@ 'use strict';

//////////////////////////////////////////////////////////////////
// static data
var _s_rediscl = null;
var _s_opts = null;
class RedisListQueue extends AsyncQueue {
//////////////////////////////////////////////
constructor (name, opts) {
constructor (name, factory, opts) {
//////////////////////////////////////////////
if (!_s_rediscl) {
throw new Error ('Redis not initialized, call init()');
}
super (name, opts);
this._factory = factory;
this._rediscl = factory._rediscl;
this._redis_l_name = 'keuss:q:list:' + this._name;

@@ -56,3 +48,3 @@ }

_s_rediscl.lpush (this._redis_l_name, JSON.stringify (pl), function (err, res) {
this._rediscl.lpush (this._redis_l_name, JSON.stringify (pl), function (err, res) {
if (err) {

@@ -73,3 +65,3 @@ return callback (err);

var self = this;
_s_rediscl.rpop (this._redis_l_name, function (err, res) {
this._rediscl.rpop (this._redis_l_name, function (err, res) {
if (err) {

@@ -98,3 +90,3 @@ return callback (err);

//////////////////////////////////
_s_rediscl.llen (this._redis_l_name, callback);
this._rediscl.llen (this._redis_l_name, callback);
}

@@ -107,3 +99,3 @@

//////////////////////////////////
_s_rediscl.llen (this._redis_l_name, callback);
this._rediscl.llen (this._redis_l_name, callback);
}

@@ -126,21 +118,16 @@

}
////////////////////////////////////////////////////////////////////////////////
// statics
//////////////////////////////////////////////////////////////////
static init (opts, cb) {
//////////////////////////////////////////////////////////////////
_s_opts = opts;
if (!_s_opts) _s_opts = {};
_s_rediscl = RedisConn.conn (_s_opts);
cb ();
};
class Factory {
constructor (opts, rediscl) {
this._opts = opts || {};
this._rediscl = rediscl;
}
//////////////////////////////////////////////////////////////////
static end (cb) {
//////////////////////////////////////////////////////////////////
if (_s_rediscl) _s_rediscl.quit();
queue (name, opts) {
return new RedisListQueue (name, this, opts);
}
close (cb) {
if (this._rediscl) this._rediscl.quit();

@@ -151,9 +138,8 @@ if (cb) {

}
//////////////////////////////////////////////////////////////////
static list (cb) {
//////////////////////////////////////////////////////////////////
list (cb) {
var colls = [];
_s_rediscl.keys ('keuss:q:list:?*', function (err, collections) {
var self = this;
this._rediscl.keys ('keuss:q:list:?*', function (err, collections) {
if (err) return cb (err);

@@ -166,3 +152,3 @@

// add "keuss:stats:redis:list:*" to try to add empty queues
_s_rediscl.keys ('keuss:stats:redis:list:?*', function (err, collections) {
self._rediscl.keys ('keuss:stats:redis:list:?*', function (err, collections) {
if (err) return cb (err);

@@ -179,7 +165,13 @@

}
};
}
module.exports = RedisListQueue;
function creator (opts, cb) {
var _opts = opts || {};
var _rediscl = RedisConn.conn (_opts);
return cb (null, new Factory (_opts, _rediscl));
}
module.exports = creator;

@@ -189,1 +181,2 @@

@@ -12,20 +12,12 @@ 'use strict';

//////////////////////////////////////////////////////////////////
// static data
var _s_rediscl = null;
var _s_opts = null;
class RedisOQ extends AsyncQueue {
//////////////////////////////////////////////
constructor (name, opts) {
constructor (name, factory, opts) {
//////////////////////////////////////////////
if (!_s_rediscl) {
throw new Error ('Redis not initialized, call init()');
}
super (name, opts);
this._roq = new RedisOrderedQueue (this._name);
this._factory = factory;
this._rediscl = factory._rediscl;
this._roq = factory._roq_factory.roq (this._name);
}

@@ -147,28 +139,5 @@

////////////////////////////////////////////////////////////////////////////////
// statics
//////////////////////////////////////////////////////////////////
static init (opts, cb) {
//////////////////////////////////////////////////////////////////
_s_opts = opts;
if (!_s_opts) _s_opts = {};
_s_rediscl = RedisConn.conn (_s_opts);
RedisOrderedQueue.init (_s_rediscl, cb);
}
//////////////////////////////////////////////////////////////////
static end (cb) {
//////////////////////////////////////////////////////////////////
if (_s_rediscl) _s_rediscl.quit();
if (cb) {
return cb ();
}
}
//////////////////////////////////////////////////////////////////
static list (cb) {

@@ -201,7 +170,58 @@ //////////////////////////////////////////////////////////////////

module.exports = RedisOQ;
class Factory {
constructor (opts, rediscl, roq_factory) {
this._opts = opts || {};
this._rediscl = rediscl;
this._roq_factory = roq_factory;
}
queue (name, opts) {
return new RedisOQ (name, this, opts);
}
close (cb) {
if (this._rediscl) this._rediscl.quit();
if (cb) {
return cb ();
}
}
list (cb) {
var colls = [];
var self = this;
this._rediscl.keys ('keuss:q:ordered_queue:index:?*', function (err, collections) {
if (err) return cb (err);
collections.forEach (function (coll) {
colls.push (coll.substring (28))
});
// add "keuss:stats:redis:list:*" to try to add empty queues
this._rediscl.keys ('keuss:stats:redis:oq:?*', function (err, collections) {
if (err) return cb (err);
collections.forEach (function (coll) {
var qname = coll.substring (21);
if (_.indexOf (colls, qname) == -1) colls.push (qname);
});
cb (null, colls);
});
});
}
}
function creator (opts, cb) {
var _opts = opts || {};
var _rediscl = RedisConn.conn (_opts);
var _roq_factory = new RedisOrderedQueue (_rediscl);
return cb (null, new Factory (_opts, _rediscl, _roq_factory));
}
module.exports = creator;

@@ -51,3 +51,3 @@ var async = require ('async');

MQ.init (opts, function (err) {
MQ (opts, function (err, factory) {
if (err) {

@@ -57,3 +57,3 @@ return logger.error (err);

var q = new MQ('test_queue', opts);
var q = factory.queue('test_queue', opts);

@@ -60,0 +60,0 @@ run_consumer (q);

@@ -40,3 +40,3 @@ var async = require ('async');

MQ.init (opts, function (err) {
MQ (opts, function (err, factory) {
if (err) {

@@ -56,5 +56,5 @@ return logger.error (err);

var q = new MQ ('test_queue', q_opts);
var q = factory.queue ('test_queue', q_opts);
run_consumer (q);
});

@@ -32,3 +32,3 @@ var async = require ('async');

run_producer (q);
}, 10);
}, 333);
});

@@ -44,3 +44,3 @@ }

MQ.init (opts, function (err) {
MQ (opts, function (err, factory) {
if (err) {

@@ -60,5 +60,5 @@ return logger.error (err);

var q = new MQ('test_queue', q_opts);
var q = factory.queue ('test_queue', q_opts);
run_producer (q);
});

@@ -113,3 +113,3 @@ var winston = require ('winston');

MQ.init ({logger: logger}, function (err) {
MQ ({logger: logger}, function (err, factory) {
if (err) {

@@ -143,3 +143,3 @@ return logger.error ('MQ.init: %s', err, {});

var q = new MQ ('test', q_opts);
var q = factory.queue ('test', q_opts);

@@ -146,0 +146,0 @@ if (program.consumer) {

{
"name": "keuss",
"version": "0.0.6",
"version": "0.0.7",
"keywords": ["queue", "job"],

@@ -5,0 +5,0 @@ "homepage":"https://github.com/pepmartinez/keuss",

# keuss
Job Queues on selectable backends (mongodb, redis)
An attempt to provide a serverless, persistent and high-available queue middleware supporting delays,
using mongodb and redis to provide most of the backend needs
Still very pre-alpha, basic structure in constant flux

@@ -8,2 +8,3 @@

var factory = null;

@@ -29,3 +30,7 @@ describe ('MongoDB queue backend', function () {

MQ.init (opts, done);
MQ (opts, function (err, fct) {
if (err) return done (err);
factory = fct;
done();
});
});

@@ -35,7 +40,7 @@

after (function (done) {
MQ.end (done);
factory.close (done);
});
it ('queue is created empty and ok', function (done){
var q = new MQ('test_queue');
var q = factory.queue ('test_queue');
should.equal (q.nextMatureDate (), null);

@@ -56,3 +61,3 @@ q.name ().should.equal ('test_queue');

it ('sequential push & pops with no delay, go as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -103,3 +108,3 @@ async.series([

it ('sequential push & pops with delays, go as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -160,3 +165,3 @@ async.series([

it ('timed-out pops work as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -221,3 +226,3 @@ async.series([

it ('pop cancellation works as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -262,3 +267,3 @@ async.series([

it ('simultaneous timed out pops on delayed items go in the expected order', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -318,3 +323,3 @@ var hrTime = process.hrtime()

it ('should do raw reserve & commit as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');
var id = null;

@@ -372,3 +377,3 @@

it ('should do raw reserve & rollback as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');
var id = null;

@@ -461,3 +466,3 @@

it ('should do get.reserve & ok as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');
var id = null;

@@ -464,0 +469,0 @@

@@ -8,2 +8,3 @@

var factory = null;

@@ -28,4 +29,8 @@ describe ('redis-list queue backend', function () {

});
MQ.init (opts, done);
MQ (opts, function (err, fct) {
if (err) return done (err);
factory = fct;
done();
});
});

@@ -35,7 +40,7 @@

after (function (done) {
MQ.end (done);
factory.close (done);
});
it ('queue is created empty and ok', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');
should.equal (q.nextMatureDate (), null);

@@ -56,3 +61,3 @@ q.name ().should.equal ('test_queue');

it ('sequential push & pops with no delay, go as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -99,3 +104,3 @@ async.series([

it ('sequential push & pops with delays, go as expected (delays are ignored)', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -144,3 +149,3 @@ async.series([

it ('pop cancellation works as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -147,0 +152,0 @@ async.series([

@@ -8,2 +8,3 @@

var factory = null;

@@ -28,4 +29,8 @@ describe ('Redis OrderedQueue backend', function () {

});
MQ.init (opts, done);
MQ (opts, function (err, fct) {
if (err) return done (err);
factory = fct;
done();
});
});

@@ -35,7 +40,7 @@

after (function (done) {
MQ.end (done);
factory.close (done);
});
it ('queue is created empty and ok', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');
should.equal (q.nextMatureDate (), null);

@@ -56,3 +61,3 @@ q.name ().should.equal ('test_queue');

it ('sequential push & pops with no delay, go as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -103,3 +108,3 @@ async.series([

it ('sequential push & pops with delays, go as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -160,3 +165,3 @@ async.series([

it ('timed-out pops work as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -221,3 +226,3 @@ async.series([

it ('pop cancellation works as expected', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -262,3 +267,3 @@ async.series([

it ('simultaneous timed out pops on delayed items go in the expected order', function (done){
var q = new MQ('test_queue');
var q = factory.queue('test_queue');

@@ -265,0 +270,0 @@ var hrTime = process.hrtime()

'use strict';
var async = require ('async');
const _s_lua_code_push = `

@@ -43,83 +40,24 @@ -- qname in KEYS[1]

var _s_sha_push = undefined;
var _s_sha_pop = undefined;
var _s_rediscl = undefined;
function _s_load (lua_code, done) {
_s_rediscl.script ('load', lua_code, done);
}
class RedisOrderedQueue {
constructor (name) {
this._rediscl = _s_rediscl;
constructor (name, factory) {
this._factory = factory;
this._rediscl = factory._rediscl;
this._name = name;
}
static init (rediscl, done) {
_s_rediscl = rediscl;
var self = this;
async.series ([
function (cb) {_s_load (_s_lua_code_push, cb)},
function (cb) {_s_load (_s_lua_code_pop, cb)}
], function (err, res) {
if (err) {
return done (err);
}
push (id, mature, obj, done) {
this._rediscl.roq_push (this._name, id, mature, obj, function (err, res) {
if (err) return done (err);
_s_sha_push = res [0];
_s_sha_pop = res [1];
console.log ('RedisOQ funcs loaded');
done ();
// res is 1
done (null, res);
});
}
push (id, mature, obj, done) {
var self = this;
this._rediscl.evalsha (_s_sha_push, 1, this._name, id, mature, obj, function (err, res) {
if (err) {
if (err.message.split(' ')[0] == 'NOSCRIPT') {
RedisOrderedQueue.init (_s_rediscl, function (err) {
if (err) {
return done (err);
}
self.push (id, mature, obj, done);
});
}
else {
done (err);
}
}
else {
// res is '1'
done (null, res);
}
});
}
pop (done) {
var self = this;
this._rediscl.evalsha (_s_sha_pop, 1, this._name, function (err, res) {
if (err) {
if (err.message.split(' ')[0] == 'NOSCRIPT') {
RedisOrderedQueue.init (_s_rediscl, function (err) {
if (err) {
return done (err);
}
self.pop (done);
});
}
else {
done (err);
}
}
else {
// res is [id, mature, text]
done (null, res);
}
this._rediscl.roq_pop (this._name, function (err, res) {
if (err) return done (err);
// res is [id, mature, text]
done (null, res);
});

@@ -163,2 +101,22 @@ }

module.exports = RedisOrderedQueue;
class Factory {
constructor (rediscl) {
this._rediscl = rediscl;
this._rediscl.defineCommand('roq_push', {
numberOfKeys: 1,
lua: _s_lua_code_push
});
this._rediscl.defineCommand('roq_pop', {
numberOfKeys: 1,
lua: _s_lua_code_pop
});
}
roq (name) {
return new RedisOrderedQueue (name, this);
}
}
module.exports = Factory;
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc