Comparing version 0.0.6 to 0.0.7
@@ -13,20 +13,11 @@ 'use strict'; | ||
////////////////////////////////////////////////////////////////// | ||
// static data | ||
var _s_mongo_conn = null; | ||
var _s_opts = null; | ||
class SimpleMongoQueue extends AsyncQueue { | ||
////////////////////////////////////////////// | ||
constructor (name, opts) { | ||
constructor (name, factory, opts) { | ||
////////////////////////////////////////////// | ||
if (!_s_mongo_conn) { | ||
throw new Error ('MongoDB not initialized, call init()'); | ||
} | ||
super (name, opts); | ||
this._col = _s_mongo_conn.collection (name); | ||
this._factory = factory; | ||
this._col = factory._mongo_conn.collection (name); | ||
this.ensureIndexes (function (err) {}); | ||
@@ -232,27 +223,19 @@ } | ||
} | ||
//////////////////////////////////////////////////////////////////////////////// | ||
// statics | ||
////////////////////////////////////////////////////////////////// | ||
static init (opts, cb) { | ||
////////////////////////////////////////////////////////////////// | ||
_s_opts = opts; | ||
if (!_s_opts) _s_opts = {}; | ||
var m_url = _s_opts.url || 'mongodb://localhost:27017/keuss'; | ||
MongoClient.connect (m_url, function (err, db) { | ||
_s_mongo_conn = db; | ||
cb (err); | ||
}); | ||
}; | ||
class Factory { | ||
constructor (opts, mongo_conn) { | ||
this._opts = opts; | ||
this._mongo_conn = mongo_conn; | ||
} | ||
////////////////////////////////////////////////////////////////// | ||
static end (cb) { | ||
////////////////////////////////////////////////////////////////// | ||
if (_s_mongo_conn) { | ||
_s_mongo_conn.close (); | ||
_s_mongo_conn = null; | ||
queue (name, opts) { | ||
return new SimpleMongoQueue (name, this, opts); | ||
} | ||
close (cb) { | ||
if (this._mongo_conn) { | ||
this._mongo_conn.close (); | ||
this._mongo_conn = null; | ||
} | ||
@@ -264,7 +247,6 @@ | ||
} | ||
////////////////////////////////////////////////////////////////// | ||
static list (cb) { | ||
////////////////////////////////////////////////////////////////// | ||
_s_mongo_conn.collections (function (err, collections) { | ||
list (cb) { | ||
this._mongo_conn.collections (function (err, collections) { | ||
if (err) { | ||
@@ -283,6 +265,15 @@ return cb (err); | ||
} | ||
}; | ||
} | ||
function creator (opts, cb) { | ||
var _opts = opts || {}; | ||
var m_url = _opts.url || 'mongodb://localhost:27017/keuss'; | ||
MongoClient.connect (m_url, function (err, db) { | ||
if (err) return cb (err); | ||
return cb (null, new Factory (_opts, db)); | ||
}); | ||
} | ||
module.exports = SimpleMongoQueue; | ||
module.exports = creator; | ||
@@ -289,0 +280,0 @@ |
@@ -10,19 +10,11 @@ 'use strict'; | ||
////////////////////////////////////////////////////////////////// | ||
// static data | ||
var _s_rediscl = null; | ||
var _s_opts = null; | ||
class RedisListQueue extends AsyncQueue { | ||
////////////////////////////////////////////// | ||
constructor (name, opts) { | ||
constructor (name, factory, opts) { | ||
////////////////////////////////////////////// | ||
if (!_s_rediscl) { | ||
throw new Error ('Redis not initialized, call init()'); | ||
} | ||
super (name, opts); | ||
this._factory = factory; | ||
this._rediscl = factory._rediscl; | ||
this._redis_l_name = 'keuss:q:list:' + this._name; | ||
@@ -56,3 +48,3 @@ } | ||
_s_rediscl.lpush (this._redis_l_name, JSON.stringify (pl), function (err, res) { | ||
this._rediscl.lpush (this._redis_l_name, JSON.stringify (pl), function (err, res) { | ||
if (err) { | ||
@@ -73,3 +65,3 @@ return callback (err); | ||
var self = this; | ||
_s_rediscl.rpop (this._redis_l_name, function (err, res) { | ||
this._rediscl.rpop (this._redis_l_name, function (err, res) { | ||
if (err) { | ||
@@ -98,3 +90,3 @@ return callback (err); | ||
////////////////////////////////// | ||
_s_rediscl.llen (this._redis_l_name, callback); | ||
this._rediscl.llen (this._redis_l_name, callback); | ||
} | ||
@@ -107,3 +99,3 @@ | ||
////////////////////////////////// | ||
_s_rediscl.llen (this._redis_l_name, callback); | ||
this._rediscl.llen (this._redis_l_name, callback); | ||
} | ||
@@ -126,21 +118,16 @@ | ||
} | ||
//////////////////////////////////////////////////////////////////////////////// | ||
// statics | ||
////////////////////////////////////////////////////////////////// | ||
static init (opts, cb) { | ||
////////////////////////////////////////////////////////////////// | ||
_s_opts = opts; | ||
if (!_s_opts) _s_opts = {}; | ||
_s_rediscl = RedisConn.conn (_s_opts); | ||
cb (); | ||
}; | ||
class Factory { | ||
constructor (opts, rediscl) { | ||
this._opts = opts || {}; | ||
this._rediscl = rediscl; | ||
} | ||
////////////////////////////////////////////////////////////////// | ||
static end (cb) { | ||
////////////////////////////////////////////////////////////////// | ||
if (_s_rediscl) _s_rediscl.quit(); | ||
queue (name, opts) { | ||
return new RedisListQueue (name, this, opts); | ||
} | ||
close (cb) { | ||
if (this._rediscl) this._rediscl.quit(); | ||
@@ -151,9 +138,8 @@ if (cb) { | ||
} | ||
////////////////////////////////////////////////////////////////// | ||
static list (cb) { | ||
////////////////////////////////////////////////////////////////// | ||
list (cb) { | ||
var colls = []; | ||
_s_rediscl.keys ('keuss:q:list:?*', function (err, collections) { | ||
var self = this; | ||
this._rediscl.keys ('keuss:q:list:?*', function (err, collections) { | ||
if (err) return cb (err); | ||
@@ -166,3 +152,3 @@ | ||
// add "keuss:stats:redis:list:*" to try to add empty queues | ||
_s_rediscl.keys ('keuss:stats:redis:list:?*', function (err, collections) { | ||
self._rediscl.keys ('keuss:stats:redis:list:?*', function (err, collections) { | ||
if (err) return cb (err); | ||
@@ -179,7 +165,13 @@ | ||
} | ||
}; | ||
} | ||
module.exports = RedisListQueue; | ||
function creator (opts, cb) { | ||
var _opts = opts || {}; | ||
var _rediscl = RedisConn.conn (_opts); | ||
return cb (null, new Factory (_opts, _rediscl)); | ||
} | ||
module.exports = creator; | ||
@@ -189,1 +181,2 @@ | ||
@@ -12,20 +12,12 @@ 'use strict'; | ||
////////////////////////////////////////////////////////////////// | ||
// static data | ||
var _s_rediscl = null; | ||
var _s_opts = null; | ||
class RedisOQ extends AsyncQueue { | ||
////////////////////////////////////////////// | ||
constructor (name, opts) { | ||
constructor (name, factory, opts) { | ||
////////////////////////////////////////////// | ||
if (!_s_rediscl) { | ||
throw new Error ('Redis not initialized, call init()'); | ||
} | ||
super (name, opts); | ||
this._roq = new RedisOrderedQueue (this._name); | ||
this._factory = factory; | ||
this._rediscl = factory._rediscl; | ||
this._roq = factory._roq_factory.roq (this._name); | ||
} | ||
@@ -147,28 +139,5 @@ | ||
//////////////////////////////////////////////////////////////////////////////// | ||
// statics | ||
////////////////////////////////////////////////////////////////// | ||
static init (opts, cb) { | ||
////////////////////////////////////////////////////////////////// | ||
_s_opts = opts; | ||
if (!_s_opts) _s_opts = {}; | ||
_s_rediscl = RedisConn.conn (_s_opts); | ||
RedisOrderedQueue.init (_s_rediscl, cb); | ||
} | ||
////////////////////////////////////////////////////////////////// | ||
static end (cb) { | ||
////////////////////////////////////////////////////////////////// | ||
if (_s_rediscl) _s_rediscl.quit(); | ||
if (cb) { | ||
return cb (); | ||
} | ||
} | ||
////////////////////////////////////////////////////////////////// | ||
static list (cb) { | ||
@@ -201,7 +170,58 @@ ////////////////////////////////////////////////////////////////// | ||
module.exports = RedisOQ; | ||
class Factory { | ||
constructor (opts, rediscl, roq_factory) { | ||
this._opts = opts || {}; | ||
this._rediscl = rediscl; | ||
this._roq_factory = roq_factory; | ||
} | ||
queue (name, opts) { | ||
return new RedisOQ (name, this, opts); | ||
} | ||
close (cb) { | ||
if (this._rediscl) this._rediscl.quit(); | ||
if (cb) { | ||
return cb (); | ||
} | ||
} | ||
list (cb) { | ||
var colls = []; | ||
var self = this; | ||
this._rediscl.keys ('keuss:q:ordered_queue:index:?*', function (err, collections) { | ||
if (err) return cb (err); | ||
collections.forEach (function (coll) { | ||
colls.push (coll.substring (28)) | ||
}); | ||
// add "keuss:stats:redis:list:*" to try to add empty queues | ||
this._rediscl.keys ('keuss:stats:redis:oq:?*', function (err, collections) { | ||
if (err) return cb (err); | ||
collections.forEach (function (coll) { | ||
var qname = coll.substring (21); | ||
if (_.indexOf (colls, qname) == -1) colls.push (qname); | ||
}); | ||
cb (null, colls); | ||
}); | ||
}); | ||
} | ||
} | ||
function creator (opts, cb) { | ||
var _opts = opts || {}; | ||
var _rediscl = RedisConn.conn (_opts); | ||
var _roq_factory = new RedisOrderedQueue (_rediscl); | ||
return cb (null, new Factory (_opts, _rediscl, _roq_factory)); | ||
} | ||
module.exports = creator; | ||
@@ -51,3 +51,3 @@ var async = require ('async'); | ||
MQ.init (opts, function (err) { | ||
MQ (opts, function (err, factory) { | ||
if (err) { | ||
@@ -57,3 +57,3 @@ return logger.error (err); | ||
var q = new MQ('test_queue', opts); | ||
var q = factory.queue('test_queue', opts); | ||
@@ -60,0 +60,0 @@ run_consumer (q); |
@@ -40,3 +40,3 @@ var async = require ('async'); | ||
MQ.init (opts, function (err) { | ||
MQ (opts, function (err, factory) { | ||
if (err) { | ||
@@ -56,5 +56,5 @@ return logger.error (err); | ||
var q = new MQ ('test_queue', q_opts); | ||
var q = factory.queue ('test_queue', q_opts); | ||
run_consumer (q); | ||
}); |
@@ -32,3 +32,3 @@ var async = require ('async'); | ||
run_producer (q); | ||
}, 10); | ||
}, 333); | ||
}); | ||
@@ -44,3 +44,3 @@ } | ||
MQ.init (opts, function (err) { | ||
MQ (opts, function (err, factory) { | ||
if (err) { | ||
@@ -60,5 +60,5 @@ return logger.error (err); | ||
var q = new MQ('test_queue', q_opts); | ||
var q = factory.queue ('test_queue', q_opts); | ||
run_producer (q); | ||
}); |
@@ -113,3 +113,3 @@ var winston = require ('winston'); | ||
MQ.init ({logger: logger}, function (err) { | ||
MQ ({logger: logger}, function (err, factory) { | ||
if (err) { | ||
@@ -143,3 +143,3 @@ return logger.error ('MQ.init: %s', err, {}); | ||
var q = new MQ ('test', q_opts); | ||
var q = factory.queue ('test', q_opts); | ||
@@ -146,0 +146,0 @@ if (program.consumer) { |
{ | ||
"name": "keuss", | ||
"version": "0.0.6", | ||
"version": "0.0.7", | ||
"keywords": ["queue", "job"], | ||
@@ -5,0 +5,0 @@ "homepage":"https://github.com/pepmartinez/keuss", |
# keuss | ||
Job Queues on selectable backends (mongodb, redis) | ||
An attempt to provide a serverless, persistent and high-available queue middleware supporting delays, | ||
using mongodb and redis to provide most of the backend needs | ||
Still very pre-alpha, basic structure in constant flux |
@@ -8,2 +8,3 @@ | ||
var factory = null; | ||
@@ -29,3 +30,7 @@ describe ('MongoDB queue backend', function () { | ||
MQ.init (opts, done); | ||
MQ (opts, function (err, fct) { | ||
if (err) return done (err); | ||
factory = fct; | ||
done(); | ||
}); | ||
}); | ||
@@ -35,7 +40,7 @@ | ||
after (function (done) { | ||
MQ.end (done); | ||
factory.close (done); | ||
}); | ||
it ('queue is created empty and ok', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue ('test_queue'); | ||
should.equal (q.nextMatureDate (), null); | ||
@@ -56,3 +61,3 @@ q.name ().should.equal ('test_queue'); | ||
it ('sequential push & pops with no delay, go as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -103,3 +108,3 @@ async.series([ | ||
it ('sequential push & pops with delays, go as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -160,3 +165,3 @@ async.series([ | ||
it ('timed-out pops work as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -221,3 +226,3 @@ async.series([ | ||
it ('pop cancellation works as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -262,3 +267,3 @@ async.series([ | ||
it ('simultaneous timed out pops on delayed items go in the expected order', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -318,3 +323,3 @@ var hrTime = process.hrtime() | ||
it ('should do raw reserve & commit as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
var id = null; | ||
@@ -372,3 +377,3 @@ | ||
it ('should do raw reserve & rollback as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
var id = null; | ||
@@ -461,3 +466,3 @@ | ||
it ('should do get.reserve & ok as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
var id = null; | ||
@@ -464,0 +469,0 @@ |
@@ -8,2 +8,3 @@ | ||
var factory = null; | ||
@@ -28,4 +29,8 @@ describe ('redis-list queue backend', function () { | ||
}); | ||
MQ.init (opts, done); | ||
MQ (opts, function (err, fct) { | ||
if (err) return done (err); | ||
factory = fct; | ||
done(); | ||
}); | ||
}); | ||
@@ -35,7 +40,7 @@ | ||
after (function (done) { | ||
MQ.end (done); | ||
factory.close (done); | ||
}); | ||
it ('queue is created empty and ok', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
should.equal (q.nextMatureDate (), null); | ||
@@ -56,3 +61,3 @@ q.name ().should.equal ('test_queue'); | ||
it ('sequential push & pops with no delay, go as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -99,3 +104,3 @@ async.series([ | ||
it ('sequential push & pops with delays, go as expected (delays are ignored)', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -144,3 +149,3 @@ async.series([ | ||
it ('pop cancellation works as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -147,0 +152,0 @@ async.series([ |
@@ -8,2 +8,3 @@ | ||
var factory = null; | ||
@@ -28,4 +29,8 @@ describe ('Redis OrderedQueue backend', function () { | ||
}); | ||
MQ.init (opts, done); | ||
MQ (opts, function (err, fct) { | ||
if (err) return done (err); | ||
factory = fct; | ||
done(); | ||
}); | ||
}); | ||
@@ -35,7 +40,7 @@ | ||
after (function (done) { | ||
MQ.end (done); | ||
factory.close (done); | ||
}); | ||
it ('queue is created empty and ok', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
should.equal (q.nextMatureDate (), null); | ||
@@ -56,3 +61,3 @@ q.name ().should.equal ('test_queue'); | ||
it ('sequential push & pops with no delay, go as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -103,3 +108,3 @@ async.series([ | ||
it ('sequential push & pops with delays, go as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -160,3 +165,3 @@ async.series([ | ||
it ('timed-out pops work as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -221,3 +226,3 @@ async.series([ | ||
it ('pop cancellation works as expected', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -262,3 +267,3 @@ async.series([ | ||
it ('simultaneous timed out pops on delayed items go in the expected order', function (done){ | ||
var q = new MQ('test_queue'); | ||
var q = factory.queue('test_queue'); | ||
@@ -265,0 +270,0 @@ var hrTime = process.hrtime() |
'use strict'; | ||
var async = require ('async'); | ||
const _s_lua_code_push = ` | ||
@@ -43,83 +40,24 @@ -- qname in KEYS[1] | ||
var _s_sha_push = undefined; | ||
var _s_sha_pop = undefined; | ||
var _s_rediscl = undefined; | ||
function _s_load (lua_code, done) { | ||
_s_rediscl.script ('load', lua_code, done); | ||
} | ||
class RedisOrderedQueue { | ||
constructor (name) { | ||
this._rediscl = _s_rediscl; | ||
constructor (name, factory) { | ||
this._factory = factory; | ||
this._rediscl = factory._rediscl; | ||
this._name = name; | ||
} | ||
static init (rediscl, done) { | ||
_s_rediscl = rediscl; | ||
var self = this; | ||
async.series ([ | ||
function (cb) {_s_load (_s_lua_code_push, cb)}, | ||
function (cb) {_s_load (_s_lua_code_pop, cb)} | ||
], function (err, res) { | ||
if (err) { | ||
return done (err); | ||
} | ||
push (id, mature, obj, done) { | ||
this._rediscl.roq_push (this._name, id, mature, obj, function (err, res) { | ||
if (err) return done (err); | ||
_s_sha_push = res [0]; | ||
_s_sha_pop = res [1]; | ||
console.log ('RedisOQ funcs loaded'); | ||
done (); | ||
// res is 1 | ||
done (null, res); | ||
}); | ||
} | ||
push (id, mature, obj, done) { | ||
var self = this; | ||
this._rediscl.evalsha (_s_sha_push, 1, this._name, id, mature, obj, function (err, res) { | ||
if (err) { | ||
if (err.message.split(' ')[0] == 'NOSCRIPT') { | ||
RedisOrderedQueue.init (_s_rediscl, function (err) { | ||
if (err) { | ||
return done (err); | ||
} | ||
self.push (id, mature, obj, done); | ||
}); | ||
} | ||
else { | ||
done (err); | ||
} | ||
} | ||
else { | ||
// res is '1' | ||
done (null, res); | ||
} | ||
}); | ||
} | ||
pop (done) { | ||
var self = this; | ||
this._rediscl.evalsha (_s_sha_pop, 1, this._name, function (err, res) { | ||
if (err) { | ||
if (err.message.split(' ')[0] == 'NOSCRIPT') { | ||
RedisOrderedQueue.init (_s_rediscl, function (err) { | ||
if (err) { | ||
return done (err); | ||
} | ||
self.pop (done); | ||
}); | ||
} | ||
else { | ||
done (err); | ||
} | ||
} | ||
else { | ||
// res is [id, mature, text] | ||
done (null, res); | ||
} | ||
this._rediscl.roq_pop (this._name, function (err, res) { | ||
if (err) return done (err); | ||
// res is [id, mature, text] | ||
done (null, res); | ||
}); | ||
@@ -163,2 +101,22 @@ } | ||
module.exports = RedisOrderedQueue; | ||
class Factory { | ||
constructor (rediscl) { | ||
this._rediscl = rediscl; | ||
this._rediscl.defineCommand('roq_push', { | ||
numberOfKeys: 1, | ||
lua: _s_lua_code_push | ||
}); | ||
this._rediscl.defineCommand('roq_pop', { | ||
numberOfKeys: 1, | ||
lua: _s_lua_code_pop | ||
}); | ||
} | ||
roq (name) { | ||
return new RedisOrderedQueue (name, this); | ||
} | ||
} | ||
module.exports = Factory; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
8
127209
2747