Comparing version 1.3.5 to 1.3.6
@@ -253,2 +253,10 @@ 'use strict'; | ||
} | ||
capabilities () { | ||
return { | ||
sched: true, | ||
reserve: true, | ||
pipeline: false | ||
}; | ||
} | ||
} | ||
@@ -255,0 +263,0 @@ |
@@ -17,3 +17,2 @@ 'use strict'; | ||
constructor (name, pipeline, opts) { | ||
////////////////////////////////////////////// | ||
super (name, pipeline._factory, opts); | ||
@@ -32,3 +31,2 @@ | ||
pipeline () { | ||
///////////////////////////////////////// | ||
return this._pipeline; | ||
@@ -39,3 +37,2 @@ } | ||
static Type () { | ||
///////////////////////////////////////// | ||
return 'mongo:pipeline'; | ||
@@ -46,3 +43,2 @@ } | ||
type () { | ||
///////////////////////////////////////// | ||
return 'mongo:pipeline'; | ||
@@ -54,3 +50,2 @@ } | ||
insert (entry, callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
@@ -74,3 +69,2 @@ entry._q = this._name; | ||
get (callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
@@ -218,3 +212,2 @@ this._col.findOneAndDelete ({_q: this._name, mature: {$lte: Queue.nowPlusSecs (0)}}, {sort: {mature : 1}}, function (err, result) { | ||
totalSize (callback) { | ||
////////////////////////////////// | ||
var q = {_q: this._name}; | ||
@@ -229,3 +222,2 @@ var opts = {}; | ||
size (callback) { | ||
////////////////////////////////// | ||
var q = { | ||
@@ -245,3 +237,2 @@ _q: this._name, | ||
schedSize (callback) { | ||
////////////////////////////////// | ||
var q = { | ||
@@ -261,3 +252,2 @@ _q: this._name, | ||
next_t (callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
@@ -303,3 +293,2 @@ this._col.find ({_q: this._name}).limit(1).sort ({mature:1}).project ({mature:1}).next (function (err, result) { | ||
ensureIndexes (cb) { | ||
////////////////////////////////////////////////////////////////// | ||
this._col.ensureIndex ({_q : 1, mature : 1}, function (err) { | ||
@@ -350,2 +339,10 @@ return cb (err); | ||
} | ||
capabilities () { | ||
return { | ||
sched: true, | ||
reserve: true, | ||
pipeline: true | ||
}; | ||
} | ||
} | ||
@@ -352,0 +349,0 @@ |
@@ -15,3 +15,2 @@ 'use strict'; | ||
constructor (name, factory, opts) { | ||
////////////////////////////////////////////// | ||
super (name, factory, opts); | ||
@@ -26,3 +25,2 @@ | ||
static Type () { | ||
///////////////////////////////////////// | ||
return 'redis:list'; | ||
@@ -34,3 +32,2 @@ } | ||
type () { | ||
///////////////////////////////////////// | ||
return 'redis:list'; | ||
@@ -43,3 +40,2 @@ } | ||
insert (entry, callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
@@ -64,3 +60,2 @@ var pl = { | ||
get (callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
@@ -88,3 +83,2 @@ this._rediscl.rpop (this._redis_l_name, function (err, res) { | ||
totalSize (callback) { | ||
////////////////////////////////// | ||
this._rediscl.llen (this._redis_l_name, callback); | ||
@@ -97,3 +91,2 @@ } | ||
size (callback) { | ||
////////////////////////////////// | ||
this._rediscl.llen (this._redis_l_name, callback); | ||
@@ -106,3 +99,2 @@ } | ||
schedSize (callback) { | ||
////////////////////////////////// | ||
callback (null, 0); | ||
@@ -115,3 +107,2 @@ } | ||
next_t (callback) { | ||
////////////////////////////////// | ||
callback (null, null) | ||
@@ -144,2 +135,10 @@ } | ||
} | ||
capabilities () { | ||
return { | ||
sched: false, | ||
reserve: false, | ||
pipeline: false | ||
}; | ||
} | ||
} | ||
@@ -146,0 +145,0 @@ |
@@ -16,3 +16,2 @@ 'use strict'; | ||
constructor (name, factory, opts) { | ||
////////////////////////////////////////////// | ||
super (name, factory, opts); | ||
@@ -27,3 +26,2 @@ | ||
static Type () { | ||
///////////////////////////////////////// | ||
return 'redis:oq'; | ||
@@ -35,3 +33,2 @@ } | ||
type () { | ||
///////////////////////////////////////// | ||
return 'redis:oq'; | ||
@@ -44,3 +41,2 @@ } | ||
insert (entry, callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
@@ -54,3 +50,2 @@ this._roq.push (entry, callback); | ||
get (callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
@@ -64,3 +59,2 @@ this._roq.pop (callback); | ||
reserve (callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
@@ -76,3 +70,2 @@ var delay = this._opts.reserve_delay || 120; | ||
commit (id, callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
@@ -93,3 +86,2 @@ | ||
rollback (id, next_t, callback) { | ||
///////////////////////////////////////// | ||
if (_.isFunction (next_t)) { | ||
@@ -115,3 +107,2 @@ callback = next_t; | ||
totalSize (callback) { | ||
////////////////////////////////// | ||
this._roq.totalSize (callback); | ||
@@ -124,3 +115,2 @@ } | ||
size (callback) { | ||
////////////////////////////////// | ||
this._roq.size (callback); | ||
@@ -133,3 +123,2 @@ } | ||
schedSize (callback) { | ||
////////////////////////////////// | ||
this._roq.schedSize (callback); | ||
@@ -142,3 +131,2 @@ } | ||
next_t (callback) { | ||
////////////////////////////////// | ||
this._roq.peek (function (err, res) { | ||
@@ -161,3 +149,2 @@ if (err) { | ||
static list (cb) { | ||
////////////////////////////////////////////////////////////////// | ||
var colls = []; | ||
@@ -214,2 +201,10 @@ | ||
} | ||
capabilities () { | ||
return { | ||
sched: true, | ||
reserve: true, | ||
pipeline: false | ||
}; | ||
} | ||
} | ||
@@ -216,0 +211,0 @@ |
{ | ||
"name": "keuss", | ||
"version": "1.3.5", | ||
"version": "1.3.6", | ||
"keywords": [ | ||
@@ -5,0 +5,0 @@ "queue", |
@@ -43,2 +43,6 @@ 'use strict'; | ||
capabilities () { | ||
return {}; | ||
} | ||
list (opts, cb) { | ||
@@ -45,0 +49,0 @@ // use stats factory |
137
Queue.js
@@ -23,2 +23,6 @@ 'use strict'; | ||
// most mature | ||
// values: | ||
// * null: unknown, triggers read this.next_t() from backend | ||
// * 0: known to be no element in queue, accepts values from insertNotifs | ||
// * <non-zero int>: a know value, to be trusted. accepts values from insertNotifs if lower | ||
this._next_mature_t = null; | ||
@@ -86,2 +90,7 @@ | ||
type () {return 'queue:base';} | ||
// capabilities | ||
capabilities () { | ||
return this._factory.capabilities (); | ||
} | ||
@@ -125,46 +134,60 @@ // T of next mature | ||
///////////////////////////////////////// | ||
// console.log ('%s - %s: signalInsertion received signalled insertion with mature %s', new Date().toISOString(), this._name, mature.toISOString ()); | ||
if (this._next_mature_t && (this._next_mature_t <= mature.getTime ())) { | ||
// console.log ('%s - %s: signalInsertion, this msg mature (%s) is set to after _next_mature (%s). Not triggering any get', new Date().toISOString(), this._name, mature, new Date(this._next_mature_t).toISOString()) | ||
if (cb) cb (); | ||
if (_.isNull (this._next_mature_t)) { | ||
// totally ignore it, let pop() get it via next_t() | ||
//console.log ('%s - %s: signalInsertion received signalled insertion with mature %s. _next_mature_t is null, ignoring', new Date().toISOString(), this._name, mature.toISOString ()); | ||
if (cb) return cb (); | ||
else return; | ||
} | ||
else if (this._next_mature_t == 0) { | ||
// next_t() forced a read and the result was 'empty': trust the notif | ||
//console.log ('%s - %s: signalInsertion received signalled insertion with mature %s. _next_mature_t is 0, trusting notif', new Date().toISOString(), this._name, mature.toISOString ()); | ||
this._next_mature_t = mature.getTime (); | ||
} | ||
else if (this._next_mature_t <= mature.getTime ()) { | ||
// _next_mature_t is numeric and non-null, so an active wait is happening. ignore it | ||
//console.log ('%s - %s: signalInsertion received signalled insertion with mature %s. _next_mature_t (%s) is lower, ignoring', new Date().toISOString(), this._name, mature.toISOString (), new Date(this._next_mature_t).toISOString()); | ||
if (cb) return cb (); | ||
else return; | ||
} | ||
else { | ||
// console.log ('%s - %s: signalInsertion about to wake up sleepers', new Date().toISOString(), this._name); | ||
// _next_mature_t is numeric and non-null, so an active wait is happening. ignore it | ||
//console.log ('%s - %s: signalInsertion received signalled insertion with mature %s. _next_mature_t (%s) is higher, trusting notif', new Date().toISOString(), this._name, mature.toISOString (), new Date(this._next_mature_t).toISOString()); | ||
this._next_mature_t = mature.getTime (); | ||
var self = this; | ||
this._nextDelta (function (delta_ms) { | ||
// run a wakeup on all consumers with the wakeup timer set | ||
// console.log ('%s - %s: signalInsertion sees that the delta_ms is now %d', new Date().toISOString(), self._name, delta_ms); | ||
// console.log ('%s - %s: signalInsertion : cosumers: %d', new Date().toISOString(), self._name, self.nConsumers()); | ||
self._consumers_by_tid.forEach (function (consumer, tid) { | ||
// console.log ('%s - %s: signalInsertion checking wakeup state for consumer %j', new Date().toISOString(), self._name, consumer); | ||
if (consumer.wakeup_timeout) { | ||
// console.log ('%s - %s: signalInsertion rescheduling consumer %j', new Date().toISOString(), self._name, consumer); | ||
clearTimeout (consumer.wakeup_timeout); | ||
consumer.wakeup_timeout = null; | ||
} | ||
if (delta_ms > 0) { | ||
consumer.wakeup_timeout = setTimeout ( | ||
function () { | ||
consumer.wakeup_timeout = null; | ||
self._onetime_pop (consumer); | ||
}, | ||
delta_ms | ||
); | ||
var self = this; | ||
this._nextDelta (function (delta_ms) { | ||
// run a wakeup on all consumers with the wakeup timer set | ||
//console.log ('%s - %s: signalInsertion sees that the delta_ms is now %d', new Date().toISOString(), self._name, delta_ms); | ||
// //console.log ('%s - %s: signalInsertion : consumers: %d', new Date().toISOString(), self._name, self.nConsumers()); | ||
self._consumers_by_tid.forEach (function (consumer, tid) { | ||
// //console.log ('%s - %s: signalInsertion checking wakeup state for consumer %j', new Date().toISOString(), self._name, consumer); | ||
if (consumer.wakeup_timeout) { | ||
// //console.log ('%s - %s: signalInsertion rescheduling consumer %j', new Date().toISOString(), self._name, consumer); | ||
clearTimeout (consumer.wakeup_timeout); | ||
consumer.wakeup_timeout = null; | ||
// console.log ('%s - %s: signalInsertion rescheduled consumer %j', new Date().toISOString(), self._name, consumer); | ||
} | ||
else { | ||
setImmediate (function () {self._onetime_pop (consumer)}); | ||
// console.log ('%s - %s: signalInsertion immediately waking up consumer %j', new Date().toISOString(), self._name, consumer); | ||
} | ||
if (delta_ms > 0) { | ||
consumer.wakeup_timeout = setTimeout ( | ||
function () { | ||
consumer.wakeup_timeout = null; | ||
self._onetime_pop (consumer); | ||
}, | ||
delta_ms | ||
); | ||
//console.log ('%s - %s: signalInsertion rescheduled consumer %j', new Date().toISOString(), self._name, consumer); | ||
} | ||
}); | ||
else { | ||
setImmediate (function () {self._onetime_pop (consumer)}); | ||
//console.log ('%s - %s: signalInsertion immediately waking up consumer %j', new Date().toISOString(), self._name, consumer); | ||
} | ||
} | ||
}); | ||
}); | ||
if (cb) cb (); | ||
} | ||
if (cb) cb (); | ||
} | ||
@@ -268,3 +291,3 @@ | ||
// attempt a read | ||
// console.log ('%s - %s: calling initial onetime_pop on %j', new Date().toISOString(), self._name, consumer_data) | ||
//console.log ('%s - %s: calling initial onetime_pop on %j', new Date().toISOString(), self._name, consumer_data) | ||
this._onetime_pop (consumer_data); | ||
@@ -387,3 +410,4 @@ | ||
var getOrReserve_cb = function (err, result) { | ||
// console.log ('%s - %s - %s: called getOrReserve_cb : err %j, result %j', new Date().toISOString(), self._name, consumer.tid, err, result); | ||
//console.log ('%s - %s - %s: called getOrReserve_cb : err %j, result %j', new Date().toISOString(), self._name, consumer.tid, err, result); | ||
if (!consumer.callback) { | ||
@@ -400,3 +424,3 @@ // consumer was cancelled mid-flight | ||
// get/reserve in error | ||
// console.log ('%s - %s - %s: getOrReserve_cb in error: err %j', new Date().toISOString(), self._name, consumer.tid, err); | ||
//console.log ('%s - %s - %s: getOrReserve_cb in error: err %j', new Date().toISOString(), self._name, consumer.tid, err); | ||
@@ -432,3 +456,3 @@ // clean timeout timer | ||
// TODO cancel previous wakeup_timeout if not null? | ||
// console.log ('%s - %s - %s: getOrReserve_cb : set wakeup in %d ms', new Date().toISOString(), self._name, consumer.tid, delta_ms); | ||
//console.log ('%s - %s - %s: getOrReserve_cb : set wakeup in %d ms', new Date().toISOString(), self._name, consumer.tid, delta_ms); | ||
@@ -452,3 +476,3 @@ consumer.wakeup_timeout = setTimeout ( | ||
// console.log ('%s - %s - %s: getOrReserve_cb : got result %j', new Date().toISOString(), self._name, consumer.tid, result); | ||
//console.log ('%s - %s - %s: getOrReserve_cb : got result %j', new Date().toISOString(), self._name, consumer.tid, result); | ||
@@ -467,3 +491,3 @@ // clean timeout timer | ||
return; | ||
} | ||
}; | ||
@@ -482,5 +506,10 @@ if (consumer.reserve) { | ||
///////////////////////////// | ||
if (!this._next_mature_t) { | ||
if (this._next_mature_t == 0) { | ||
//console.log ('%s - %s : _nextDelta: _next_mature_t is zero, serving default', new Date().toISOString(), this._name); | ||
return cb (this._pollInterval); | ||
} | ||
if (_.isNull (this._next_mature_t)) { | ||
// there's no precalculated value, get it from backend | ||
// console.log ('%s - %s : _nextDelta has no available _next_mature_t, getting it from backend', new Date().toISOString(), this._name); | ||
//console.log ('%s - %s : _nextDelta: null _next_mature_t, getting it from backend', new Date().toISOString(), this._name); | ||
var self = this; | ||
@@ -490,19 +519,19 @@ | ||
if (err) { | ||
var delta = self._next_mature_t || self._pollInterval; | ||
// console.log ('%s - %s : _nextDelta error from backend, calc delta is %d', new Date().toISOString(), self._name, delta); | ||
return cb (delta); | ||
//console.log ('%s - %s : _nextDelta error from backend, serving default', new Date().toISOString(), self._name); | ||
return cb (self._pollInterval); | ||
} | ||
if (res) { | ||
// always trust backend's next_t, no matter what is in self._next_mature_t | ||
// got a res, use it | ||
self._next_mature_t = res; | ||
var delta = res - Queue.now ().getTime(); | ||
if (delta > self._pollInterval) delta = self._pollInterval; | ||
// console.log ('%s - %s : _nextDelta _next_mature_t from backend is %d, calc delta is %d', new Date().toISOString(), self._name, res, delta); | ||
//console.log ('%s - %s : _nextDelta: _next_mature_t from backend is %d, calc delta is %d', new Date().toISOString(), self._name, res, delta); | ||
return cb (delta); | ||
} | ||
else { | ||
var delta = self._next_mature_t || self._pollInterval; | ||
// console.log ('%s - %s : _nextDelta no _next_mature_t from backend, calc delta is %d', new Date().toISOString(), self._name, delta); | ||
return cb (delta); | ||
// no res, set _next_mature_t to 0, serve default | ||
//console.log ('%s - %s : _nextDelta: no _next_mature_t from backend, use default', new Date().toISOString(), self._name); | ||
self._next_mature_t = 0; | ||
return cb (self._pollInterval); | ||
} | ||
@@ -512,3 +541,3 @@ }); | ||
else { | ||
// console.log ('%s - %s : _nextDelta has available _next_mature_t %d', new Date().toISOString(), this._name, this._next_mature_t); | ||
// _next_mature_t is non-zero numeric, use it | ||
var delta = this._next_mature_t - Queue.now ().getTime(); | ||
@@ -518,3 +547,3 @@ if (delta > this._pollInterval) delta = this._pollInterval; | ||
// console.log ('%s - %s : _nextDelta calc delta is %d', new Date().toISOString(), this._name, delta); | ||
//console.log ('%s - %s : _nextDelta: using _next_mature_t %s, calc delta is %d', new Date().toISOString(), this._name, new Date(this._next_mature_t).toISOString (), delta); | ||
@@ -521,0 +550,0 @@ return cb (delta); |
@@ -15,3 +15,3 @@ 'use strict'; | ||
// ('Signaller created with bufferTime %d msecs', this._bufferTime); | ||
// console.log ('Signaller created with bufferTime %d msecs', this._bufferTime); | ||
} | ||
@@ -53,2 +53,3 @@ | ||
// last hit too close in the past, not emitting | ||
// console.log ('%s: last hit too close in the past, not emitting (%s)', new Date().toISOString(), this._buffered_mature); | ||
if (cb) cb (); | ||
@@ -55,0 +56,0 @@ } |
@@ -196,3 +196,3 @@ | ||
if (err) return cb (err); | ||
res.should.eql ([ 'test-stats', 'test-stats-2' ]); | ||
res.sort().should.eql ([ 'test-stats', 'test-stats-2' ]); | ||
cb (); | ||
@@ -199,0 +199,0 @@ }) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
187911
4264