Socket
Socket
Sign inDemoInstall

keuss

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

keuss - npm Package Compare versions

Comparing version 1.3.5 to 1.3.6

8

backends/mongo.js

@@ -253,2 +253,10 @@ 'use strict';

}
capabilities () {
return {
sched: true,
reserve: true,
pipeline: false
};
}
}

@@ -255,0 +263,0 @@

19

backends/pl-mongo.js

@@ -17,3 +17,2 @@ 'use strict';

constructor (name, pipeline, opts) {
//////////////////////////////////////////////
super (name, pipeline._factory, opts);

@@ -32,3 +31,2 @@

pipeline () {
/////////////////////////////////////////
return this._pipeline;

@@ -39,3 +37,2 @@ }

static Type () {
/////////////////////////////////////////
return 'mongo:pipeline';

@@ -46,3 +43,2 @@ }

type () {
/////////////////////////////////////////
return 'mongo:pipeline';

@@ -54,3 +50,2 @@ }

insert (entry, callback) {
/////////////////////////////////////////
var self = this;

@@ -74,3 +69,2 @@ entry._q = this._name;

get (callback) {
/////////////////////////////////////////
var self = this;

@@ -218,3 +212,2 @@ this._col.findOneAndDelete ({_q: this._name, mature: {$lte: Queue.nowPlusSecs (0)}}, {sort: {mature : 1}}, function (err, result) {

totalSize (callback) {
//////////////////////////////////
var q = {_q: this._name};

@@ -229,3 +222,2 @@ var opts = {};

size (callback) {
//////////////////////////////////
var q = {

@@ -245,3 +237,2 @@ _q: this._name,

schedSize (callback) {
//////////////////////////////////
var q = {

@@ -261,3 +252,2 @@ _q: this._name,

next_t (callback) {
/////////////////////////////////////////
var self = this;

@@ -303,3 +293,2 @@ this._col.find ({_q: this._name}).limit(1).sort ({mature:1}).project ({mature:1}).next (function (err, result) {

ensureIndexes (cb) {
//////////////////////////////////////////////////////////////////
this._col.ensureIndex ({_q : 1, mature : 1}, function (err) {

@@ -350,2 +339,10 @@ return cb (err);

}
capabilities () {
return {
sched: true,
reserve: true,
pipeline: true
};
}
}

@@ -352,0 +349,0 @@

@@ -15,3 +15,2 @@ 'use strict';

constructor (name, factory, opts) {
//////////////////////////////////////////////
super (name, factory, opts);

@@ -26,3 +25,2 @@

static Type () {
/////////////////////////////////////////
return 'redis:list';

@@ -34,3 +32,2 @@ }

type () {
/////////////////////////////////////////
return 'redis:list';

@@ -43,3 +40,2 @@ }

insert (entry, callback) {
/////////////////////////////////////////
var self = this;

@@ -64,3 +60,2 @@ var pl = {

get (callback) {
/////////////////////////////////////////
var self = this;

@@ -88,3 +83,2 @@ this._rediscl.rpop (this._redis_l_name, function (err, res) {

totalSize (callback) {
//////////////////////////////////
this._rediscl.llen (this._redis_l_name, callback);

@@ -97,3 +91,2 @@ }

size (callback) {
//////////////////////////////////
this._rediscl.llen (this._redis_l_name, callback);

@@ -106,3 +99,2 @@ }

schedSize (callback) {
//////////////////////////////////
callback (null, 0);

@@ -115,3 +107,2 @@ }

next_t (callback) {
//////////////////////////////////
callback (null, null)

@@ -144,2 +135,10 @@ }

}
capabilities () {
return {
sched: false,
reserve: false,
pipeline: false
};
}
}

@@ -146,0 +145,0 @@

@@ -16,3 +16,2 @@ 'use strict';

constructor (name, factory, opts) {
//////////////////////////////////////////////
super (name, factory, opts);

@@ -27,3 +26,2 @@

static Type () {
/////////////////////////////////////////
return 'redis:oq';

@@ -35,3 +33,2 @@ }

type () {
/////////////////////////////////////////
return 'redis:oq';

@@ -44,3 +41,2 @@ }

insert (entry, callback) {
/////////////////////////////////////////
var self = this;

@@ -54,3 +50,2 @@ this._roq.push (entry, callback);

get (callback) {
/////////////////////////////////////////
var self = this;

@@ -64,3 +59,2 @@ this._roq.pop (callback);

reserve (callback) {
/////////////////////////////////////////
var self = this;

@@ -76,3 +70,2 @@ var delay = this._opts.reserve_delay || 120;

commit (id, callback) {
/////////////////////////////////////////
var self = this;

@@ -93,3 +86,2 @@

rollback (id, next_t, callback) {
/////////////////////////////////////////
if (_.isFunction (next_t)) {

@@ -115,3 +107,2 @@ callback = next_t;

totalSize (callback) {
//////////////////////////////////
this._roq.totalSize (callback);

@@ -124,3 +115,2 @@ }

size (callback) {
//////////////////////////////////
this._roq.size (callback);

@@ -133,3 +123,2 @@ }

schedSize (callback) {
//////////////////////////////////
this._roq.schedSize (callback);

@@ -142,3 +131,2 @@ }

next_t (callback) {
//////////////////////////////////
this._roq.peek (function (err, res) {

@@ -161,3 +149,2 @@ if (err) {

static list (cb) {
//////////////////////////////////////////////////////////////////
var colls = [];

@@ -214,2 +201,10 @@

}
capabilities () {
return {
sched: true,
reserve: true,
pipeline: false
};
}
}

@@ -216,0 +211,0 @@

{
"name": "keuss",
"version": "1.3.5",
"version": "1.3.6",
"keywords": [

@@ -5,0 +5,0 @@ "queue",

@@ -43,2 +43,6 @@ 'use strict';

capabilities () {
return {};
}
list (opts, cb) {

@@ -45,0 +49,0 @@ // use stats factory

@@ -23,2 +23,6 @@ 'use strict';

// most mature
// values:
// * null: unknown, triggers read this.next_t() from backend
// * 0: known to be no element in queue, accepts values from insertNotifs
// * <non-zero int>: a know value, to be trusted. accepts values from insertNotifs if lower
this._next_mature_t = null;

@@ -86,2 +90,7 @@

type () {return 'queue:base';}
// capabilities
capabilities () {
return this._factory.capabilities ();
}

@@ -125,46 +134,60 @@ // T of next mature

/////////////////////////////////////////
// console.log ('%s - %s: signalInsertion received signalled insertion with mature %s', new Date().toISOString(), this._name, mature.toISOString ());
if (this._next_mature_t && (this._next_mature_t <= mature.getTime ())) {
// console.log ('%s - %s: signalInsertion, this msg mature (%s) is set to after _next_mature (%s). Not triggering any get', new Date().toISOString(), this._name, mature, new Date(this._next_mature_t).toISOString())
if (cb) cb ();
if (_.isNull (this._next_mature_t)) {
// totally ignore it, let pop() get it via next_t()
//console.log ('%s - %s: signalInsertion received signalled insertion with mature %s. _next_mature_t is null, ignoring', new Date().toISOString(), this._name, mature.toISOString ());
if (cb) return cb ();
else return;
}
else if (this._next_mature_t == 0) {
// next_t() forced a read and the result was 'empty': trust the notif
//console.log ('%s - %s: signalInsertion received signalled insertion with mature %s. _next_mature_t is 0, trusting notif', new Date().toISOString(), this._name, mature.toISOString ());
this._next_mature_t = mature.getTime ();
}
else if (this._next_mature_t <= mature.getTime ()) {
// _next_mature_t is numeric and non-null, so an active wait is happening. ignore it
//console.log ('%s - %s: signalInsertion received signalled insertion with mature %s. _next_mature_t (%s) is lower, ignoring', new Date().toISOString(), this._name, mature.toISOString (), new Date(this._next_mature_t).toISOString());
if (cb) return cb ();
else return;
}
else {
// console.log ('%s - %s: signalInsertion about to wake up sleepers', new Date().toISOString(), this._name);
// _next_mature_t is numeric and non-null, so an active wait is happening. ignore it
//console.log ('%s - %s: signalInsertion received signalled insertion with mature %s. _next_mature_t (%s) is higher, trusting notif', new Date().toISOString(), this._name, mature.toISOString (), new Date(this._next_mature_t).toISOString());
this._next_mature_t = mature.getTime ();
var self = this;
this._nextDelta (function (delta_ms) {
// run a wakeup on all consumers with the wakeup timer set
// console.log ('%s - %s: signalInsertion sees that the delta_ms is now %d', new Date().toISOString(), self._name, delta_ms);
// console.log ('%s - %s: signalInsertion : cosumers: %d', new Date().toISOString(), self._name, self.nConsumers());
self._consumers_by_tid.forEach (function (consumer, tid) {
// console.log ('%s - %s: signalInsertion checking wakeup state for consumer %j', new Date().toISOString(), self._name, consumer);
if (consumer.wakeup_timeout) {
// console.log ('%s - %s: signalInsertion rescheduling consumer %j', new Date().toISOString(), self._name, consumer);
clearTimeout (consumer.wakeup_timeout);
consumer.wakeup_timeout = null;
}
if (delta_ms > 0) {
consumer.wakeup_timeout = setTimeout (
function () {
consumer.wakeup_timeout = null;
self._onetime_pop (consumer);
},
delta_ms
);
var self = this;
this._nextDelta (function (delta_ms) {
// run a wakeup on all consumers with the wakeup timer set
//console.log ('%s - %s: signalInsertion sees that the delta_ms is now %d', new Date().toISOString(), self._name, delta_ms);
// //console.log ('%s - %s: signalInsertion : consumers: %d', new Date().toISOString(), self._name, self.nConsumers());
self._consumers_by_tid.forEach (function (consumer, tid) {
// //console.log ('%s - %s: signalInsertion checking wakeup state for consumer %j', new Date().toISOString(), self._name, consumer);
if (consumer.wakeup_timeout) {
// //console.log ('%s - %s: signalInsertion rescheduling consumer %j', new Date().toISOString(), self._name, consumer);
clearTimeout (consumer.wakeup_timeout);
consumer.wakeup_timeout = null;
// console.log ('%s - %s: signalInsertion rescheduled consumer %j', new Date().toISOString(), self._name, consumer);
}
else {
setImmediate (function () {self._onetime_pop (consumer)});
// console.log ('%s - %s: signalInsertion immediately waking up consumer %j', new Date().toISOString(), self._name, consumer);
}
if (delta_ms > 0) {
consumer.wakeup_timeout = setTimeout (
function () {
consumer.wakeup_timeout = null;
self._onetime_pop (consumer);
},
delta_ms
);
//console.log ('%s - %s: signalInsertion rescheduled consumer %j', new Date().toISOString(), self._name, consumer);
}
});
else {
setImmediate (function () {self._onetime_pop (consumer)});
//console.log ('%s - %s: signalInsertion immediately waking up consumer %j', new Date().toISOString(), self._name, consumer);
}
}
});
});
if (cb) cb ();
}
if (cb) cb ();
}

@@ -268,3 +291,3 @@

// attempt a read
// console.log ('%s - %s: calling initial onetime_pop on %j', new Date().toISOString(), self._name, consumer_data)
//console.log ('%s - %s: calling initial onetime_pop on %j', new Date().toISOString(), self._name, consumer_data)
this._onetime_pop (consumer_data);

@@ -387,3 +410,4 @@

var getOrReserve_cb = function (err, result) {
// console.log ('%s - %s - %s: called getOrReserve_cb : err %j, result %j', new Date().toISOString(), self._name, consumer.tid, err, result);
//console.log ('%s - %s - %s: called getOrReserve_cb : err %j, result %j', new Date().toISOString(), self._name, consumer.tid, err, result);
if (!consumer.callback) {

@@ -400,3 +424,3 @@ // consumer was cancelled mid-flight

// get/reserve in error
// console.log ('%s - %s - %s: getOrReserve_cb in error: err %j', new Date().toISOString(), self._name, consumer.tid, err);
//console.log ('%s - %s - %s: getOrReserve_cb in error: err %j', new Date().toISOString(), self._name, consumer.tid, err);

@@ -432,3 +456,3 @@ // clean timeout timer

// TODO cancel previous wakeup_timeout if not null?
// console.log ('%s - %s - %s: getOrReserve_cb : set wakeup in %d ms', new Date().toISOString(), self._name, consumer.tid, delta_ms);
//console.log ('%s - %s - %s: getOrReserve_cb : set wakeup in %d ms', new Date().toISOString(), self._name, consumer.tid, delta_ms);

@@ -452,3 +476,3 @@ consumer.wakeup_timeout = setTimeout (

// console.log ('%s - %s - %s: getOrReserve_cb : got result %j', new Date().toISOString(), self._name, consumer.tid, result);
//console.log ('%s - %s - %s: getOrReserve_cb : got result %j', new Date().toISOString(), self._name, consumer.tid, result);

@@ -467,3 +491,3 @@ // clean timeout timer

return;
}
};

@@ -482,5 +506,10 @@ if (consumer.reserve) {

/////////////////////////////
if (!this._next_mature_t) {
if (this._next_mature_t == 0) {
//console.log ('%s - %s : _nextDelta: _next_mature_t is zero, serving default', new Date().toISOString(), this._name);
return cb (this._pollInterval);
}
if (_.isNull (this._next_mature_t)) {
// there's no precalculated value, get it from backend
// console.log ('%s - %s : _nextDelta has no available _next_mature_t, getting it from backend', new Date().toISOString(), this._name);
//console.log ('%s - %s : _nextDelta: null _next_mature_t, getting it from backend', new Date().toISOString(), this._name);
var self = this;

@@ -490,19 +519,19 @@

if (err) {
var delta = self._next_mature_t || self._pollInterval;
// console.log ('%s - %s : _nextDelta error from backend, calc delta is %d', new Date().toISOString(), self._name, delta);
return cb (delta);
//console.log ('%s - %s : _nextDelta error from backend, serving default', new Date().toISOString(), self._name);
return cb (self._pollInterval);
}
if (res) {
// always trust backend's next_t, no matter what is in self._next_mature_t
// got a res, use it
self._next_mature_t = res;
var delta = res - Queue.now ().getTime();
if (delta > self._pollInterval) delta = self._pollInterval;
// console.log ('%s - %s : _nextDelta _next_mature_t from backend is %d, calc delta is %d', new Date().toISOString(), self._name, res, delta);
//console.log ('%s - %s : _nextDelta: _next_mature_t from backend is %d, calc delta is %d', new Date().toISOString(), self._name, res, delta);
return cb (delta);
}
else {
var delta = self._next_mature_t || self._pollInterval;
// console.log ('%s - %s : _nextDelta no _next_mature_t from backend, calc delta is %d', new Date().toISOString(), self._name, delta);
return cb (delta);
// no res, set _next_mature_t to 0, serve default
//console.log ('%s - %s : _nextDelta: no _next_mature_t from backend, use default', new Date().toISOString(), self._name);
self._next_mature_t = 0;
return cb (self._pollInterval);
}

@@ -512,3 +541,3 @@ });

else {
// console.log ('%s - %s : _nextDelta has available _next_mature_t %d', new Date().toISOString(), this._name, this._next_mature_t);
// _next_mature_t is non-zero numeric, use it
var delta = this._next_mature_t - Queue.now ().getTime();

@@ -518,3 +547,3 @@ if (delta > this._pollInterval) delta = this._pollInterval;

// console.log ('%s - %s : _nextDelta calc delta is %d', new Date().toISOString(), this._name, delta);
//console.log ('%s - %s : _nextDelta: using _next_mature_t %s, calc delta is %d', new Date().toISOString(), this._name, new Date(this._next_mature_t).toISOString (), delta);

@@ -521,0 +550,0 @@ return cb (delta);

@@ -15,3 +15,3 @@ 'use strict';

// ('Signaller created with bufferTime %d msecs', this._bufferTime);
// console.log ('Signaller created with bufferTime %d msecs', this._bufferTime);
}

@@ -53,2 +53,3 @@

// last hit too close in the past, not emitting
// console.log ('%s: last hit too close in the past, not emitting (%s)', new Date().toISOString(), this._buffered_mature);
if (cb) cb ();

@@ -55,0 +56,0 @@ }

@@ -196,3 +196,3 @@

if (err) return cb (err);
res.should.eql ([ 'test-stats', 'test-stats-2' ]);
res.sort().should.eql ([ 'test-stats', 'test-stats-2' ]);
cb ();

@@ -199,0 +199,0 @@ })

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc