Comparing version 1.6.15 to 1.7.0
@@ -17,2 +17,4 @@ var async = require ('async'); | ||
if (!this._opts.ttl) this._opts.ttl = 3600; | ||
this._factory = factory; | ||
@@ -54,3 +56,6 @@ this._col = factory._db.collection (name); | ||
var updt = { | ||
$set: {processed: new Date ()} | ||
$set: { | ||
processed: new Date (), | ||
mature: Queue.nowPlusSecs (100 * this._opts.ttl) | ||
} | ||
}; | ||
@@ -83,3 +88,6 @@ | ||
var update = { | ||
$set: {mature: Queue.nowPlusSecs (delay), reserved: new Date ()}, | ||
$set: { | ||
mature: Queue.nowPlusSecs (delay), | ||
reserved: new Date () | ||
}, | ||
$inc: {tries: 1} | ||
@@ -119,3 +127,6 @@ }; | ||
var updt = { | ||
$set: {processed: new Date ()}, | ||
$set: { | ||
processed: new Date (), | ||
mature: Queue.nowPlusSecs (100 * this._opts.ttl) | ||
}, | ||
$unset: {reserved: ''} | ||
@@ -233,3 +244,7 @@ }; | ||
var updt = { | ||
$set: {processed: new Date (), removed: true}, | ||
$set: { | ||
processed: new Date (), | ||
mature: Queue.nowPlusSecs (100 * this._opts.ttl), | ||
removed: true | ||
}, | ||
}; | ||
@@ -269,3 +284,3 @@ | ||
if (err) return cb (err); | ||
this._col.createIndex({processed: 1}, {expireAfterSeconds: this._opts.ttl || 3600}, err => cb (err)); | ||
this._col.createIndex({processed: 1}, {expireAfterSeconds: this._opts.ttl}, err => cb (err)); | ||
}); | ||
@@ -272,0 +287,0 @@ } |
{ | ||
"name": "keuss", | ||
"version": "1.6.15", | ||
"version": "1.7.0", | ||
"keywords": [ | ||
@@ -5,0 +5,0 @@ "queue", |
26
Queue.js
@@ -101,2 +101,5 @@ var async = require ('async'); | ||
// extra information & status | ||
extra_info (cb) {cb (null, {});} | ||
// end of expected redefinitions on subclasses | ||
@@ -117,3 +120,26 @@ //////////////////////////////////////////////////////////////////////////// | ||
} | ||
info (cb) { | ||
async.parallel ({ | ||
size: cb => this.size (cb), | ||
totalSize: cb => this.totalSize (cb), | ||
schedSize: cb => this.schedSize (cb), | ||
resvSize: cb => this.resvSize (cb), | ||
next_t: cb => this.next_t (cb), | ||
stats: cb => this.stats (cb), | ||
paused: cb => this.paused (cb), | ||
extra: cb => this.extra_info (cb), | ||
}, (err, res) => { | ||
if (err) return cb (err); | ||
res.name = this.name(); | ||
res.ns = this.ns(); | ||
res.type = this.type(); | ||
res.capabilities = this.capabilities(); | ||
cb (null, res); | ||
}); | ||
} | ||
// T of next mature | ||
@@ -120,0 +146,0 @@ nextMatureDate () {return this._next_mature_t;} |
@@ -39,11 +39,8 @@ var _ = require ('lodash'); | ||
incr (v, delta, cb) { | ||
if (!this._s.counters[v]) { | ||
this._s.counters[v] = 0; | ||
} | ||
if ((delta == null) || (delta == undefined)) delta = 1; | ||
var old_v = this._s.counters[v] | ||
debug ('incr %s by %d: %d --> %d', v, delta, old_v, this._s.counters[v]); | ||
this._s.counters[v] = this._s.counters[v] + delta; | ||
if (cb) cb(null, this._s.counters[v]); | ||
const old_v = _.get (this._s.counters, v, 0); | ||
_.set (this._s.counters, v, old_v + delta); | ||
const new_v = _.get (this._s.counters, v); | ||
debug ('incr %s by %d: %d --> %d', v, delta, old_v, new_v); | ||
if (cb) cb(null, new_v); | ||
} | ||
@@ -50,0 +47,0 @@ |
@@ -40,3 +40,3 @@ var _ = require ('lodash'); | ||
// filter counters | ||
if (k.startsWith ('counter_')) ret[k.substr (8)] = parseInt(v[k]); | ||
if (k.startsWith ('counter_')) _.set (ret, k.substring (8), parseInt(v[k])); | ||
} | ||
@@ -48,2 +48,16 @@ | ||
_raw_values (cb) { | ||
this._rediscl.hgetall (this._id, (err, v) => { | ||
if (err) return cb(err); | ||
var ret = {}; | ||
for (let k in v) { | ||
if (k.startsWith ('counter_')) ret[k.substring (8)] = v[k]; | ||
} | ||
cb (null, ret); | ||
}); | ||
} | ||
paused (val, cb) { | ||
@@ -149,3 +163,3 @@ if (!cb) { | ||
this.values ((err, vals) => { | ||
this._raw_values ((err, vals) => { | ||
_.forEach (vals, (v, k) => { | ||
@@ -152,0 +166,0 @@ tasks.push (cb => this._rediscl.hdel (this._id, 'counter_' + k, cb)); |
@@ -83,2 +83,3 @@ var async = require ('async'); | ||
{label: 'Tape MongoDB', mq: require ('../backends/ps-mongo')}, | ||
{label: 'Stream MongoDB', mq: require ('../backends/stream-mongo')}, | ||
{label: 'Redis OrderedQueue', mq: require ('../backends/redis-oq')}, | ||
@@ -163,4 +164,4 @@ {label: 'MongoDB SafeBucket', mq: require ('../backends/bucket-mongo-safe')} | ||
], (err, res) => { | ||
res[1].tsize.should.equal (0); | ||
res[2].tsize.should.equal (0); | ||
res[1].tsize.should.equal (0); | ||
res[2].tsize.should.equal (0); | ||
@@ -214,4 +215,4 @@ cb (err, q, factory); | ||
], (err, res) => { | ||
res[1].tsize.should.equal (0); | ||
res[2].tsize.should.equal (0); | ||
res[1].tsize.should.equal (0); | ||
res[2].tsize.should.equal (0); | ||
@@ -218,0 +219,0 @@ cb (err, q, factory); |
@@ -16,3 +16,3 @@ var async = require ('async'); | ||
{label: 'Tape MongoDB', mq: require ('../backends/ps-mongo')}, | ||
{label: 'Simple MongoDB Buckets', mq: require ('../backends/bucket-mongo')}, | ||
{label: 'Stream MongoDB', mq: require ('../backends/stream-mongo')}, | ||
{label: 'Safe MongoDB Buckets', mq: require ('../backends/bucket-mongo-safe')}, | ||
@@ -268,2 +268,3 @@ {label: 'Redis List', mq: require ('../backends/redis-list')}, | ||
{label: 'Tape MongoDB', mq: require ('../backends/ps-mongo')}, | ||
{label: 'Stream MongoDB', mq: require ('../backends/stream-mongo')}, | ||
{label: 'Safe MongoDB Buckets', mq: require ('../backends/bucket-mongo-safe')}, | ||
@@ -270,0 +271,0 @@ {label: 'Redis OrderedQueue', mq: require ('../backends/redis-oq')}, |
@@ -16,2 +16,3 @@ var async = require ('async'); | ||
{label: 'Tape MongoDB', mq: require ('../backends/ps-mongo')}, | ||
{label: 'Stream MongoDB', mq: require ('../backends/stream-mongo')}, | ||
// {label: 'Safe MongoDB Buckets', mq: require ('../backends/bucket-mongo-safe')}, | ||
@@ -83,3 +84,3 @@ {label: 'Redis OrderedQueue', mq: require ('../backends/redis-oq')} | ||
cb => q.stats((err, res) => { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 0, | ||
@@ -104,3 +105,3 @@ put: 2, | ||
cb => q.stats((err, res) => { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 1, | ||
@@ -124,3 +125,3 @@ put: 2, | ||
cb => q.stats((err, res) => { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 2, | ||
@@ -177,3 +178,3 @@ put: 2, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 0, | ||
@@ -212,3 +213,3 @@ put: 2, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 1, | ||
@@ -247,3 +248,3 @@ put: 2, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 2, | ||
@@ -303,3 +304,3 @@ put: 2, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 0, | ||
@@ -331,3 +332,3 @@ put: 2, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 0, | ||
@@ -359,3 +360,3 @@ put: 2, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 0, | ||
@@ -401,3 +402,3 @@ put: 2, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 2, | ||
@@ -483,3 +484,3 @@ put: 2, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 2, | ||
@@ -541,3 +542,3 @@ put: 2, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 0, | ||
@@ -600,3 +601,3 @@ put: 3, | ||
q.stats(function (err, res) { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 3, | ||
@@ -875,3 +876,3 @@ put: 3, | ||
cb => q.stats((err, res) => { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 0, | ||
@@ -1014,3 +1015,3 @@ put: 1, | ||
cb => q.stats((err, res) => { | ||
res.should.eql({ | ||
res.should.match({ | ||
get: 0, | ||
@@ -1017,0 +1018,0 @@ put: 1, |
@@ -14,4 +14,4 @@ | ||
{label: 'persistent MongoDB', backend: require ('../backends/ps-mongo')}, | ||
{label: 'Stream MongoDB', backend: require ('../backends/stream-mongo')}, | ||
{label: 'plain MongoDB', backend: require ('../backends/mongo')}, | ||
{label: 'MongoDB Bucket', backend: require ('../backends/bucket-mongo')}, | ||
{label: 'Safe MongoDB Bucket', backend: require ('../backends/bucket-mongo-safe')}, | ||
@@ -106,3 +106,3 @@ ].forEach (backend_item => { | ||
if (err) return done (err); | ||
res[4].should.eql ({ put: 6, get: 6 }); | ||
res[4].should.match ({ put: 6, get: 6 }); | ||
res[5].should.equal (0); | ||
@@ -136,3 +136,3 @@ q.nConsumers().should.equal (0); | ||
if (err) return done (err); | ||
res[4].should.eql ({ put: 1, get: 1 }); | ||
res[4].should.match ({ put: 1, get: 1 }); | ||
res[5].should.equal (0); | ||
@@ -171,3 +171,3 @@ q.nConsumers().should.equal (0); | ||
if (err) return done (err); | ||
res[4].should.eql ({ put: 1 }); | ||
res[4].should.match ({ put: 1 }); | ||
res[5].should.equal (1); | ||
@@ -208,3 +208,3 @@ res[2][0].timeout.should.eql (true); | ||
if (err) return done (err); | ||
res[6].should.eql ({ put: 1 }); | ||
res[6].should.match ({ put: 1 }); | ||
res[7].should.equal (1); | ||
@@ -211,0 +211,0 @@ res[4].timeout.should.eql (true); |
@@ -43,4 +43,39 @@ const should = require ('should'); | ||
}); | ||
it ('signals insertion ok', done => { | ||
CL ({}, (err, factory) => { | ||
if (err) return done(err); | ||
const q = { | ||
ns() {return 'the-ns'}, | ||
name () {return 'the-queue'}, | ||
signalInsertion (d) { | ||
d.getTime().should.equal (1234567890); | ||
factory.close(done); | ||
} | ||
}; | ||
const signal = factory.signal (q, {}); | ||
setTimeout (() => signal.emitInsertion (new Date(1234567890)), 500); | ||
}); | ||
}); | ||
it ('signals pause ok', done => { | ||
CL ({}, (err, factory) => { | ||
if (err) return done(err); | ||
const q = { | ||
ns() {return 'the-ns'}, | ||
name () {return 'the-queue'}, | ||
signalPaused(d) { | ||
d.should.equal (true); | ||
factory.close(done); | ||
} | ||
}; | ||
const signal = factory.signal (q, {}); | ||
setTimeout (() => signal.emitPaused (true), 500); | ||
}); | ||
}); | ||
describe (`extra/generic pubsub`, () => { | ||
@@ -47,0 +82,0 @@ it ('subscribes and receives info ok on 3 subscribers', done => { |
@@ -42,5 +42,10 @@ | ||
var mem = ftry.stats (ns, name); | ||
mem.values (function (err, vals) { | ||
vals.should.eql ({}); | ||
ftry.close(done); | ||
async.series([ | ||
cb => mem.clear (cb), | ||
], () => { | ||
mem.values (function (err, vals) { | ||
vals.should.eql ({}); | ||
ftry.close(done); | ||
}); | ||
}); | ||
@@ -56,2 +61,3 @@ }); | ||
async.series([ | ||
cb => mem.clear (cb), | ||
cb => mem.incr ('v1', 1, cb), | ||
@@ -256,3 +262,38 @@ cb => mem.incr ('v2', 1, cb), | ||
}); | ||
it ('managed hierarchy (dotted keys) ok', done => { | ||
CL ((err, ftry) => { | ||
if (err) return done(err); | ||
var mem = ftry.stats (ns, name); | ||
async.series([ | ||
cb => mem.clear (cb), | ||
cb => mem.incr ('l1.l2.c', -1, cb), | ||
cb => mem.incr ('l1.l2.b', 1, cb), | ||
cb => mem.incr ('l1.l3.a', 7, cb), | ||
cb => mem.incr ('v1', 3, cb), | ||
], (err, results) => { | ||
setTimeout ( () => mem.values ((err, vals) => { | ||
vals.should.eql ({ | ||
l1: { | ||
l2: { | ||
b: 1, | ||
c: -1 | ||
}, | ||
l3: { | ||
a: 7 | ||
} | ||
}, | ||
v1: 3 | ||
}); | ||
ftry.close(done); | ||
}), 200); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
399821
77
10790