Socket
Socket
Sign inDemoInstall

keuss

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

keuss - npm Package Compare versions

Comparing version 1.5.12 to 1.6.0

.nyc_output/4d184b7a-5a65-4754-ae4d-edc1ab7ce017.json

2

.nyc_output/processinfo/index.json

@@ -1,1 +0,1 @@

{"processes":{"625903ce-ee0a-4ca4-bf41-7fe36a24f6d0":{"parent":null,"children":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"]},"ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee":{"parent":"625903ce-ee0a-4ca4-bf41-7fe36a24f6d0","children":[]}},"files":{"/home/pepe/git/keuss/backends/bucket-mongo-safe.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Queue.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/QFactory.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/signal/local.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Signal.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/stats/mem.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/bucket-mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/pl-mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/ps-mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/redis-oq.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/utils/RedisConn.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/utils/RedisOrderedQueue.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/redis-list.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/stats/mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/stats/redis.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/signal/redis-pubsub.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/signal/mongo-capped.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Pipeline/DirectLink.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Pipeline/BaseLink.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Pipeline/Sink.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"]},"externalIds":{}}
{"processes":{"4d184b7a-5a65-4754-ae4d-edc1ab7ce017":{"parent":"b24575bb-b456-49b7-b8a9-c2f9d2cbc2f3","children":[]},"b24575bb-b456-49b7-b8a9-c2f9d2cbc2f3":{"parent":null,"children":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"]}},"files":{"/home/pepe/git/keuss/signal/local.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Signal.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/stats/mem.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Stats.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/bucket-mongo-safe.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Queue.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/QFactory-MongoDB-defaults.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/QFactory.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/signal/mongo-capped.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/stats/mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/bucket-mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/pl-mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/Builder.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/DirectLink.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/BaseLink.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/ChoiceLink.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/Sink.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/Pipeline.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/Queue.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/ps-mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/redis-oq.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/utils/RedisConn.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/utils/RedisOrderedQueue.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/redis-list.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/stats/redis.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/signal/redis-pubsub.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"]},"externalIds":{}}

@@ -11,4 +11,4 @@ var async = require ('async');

var Queue = require ('../Queue');
var QFactory = require ('../QFactory');
var Queue = require ('../Queue');
var QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults');

@@ -528,5 +528,5 @@ var State = {

//////////////////////////////////////////////
constructor (name, factory, opts) {
constructor (name, factory, opts, orig_opts) {
//////////////////////////////////////////////
super (name, factory, opts);
super (name, factory, opts, orig_opts);

@@ -857,3 +857,3 @@ this._factory = factory;

class Factory extends QFactory {
class Factory extends QFactory_MongoDB_defaults {
constructor (opts, mongo_conn) {

@@ -868,3 +868,3 @@ super (opts);

_.merge(full_opts, this._opts, opts);
return new BucketMongoSafeQueue (name, this, full_opts);
return new BucketMongoSafeQueue (name, this, full_opts, opts);
}

@@ -871,0 +871,0 @@

@@ -10,12 +10,11 @@ var async = require ('async');

var Queue = require ('../Queue');
var QFactory = require ('../QFactory');
var Queue = require ('../Queue');
var QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults');
class BucketMongoQueue extends Queue {
//////////////////////////////////////////////
constructor (name, factory, opts) {
constructor (name, factory, opts, orig_opts) {
//////////////////////////////////////////////
super (name, factory, opts);
super (name, factory, opts, orig_opts);

@@ -298,3 +297,3 @@ this._factory = factory;

class Factory extends QFactory {
class Factory extends QFactory_MongoDB_defaults {
constructor (opts, mongo_conn) {

@@ -309,3 +308,3 @@ super (opts);

_.merge(full_opts, this._opts, opts);
return new BucketMongoQueue (name, this, full_opts);
return new BucketMongoQueue (name, this, full_opts, opts);
}

@@ -312,0 +311,0 @@

@@ -6,4 +6,4 @@ var _ = require ('lodash');

var Queue = require ('../Queue');
var QFactory = require ('../QFactory');
var Queue = require ('../Queue');
var QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults');

@@ -13,4 +13,4 @@ class SimpleMongoQueue extends Queue {

//////////////////////////////////////////////
constructor (name, factory, opts) {
super (name, factory, opts);
constructor (name, factory, opts, orig_opts) {
super (name, factory, opts, orig_opts);

@@ -198,3 +198,3 @@ this._factory = factory;

class Factory extends QFactory {
class Factory extends QFactory_MongoDB_defaults {
constructor (opts, mongo_conn) {

@@ -207,5 +207,5 @@ super (opts);

queue (name, opts) {
var full_opts = {}
var full_opts = {};
_.merge(full_opts, this._opts, opts);
return new SimpleMongoQueue (name, this, full_opts);
return new SimpleMongoQueue (name, this, full_opts, opts);
}

@@ -212,0 +212,0 @@

@@ -1,49 +0,34 @@

var _ = require ('lodash');
const _ = require ('lodash');
const async = require ('async');
const vm = require ('vm');
var MongoClient = require ('mongodb').MongoClient;
var mongo = require ('mongodb');
const MongoClient = require ('mongodb').MongoClient;
var Queue = require ('../Queue');
var QFactory = require ('../QFactory');
const QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults');
const PipelineBuilder = require ('../Pipeline/Builder');
const Pipeline = require ('../Pipeline/Pipeline');
const PipelinedMongoQueue = require ('../Pipeline/Queue');
class PipelinedMongoQueue extends Queue {
const debug = require('debug')('keuss:Pipeline:Main');
//////////////////////////////////////////////
constructor (name, pipeline, opts) {
super (name, pipeline._factory, opts);
this._pipeline = pipeline;
this._col = this._pipeline._col;
///////////////////////////////////////////////////////////
class Factory extends QFactory_MongoDB_defaults {
constructor (opts, mongo_data_conn, mongo_topology_conn) {
super (opts);
this._mongo_data_conn = mongo_data_conn;
this._mongo_topology_conn = mongo_topology_conn;
this._db = mongo_data_conn.db();
this._topology_db = mongo_topology_conn.db();
this._pipelines = {};
// set topology
this._stats.topology ({
pipeline: pipeline.name()
}, () => {});
}
/////////////////////////////////////////
pipeline () {
return this._pipeline;
}
/////////////////////////////////////////
static Type () {
return 'mongo:pipeline';
}
/////////////////////////////////////////
type () {
return 'mongo:pipeline';
}
/////////////////////////////////////////
// add element to queue
insert (entry, callback) {
entry._q = this._name;
this._col.insertOne (entry, {}, (err, result) => {
if (err) return callback (err);
// TODO result.insertedCount must be 1
callback (null, result.insertedId);
this._topology_db.collection ('factory').updateOne ({
_id: this._name
}, {
$set: {
opts: opts
}
}, {
upsert: true
});

@@ -53,264 +38,90 @@ }

/////////////////////////////////////////
// get element from queue
get (callback) {
this._col.findOneAndDelete ({_q: this._name, mature: {$lte: Queue.nowPlusSecs (0)}}, {sort: {mature : 1}}, (err, result) => {
if (err) return callback (err);
callback (null, result && result.value);
});
///////////////////////////////////////////////////////////
builder () {
return new PipelineBuilder (this);
}
//////////////////////////////////
// reserve element: call cb (err, pl) where pl has an id
reserve (callback) {
var delay = this._opts.reserve_delay || 120;
var query = {
_q: this._name,
mature: {$lte: Queue.nowPlusSecs (0)}
///////////////////////////////////////////////////////////
pipelineFromRecipe (name, bs_src_array, setup_src_array, opts, cb) {
const context = {
Buffer,
clearImmediate,
clearInterval,
clearTimeout,
require,
setImmediate,
setTimeout,
setInterval,
TextEncoder,
TextDecoder,
URL,
URLSearchParams,
builder: this.builder ().pipeline (name),
done: cb
};
var update = {
$set: {mature: Queue.nowPlusSecs (delay), reserved: new Date ()},
$inc: {tries: 1}
};
if (opts && opts.context) _.assign (context, opts.context);
var opts = {
sort: {mature : 1},
returnOriginal: true
};
vm.createContext (context);
this._col.findOneAndUpdate (query, update, opts, (err, result) => {
if (err) return callback (err);
callback (null, result && result.value);
});
}
//////////////////////////////////
// commit previous reserve, by p.id
commit (id, callback) {
var query;
try {
query = {
_id: (_.isString(id) ? new mongo.ObjectID (id) : id),
_q: this._name,
reserved: {$exists: true}
};
}
catch (e) {
return callback ('id [' + id + '] can not be used as rollback id: ' + e);
}
_.each (bs_src_array, (elem, idx) => {
const src = elem.src || elem;
const sname = elem.name || `bootstrap[${idx}]`;
this._col.deleteOne (query, {}, (err, result) => {
if (err) return callback (err);
callback (null, result && (result.deletedCount == 1));
});
}
debug ('Loading BS script %s', sname);
const script = new vm.Script (src, {filename: sname});
script.runInContext (context);
debug ('BS script %s loaded', sname);
});
_.each (setup_src_array, (elem, idx) => {
const src = elem.src || elem;
const sname = elem.name || `setup[${idx}]`;
//////////////////////////////////
// rollback previous reserve, by p.id
rollback (id, next_t, callback) {
if (_.isFunction (next_t)) {
callback = next_t;
next_t = null;
debug ('Loading Setup script %s', sname);
const script = new vm.Script (src, {filename: sname});
script.runInContext (context);
debug ('Setup script %s loaded', sname);
});
}
var query;
try {
query = {
_id: (_.isString(id) ? new mongo.ObjectID (id) : id),
_q: this._name,
reserved: {$exists: true}
};
catch (err) {
return cb (err);
}
catch (e) {
return callback ('id [' + id + '] can not be used as rollback id: ' + e);
}
var update = {
$set: {mature: (next_t ? new Date (next_t) : Queue.now ())},
$unset: {reserved: ''}
};
this._col.updateOne (query, update, {}, (err, result) => {
if (err) return callback (err);
callback (null, result && (result.modifiedCount == 1));
});
}
//////////////////////////////////
// passes element to the next queue in pipeline
pl_step (id, next_queue, opts, callback) {
var q = {
_id: id,
_q: this._name,
reserved: {$exists: true}
};
var upd = {
$set: {
mature: opts.mature || Queue.now (),
tries: opts.tries || 0,
_q: next_queue.name ()
},
$unset: {reserved: ''}
};
if (opts.payload) {
upd.$set.payload = opts.payload;
}
else if (opts.update) {
this._embed_update_for_payload (upd, opts.update);
}
this._col.updateOne (q, upd, {}, (err, result) => {
if (err) return callback (err);
callback (null, result && (result.modifiedCount == 1));
});
}
//////////////////////////////////
// queue size including non-mature elements
totalSize (callback) {
var q = {_q: this._name};
var opts = {};
this._col.countDocuments (q, opts, callback);
}
//////////////////////////////////
// queue size NOT including non-mature elements
size (callback) {
var q = {
_q: this._name,
mature : {$lte : Queue.now ()}
};
var opts = {};
this._col.countDocuments (q, opts, callback);
}
//////////////////////////////////
// queue size of non-mature elements only
schedSize (callback) {
var q = {
_q: this._name,
mature : {$gt : Queue.now ()},
reserved: {$exists: false}
};
var opts = {};
this._col.countDocuments (q, opts, callback);
}
//////////////////////////////////
// queue size of reserved elements only
resvSize (callback) {
var q = {
_q: this._name,
mature : {$gt : Queue.now ()},
reserved: {$exists: true}
};
var opts = {};
this._col.countDocuments (q, opts, callback);
}
/////////////////////////////////////////
// get element from queue
next_t (callback) {
this._col.find ({_q: this._name}).limit(1).sort ({mature:1}).project ({mature:1}).next ((err, result) => {
if (err) return callback (err);
callback (null, result && result.mature);
});
}
//////////////////////////////////////////////
_embed_update_for_payload (dst, src) {
_.each (src, (v, k) => {
if (k.startsWith ('$')) {
_.each (v, (fv, fk) => {
if (!dst[k]) dst[k] = {};
dst[k]['payload.' + fk] = fv;
});
this._topology_db.collection ('pipelines').updateOne ({
_id: name
}, {
$set: {
bs: bs_src_array,
setup: setup_src_array
}
else {
dst['payload.' + k] = v;
}
}, {
upsert: true
});
}
//////////////////////////////////////////////
// redefnition
_move_to_deadletter (obj, cb) {
this.pl_step (obj._id, this._factory.deadletter_queue (), {}, (err, res) => {
if (err) return cb (err);
this._stats.incr ('get');
this._factory.deadletter_queue ()._stats.incr ('put');
this._factory.deadletter_queue ()._signaller.signalInsertion (Queue.now());
cb (null, false);
});
}
}
///////////////////////////////////////////////////////////
pipeline (name) {
if (this._pipelines[name]) {
debug ('returning existing pipeline [%s]', name);
return this._pipelines[name];
}
class Pipeline {
constructor (name, factory) {
this._name = name;
this._factory = factory;
this._col = factory._db.collection (this._name);
this.ensureIndexes (err => {});
const pl = new Pipeline (name, this);
debug ('created pipeline [%s]', name);
this._pipelines[name] = pl;
return pl;
}
name () {
return this._name;
}
///////////////////////////////////////////////////////////
queue (name, opts) {
return new PipelinedMongoQueue (name, this, opts);
}
list (cb) {
// TODO
cb (null, []);
}
///////////////////////////////////////////////////////////////////////////////
// private parts
//////////////////////////////////////////////////////////////////
// create needed indexes for O(1) functioning
ensureIndexes (cb) {
this._col.createIndex ({_q : 1, mature : 1}, err => cb (err));
}
}
class Factory extends QFactory {
constructor (opts, mongo_conn) {
super (opts);
this._mongo_conn = mongo_conn;
this._db = mongo_conn.db();
this._pipelines = {};
}
queue (name, opts) {
if (!opts) opts = {};
if (!opts.pipeline) opts.pipeline = 'default';
var pl_name = opts.pipeline;
var pl_name = opts.pipeline || 'default';
var pipeline = this._pipelines[pl_name];

@@ -323,18 +134,22 @@

var full_opts = {};
_.merge(full_opts, this._opts, opts);
return pipeline.queue (name, full_opts);
return this._queue_from_pipeline (name, pipeline, opts);
}
///////////////////////////////////////////////////////////
close (cb) {
super.close (() => {
if (this._mongo_conn) {
this._mongo_conn.close ();
this._mongo_conn = null;
}
if (cb) return cb ();
async.parallel ([
cb => this._mongo_data_conn.close (cb),
cb => this._mongo_topology_conn.close (cb)
], err => {
this._mongo_data_conn = null;
this._mongo_topology_conn = null;
if (cb) return cb (err);
});
});
}
///////////////////////////////////////////////////////////
type () {

@@ -344,2 +159,4 @@ return PipelinedMongoQueue.Type ();

///////////////////////////////////////////////////////////
capabilities () {

@@ -352,11 +169,50 @@ return {

}
///////////////////////////////////////////////////////////
_queue_from_pipeline (name, pipeline, opts) {
if (!opts) opts = {};
var full_opts = {};
_.merge(full_opts, this._opts, opts);
return pipeline.queue (name, full_opts, opts);
}
}
///////////////////////////////////////////////////////////
function creator (opts, cb) {
var _opts = opts || {};
var m_url = _opts.url || 'mongodb://localhost:27017/keuss';
const _opts = opts || {};
const m_url = _opts.url || 'mongodb://localhost:27017/keuss';
let m_topology_url = _opts.topology_url;
MongoClient.connect (m_url, { useNewUrlParser: true }, (err, cl) => {
if (!m_topology_url) {
let arr = m_url.split ('?');
arr[0] += '_status';
m_topology_url = arr.join ('?');
}
async.series ([
cb => MongoClient.connect (m_url, { useNewUrlParser: true }, (err, cl) => {
if (err) {
debug ('error while connecting to data mongoDB [%s]', m_url, err);
return cb (err);
}
debug ('connected OK to data mongoDB %s', m_url);
cb (null, cl);
}),
cb => MongoClient.connect (m_topology_url, { useNewUrlParser: true }, (err, cl) => {
if (err) {
debug ('error while connecting to topology mongoDB [%s]', m_topology_url, err);
return cb (err);
}
debug ('connected OK to topology mongoDB %s', m_topology_url);
cb (null, cl);
}),
], (err, res) => {
if (err) return cb (err);
var F = new Factory (_opts, cl);
var F = new Factory (_opts, res[0], res[1]);
F.async_init (err => cb (null, F));

@@ -363,0 +219,0 @@ });

@@ -7,10 +7,11 @@ var async = require ('async');

var Queue = require ('../Queue');
var QFactory = require ('../QFactory');
var Queue = require ('../Queue');
var QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults');
class PersistentMongoQueue extends Queue {
//////////////////////////////////////////////
constructor (name, factory, opts) {
super (name, factory, opts);
constructor (name, factory, opts, orig_opts) {
super (name, factory, opts, orig_opts);

@@ -235,3 +236,3 @@ this._factory = factory;

class Factory extends QFactory {
class Factory extends QFactory_MongoDB_defaults {
constructor (opts, mongo_conn) {

@@ -244,5 +245,5 @@ super (opts);

queue (name, opts) {
var full_opts = {}
var full_opts = {};
_.merge(full_opts, this._opts, opts);
return new PersistentMongoQueue (name, this, full_opts);
return new PersistentMongoQueue (name, this, full_opts, opts);
}

@@ -249,0 +250,0 @@

@@ -12,4 +12,4 @@ var async = require ('async');

//////////////////////////////////////////////
constructor (name, factory, opts) {
super (name, factory, opts);
constructor (name, factory, opts, orig_opts) {
super (name, factory, opts, orig_opts);

@@ -97,5 +97,5 @@ this._rediscl = factory._rediscl;

queue (name, opts) {
var full_opts = {}
var full_opts = {};
_.merge(full_opts, this._opts, opts);
return new RedisListQueue (name, this, full_opts);
return new RedisListQueue (name, this, full_opts, opts);
}

@@ -128,3 +128,2 @@

var F = new Factory (_opts, rediscl);
// console.log ('factory created with opts %j, calling async_init', opts)
F.async_init ((err) => cb (null, F));

@@ -131,0 +130,0 @@ }

@@ -12,4 +12,4 @@ var _ = require ('lodash')

//////////////////////////////////////////////
constructor (name, factory, opts) {
super (name, factory, opts);
constructor (name, factory, opts, orig_opts) {
super (name, factory, opts, orig_opts);

@@ -148,5 +148,5 @@ this._rediscl = factory._rediscl;

queue (name, opts) {
var full_opts = {}
var full_opts = {};
_.merge(full_opts, this._opts, opts);
return new RedisOQ (name, this, full_opts);
return new RedisOQ (name, this, full_opts, opts);
}

@@ -153,0 +153,0 @@

# Changelog
* v1.6.0
* added sane defaults for stats and signal for mongodb-based backends (using mongo stats and signal)
* added pipeline builder
* added ability to create a full pipeline from text (making it trivial to be stored in file)
* v1.5.12

@@ -3,0 +7,0 @@ * corrected small pipeline-related issues

@@ -21,3 +21,3 @@ var MQ = require ('../../../backends/pl-mongo');

pdl.start ((elem, done) => {
pdl.start (function (elem, done) {
// pass element to next queue, set payload.passed to true

@@ -32,6 +32,6 @@ done (null, {

// insert elements in the entry queue
async.timesLimit (111, 3, (n, next) => q1.push ({a:n, b:'see it spin...'}, next));
async.timesLimit (3, 3, (n, next) => q1.push ({a:n, b:'see it spin...'}, next));
// read elements at the outer end
async.timesLimit (111, 3, (n, next) => q2.pop ('exit', (err, res) => {
async.timesLimit (3, 3, (n, next) => q2.pop ('exit', (err, res) => {
console.log ('end point get', res);

@@ -38,0 +38,0 @@ next ();

// mongodb: create a consumer and a producer
const Chance = require ('chance');
const MQ = require ('../../../backends/pl-mongo');
const signal = require ('../../../signal/mongo-capped');
const stats = require ('../../../stats/mongo');
const DCT = require ('../../../Pipeline/DirectLink');
const SNK = require ('../../../Pipeline/Sink');
const CHC = require ('../../../Pipeline/ChoiceLink');
const MQ = require ('../../../backends/pl-mongo');
const DCT = require ('../../../Pipeline/DirectLink');
const SNK = require ('../../../Pipeline/Sink');
const CHC = require ('../../../Pipeline/ChoiceLink');

@@ -14,3 +12,3 @@

const num_elems = 1000;
const num_elems = 100;
let processed = 0;

@@ -49,16 +47,5 @@

const factory_opts = {
url: 'mongodb://localhost/qeus_pl',
signaller: {
provider: signal,
opts: {
url: 'mongodb://localhost/qeus_pl_signal',
channel: 'das_channel'
}
},
stats: {
provider: stats,
opts: {
url: 'mongodb://localhost/qeus_pl_stats'
}
},
// url: 'mongodb+srv://keuss:dCxvqgFs3umuZenb@cluster0-e1vob.mongodb.net/qeus?retryWrites=true&w=majority',
url: 'mongodb://localhost/keuss_pl_example_simfork',
options: { useNewUrlParser: true },
deadletter: {

@@ -74,3 +61,3 @@ max_ko: 3

// factory ready, create queues on default pipeline
const q_opts = {};
const q_opts = {aaa: 666, b: 'yy'};
const q1 = factory.queue ('pl_many_q_1', q_opts);

@@ -77,0 +64,0 @@ const q2 = factory.queue ('pl_many_q_2', q_opts);

@@ -1,5 +0,15 @@

var MQ = require ('../backends/bucket-mongo-safe');
var signal_mongo_capped = require ('../signal/mongo-capped');
var stats_mongo = require ('../stats/mongo');
/*
*
* Runs a set of producers and consumers, where consumers do a reserve-commit-rollback cycle. In each cycle a
* consumer would randomly choose whether to commit or rollback (10% chance of rollback). No deadletter is used
* so elements are retried ad infinitum until processed ok
*
* Upon completion (all items generated and consumed ok) some stats will be shown
*
* It uses bucket-mongo-safe backend for high throughput
*
*/
var MQ = require ('../../backends/bucket-mongo-safe');
var _ = require ('lodash');

@@ -12,22 +22,16 @@ var async = require ('async');

var factory_opts = {
url: 'mongodb://localhost/qeus',
// url: 'mongodb://localhost/qeus',
url: 'mongodb+srv://keuss:dCxvqgFs3umuZenb@cluster0-e1vob.mongodb.net/qeus?retryWrites=true&w=majority',
name: 'Random-NS',
reject_delta_base: 2000,
reject_delta_factor: 2000,
signaller: {
provider: signal_mongo_capped,
opts: {
url: 'mongodb://localhost/qeus_signal',
channel: 'a_channel'
}
},
stats: {
provider: stats_mongo,
opts: {
url: 'mongodb://localhost/qeus_stats'
}
}
reject_delta_factor: 2000
};
// tets dimensions: elems to produce and consume, number of consumers, number of producers
const num_elems = 10000000;
const num_producers = 3;
const num_consumers = 7;
const commit_likelihood = 95;
// stats holder
var selfs = {

@@ -83,3 +87,3 @@ consumers: {},

var tasks = [];
for (var i = 0; i < 7; i++) tasks.push ((cb) => run_a_pop_consumer (shared_ctx, cb));
for (var i = 0; i < num_consumers; i++) tasks.push ((cb) => run_a_pop_consumer (shared_ctx, cb));
async.parallel (tasks, cb);

@@ -104,3 +108,3 @@ }

if (chance.bool({likelihood: 40})) {
if (chance.bool({likelihood: commit_likelihood})) {
shared_ctx.q.ok (res._id, (err) => {

@@ -162,3 +166,3 @@ if (err) return cb (err);

var tasks = [];
for (var i = 0; i < 7; i++)
for (var i = 0; i < num_consumers; i++)
tasks.push ((cb) => run_a_rcr_consumer (shared_ctx, cb));

@@ -211,3 +215,3 @@

var tasks = [];
for (var i = 0; i < 3; i++) tasks.push ((cb) => run_a_producer (shared_ctx, cb));
for (var i = 0; i < num_producers; i++) tasks.push ((cb) => run_a_producer (shared_ctx, cb));
async.parallel (tasks, cb);

@@ -238,5 +242,5 @@ }

async.parallel ([
(cb) => run_producers (q, 10000000, cb),
(cb) => run_producers (q, num_elems, cb),
// (cb) => run_consumers (q, 250000, cb),
(cb) => run_rcr_consumers (q, 10000000, cb),
(cb) => run_rcr_consumers (q, num_elems, cb),
], (err) => {

@@ -243,0 +247,0 @@ if (err) {

{
"name": "keuss",
"version": "1.5.12",
"version": "1.6.0",
"keywords": [

@@ -30,14 +30,14 @@ "queue",

"async": "~3.2.0",
"ioredis": "~4.16.3",
"lodash": "~4.17.15",
"mitt": "~1.2.0",
"mongodb": "~3.5.0",
"uuid": "~8.0.0",
"@nodebb/mubsub": "~1.6.0",
"ioredis": "~4.17.3",
"lodash": "~4.17.19",
"mitt": "~2.1.0",
"mongodb": "~3.5.9",
"uuid": "~8.2.0",
"@nodebb/mubsub": "~1.6.2",
"debug": "~4.1.1",
"async-lock": "~1.2.2"
"async-lock": "~1.2.4"
},
"devDependencies": {
"mocha": "~7.1.2",
"chance": "~1.1.4",
"mocha": "~8.0.1",
"chance": "~1.1.6",
"should": "~13.2.3"

@@ -44,0 +44,0 @@ },

@@ -20,3 +20,3 @@ const _ = require ('lodash');

// check queues are pipelined
if (! src_q.pipeline) throw Error ('source queue is not pipelined');
if (!src_q.pipeline) throw Error ('source queue is not pipelined');

@@ -30,4 +30,18 @@ this._opts = opts || {};

static Type () {return 'pipeline:processor:BaseLink';}
type () {return BaseLink.Type();}
/////////////////////////////////////////
desc () {
return {
type: this.type (),
src: this.src().name()
};
}
/////////////////////////////////////////
on_data (ondata) {
this._ondata_orig = ondata;
this._ondata = ondata.bind (this);

@@ -40,2 +54,3 @@ }

this._process (this._ondata);
debug ('%s: started', this._name);
}

@@ -48,3 +63,10 @@

/////////////////////////////////////////
_add_to_pipeline () {
this._src.pipeline()._add_processor (this);
}
/////////////////////////////////////////
_mature (opts) {

@@ -61,3 +83,26 @@ if (opts.mature) {

/////////////////////////////////////////
_on_error (err, elem, ondata) {
// error: drop or retry?
if (err.drop === true) {
// drop: commit and forget
this.src().ok (elem, err => {
if (err) this.emit ('error', {on: 'src-queue-commit-on-error', elem, err});
debug ('%s: in error, marked to be dropped: %s', this._name, elem._id);
this._process (ondata);
});
}
else {
// retry: rollback
this.src().ko (elem, this._rollback_next_t (elem), err => {
if (err) this.emit ('error', {on: 'src-queue-rollback-on-error', elem, err});
debug ('%s: in error, rolled back: %s', this._name, elem._id);
this._process (ondata);
});
}
}
/////////////////////////////////////////
_process (ondata) {

@@ -70,3 +115,7 @@ debug ('%s: attempting reserve', this._name);

if (err) {
if (err == 'cancel') return; // end the process loop
if (err == 'cancel') {
debug ('%s: pipeline processor cancelled', this._name);
return; // end the process loop
}
debug ('%s: error in reserve:', this._name, err);

@@ -83,27 +132,8 @@ this.emit ('error', {on: 'src-queue-pop', err});

// do something
try {
ondata (elem, (err, res) => {
debug ('%s: processed: %s', this._name, elem._id);
if (err) {
// error: drop or retry?
if (err.drop === true) {
// drop: commit and forget
this.src().ok (elem, err => {
if (err) this.emit ('error', {on: 'src-queue-commit-on-error', elem, err});
debug ('%s: in error, marked to be dropped: %s', this._name, elem._id);
this._process (ondata);
});
}
else {
// retry: rollback
this.src().ko (elem, this._rollback_next_t (elem), err => {
if (err) this.emit ('error', {on: 'src-queue-rollback-on-error', elem, err});
debug ('%s: in error, rolled back: %s', this._name, elem._id);
this._process (ondata);
});
}
if (err) return this._on_error (err, elem, ondata);
return;
}
// drop it (act as sink) ?

@@ -141,2 +171,9 @@ if ((res === false) || (res && res.drop)) {

});
}
catch (e) {
debug ('catch error, emitting it: ', e);
console.log ('catch error, emitting it: ', e);
this.emit (e);
this._on_error (e, elem, ondata);
}
});

@@ -143,0 +180,0 @@ }

@@ -12,5 +12,3 @@ var Queue = require ('../Queue');

if (! _.isArray (dst_q_array)) {
throw Error ('destination-queues is not an array');
}
if (!_.isArray (dst_q_array)) throw Error ('destination-queues is not an array');

@@ -31,2 +29,3 @@ this._dst_q_array = dst_q_array;

this._add_to_pipeline ();
debug ('created Pipeline/ChoiceLink %s', this._name);

@@ -41,4 +40,15 @@ }

static Type () {return 'pipeline:processor:ChoiceLink';}
type () {return ChoiceLink.Type();}
/////////////////////////////////////////
desc () {
return _.merge (super.desc(), {
dst: _.map (this._dst_q_array, q => q.name ())
});
}
/////////////////////////////////////////
_next (id, opts, cb) {

@@ -45,0 +55,0 @@ let dst = null;

@@ -24,2 +24,3 @@ var Queue = require ('../Queue');

this._add_to_pipeline ();
debug ('created Pipeline/DirectLink %s', this._name);

@@ -30,4 +31,15 @@ }

static Type () {return 'pipeline:processor:DirectLink';}
type () {return DirectLink.Type();}
/////////////////////////////////////////
desc () {
return _.merge (super.desc(), {
dst: this.dst().name()
});
}
/////////////////////////////////////////
_next (id, opts, cb) {

@@ -34,0 +46,0 @@ this.src().pl_step (id, this.dst(), opts, (err, res) => {

@@ -10,5 +10,9 @@ var debug = require('debug')('keuss:Pipeline:Sink');

this._name = src_q.name () + '->(sink)';
this._add_to_pipeline ();
debug ('created Pipeline/Sink %s', this._name);
}
static Type () {return 'pipeline:processor:Sink';}
type () {return Sink.Type();}
/////////////////////////////////////////

@@ -15,0 +19,0 @@ _next (id, opts, callback) {

@@ -13,5 +13,8 @@ var async = require ('async');

this._opts = opts || {};
this._name = opts.name || 'N';
debug ('created QFactory %s with opts %o', this._name, this._opts);
}
async_init (cb) {
if (!this._opts.stats) this._opts.stats = {};

@@ -23,6 +26,2 @@ if (!this._opts.stats.opts) this._opts.stats.opts = {};

debug ('created QFactory %s with opts %o', this._name, this._opts);
}
async_init (cb) {
var signal_provider = this._opts.signaller.provider || LocalSignal;

@@ -112,4 +111,23 @@ var stats_provider = this._opts.stats.provider || MemStats;

}
to_descriptor_obj () {
let obj = {
name: this.name (),
type: this.type (),
opts: _.omit (this._opts, ['stats', 'signaller']),
signaller: {
type: this.signaller_factory().type(),
opts: this._opts.signaller.opts
},
stats: {
type: this.stats_factory().type(),
opts: this._opts.stats.opts
}
};
return obj;
}
}
module.exports = QFactory;

@@ -11,3 +11,3 @@ var async = require ('async');

//////////////////////////////////////////////
constructor (name, factory, opts) {
constructor (name, factory, opts, orig_opts) {
//////////////////////////////////////////////

@@ -19,2 +19,3 @@ if (!name) {

this._opts = opts || {};
this._orig_opts = orig_opts || {};

@@ -40,5 +41,4 @@ this._name = name;

// save opts minus stats & signaller
var __opts = _.omit (this._opts, ['signaller', 'stats']);
this._stats.opts (__opts, () => {});
// save original options
this._stats.opts (orig_opts || {}, () => {});

@@ -98,3 +98,2 @@ this._stats.incr ('get', 0);

stats (cb) {this._stats.values (cb);}
topology (cb) {this._stats.topology (cb);}
paused (cb) {this._stats.paused (cb);}

@@ -528,3 +527,2 @@

stats: cb => this.stats (cb),
topology: cb => this.topology (cb),
paused: cb => this.paused (cb),

@@ -531,0 +529,0 @@ next_mature_t: cb => this.next_t (cb),

@@ -20,2 +20,3 @@ # keuss

- [Initialization](#initialization)
- [MongoDB defaults](#mongodb-defaults)
- [Queue creation](#queue-creation)

@@ -196,2 +197,8 @@ - [Factory close](#factory-close)

##### MongoDB defaults
On MongoDB-based backends, `signaller` and `stats` default to:
* `signaller`: uses `mongo-capped`, using the same mongodb url than the backend, but postfixing the db with `_signal`
* `stats`: uses `mongo-capped`, using the same mongodb url than the backend, but postfixing the db with `_stats`
This alows cleaner and more concise initialization, using a sane default
#### Queue creation

@@ -198,0 +205,0 @@ ```javascript

@@ -83,3 +83,3 @@ var mubsub = require('@nodebb/mubsub');

static Type () {return 'signal:mongo-capped'}
type () {return Type ()}
type () {return MCSignalFactory.Type ()}

@@ -86,0 +86,0 @@ signal (channel, opts) {

var _ = require ('lodash');
var Stats = require ('../Stats');
var debug = require('debug')('keuss:Stats:Mem');
class MemStats {
class MemStats extends Stats {
constructor (ns, name, factory) {
this._factory = factory;
super (ns, name, factory);
this._s = {

@@ -13,3 +16,2 @@ ns: ns,

opts: {},
topology: {},
paused: false

@@ -19,14 +21,2 @@ };

type () {
return this._factory.type ();
}
ns () {
return this._s.ns;
}
name () {
return this._s.name;
}
values (cb) {

@@ -78,19 +68,5 @@ cb (null, this._s.counters);

topology (tplg, cb) {
if (!cb) {
// get
cb = tplg;
cb (null, this._s.topology);
}
else {
// set
this._s.topology = tplg;
cb ();
}
}
clear (cb) {
this._s.counters = {}
this._s.opts = {};
this._s.topology = {};
this._s.paused = false;

@@ -119,3 +95,3 @@

type () {
return Type ();
return MemStatsFactory.Type ();
}

@@ -122,0 +98,0 @@

@@ -5,2 +5,4 @@ var _ = require ('lodash');

var Stats = require ('../Stats');
var debug = require('debug')('keuss:Stats:Mongo');

@@ -11,9 +13,7 @@

*/
class MongoStats {
class MongoStats extends Stats {
constructor(ns, name, factory, opts) {
this._ns = ns;
this._name = name;
super (ns, name, factory);
this._id = 'keuss:stats:' + ns + ':' + name;
this._opts = opts || {};
this._factory = factory;
this._cache = {};

@@ -33,16 +33,2 @@

type() {
return this._factory.type();
}
ns () {
return this._ns;
}
name () {
return this._name;
}
values(cb) {

@@ -170,24 +156,2 @@ this._coll().findOne ({_id: this._id}, {projection: {counters: 1}}, (err, res) => {

topology (tplg, cb) {
if (!cb) {
// get
cb = tplg;
this._coll().findOne ({_id: this._id}, {projection: {topology: 1}}, (err, res) => {
if (err) return cb (err);
debug ('mongo stats - topology: get %s -> %j', this._name, res);
cb (null, (res && res.topology) || {});
});
}
else {
// set
var upd = {$set: {topology : tplg}};
this._coll().updateOne ({_id: this._id}, upd, {upsert: true}, (err) => {
debug ('mongo stats: updated %s -> %j', this._name, upd);
cb (err);
});
}
}
clear(cb) {

@@ -200,4 +164,3 @@ this._cancelFlush();

counters: 1,
opts: 1,
topology: 1
opts: 1
}

@@ -232,3 +195,3 @@ };

static Type() { return 'mongo' }
type() { return Type() }
type() { return MongoStatsFactory.Type() }

@@ -262,3 +225,2 @@

counters: elem.counters,
topology: elem.topology,
opts: elem.opts,

@@ -265,0 +227,0 @@ paused: elem.paused || false

@@ -1,4 +0,6 @@

var _ = require('lodash');
var async = require('async');
var _ = require ('lodash');
var async = require ('async');
var Stats = require ('../Stats');
var RedisConn = require('../utils/RedisConn');

@@ -14,11 +16,9 @@

* - opts (queue creation opts) in keuss:stats:<ns>:<name>:opts -> string/json
* - topology in keuss:stats:<ns>:<name>:topology -> string/json
*/
class RedisStats {
class RedisStats extends Stats {
constructor(ns, name, factory, opts) {
this._ns = ns;
this._name = name;
super (ns, name, factory);
this._id = 'keuss:stats:' + ns + ':' + name;
this._opts = opts || {};
this._factory = factory;
this._rediscl = factory._rediscl;

@@ -33,15 +33,2 @@ this._cache = {};

type() {
return this._factory.type();
}
ns () {
return this._ns;
}
name () {
return this._name;
}
values(cb) {

@@ -154,25 +141,2 @@ this._rediscl.hgetall (this._id, (err, v) => {

topology (tplg, cb) {
if (!cb) {
// get
cb = tplg;
this._rediscl.hget (this._id, 'topology', (err, res) => {
if (err) return cb(err);
if (!res) return cb(null, {});
try {
if (res) res = JSON.parse(res);
cb (null, res);
}
catch (e) {
cb(e);
}
});
}
else {
// set
this._rediscl.hset (this._id, 'topology', JSON.stringify (tplg), cb);
}
}
clear(cb) {

@@ -183,4 +147,3 @@ this._cancelFlush();

var tasks = [
cb => this._rediscl.hdel (this._id, 'opts', cb),
cb => this._rediscl.hdel (this._id, 'topology', cb)
cb => this._rediscl.hdel (this._id, 'opts', cb)
];

@@ -216,3 +179,3 @@

static Type() { return 'redis' }
type() { return Type() }
type() { return RedisStatsFactory.Type() }

@@ -252,11 +215,5 @@

}
else if (k == 'topology') {
ret[k] = JSON.parse (v[k]);
}
else if (k == 'opts') {
ret[k] = JSON.parse (v[k]);
}
else if (k == 'opts') {
ret[k] = JSON.parse (v[k]);
}
else if (k == 'paused') {

@@ -263,0 +220,0 @@ ret[k] = (v[k] == 'true' ? true : false);

@@ -5,2 +5,7 @@

var LocalSignal = require ('../signal/local');
var MemStats = require ('../stats/mem');
const MongoClient = require ('mongodb').MongoClient;
var factory = null;

@@ -16,3 +21,7 @@

before (function (done) {
var opts = {url: 'mongodb://localhost/keuss_test_bucket_at_least_once'};
var opts = {
url: 'mongodb://localhost/keuss_test_bucket_at_least_once',
signaller: { provider: LocalSignal},
stats: {provider: MemStats}
};

@@ -29,3 +38,7 @@ MQ (opts, function (err, fct) {

cb => setTimeout (cb, 1000),
cb => factory.close (cb)
cb => factory.close (cb),
cb => MongoClient.connect ('mongodb://localhost/keuss_test_bucket_at_least_once', (err, cl) => {
if (err) return done (err);
cl.db().dropDatabase (() => cl.close (cb))
})
], done));

@@ -32,0 +45,0 @@

@@ -5,2 +5,7 @@

var LocalSignal = require ('../signal/local');
var MemStats = require ('../stats/mem');
const MongoClient = require ('mongodb').MongoClient;
var factory = null;

@@ -16,3 +21,7 @@

before (function (done) {
var opts = {url: 'mongodb://localhost/keuss_test_bucket_at_most_once'};
var opts = {
url: 'mongodb://localhost/keuss_test_bucket_at_most_once',
signaller: { provider: LocalSignal},
stats: {provider: MemStats}
};

@@ -29,3 +38,7 @@ MQ (opts, function (err, fct) {

cb => setTimeout (cb, 1000),
cb => factory.close (cb)
cb => factory.close (cb),
cb => MongoClient.connect ('mongodb://localhost/keuss_test_bucket_at_most_once', (err, cl) => {
if (err) return done (err);
cl.db().dropDatabase (() => cl.close (cb))
})
], done));

@@ -32,0 +45,0 @@

@@ -5,7 +5,8 @@ var async = require ('async');

var LocalSignal = require ('../signal/local');
var MemStats = require ('../stats/mem');
const MongoClient = require ('mongodb').MongoClient;
function stats (q, cb) {

@@ -58,2 +59,4 @@ async.series ({

url: 'mongodb://localhost/keuss_test_backends_deadletter',
signaller: { provider: LocalSignal},
stats: {provider: MemStats},
deadletter: {

@@ -60,0 +63,0 @@ }

@@ -5,2 +5,5 @@ var async = require ('async');

var LocalSignal = require ('../signal/local');
var MemStats = require ('../stats/mem');
var MongoClient = require ('mongodb').MongoClient;

@@ -22,2 +25,4 @@

url: 'mongodb://localhost/keuss_test_backends_rcr',
signaller: { provider: LocalSignal},
stats: {provider: MemStats}
};

@@ -42,3 +47,3 @@

it('queue is created empty and ok', done => {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_1');
should.equal(q.nextMatureDate(), null);

@@ -63,3 +68,3 @@

it('sequential push & pops with no delay, go as expected', function (done) {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_2');

@@ -148,3 +153,3 @@ async.series([

it('sequential push & pops with delays, go as expected', function (done) {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_3');

@@ -260,3 +265,3 @@ async.series([

it('timed-out pops work as expected', function (done) {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_4');

@@ -389,3 +394,3 @@ async.series([

it('pop cancellation works as expected', function (done) {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_5');

@@ -467,3 +472,3 @@ async.series([

it('simultaneous timed out pops on delayed items go in the expected order', function (done) {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_6');

@@ -575,3 +580,3 @@ var hrTime = process.hrtime()

it('should do raw reserve & commit as expected', function (done) {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_7');
var id = null;

@@ -657,3 +662,3 @@

it('should do raw reserve & rollback as expected', function (done) {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_8');
var id = null;

@@ -792,3 +797,3 @@

it('should do get.reserve & ok as expected', function (done) {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_9');
var id = null;

@@ -866,3 +871,3 @@

var q = factory.queue('test_queue');
var q = factory.queue('test_queue_10');

@@ -894,3 +899,3 @@ async.series([

var q = factory.queue('test_queue');
var q = factory.queue('test_queue_11');
var state = {};

@@ -940,3 +945,3 @@

it('should manage rollback on unknown id as expected', function (done) {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_12');

@@ -958,3 +963,3 @@ async.series([

it('should rollback ok using full object', done => {
var q = factory.queue('test_queue');
var q = factory.queue('test_queue_13');
var state = {};

@@ -961,0 +966,0 @@

@@ -35,6 +35,8 @@

signaller: {
provider: signal_item.signal
provider: signal_item.signal,
opts: {url: 'mongodb://localhost/keuss_test_pause_signal'}
},
stats: {
provider: stats_item.stats
provider: stats_item.stats,
opts: {url: 'mongodb://localhost/keuss_test_pause_stats'}
}

@@ -57,2 +59,10 @@ };

cl.db().dropDatabase (() => cl.close (cb))
}),
cb => MongoClient.connect ('mongodb://localhost/keuss_test_pause_stats', (err, cl) => {
if (err) return done (err);
cl.db().dropDatabase (() => cl.close (cb))
}),
cb => MongoClient.connect ('mongodb://localhost/keuss_test_pause_signal', (err, cl) => {
if (err) return done (err);
cl.db().dropDatabase (() => cl.close (cb))
})

@@ -59,0 +69,0 @@ ], done);

@@ -5,2 +5,5 @@ const async = require ('async');

var LocalSignal = require ('../signal/local');
var MemStats = require ('../stats/mem');
const MongoClient = require('mongodb').MongoClient;

@@ -32,3 +35,5 @@

url: 'mongodb://localhost/__test_pipeline_choicelink__',
opts: { useUnifiedTopology: true }
opts: { useUnifiedTopology: true },
signaller: { provider: LocalSignal},
stats: {provider: MemStats}
};

@@ -46,2 +51,6 @@

cb => factory.close (cb),
cb => MongoClient.connect ('mongodb://localhost/__test_pipeline_choicelink__', (err, cl) => {
if (err) return done (err);
cl.db().dropDatabase (() => cl.close (cb))
})
], done));

@@ -92,3 +101,3 @@

done ();
setTimeout (done, 250);
}, 100);

@@ -160,6 +169,12 @@ }

const client = new MongoClient('mongodb://localhost/__test_pipeline_choicelink__');
client.connect(err => {
client.db().collection ('pl2').deleteMany ({}, () => done ());
});
sk1.stop ();
sk2.stop ();
sk3.stop ();
setTimeout (() => {
const client = new MongoClient('mongodb://localhost/__test_pipeline_choicelink__');
client.connect(err => {
client.db().collection ('pl2').deleteMany ({}, () => done ());
});
}, 250);
});

@@ -210,6 +225,12 @@

const client = new MongoClient('mongodb://localhost/__test_pipeline_choicelink__');
client.connect(err => {
client.db().collection ('pl3').deleteMany ({}, () => done ());
});
sk1.stop ();
sk2.stop ();
sk3.stop ();
setTimeout (() => {
const client = new MongoClient('mongodb://localhost/__test_pipeline_choicelink__');
client.connect(err => {
client.db().collection ('pl3').deleteMany ({}, () => done ());
});
}, 250);
});

@@ -216,0 +237,0 @@

@@ -5,4 +5,9 @@

var LocalSignal = require ('../signal/local');
var MemStats = require ('../stats/mem');
var PDL = require ('../Pipeline/DirectLink');
const MongoClient = require ('mongodb').MongoClient;
var factory = null;

@@ -17,3 +22,8 @@

before (done => {
var opts = {};
var opts = {
url: 'mongodb://localhost/__test_pipeline_directlink__',
opts: { useUnifiedTopology: true },
signaller: { provider: LocalSignal},
stats: {provider: MemStats}
};

@@ -29,3 +39,7 @@ MQ (opts, (err, fct) => {

cb => setTimeout (cb, 1000),
cb => factory.close (cb)
cb => factory.close (cb),
cb => MongoClient.connect ('mongodb://localhost/__test_pipeline_directlink__', (err, cl) => {
if (err) return done (err);
cl.db().dropDatabase (() => cl.close (cb))
})
], done));

@@ -63,3 +77,3 @@

done ();
setTimeout (done, 250);
});

@@ -104,3 +118,3 @@

done ();
setTimeout (done, 250);
});

@@ -142,3 +156,3 @@

done ();
setTimeout (done, 250);
});

@@ -215,3 +229,3 @@

done ();
setTimeout (done, 250);
});

@@ -264,3 +278,3 @@

res.should.eql ([0,0,0]);
done ();
setTimeout (done, 250);
});

@@ -313,3 +327,3 @@ }, 500);

res.should.eql ([0,0,0]);
done ();
setTimeout (done, 250);
});

@@ -362,3 +376,3 @@ }, 500);

res.should.eql ([0,0,0]);
done ();
setTimeout (done, 250);
});

@@ -371,4 +385,3 @@ }, 500);

});
});

@@ -5,5 +5,10 @@

var LocalSignal = require ('../signal/local');
var MemStats = require ('../stats/mem');
var PDL = require ('../Pipeline/DirectLink');
var PS = require ('../Pipeline/Sink');
const MongoClient = require ('mongodb').MongoClient;
var factory = null;

@@ -18,3 +23,8 @@

before (done => {
var opts = {};
var opts = {
url: 'mongodb://localhost/__test_pipeline_sink__',
opts: { useUnifiedTopology: true },
signaller: { provider: LocalSignal},
stats: {provider: MemStats}
};

@@ -30,3 +40,7 @@ MQ (opts, (err, fct) => {

cb => setTimeout (cb, 1000),
cb => factory.close (cb)
cb => factory.close (cb),
cb => MongoClient.connect ('mongodb://localhost/__test_pipeline_sink__', (err, cl) => {
if (err) return done (err);
cl.db().dropDatabase (() => cl.close (cb))
})
], done));

@@ -33,0 +47,0 @@

@@ -12,2 +12,4 @@

const MongoClient = require ('mongodb').MongoClient;
var ns = 'some-class';

@@ -17,10 +19,8 @@ var name = 'test-stats';

var tests = {
_.forEach ({
Mem: Mem,
Redis: Redis,
Mongo: Mongo
};
}, (CL, CLName) => {
_.forEach (tests, (CL, CLName) => {
describe (CLName + ' stats provider', function () {

@@ -34,2 +34,6 @@

cb => setTimeout (cb, 1000),
cb => MongoClient.connect ('mongodb://localhost/keuss_stats', (err, cl) => {
if (err) return done (err);
cl.db().dropDatabase (() => cl.close (cb))
})
], done));

@@ -157,24 +161,3 @@

it ('manages topology ok', done => {
CL ((err, ftry) => {
if (err) return done(err);
var mem = ftry.stats (ns, name);
var topology = {a:1, b: {t:'yy', tt: 99}};
async.series([
cb => mem.clear (cb),
cb => mem.topology (topology, cb),
cb => setTimeout (() => mem.topology ((err, tplg) => {
tplg.should.eql (topology);
cb();
}), 200),
cb => mem.clear (cb),
cb => setTimeout (cb, 200)
], (err, results) => {
ftry.close();
done (err);
});
});
});
it ('manages pause/resume ok', done => {

@@ -218,4 +201,2 @@ CL ((err, ftry) => {

var opts2 = {s:7, at: 'yy--j'};
var topology1 = {a:1, b: {t:'yy', tt: 99}};
var topology2 = {a:17, b: {t:'yyuyyrtyurt', tt: 77777}, cc: 7};

@@ -225,4 +206,2 @@ async.series([

cb => mem2.clear (cb),
cb => mem1.topology (topology1, cb),
cb => mem2.topology (topology2, cb),
cb => mem1.opts (opts1, cb),

@@ -242,4 +221,4 @@ cb => mem2.opts (opts2, cb),

res.should.eql ({
'test-stats': { name: 'test-stats', ns: 'some-class', topology: topology1, opts: opts1, counters: {v1: 8, v2: 6 }, paused: false },
'test-stats-2': { name: 'test-stats-2', ns: 'some-class', topology: topology2, opts: opts2, counters: {v1: 4, v3: 45}, paused: false }
'test-stats': { name: 'test-stats', ns: 'some-class', opts: opts1, counters: {v1: 8, v2: 6 }, paused: false },
'test-stats-2': { name: 'test-stats-2', ns: 'some-class', opts: opts2, counters: {v1: 4, v3: 45}, paused: false }
});

@@ -246,0 +225,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc