Comparing version 1.3.8 to 1.4.0
{ | ||
"name": "keuss", | ||
"version": "1.3.8", | ||
"version": "1.4.0", | ||
"keywords": [ | ||
@@ -5,0 +5,0 @@ "queue", |
@@ -12,2 +12,4 @@ 'use strict'; | ||
this._opts = opts || {}; | ||
this._name = opts.name || 'N'; | ||
@@ -40,2 +42,6 @@ if (!this._opts.stats) this._opts.stats = {}; | ||
name () { | ||
return this._name; | ||
} | ||
type () { | ||
@@ -51,3 +57,3 @@ return 'none'; | ||
// use stats factory | ||
this._stats_factory.queues (this.type (), opts, cb); | ||
this._stats_factory.queues (this.name (), opts, cb); | ||
} | ||
@@ -54,0 +60,0 @@ |
@@ -36,3 +36,3 @@ 'use strict'; | ||
this._signaller = factory._signaller_factory.signal (this, this._opts.signaller.opts); | ||
this._stats = factory._stats_factory.stats (this.type (), this.name (), this._opts.stats.opts); | ||
this._stats = factory._stats_factory.stats (factory.name(), this.name (), this._opts.stats.opts); | ||
@@ -89,2 +89,3 @@ // save opts minus stats & signaller | ||
name () {return this._name;} | ||
ns () {return this._factory.name();} | ||
type () {return 'queue:base';} | ||
@@ -91,0 +92,0 @@ |
@@ -8,3 +8,3 @@ 'use strict'; | ||
this._master = master; | ||
this._name = 'signal:' + master._name; | ||
this._name = 'signal:' + master.ns() + ':' + master.name(); | ||
@@ -11,0 +11,0 @@ this._bufferTime = this._opts.bufferTime || 50; //msec |
@@ -11,3 +11,3 @@ 'use strict'; | ||
this._factory = factory; | ||
this._channel = 'keuss:q:signal:' + queue.type () + ':' + queue.name (); | ||
this._channel = 'keuss:signal:' + queue.ns () + ':' + queue.name (); | ||
@@ -14,0 +14,0 @@ var self = this; |
@@ -13,3 +13,3 @@ 'use strict'; | ||
this._topic_name = 'keuss:q:signal:' + queue.type () + ':' + queue.name (); | ||
this._topic_name = 'keuss:signal:' + queue.ns () + ':' + queue.name (); | ||
this._opts = opts || {}; | ||
@@ -16,0 +16,0 @@ var self = this; |
@@ -13,3 +13,3 @@ 'use strict'; | ||
this._channel = 'keuss:q:signal:' + queue.type () + ':' + queue.name (); | ||
this._channel = 'keuss:signal:' + queue.ns () + ':' + queue.name (); | ||
this._opts = opts || {}; | ||
@@ -16,0 +16,0 @@ var self = this; |
@@ -6,5 +6,7 @@ 'use strict'; | ||
class MemStats { | ||
constructor (factory) { | ||
constructor (ns, name, factory) { | ||
this._factory = factory; | ||
this._s = { | ||
ns: ns, | ||
name: name, | ||
counters: {}, | ||
@@ -19,3 +21,11 @@ opts: {}, | ||
} | ||
ns () { | ||
return this._s.ns; | ||
} | ||
name () { | ||
return this._s.name; | ||
} | ||
values (cb) { | ||
@@ -67,8 +77,6 @@ cb (null, this._s.counters); | ||
clear (cb) { | ||
this._s = { | ||
counters: {}, | ||
opts: {}, | ||
topology: {} | ||
}; | ||
this._s.counters = {} | ||
this._s.opts = {}; | ||
this._s.topology = {}; | ||
// TODO remove from factory | ||
@@ -82,3 +90,3 @@ | ||
constructor (opts) { | ||
// map of created queues' stats: root -> qclass -> queues | ||
// map of created queues' stats: root -> ns -> queues | ||
this._queues = {}; | ||
@@ -95,3 +103,3 @@ } | ||
queues (qclass, opts, cb) { | ||
queues (ns, opts, cb) { | ||
if (!cb) { | ||
@@ -102,3 +110,3 @@ cb = opts; | ||
var cls = this._queues[qclass]; | ||
var cls = this._queues[ns]; | ||
@@ -129,6 +137,6 @@ if (opts.full) { | ||
stats (qclass, name, opts) { | ||
if (!this._queues[qclass]) this._queues[qclass] = {}; | ||
var st = new MemStats (this); | ||
this._queues[qclass][name] = st; | ||
stats (ns, name, opts) { | ||
if (!this._queues[ns]) this._queues[ns] = {}; | ||
var st = new MemStats (ns, name, this); | ||
this._queues[ns][name] = st; | ||
return st; | ||
@@ -135,0 +143,0 @@ } |
@@ -15,7 +15,22 @@ 'use strict'; | ||
class MongoStats { | ||
constructor(name, factory, opts) { | ||
this._name = 'keuss:stats:' + name; | ||
constructor(ns, name, factory, opts) { | ||
this._ns = ns; | ||
this._name = name; | ||
this._id = 'keuss:stats:' + ns + ':' + name; | ||
this._opts = opts || {}; | ||
this._factory = factory; | ||
this._cache = {}; | ||
this._ensure_conn (err => { | ||
if (err) return; | ||
var upd = { | ||
$set: { | ||
ns: this._ns, | ||
name: this._name, | ||
} | ||
}; | ||
this._coll().updateOne ({_id: this._id}, upd, {upsert: true}, (err, ret) => {}); | ||
}); | ||
} | ||
@@ -28,3 +43,11 @@ | ||
ns () { | ||
return this._ns; | ||
} | ||
name () { | ||
return this._name; | ||
} | ||
values(cb) { | ||
@@ -35,3 +58,3 @@ var self = this; | ||
self._coll().findOne ({_id: self._name}, {fields: {counters: 1}}, function (err, res) { | ||
self._coll().findOne ({_id: self._id}, {fields: {counters: 1}}, function (err, res) { | ||
if (err) return cb (err); | ||
@@ -65,3 +88,3 @@ // ('mongo stats: get %s -> %j', self._name, res); | ||
self._coll().updateOne ({_id: self._name}, upd, {upsert: true}, function (err) { | ||
self._coll().updateOne ({_id: self._id}, upd, {upsert: true}, function (err) { | ||
// ('mongo stats: updated %s -> %j', self._name, upd); | ||
@@ -122,3 +145,3 @@ }); | ||
self._coll().findOne ({_id: self._name}, {fields: {opts: 1}}, function (err, res) { | ||
self._coll().findOne ({_id: self._id}, {fields: {opts: 1}}, function (err, res) { | ||
if (err) return cb (err); | ||
@@ -136,3 +159,3 @@ // ('mongo stats - opts: get %s -> %j', self._name, res); | ||
var upd = {$set: {opts : opts}}; | ||
self._coll().updateOne ({_id: self._name}, upd, {upsert: true}, function (err) { | ||
self._coll().updateOne ({_id: self._id}, upd, {upsert: true}, function (err) { | ||
// ('mongo stats: updated %s -> %j', self._name, upd); | ||
@@ -154,3 +177,3 @@ cb (err); | ||
self._coll().findOne ({_id: self._name}, {fields: {topology: 1}}, function (err, res) { | ||
self._coll().findOne ({_id: self._id}, {fields: {topology: 1}}, function (err, res) { | ||
if (err) return cb (err); | ||
@@ -168,3 +191,4 @@ // ('mongo stats - topology: get %s -> %j', self._name, res); | ||
var upd = {$set: {topology : tplg}}; | ||
self._coll().updateOne ({_id: self._name}, upd, {upsert: true}, function (err) { | ||
self._coll().updateOne ({_id: self._id}, upd, {upsert: true}, function (err) { | ||
// ('mongo stats: updated %s -> %j', self._name, upd); | ||
@@ -185,3 +209,11 @@ cb (err); | ||
self._coll().deleteOne ({_id: self._name}, function (err) { | ||
var upd = { | ||
$unset: { | ||
counters: 1, | ||
opts: 1, | ||
topology: 1 | ||
} | ||
}; | ||
self._coll().updateOne ({_id: self._id}, upd, function (err) { | ||
cb (err); | ||
@@ -203,7 +235,7 @@ }); | ||
stats(qclass, name, opts) { | ||
return new MongoStats (qclass + ':' + name, this); | ||
stats(ns, name, opts) { | ||
return new MongoStats (ns, name, this); | ||
} | ||
queues (qclass, opts, cb) { | ||
queues (ns, opts, cb) { | ||
if (!cb) { | ||
@@ -220,3 +252,3 @@ cb = opts; | ||
if (opts.full) { | ||
self._coll.find().toArray (function (err, arr) { | ||
self._coll.find({_id: {$regex: '^keuss:stats:' + ns}}).toArray (function (err, arr) { | ||
if (err) return cb (err); | ||
@@ -226,3 +258,5 @@ | ||
arr.forEach (function (elem){ | ||
res [elem._id.substring(13 + qclass.length)] = { | ||
res [elem.name] = { | ||
ns: elem.ns, | ||
name: elem.name, | ||
counters: elem.counters, | ||
@@ -238,3 +272,3 @@ topology: elem.topology, | ||
else { | ||
self._coll.find().project ({_id: 1}).toArray (function (err, arr) { | ||
self._coll.find({_id: {$regex: '^keuss:stats:' + ns}}).project ({_id: 1, name: 1}).toArray (function (err, arr) { | ||
if (err) return cb (err); | ||
@@ -244,3 +278,3 @@ | ||
arr.forEach (function (elem){ | ||
res.push (elem._id.substring(13 + qclass.length)); | ||
res.push (elem.name); | ||
}); | ||
@@ -247,0 +281,0 @@ |
@@ -15,9 +15,11 @@ 'use strict'; | ||
* stores in: | ||
* - counters in keuss:stats:<qclass>:<name>:counter_<counter> -> int | ||
* - opts (queue creation opts) in keuss:stats:<qclass>:<name>:opts -> string/json | ||
* - topology in keuss:stats:<qclass>:<name>:topology -> string/json | ||
* - counters in keuss:stats:<ns>:<name>:counter_<counter> -> int | ||
* - opts (queue creation opts) in keuss:stats:<ns>:<name>:opts -> string/json | ||
* - topology in keuss:stats:<ns>:<name>:topology -> string/json | ||
*/ | ||
class RedisStats { | ||
constructor(name, factory, opts) { | ||
this._name = 'keuss:stats:' + name; | ||
constructor(ns, name, factory, opts) { | ||
this._ns = ns; | ||
this._name = name; | ||
this._id = 'keuss:stats:' + ns + ':' + name; | ||
this._opts = opts || {}; | ||
@@ -27,2 +29,5 @@ this._factory = factory; | ||
this._cache = {}; | ||
this._rediscl.hset (this._id, 'name', this._name); | ||
this._rediscl.hset (this._id, 'ns', this._ns); | ||
} | ||
@@ -35,5 +40,13 @@ | ||
ns () { | ||
return this._ns; | ||
} | ||
name () { | ||
return this._name; | ||
} | ||
values(cb) { | ||
this._rediscl.hgetall (this._name, function (err, v) { | ||
this._rediscl.hgetall (this._id, function (err, v) { | ||
if (err) { | ||
@@ -62,3 +75,3 @@ return cb(err); | ||
if (value) { | ||
self._rediscl.hincrby(self._name, 'counter_' + key, value); | ||
self._rediscl.hincrby(self._id, 'counter_' + key, value); | ||
// ('stats-redis: flushed (%s) %d -> %s', self._name, value, key); | ||
@@ -103,3 +116,3 @@ self._cache[key] = 0; | ||
cb = opts; | ||
this._rediscl.hget (this._name, 'opts', function (err, res){ | ||
this._rediscl.hget (this._id, 'opts', function (err, res){ | ||
if (err) return cb(err); | ||
@@ -118,3 +131,3 @@ if (!res) return cb(null, {}); | ||
// set | ||
this._rediscl.hset (this._name, 'opts', JSON.stringify (opts || {}), cb); | ||
this._rediscl.hset (this._id, 'opts', JSON.stringify (opts || {}), cb); | ||
} | ||
@@ -127,3 +140,3 @@ } | ||
cb = tplg; | ||
this._rediscl.hget (this._name, 'topology', function (err, res){ | ||
this._rediscl.hget (this._id, 'topology', function (err, res){ | ||
if (err) return cb(err); | ||
@@ -142,3 +155,3 @@ if (!res) return cb(null, {}); | ||
// set | ||
this._rediscl.hset (this._name, 'topology', JSON.stringify (tplg), cb); | ||
this._rediscl.hset (this._id, 'topology', JSON.stringify (tplg), cb); | ||
} | ||
@@ -151,5 +164,18 @@ } | ||
var self = this; | ||
var tasks = [ | ||
function (cb) {self._rediscl.hdel(self._id, 'opts', cb);}, | ||
function (cb) {self._rediscl.hdel(self._id, 'topology', cb);} | ||
]; | ||
this._rediscl.del(this._name, function (err, res) { | ||
if (cb) cb(err); | ||
this.values ((err, vals) => { | ||
_.forEach (vals, (v, k) => { | ||
tasks.push (function (cb) { | ||
self._rediscl.hdel(self._id, 'counter_' + k, cb); | ||
}); | ||
}); | ||
async.series (tasks, function (err) { | ||
if (cb) cb(err); | ||
}); | ||
}); | ||
@@ -168,7 +194,7 @@ } | ||
stats(qclass, name, opts) { | ||
return new RedisStats (qclass + ':' + name, this); | ||
stats(ns, name, opts) { | ||
return new RedisStats (ns, name, this); | ||
} | ||
queues (qclass, opts, cb) { | ||
queues (ns, opts, cb) { | ||
if (!cb) { | ||
@@ -181,3 +207,3 @@ cb = opts; | ||
this._rediscl.keys('keuss:stats:' + qclass + ':?*', function (err, queues) { | ||
this._rediscl.keys('keuss:stats:' + ns + ':?*', function (err, queues) { | ||
if (err) return cb(err); | ||
@@ -188,3 +214,3 @@ | ||
queues.forEach(function (q) { | ||
tasks[q.substring(13 + qclass.length)] = function (cb) { | ||
tasks[q.substring(13 + ns.length)] = function (cb) { | ||
self._rediscl.hgetall (q, function (err, v) { | ||
@@ -221,3 +247,3 @@ if (err) { | ||
queues.forEach(function (q) { | ||
var qname = q.substring(13 + qclass.length); | ||
var qname = q.substring(13 + ns.length); | ||
ret.push(qname); | ||
@@ -224,0 +250,0 @@ }); |
@@ -11,3 +11,3 @@ | ||
var qclass = 'some-class'; | ||
var ns = 'some-class'; | ||
var name = 'test-stats'; | ||
@@ -28,3 +28,3 @@ | ||
var ftry = new CL (); | ||
var mem = ftry.stats (qclass, name); | ||
var mem = ftry.stats (ns, name); | ||
mem.values (function (err, vals) { | ||
@@ -39,3 +39,3 @@ vals.should.eql ({}); | ||
var ftry = new CL (); | ||
var mem = ftry.stats (qclass, name); | ||
var mem = ftry.stats (ns, name); | ||
@@ -48,2 +48,4 @@ async.series([ | ||
setTimeout (function () { | ||
mem.ns().should.equal (ns); | ||
mem.name().should.equal (name); | ||
mem.values (function (err, vals) { | ||
@@ -62,3 +64,3 @@ vals.should.eql ({v1: 1, v2: 1, v3: 1}); | ||
var ftry = new CL (); | ||
var mem = ftry.stats (qclass, name); | ||
var mem = ftry.stats (ns, name); | ||
@@ -85,3 +87,3 @@ async.series([ | ||
var ftry = new CL (); | ||
var mem = ftry.stats (qclass, name); | ||
var mem = ftry.stats (ns, name); | ||
@@ -106,3 +108,3 @@ async.series([ | ||
var ftry = new CL (); | ||
var mem = ftry.stats (qclass, name); | ||
var mem = ftry.stats (ns, name); | ||
@@ -127,3 +129,3 @@ async.series([ | ||
var ftry = new CL (); | ||
var mem = ftry.stats (qclass, name); | ||
var mem = ftry.stats (ns, name); | ||
@@ -154,3 +156,3 @@ async.series([ | ||
var ftry = new CL (); | ||
var mem = ftry.stats (qclass, name); | ||
var mem = ftry.stats (ns, name); | ||
var topology = {a:1, b: {t:'yy', tt: 99}}; | ||
@@ -183,4 +185,4 @@ | ||
var ftry = new CL (); | ||
var mem1 = ftry.stats (qclass, name); | ||
var mem2 = ftry.stats (qclass, name + '-2'); | ||
var mem1 = ftry.stats (ns, name); | ||
var mem2 = ftry.stats (ns, name + '-2'); | ||
var opts1 = {s:0, a: 'yy'}; | ||
@@ -204,3 +206,3 @@ var opts2 = {s:7, at: 'yy--j'}; | ||
setTimeout (function () { | ||
ftry.queues (qclass, function (err, res) { | ||
ftry.queues (ns, function (err, res) { | ||
if (err) return cb (err); | ||
@@ -214,7 +216,7 @@ res.sort().should.eql ([ 'test-stats', 'test-stats-2' ]); | ||
setTimeout (function () { | ||
ftry.queues (qclass, {full: true}, function (err, res) { | ||
ftry.queues (ns, {full: true}, function (err, res) { | ||
if (err) return cb (err); | ||
res.should.eql ({ | ||
'test-stats': { topology: topology1, opts: opts1, counters: {v1: 8, v2: 6 } }, | ||
'test-stats-2': { topology: topology2, opts: opts2, counters: {v1: 4, v3: 45} } | ||
'test-stats': { name: 'test-stats', ns: 'some-class', topology: topology1, opts: opts1, counters: {v1: 8, v2: 6 } }, | ||
'test-stats-2': { name: 'test-stats-2', ns: 'some-class', topology: topology2, opts: opts2, counters: {v1: 4, v3: 45} } | ||
}); | ||
@@ -234,3 +236,3 @@ | ||
var ftry = new CL (); | ||
var mem = ftry.stats (qclass, name); | ||
var mem = ftry.stats (ns, name); | ||
@@ -237,0 +239,0 @@ async.series([ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
196976
4565