Comparing version 1.5.12 to 1.6.0
@@ -1,1 +0,1 @@ | ||
{"processes":{"625903ce-ee0a-4ca4-bf41-7fe36a24f6d0":{"parent":null,"children":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"]},"ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee":{"parent":"625903ce-ee0a-4ca4-bf41-7fe36a24f6d0","children":[]}},"files":{"/home/pepe/git/keuss/backends/bucket-mongo-safe.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Queue.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/QFactory.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/signal/local.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Signal.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/stats/mem.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/bucket-mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/pl-mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/ps-mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/redis-oq.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/utils/RedisConn.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/utils/RedisOrderedQueue.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/backends/redis-list.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/stats/mongo.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/stats/redis.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/signal/redis-pubsub.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/signal/mongo-capped.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Pipeline/DirectLink.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Pipeline/BaseLink.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"],"/home/pepe/git/keuss/Pipeline/Sink.js":["ccab1e3b-d9e6-4b1a-bfd6-eace3b9b09ee"]},"externalIds":{}} | ||
{"processes":{"4d184b7a-5a65-4754-ae4d-edc1ab7ce017":{"parent":"b24575bb-b456-49b7-b8a9-c2f9d2cbc2f3","children":[]},"b24575bb-b456-49b7-b8a9-c2f9d2cbc2f3":{"parent":null,"children":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"]}},"files":{"/home/pepe/git/keuss/signal/local.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Signal.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/stats/mem.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Stats.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/bucket-mongo-safe.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Queue.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/QFactory-MongoDB-defaults.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/QFactory.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/signal/mongo-capped.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/stats/mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/bucket-mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/pl-mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/Builder.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/DirectLink.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/BaseLink.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/ChoiceLink.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/Sink.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/Pipeline.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/Pipeline/Queue.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/ps-mongo.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/redis-oq.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/utils/RedisConn.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/utils/RedisOrderedQueue.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/backends/redis-list.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/stats/redis.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"],"/home/pepe/git/keuss/signal/redis-pubsub.js":["4d184b7a-5a65-4754-ae4d-edc1ab7ce017"]},"externalIds":{}} |
@@ -11,4 +11,4 @@ var async = require ('async'); | ||
var Queue = require ('../Queue'); | ||
var QFactory = require ('../QFactory'); | ||
var Queue = require ('../Queue'); | ||
var QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults'); | ||
@@ -528,5 +528,5 @@ var State = { | ||
////////////////////////////////////////////// | ||
constructor (name, factory, opts) { | ||
constructor (name, factory, opts, orig_opts) { | ||
////////////////////////////////////////////// | ||
super (name, factory, opts); | ||
super (name, factory, opts, orig_opts); | ||
@@ -857,3 +857,3 @@ this._factory = factory; | ||
class Factory extends QFactory { | ||
class Factory extends QFactory_MongoDB_defaults { | ||
constructor (opts, mongo_conn) { | ||
@@ -868,3 +868,3 @@ super (opts); | ||
_.merge(full_opts, this._opts, opts); | ||
return new BucketMongoSafeQueue (name, this, full_opts); | ||
return new BucketMongoSafeQueue (name, this, full_opts, opts); | ||
} | ||
@@ -871,0 +871,0 @@ |
@@ -10,12 +10,11 @@ var async = require ('async'); | ||
var Queue = require ('../Queue'); | ||
var QFactory = require ('../QFactory'); | ||
var Queue = require ('../Queue'); | ||
var QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults'); | ||
class BucketMongoQueue extends Queue { | ||
////////////////////////////////////////////// | ||
constructor (name, factory, opts) { | ||
constructor (name, factory, opts, orig_opts) { | ||
////////////////////////////////////////////// | ||
super (name, factory, opts); | ||
super (name, factory, opts, orig_opts); | ||
@@ -298,3 +297,3 @@ this._factory = factory; | ||
class Factory extends QFactory { | ||
class Factory extends QFactory_MongoDB_defaults { | ||
constructor (opts, mongo_conn) { | ||
@@ -309,3 +308,3 @@ super (opts); | ||
_.merge(full_opts, this._opts, opts); | ||
return new BucketMongoQueue (name, this, full_opts); | ||
return new BucketMongoQueue (name, this, full_opts, opts); | ||
} | ||
@@ -312,0 +311,0 @@ |
@@ -6,4 +6,4 @@ var _ = require ('lodash'); | ||
var Queue = require ('../Queue'); | ||
var QFactory = require ('../QFactory'); | ||
var Queue = require ('../Queue'); | ||
var QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults'); | ||
@@ -13,4 +13,4 @@ class SimpleMongoQueue extends Queue { | ||
////////////////////////////////////////////// | ||
constructor (name, factory, opts) { | ||
super (name, factory, opts); | ||
constructor (name, factory, opts, orig_opts) { | ||
super (name, factory, opts, orig_opts); | ||
@@ -198,3 +198,3 @@ this._factory = factory; | ||
class Factory extends QFactory { | ||
class Factory extends QFactory_MongoDB_defaults { | ||
constructor (opts, mongo_conn) { | ||
@@ -207,5 +207,5 @@ super (opts); | ||
queue (name, opts) { | ||
var full_opts = {} | ||
var full_opts = {}; | ||
_.merge(full_opts, this._opts, opts); | ||
return new SimpleMongoQueue (name, this, full_opts); | ||
return new SimpleMongoQueue (name, this, full_opts, opts); | ||
} | ||
@@ -212,0 +212,0 @@ |
@@ -1,49 +0,34 @@ | ||
var _ = require ('lodash'); | ||
const _ = require ('lodash'); | ||
const async = require ('async'); | ||
const vm = require ('vm'); | ||
var MongoClient = require ('mongodb').MongoClient; | ||
var mongo = require ('mongodb'); | ||
const MongoClient = require ('mongodb').MongoClient; | ||
var Queue = require ('../Queue'); | ||
var QFactory = require ('../QFactory'); | ||
const QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults'); | ||
const PipelineBuilder = require ('../Pipeline/Builder'); | ||
const Pipeline = require ('../Pipeline/Pipeline'); | ||
const PipelinedMongoQueue = require ('../Pipeline/Queue'); | ||
class PipelinedMongoQueue extends Queue { | ||
const debug = require('debug')('keuss:Pipeline:Main'); | ||
////////////////////////////////////////////// | ||
constructor (name, pipeline, opts) { | ||
super (name, pipeline._factory, opts); | ||
this._pipeline = pipeline; | ||
this._col = this._pipeline._col; | ||
/////////////////////////////////////////////////////////// | ||
class Factory extends QFactory_MongoDB_defaults { | ||
constructor (opts, mongo_data_conn, mongo_topology_conn) { | ||
super (opts); | ||
this._mongo_data_conn = mongo_data_conn; | ||
this._mongo_topology_conn = mongo_topology_conn; | ||
this._db = mongo_data_conn.db(); | ||
this._topology_db = mongo_topology_conn.db(); | ||
this._pipelines = {}; | ||
// set topology | ||
this._stats.topology ({ | ||
pipeline: pipeline.name() | ||
}, () => {}); | ||
} | ||
///////////////////////////////////////// | ||
pipeline () { | ||
return this._pipeline; | ||
} | ||
///////////////////////////////////////// | ||
static Type () { | ||
return 'mongo:pipeline'; | ||
} | ||
///////////////////////////////////////// | ||
type () { | ||
return 'mongo:pipeline'; | ||
} | ||
///////////////////////////////////////// | ||
// add element to queue | ||
insert (entry, callback) { | ||
entry._q = this._name; | ||
this._col.insertOne (entry, {}, (err, result) => { | ||
if (err) return callback (err); | ||
// TODO result.insertedCount must be 1 | ||
callback (null, result.insertedId); | ||
this._topology_db.collection ('factory').updateOne ({ | ||
_id: this._name | ||
}, { | ||
$set: { | ||
opts: opts | ||
} | ||
}, { | ||
upsert: true | ||
}); | ||
@@ -53,264 +38,90 @@ } | ||
///////////////////////////////////////// | ||
// get element from queue | ||
get (callback) { | ||
this._col.findOneAndDelete ({_q: this._name, mature: {$lte: Queue.nowPlusSecs (0)}}, {sort: {mature : 1}}, (err, result) => { | ||
if (err) return callback (err); | ||
callback (null, result && result.value); | ||
}); | ||
/////////////////////////////////////////////////////////// | ||
builder () { | ||
return new PipelineBuilder (this); | ||
} | ||
////////////////////////////////// | ||
// reserve element: call cb (err, pl) where pl has an id | ||
reserve (callback) { | ||
var delay = this._opts.reserve_delay || 120; | ||
var query = { | ||
_q: this._name, | ||
mature: {$lte: Queue.nowPlusSecs (0)} | ||
/////////////////////////////////////////////////////////// | ||
pipelineFromRecipe (name, bs_src_array, setup_src_array, opts, cb) { | ||
const context = { | ||
Buffer, | ||
clearImmediate, | ||
clearInterval, | ||
clearTimeout, | ||
require, | ||
setImmediate, | ||
setTimeout, | ||
setInterval, | ||
TextEncoder, | ||
TextDecoder, | ||
URL, | ||
URLSearchParams, | ||
builder: this.builder ().pipeline (name), | ||
done: cb | ||
}; | ||
var update = { | ||
$set: {mature: Queue.nowPlusSecs (delay), reserved: new Date ()}, | ||
$inc: {tries: 1} | ||
}; | ||
if (opts && opts.context) _.assign (context, opts.context); | ||
var opts = { | ||
sort: {mature : 1}, | ||
returnOriginal: true | ||
}; | ||
vm.createContext (context); | ||
this._col.findOneAndUpdate (query, update, opts, (err, result) => { | ||
if (err) return callback (err); | ||
callback (null, result && result.value); | ||
}); | ||
} | ||
////////////////////////////////// | ||
// commit previous reserve, by p.id | ||
commit (id, callback) { | ||
var query; | ||
try { | ||
query = { | ||
_id: (_.isString(id) ? new mongo.ObjectID (id) : id), | ||
_q: this._name, | ||
reserved: {$exists: true} | ||
}; | ||
} | ||
catch (e) { | ||
return callback ('id [' + id + '] can not be used as rollback id: ' + e); | ||
} | ||
_.each (bs_src_array, (elem, idx) => { | ||
const src = elem.src || elem; | ||
const sname = elem.name || `bootstrap[${idx}]`; | ||
this._col.deleteOne (query, {}, (err, result) => { | ||
if (err) return callback (err); | ||
callback (null, result && (result.deletedCount == 1)); | ||
}); | ||
} | ||
debug ('Loading BS script %s', sname); | ||
const script = new vm.Script (src, {filename: sname}); | ||
script.runInContext (context); | ||
debug ('BS script %s loaded', sname); | ||
}); | ||
_.each (setup_src_array, (elem, idx) => { | ||
const src = elem.src || elem; | ||
const sname = elem.name || `setup[${idx}]`; | ||
////////////////////////////////// | ||
// rollback previous reserve, by p.id | ||
rollback (id, next_t, callback) { | ||
if (_.isFunction (next_t)) { | ||
callback = next_t; | ||
next_t = null; | ||
debug ('Loading Setup script %s', sname); | ||
const script = new vm.Script (src, {filename: sname}); | ||
script.runInContext (context); | ||
debug ('Setup script %s loaded', sname); | ||
}); | ||
} | ||
var query; | ||
try { | ||
query = { | ||
_id: (_.isString(id) ? new mongo.ObjectID (id) : id), | ||
_q: this._name, | ||
reserved: {$exists: true} | ||
}; | ||
catch (err) { | ||
return cb (err); | ||
} | ||
catch (e) { | ||
return callback ('id [' + id + '] can not be used as rollback id: ' + e); | ||
} | ||
var update = { | ||
$set: {mature: (next_t ? new Date (next_t) : Queue.now ())}, | ||
$unset: {reserved: ''} | ||
}; | ||
this._col.updateOne (query, update, {}, (err, result) => { | ||
if (err) return callback (err); | ||
callback (null, result && (result.modifiedCount == 1)); | ||
}); | ||
} | ||
////////////////////////////////// | ||
// passes element to the next queue in pipeline | ||
pl_step (id, next_queue, opts, callback) { | ||
var q = { | ||
_id: id, | ||
_q: this._name, | ||
reserved: {$exists: true} | ||
}; | ||
var upd = { | ||
$set: { | ||
mature: opts.mature || Queue.now (), | ||
tries: opts.tries || 0, | ||
_q: next_queue.name () | ||
}, | ||
$unset: {reserved: ''} | ||
}; | ||
if (opts.payload) { | ||
upd.$set.payload = opts.payload; | ||
} | ||
else if (opts.update) { | ||
this._embed_update_for_payload (upd, opts.update); | ||
} | ||
this._col.updateOne (q, upd, {}, (err, result) => { | ||
if (err) return callback (err); | ||
callback (null, result && (result.modifiedCount == 1)); | ||
}); | ||
} | ||
////////////////////////////////// | ||
// queue size including non-mature elements | ||
totalSize (callback) { | ||
var q = {_q: this._name}; | ||
var opts = {}; | ||
this._col.countDocuments (q, opts, callback); | ||
} | ||
////////////////////////////////// | ||
// queue size NOT including non-mature elements | ||
size (callback) { | ||
var q = { | ||
_q: this._name, | ||
mature : {$lte : Queue.now ()} | ||
}; | ||
var opts = {}; | ||
this._col.countDocuments (q, opts, callback); | ||
} | ||
////////////////////////////////// | ||
// queue size of non-mature elements only | ||
schedSize (callback) { | ||
var q = { | ||
_q: this._name, | ||
mature : {$gt : Queue.now ()}, | ||
reserved: {$exists: false} | ||
}; | ||
var opts = {}; | ||
this._col.countDocuments (q, opts, callback); | ||
} | ||
////////////////////////////////// | ||
// queue size of reserved elements only | ||
resvSize (callback) { | ||
var q = { | ||
_q: this._name, | ||
mature : {$gt : Queue.now ()}, | ||
reserved: {$exists: true} | ||
}; | ||
var opts = {}; | ||
this._col.countDocuments (q, opts, callback); | ||
} | ||
///////////////////////////////////////// | ||
// get element from queue | ||
next_t (callback) { | ||
this._col.find ({_q: this._name}).limit(1).sort ({mature:1}).project ({mature:1}).next ((err, result) => { | ||
if (err) return callback (err); | ||
callback (null, result && result.mature); | ||
}); | ||
} | ||
////////////////////////////////////////////// | ||
_embed_update_for_payload (dst, src) { | ||
_.each (src, (v, k) => { | ||
if (k.startsWith ('$')) { | ||
_.each (v, (fv, fk) => { | ||
if (!dst[k]) dst[k] = {}; | ||
dst[k]['payload.' + fk] = fv; | ||
}); | ||
this._topology_db.collection ('pipelines').updateOne ({ | ||
_id: name | ||
}, { | ||
$set: { | ||
bs: bs_src_array, | ||
setup: setup_src_array | ||
} | ||
else { | ||
dst['payload.' + k] = v; | ||
} | ||
}, { | ||
upsert: true | ||
}); | ||
} | ||
////////////////////////////////////////////// | ||
// redefnition | ||
_move_to_deadletter (obj, cb) { | ||
this.pl_step (obj._id, this._factory.deadletter_queue (), {}, (err, res) => { | ||
if (err) return cb (err); | ||
this._stats.incr ('get'); | ||
this._factory.deadletter_queue ()._stats.incr ('put'); | ||
this._factory.deadletter_queue ()._signaller.signalInsertion (Queue.now()); | ||
cb (null, false); | ||
}); | ||
} | ||
} | ||
/////////////////////////////////////////////////////////// | ||
pipeline (name) { | ||
if (this._pipelines[name]) { | ||
debug ('returning existing pipeline [%s]', name); | ||
return this._pipelines[name]; | ||
} | ||
class Pipeline { | ||
constructor (name, factory) { | ||
this._name = name; | ||
this._factory = factory; | ||
this._col = factory._db.collection (this._name); | ||
this.ensureIndexes (err => {}); | ||
const pl = new Pipeline (name, this); | ||
debug ('created pipeline [%s]', name); | ||
this._pipelines[name] = pl; | ||
return pl; | ||
} | ||
name () { | ||
return this._name; | ||
} | ||
/////////////////////////////////////////////////////////// | ||
queue (name, opts) { | ||
return new PipelinedMongoQueue (name, this, opts); | ||
} | ||
list (cb) { | ||
// TODO | ||
cb (null, []); | ||
} | ||
/////////////////////////////////////////////////////////////////////////////// | ||
// private parts | ||
////////////////////////////////////////////////////////////////// | ||
// create needed indexes for O(1) functioning | ||
ensureIndexes (cb) { | ||
this._col.createIndex ({_q : 1, mature : 1}, err => cb (err)); | ||
} | ||
} | ||
class Factory extends QFactory { | ||
constructor (opts, mongo_conn) { | ||
super (opts); | ||
this._mongo_conn = mongo_conn; | ||
this._db = mongo_conn.db(); | ||
this._pipelines = {}; | ||
} | ||
queue (name, opts) { | ||
if (!opts) opts = {}; | ||
if (!opts.pipeline) opts.pipeline = 'default'; | ||
var pl_name = opts.pipeline; | ||
var pl_name = opts.pipeline || 'default'; | ||
var pipeline = this._pipelines[pl_name]; | ||
@@ -323,18 +134,22 @@ | ||
var full_opts = {}; | ||
_.merge(full_opts, this._opts, opts); | ||
return pipeline.queue (name, full_opts); | ||
return this._queue_from_pipeline (name, pipeline, opts); | ||
} | ||
/////////////////////////////////////////////////////////// | ||
close (cb) { | ||
super.close (() => { | ||
if (this._mongo_conn) { | ||
this._mongo_conn.close (); | ||
this._mongo_conn = null; | ||
} | ||
if (cb) return cb (); | ||
async.parallel ([ | ||
cb => this._mongo_data_conn.close (cb), | ||
cb => this._mongo_topology_conn.close (cb) | ||
], err => { | ||
this._mongo_data_conn = null; | ||
this._mongo_topology_conn = null; | ||
if (cb) return cb (err); | ||
}); | ||
}); | ||
} | ||
/////////////////////////////////////////////////////////// | ||
type () { | ||
@@ -344,2 +159,4 @@ return PipelinedMongoQueue.Type (); | ||
/////////////////////////////////////////////////////////// | ||
capabilities () { | ||
@@ -352,11 +169,50 @@ return { | ||
} | ||
/////////////////////////////////////////////////////////// | ||
_queue_from_pipeline (name, pipeline, opts) { | ||
if (!opts) opts = {}; | ||
var full_opts = {}; | ||
_.merge(full_opts, this._opts, opts); | ||
return pipeline.queue (name, full_opts, opts); | ||
} | ||
} | ||
/////////////////////////////////////////////////////////// | ||
function creator (opts, cb) { | ||
var _opts = opts || {}; | ||
var m_url = _opts.url || 'mongodb://localhost:27017/keuss'; | ||
const _opts = opts || {}; | ||
const m_url = _opts.url || 'mongodb://localhost:27017/keuss'; | ||
let m_topology_url = _opts.topology_url; | ||
MongoClient.connect (m_url, { useNewUrlParser: true }, (err, cl) => { | ||
if (!m_topology_url) { | ||
let arr = m_url.split ('?'); | ||
arr[0] += '_status'; | ||
m_topology_url = arr.join ('?'); | ||
} | ||
async.series ([ | ||
cb => MongoClient.connect (m_url, { useNewUrlParser: true }, (err, cl) => { | ||
if (err) { | ||
debug ('error while connecting to data mongoDB [%s]', m_url, err); | ||
return cb (err); | ||
} | ||
debug ('connected OK to data mongoDB %s', m_url); | ||
cb (null, cl); | ||
}), | ||
cb => MongoClient.connect (m_topology_url, { useNewUrlParser: true }, (err, cl) => { | ||
if (err) { | ||
debug ('error while connecting to topology mongoDB [%s]', m_topology_url, err); | ||
return cb (err); | ||
} | ||
debug ('connected OK to topology mongoDB %s', m_topology_url); | ||
cb (null, cl); | ||
}), | ||
], (err, res) => { | ||
if (err) return cb (err); | ||
var F = new Factory (_opts, cl); | ||
var F = new Factory (_opts, res[0], res[1]); | ||
F.async_init (err => cb (null, F)); | ||
@@ -363,0 +219,0 @@ }); |
@@ -7,10 +7,11 @@ var async = require ('async'); | ||
var Queue = require ('../Queue'); | ||
var QFactory = require ('../QFactory'); | ||
var Queue = require ('../Queue'); | ||
var QFactory_MongoDB_defaults = require ('../QFactory-MongoDB-defaults'); | ||
class PersistentMongoQueue extends Queue { | ||
////////////////////////////////////////////// | ||
constructor (name, factory, opts) { | ||
super (name, factory, opts); | ||
constructor (name, factory, opts, orig_opts) { | ||
super (name, factory, opts, orig_opts); | ||
@@ -235,3 +236,3 @@ this._factory = factory; | ||
class Factory extends QFactory { | ||
class Factory extends QFactory_MongoDB_defaults { | ||
constructor (opts, mongo_conn) { | ||
@@ -244,5 +245,5 @@ super (opts); | ||
queue (name, opts) { | ||
var full_opts = {} | ||
var full_opts = {}; | ||
_.merge(full_opts, this._opts, opts); | ||
return new PersistentMongoQueue (name, this, full_opts); | ||
return new PersistentMongoQueue (name, this, full_opts, opts); | ||
} | ||
@@ -249,0 +250,0 @@ |
@@ -12,4 +12,4 @@ var async = require ('async'); | ||
////////////////////////////////////////////// | ||
constructor (name, factory, opts) { | ||
super (name, factory, opts); | ||
constructor (name, factory, opts, orig_opts) { | ||
super (name, factory, opts, orig_opts); | ||
@@ -97,5 +97,5 @@ this._rediscl = factory._rediscl; | ||
queue (name, opts) { | ||
var full_opts = {} | ||
var full_opts = {}; | ||
_.merge(full_opts, this._opts, opts); | ||
return new RedisListQueue (name, this, full_opts); | ||
return new RedisListQueue (name, this, full_opts, opts); | ||
} | ||
@@ -128,3 +128,2 @@ | ||
var F = new Factory (_opts, rediscl); | ||
// console.log ('factory created with opts %j, calling async_init', opts) | ||
F.async_init ((err) => cb (null, F)); | ||
@@ -131,0 +130,0 @@ } |
@@ -12,4 +12,4 @@ var _ = require ('lodash') | ||
////////////////////////////////////////////// | ||
constructor (name, factory, opts) { | ||
super (name, factory, opts); | ||
constructor (name, factory, opts, orig_opts) { | ||
super (name, factory, opts, orig_opts); | ||
@@ -148,5 +148,5 @@ this._rediscl = factory._rediscl; | ||
queue (name, opts) { | ||
var full_opts = {} | ||
var full_opts = {}; | ||
_.merge(full_opts, this._opts, opts); | ||
return new RedisOQ (name, this, full_opts); | ||
return new RedisOQ (name, this, full_opts, opts); | ||
} | ||
@@ -153,0 +153,0 @@ |
# Changelog | ||
* v1.6.0 | ||
* added sane defaults for stats and signal for mongodb-based backends (using mongo stats and signal) | ||
* added pipeline builder | ||
* added ability to create a full pipeline from text (making it trivial to be stored in file) | ||
* v1.5.12 | ||
@@ -3,0 +7,0 @@ * corrected small pipeline-related issues |
@@ -21,3 +21,3 @@ var MQ = require ('../../../backends/pl-mongo'); | ||
pdl.start ((elem, done) => { | ||
pdl.start (function (elem, done) { | ||
// pass element to next queue, set payload.passed to true | ||
@@ -32,6 +32,6 @@ done (null, { | ||
// insert elements in the entry queue | ||
async.timesLimit (111, 3, (n, next) => q1.push ({a:n, b:'see it spin...'}, next)); | ||
async.timesLimit (3, 3, (n, next) => q1.push ({a:n, b:'see it spin...'}, next)); | ||
// read elements at the outer end | ||
async.timesLimit (111, 3, (n, next) => q2.pop ('exit', (err, res) => { | ||
async.timesLimit (3, 3, (n, next) => q2.pop ('exit', (err, res) => { | ||
console.log ('end point get', res); | ||
@@ -38,0 +38,0 @@ next (); |
// mongodb: create a consumer and a producer | ||
const Chance = require ('chance'); | ||
const MQ = require ('../../../backends/pl-mongo'); | ||
const signal = require ('../../../signal/mongo-capped'); | ||
const stats = require ('../../../stats/mongo'); | ||
const DCT = require ('../../../Pipeline/DirectLink'); | ||
const SNK = require ('../../../Pipeline/Sink'); | ||
const CHC = require ('../../../Pipeline/ChoiceLink'); | ||
const MQ = require ('../../../backends/pl-mongo'); | ||
const DCT = require ('../../../Pipeline/DirectLink'); | ||
const SNK = require ('../../../Pipeline/Sink'); | ||
const CHC = require ('../../../Pipeline/ChoiceLink'); | ||
@@ -14,3 +12,3 @@ | ||
const num_elems = 1000; | ||
const num_elems = 100; | ||
let processed = 0; | ||
@@ -49,16 +47,5 @@ | ||
const factory_opts = { | ||
url: 'mongodb://localhost/qeus_pl', | ||
signaller: { | ||
provider: signal, | ||
opts: { | ||
url: 'mongodb://localhost/qeus_pl_signal', | ||
channel: 'das_channel' | ||
} | ||
}, | ||
stats: { | ||
provider: stats, | ||
opts: { | ||
url: 'mongodb://localhost/qeus_pl_stats' | ||
} | ||
}, | ||
// url: 'mongodb+srv://keuss:dCxvqgFs3umuZenb@cluster0-e1vob.mongodb.net/qeus?retryWrites=true&w=majority', | ||
url: 'mongodb://localhost/keuss_pl_example_simfork', | ||
options: { useNewUrlParser: true }, | ||
deadletter: { | ||
@@ -74,3 +61,3 @@ max_ko: 3 | ||
// factory ready, create queues on default pipeline | ||
const q_opts = {}; | ||
const q_opts = {aaa: 666, b: 'yy'}; | ||
const q1 = factory.queue ('pl_many_q_1', q_opts); | ||
@@ -77,0 +64,0 @@ const q2 = factory.queue ('pl_many_q_2', q_opts); |
@@ -1,5 +0,15 @@ | ||
var MQ = require ('../backends/bucket-mongo-safe'); | ||
var signal_mongo_capped = require ('../signal/mongo-capped'); | ||
var stats_mongo = require ('../stats/mongo'); | ||
/* | ||
* | ||
* Runs a set of producers and consumers, where consumers do a reserve-commit-rollback cycle. In each cycle a | ||
* consumer would randomly choose whether to commit or rollback (10% chance of rollback). No deadletter is used | ||
* so elements are retried ad infinitum until processed ok | ||
* | ||
* Upon completion (all items generated and consumed ok) some stats will be shown | ||
* | ||
* It uses bucket-mongo-safe backend for high throughput | ||
* | ||
*/ | ||
var MQ = require ('../../backends/bucket-mongo-safe'); | ||
var _ = require ('lodash'); | ||
@@ -12,22 +22,16 @@ var async = require ('async'); | ||
var factory_opts = { | ||
url: 'mongodb://localhost/qeus', | ||
// url: 'mongodb://localhost/qeus', | ||
url: 'mongodb+srv://keuss:dCxvqgFs3umuZenb@cluster0-e1vob.mongodb.net/qeus?retryWrites=true&w=majority', | ||
name: 'Random-NS', | ||
reject_delta_base: 2000, | ||
reject_delta_factor: 2000, | ||
signaller: { | ||
provider: signal_mongo_capped, | ||
opts: { | ||
url: 'mongodb://localhost/qeus_signal', | ||
channel: 'a_channel' | ||
} | ||
}, | ||
stats: { | ||
provider: stats_mongo, | ||
opts: { | ||
url: 'mongodb://localhost/qeus_stats' | ||
} | ||
} | ||
reject_delta_factor: 2000 | ||
}; | ||
// tets dimensions: elems to produce and consume, number of consumers, number of producers | ||
const num_elems = 10000000; | ||
const num_producers = 3; | ||
const num_consumers = 7; | ||
const commit_likelihood = 95; | ||
// stats holder | ||
var selfs = { | ||
@@ -83,3 +87,3 @@ consumers: {}, | ||
var tasks = []; | ||
for (var i = 0; i < 7; i++) tasks.push ((cb) => run_a_pop_consumer (shared_ctx, cb)); | ||
for (var i = 0; i < num_consumers; i++) tasks.push ((cb) => run_a_pop_consumer (shared_ctx, cb)); | ||
async.parallel (tasks, cb); | ||
@@ -104,3 +108,3 @@ } | ||
if (chance.bool({likelihood: 40})) { | ||
if (chance.bool({likelihood: commit_likelihood})) { | ||
shared_ctx.q.ok (res._id, (err) => { | ||
@@ -162,3 +166,3 @@ if (err) return cb (err); | ||
var tasks = []; | ||
for (var i = 0; i < 7; i++) | ||
for (var i = 0; i < num_consumers; i++) | ||
tasks.push ((cb) => run_a_rcr_consumer (shared_ctx, cb)); | ||
@@ -211,3 +215,3 @@ | ||
var tasks = []; | ||
for (var i = 0; i < 3; i++) tasks.push ((cb) => run_a_producer (shared_ctx, cb)); | ||
for (var i = 0; i < num_producers; i++) tasks.push ((cb) => run_a_producer (shared_ctx, cb)); | ||
async.parallel (tasks, cb); | ||
@@ -238,5 +242,5 @@ } | ||
async.parallel ([ | ||
(cb) => run_producers (q, 10000000, cb), | ||
(cb) => run_producers (q, num_elems, cb), | ||
// (cb) => run_consumers (q, 250000, cb), | ||
(cb) => run_rcr_consumers (q, 10000000, cb), | ||
(cb) => run_rcr_consumers (q, num_elems, cb), | ||
], (err) => { | ||
@@ -243,0 +247,0 @@ if (err) { |
{ | ||
"name": "keuss", | ||
"version": "1.5.12", | ||
"version": "1.6.0", | ||
"keywords": [ | ||
@@ -30,14 +30,14 @@ "queue", | ||
"async": "~3.2.0", | ||
"ioredis": "~4.16.3", | ||
"lodash": "~4.17.15", | ||
"mitt": "~1.2.0", | ||
"mongodb": "~3.5.0", | ||
"uuid": "~8.0.0", | ||
"@nodebb/mubsub": "~1.6.0", | ||
"ioredis": "~4.17.3", | ||
"lodash": "~4.17.19", | ||
"mitt": "~2.1.0", | ||
"mongodb": "~3.5.9", | ||
"uuid": "~8.2.0", | ||
"@nodebb/mubsub": "~1.6.2", | ||
"debug": "~4.1.1", | ||
"async-lock": "~1.2.2" | ||
"async-lock": "~1.2.4" | ||
}, | ||
"devDependencies": { | ||
"mocha": "~7.1.2", | ||
"chance": "~1.1.4", | ||
"mocha": "~8.0.1", | ||
"chance": "~1.1.6", | ||
"should": "~13.2.3" | ||
@@ -44,0 +44,0 @@ }, |
@@ -20,3 +20,3 @@ const _ = require ('lodash'); | ||
// check queues are pipelined | ||
if (! src_q.pipeline) throw Error ('source queue is not pipelined'); | ||
if (!src_q.pipeline) throw Error ('source queue is not pipelined'); | ||
@@ -30,4 +30,18 @@ this._opts = opts || {}; | ||
static Type () {return 'pipeline:processor:BaseLink';} | ||
type () {return BaseLink.Type();} | ||
///////////////////////////////////////// | ||
desc () { | ||
return { | ||
type: this.type (), | ||
src: this.src().name() | ||
}; | ||
} | ||
///////////////////////////////////////// | ||
on_data (ondata) { | ||
this._ondata_orig = ondata; | ||
this._ondata = ondata.bind (this); | ||
@@ -40,2 +54,3 @@ } | ||
this._process (this._ondata); | ||
debug ('%s: started', this._name); | ||
} | ||
@@ -48,3 +63,10 @@ | ||
///////////////////////////////////////// | ||
_add_to_pipeline () { | ||
this._src.pipeline()._add_processor (this); | ||
} | ||
///////////////////////////////////////// | ||
_mature (opts) { | ||
@@ -61,3 +83,26 @@ if (opts.mature) { | ||
///////////////////////////////////////// | ||
_on_error (err, elem, ondata) { | ||
// error: drop or retry? | ||
if (err.drop === true) { | ||
// drop: commit and forget | ||
this.src().ok (elem, err => { | ||
if (err) this.emit ('error', {on: 'src-queue-commit-on-error', elem, err}); | ||
debug ('%s: in error, marked to be dropped: %s', this._name, elem._id); | ||
this._process (ondata); | ||
}); | ||
} | ||
else { | ||
// retry: rollback | ||
this.src().ko (elem, this._rollback_next_t (elem), err => { | ||
if (err) this.emit ('error', {on: 'src-queue-rollback-on-error', elem, err}); | ||
debug ('%s: in error, rolled back: %s', this._name, elem._id); | ||
this._process (ondata); | ||
}); | ||
} | ||
} | ||
///////////////////////////////////////// | ||
_process (ondata) { | ||
@@ -70,3 +115,7 @@ debug ('%s: attempting reserve', this._name); | ||
if (err) { | ||
if (err == 'cancel') return; // end the process loop | ||
if (err == 'cancel') { | ||
debug ('%s: pipeline processor cancelled', this._name); | ||
return; // end the process loop | ||
} | ||
debug ('%s: error in reserve:', this._name, err); | ||
@@ -83,27 +132,8 @@ this.emit ('error', {on: 'src-queue-pop', err}); | ||
// do something | ||
try { | ||
ondata (elem, (err, res) => { | ||
debug ('%s: processed: %s', this._name, elem._id); | ||
if (err) { | ||
// error: drop or retry? | ||
if (err.drop === true) { | ||
// drop: commit and forget | ||
this.src().ok (elem, err => { | ||
if (err) this.emit ('error', {on: 'src-queue-commit-on-error', elem, err}); | ||
debug ('%s: in error, marked to be dropped: %s', this._name, elem._id); | ||
this._process (ondata); | ||
}); | ||
} | ||
else { | ||
// retry: rollback | ||
this.src().ko (elem, this._rollback_next_t (elem), err => { | ||
if (err) this.emit ('error', {on: 'src-queue-rollback-on-error', elem, err}); | ||
debug ('%s: in error, rolled back: %s', this._name, elem._id); | ||
this._process (ondata); | ||
}); | ||
} | ||
if (err) return this._on_error (err, elem, ondata); | ||
return; | ||
} | ||
// drop it (act as sink) ? | ||
@@ -141,2 +171,9 @@ if ((res === false) || (res && res.drop)) { | ||
}); | ||
} | ||
catch (e) { | ||
debug ('catch error, emitting it: ', e); | ||
console.log ('catch error, emitting it: ', e); | ||
this.emit (e); | ||
this._on_error (e, elem, ondata); | ||
} | ||
}); | ||
@@ -143,0 +180,0 @@ } |
@@ -12,5 +12,3 @@ var Queue = require ('../Queue'); | ||
if (! _.isArray (dst_q_array)) { | ||
throw Error ('destination-queues is not an array'); | ||
} | ||
if (!_.isArray (dst_q_array)) throw Error ('destination-queues is not an array'); | ||
@@ -31,2 +29,3 @@ this._dst_q_array = dst_q_array; | ||
this._add_to_pipeline (); | ||
debug ('created Pipeline/ChoiceLink %s', this._name); | ||
@@ -41,4 +40,15 @@ } | ||
static Type () {return 'pipeline:processor:ChoiceLink';} | ||
type () {return ChoiceLink.Type();} | ||
///////////////////////////////////////// | ||
desc () { | ||
return _.merge (super.desc(), { | ||
dst: _.map (this._dst_q_array, q => q.name ()) | ||
}); | ||
} | ||
///////////////////////////////////////// | ||
_next (id, opts, cb) { | ||
@@ -45,0 +55,0 @@ let dst = null; |
@@ -24,2 +24,3 @@ var Queue = require ('../Queue'); | ||
this._add_to_pipeline (); | ||
debug ('created Pipeline/DirectLink %s', this._name); | ||
@@ -30,4 +31,15 @@ } | ||
static Type () {return 'pipeline:processor:DirectLink';} | ||
type () {return DirectLink.Type();} | ||
///////////////////////////////////////// | ||
desc () { | ||
return _.merge (super.desc(), { | ||
dst: this.dst().name() | ||
}); | ||
} | ||
///////////////////////////////////////// | ||
_next (id, opts, cb) { | ||
@@ -34,0 +46,0 @@ this.src().pl_step (id, this.dst(), opts, (err, res) => { |
@@ -10,5 +10,9 @@ var debug = require('debug')('keuss:Pipeline:Sink'); | ||
this._name = src_q.name () + '->(sink)'; | ||
this._add_to_pipeline (); | ||
debug ('created Pipeline/Sink %s', this._name); | ||
} | ||
static Type () {return 'pipeline:processor:Sink';} | ||
type () {return Sink.Type();} | ||
///////////////////////////////////////// | ||
@@ -15,0 +19,0 @@ _next (id, opts, callback) { |
@@ -13,5 +13,8 @@ var async = require ('async'); | ||
this._opts = opts || {}; | ||
this._name = opts.name || 'N'; | ||
debug ('created QFactory %s with opts %o', this._name, this._opts); | ||
} | ||
async_init (cb) { | ||
if (!this._opts.stats) this._opts.stats = {}; | ||
@@ -23,6 +26,2 @@ if (!this._opts.stats.opts) this._opts.stats.opts = {}; | ||
debug ('created QFactory %s with opts %o', this._name, this._opts); | ||
} | ||
async_init (cb) { | ||
var signal_provider = this._opts.signaller.provider || LocalSignal; | ||
@@ -112,4 +111,23 @@ var stats_provider = this._opts.stats.provider || MemStats; | ||
} | ||
to_descriptor_obj () { | ||
let obj = { | ||
name: this.name (), | ||
type: this.type (), | ||
opts: _.omit (this._opts, ['stats', 'signaller']), | ||
signaller: { | ||
type: this.signaller_factory().type(), | ||
opts: this._opts.signaller.opts | ||
}, | ||
stats: { | ||
type: this.stats_factory().type(), | ||
opts: this._opts.stats.opts | ||
} | ||
}; | ||
return obj; | ||
} | ||
} | ||
module.exports = QFactory; |
10
Queue.js
@@ -11,3 +11,3 @@ var async = require ('async'); | ||
////////////////////////////////////////////// | ||
constructor (name, factory, opts) { | ||
constructor (name, factory, opts, orig_opts) { | ||
////////////////////////////////////////////// | ||
@@ -19,2 +19,3 @@ if (!name) { | ||
this._opts = opts || {}; | ||
this._orig_opts = orig_opts || {}; | ||
@@ -40,5 +41,4 @@ this._name = name; | ||
// save opts minus stats & signaller | ||
var __opts = _.omit (this._opts, ['signaller', 'stats']); | ||
this._stats.opts (__opts, () => {}); | ||
// save original options | ||
this._stats.opts (orig_opts || {}, () => {}); | ||
@@ -98,3 +98,2 @@ this._stats.incr ('get', 0); | ||
stats (cb) {this._stats.values (cb);} | ||
topology (cb) {this._stats.topology (cb);} | ||
paused (cb) {this._stats.paused (cb);} | ||
@@ -528,3 +527,2 @@ | ||
stats: cb => this.stats (cb), | ||
topology: cb => this.topology (cb), | ||
paused: cb => this.paused (cb), | ||
@@ -531,0 +529,0 @@ next_mature_t: cb => this.next_t (cb), |
@@ -20,2 +20,3 @@ # keuss | ||
- [Initialization](#initialization) | ||
- [MongoDB defaults](#mongodb-defaults) | ||
- [Queue creation](#queue-creation) | ||
@@ -196,2 +197,8 @@ - [Factory close](#factory-close) | ||
##### MongoDB defaults | ||
On MongoDB-based backends, `signaller` and `stats` default to: | ||
* `signaller`: uses `mongo-capped`, using the same mongodb url than the backend, but postfixing the db with `_signal` | ||
* `stats`: uses `mongo-capped`, using the same mongodb url than the backend, but postfixing the db with `_stats` | ||
This alows cleaner and more concise initialization, using a sane default | ||
#### Queue creation | ||
@@ -198,0 +205,0 @@ ```javascript |
@@ -83,3 +83,3 @@ var mubsub = require('@nodebb/mubsub'); | ||
static Type () {return 'signal:mongo-capped'} | ||
type () {return Type ()} | ||
type () {return MCSignalFactory.Type ()} | ||
@@ -86,0 +86,0 @@ signal (channel, opts) { |
var _ = require ('lodash'); | ||
var Stats = require ('../Stats'); | ||
var debug = require('debug')('keuss:Stats:Mem'); | ||
class MemStats { | ||
class MemStats extends Stats { | ||
constructor (ns, name, factory) { | ||
this._factory = factory; | ||
super (ns, name, factory); | ||
this._s = { | ||
@@ -13,3 +16,2 @@ ns: ns, | ||
opts: {}, | ||
topology: {}, | ||
paused: false | ||
@@ -19,14 +21,2 @@ }; | ||
type () { | ||
return this._factory.type (); | ||
} | ||
ns () { | ||
return this._s.ns; | ||
} | ||
name () { | ||
return this._s.name; | ||
} | ||
values (cb) { | ||
@@ -78,19 +68,5 @@ cb (null, this._s.counters); | ||
topology (tplg, cb) { | ||
if (!cb) { | ||
// get | ||
cb = tplg; | ||
cb (null, this._s.topology); | ||
} | ||
else { | ||
// set | ||
this._s.topology = tplg; | ||
cb (); | ||
} | ||
} | ||
clear (cb) { | ||
this._s.counters = {} | ||
this._s.opts = {}; | ||
this._s.topology = {}; | ||
this._s.paused = false; | ||
@@ -119,3 +95,3 @@ | ||
type () { | ||
return Type (); | ||
return MemStatsFactory.Type (); | ||
} | ||
@@ -122,0 +98,0 @@ |
@@ -5,2 +5,4 @@ var _ = require ('lodash'); | ||
var Stats = require ('../Stats'); | ||
var debug = require('debug')('keuss:Stats:Mongo'); | ||
@@ -11,9 +13,7 @@ | ||
*/ | ||
class MongoStats { | ||
class MongoStats extends Stats { | ||
constructor(ns, name, factory, opts) { | ||
this._ns = ns; | ||
this._name = name; | ||
super (ns, name, factory); | ||
this._id = 'keuss:stats:' + ns + ':' + name; | ||
this._opts = opts || {}; | ||
this._factory = factory; | ||
this._cache = {}; | ||
@@ -33,16 +33,2 @@ | ||
type() { | ||
return this._factory.type(); | ||
} | ||
ns () { | ||
return this._ns; | ||
} | ||
name () { | ||
return this._name; | ||
} | ||
values(cb) { | ||
@@ -170,24 +156,2 @@ this._coll().findOne ({_id: this._id}, {projection: {counters: 1}}, (err, res) => { | ||
topology (tplg, cb) { | ||
if (!cb) { | ||
// get | ||
cb = tplg; | ||
this._coll().findOne ({_id: this._id}, {projection: {topology: 1}}, (err, res) => { | ||
if (err) return cb (err); | ||
debug ('mongo stats - topology: get %s -> %j', this._name, res); | ||
cb (null, (res && res.topology) || {}); | ||
}); | ||
} | ||
else { | ||
// set | ||
var upd = {$set: {topology : tplg}}; | ||
this._coll().updateOne ({_id: this._id}, upd, {upsert: true}, (err) => { | ||
debug ('mongo stats: updated %s -> %j', this._name, upd); | ||
cb (err); | ||
}); | ||
} | ||
} | ||
clear(cb) { | ||
@@ -200,4 +164,3 @@ this._cancelFlush(); | ||
counters: 1, | ||
opts: 1, | ||
topology: 1 | ||
opts: 1 | ||
} | ||
@@ -232,3 +195,3 @@ }; | ||
static Type() { return 'mongo' } | ||
type() { return Type() } | ||
type() { return MongoStatsFactory.Type() } | ||
@@ -262,3 +225,2 @@ | ||
counters: elem.counters, | ||
topology: elem.topology, | ||
opts: elem.opts, | ||
@@ -265,0 +227,0 @@ paused: elem.paused || false |
@@ -1,4 +0,6 @@ | ||
var _ = require('lodash'); | ||
var async = require('async'); | ||
var _ = require ('lodash'); | ||
var async = require ('async'); | ||
var Stats = require ('../Stats'); | ||
var RedisConn = require('../utils/RedisConn'); | ||
@@ -14,11 +16,9 @@ | ||
* - opts (queue creation opts) in keuss:stats:<ns>:<name>:opts -> string/json | ||
* - topology in keuss:stats:<ns>:<name>:topology -> string/json | ||
*/ | ||
class RedisStats { | ||
class RedisStats extends Stats { | ||
constructor(ns, name, factory, opts) { | ||
this._ns = ns; | ||
this._name = name; | ||
super (ns, name, factory); | ||
this._id = 'keuss:stats:' + ns + ':' + name; | ||
this._opts = opts || {}; | ||
this._factory = factory; | ||
this._rediscl = factory._rediscl; | ||
@@ -33,15 +33,2 @@ this._cache = {}; | ||
type() { | ||
return this._factory.type(); | ||
} | ||
ns () { | ||
return this._ns; | ||
} | ||
name () { | ||
return this._name; | ||
} | ||
values(cb) { | ||
@@ -154,25 +141,2 @@ this._rediscl.hgetall (this._id, (err, v) => { | ||
topology (tplg, cb) { | ||
if (!cb) { | ||
// get | ||
cb = tplg; | ||
this._rediscl.hget (this._id, 'topology', (err, res) => { | ||
if (err) return cb(err); | ||
if (!res) return cb(null, {}); | ||
try { | ||
if (res) res = JSON.parse(res); | ||
cb (null, res); | ||
} | ||
catch (e) { | ||
cb(e); | ||
} | ||
}); | ||
} | ||
else { | ||
// set | ||
this._rediscl.hset (this._id, 'topology', JSON.stringify (tplg), cb); | ||
} | ||
} | ||
clear(cb) { | ||
@@ -183,4 +147,3 @@ this._cancelFlush(); | ||
var tasks = [ | ||
cb => this._rediscl.hdel (this._id, 'opts', cb), | ||
cb => this._rediscl.hdel (this._id, 'topology', cb) | ||
cb => this._rediscl.hdel (this._id, 'opts', cb) | ||
]; | ||
@@ -216,3 +179,3 @@ | ||
static Type() { return 'redis' } | ||
type() { return Type() } | ||
type() { return RedisStatsFactory.Type() } | ||
@@ -252,11 +215,5 @@ | ||
} | ||
else if (k == 'topology') { | ||
ret[k] = JSON.parse (v[k]); | ||
} | ||
else if (k == 'opts') { | ||
ret[k] = JSON.parse (v[k]); | ||
} | ||
else if (k == 'opts') { | ||
ret[k] = JSON.parse (v[k]); | ||
} | ||
else if (k == 'paused') { | ||
@@ -263,0 +220,0 @@ ret[k] = (v[k] == 'true' ? true : false); |
@@ -5,2 +5,7 @@ | ||
var LocalSignal = require ('../signal/local'); | ||
var MemStats = require ('../stats/mem'); | ||
const MongoClient = require ('mongodb').MongoClient; | ||
var factory = null; | ||
@@ -16,3 +21,7 @@ | ||
before (function (done) { | ||
var opts = {url: 'mongodb://localhost/keuss_test_bucket_at_least_once'}; | ||
var opts = { | ||
url: 'mongodb://localhost/keuss_test_bucket_at_least_once', | ||
signaller: { provider: LocalSignal}, | ||
stats: {provider: MemStats} | ||
}; | ||
@@ -29,3 +38,7 @@ MQ (opts, function (err, fct) { | ||
cb => setTimeout (cb, 1000), | ||
cb => factory.close (cb) | ||
cb => factory.close (cb), | ||
cb => MongoClient.connect ('mongodb://localhost/keuss_test_bucket_at_least_once', (err, cl) => { | ||
if (err) return done (err); | ||
cl.db().dropDatabase (() => cl.close (cb)) | ||
}) | ||
], done)); | ||
@@ -32,0 +45,0 @@ |
@@ -5,2 +5,7 @@ | ||
var LocalSignal = require ('../signal/local'); | ||
var MemStats = require ('../stats/mem'); | ||
const MongoClient = require ('mongodb').MongoClient; | ||
var factory = null; | ||
@@ -16,3 +21,7 @@ | ||
before (function (done) { | ||
var opts = {url: 'mongodb://localhost/keuss_test_bucket_at_most_once'}; | ||
var opts = { | ||
url: 'mongodb://localhost/keuss_test_bucket_at_most_once', | ||
signaller: { provider: LocalSignal}, | ||
stats: {provider: MemStats} | ||
}; | ||
@@ -29,3 +38,7 @@ MQ (opts, function (err, fct) { | ||
cb => setTimeout (cb, 1000), | ||
cb => factory.close (cb) | ||
cb => factory.close (cb), | ||
cb => MongoClient.connect ('mongodb://localhost/keuss_test_bucket_at_most_once', (err, cl) => { | ||
if (err) return done (err); | ||
cl.db().dropDatabase (() => cl.close (cb)) | ||
}) | ||
], done)); | ||
@@ -32,0 +45,0 @@ |
@@ -5,7 +5,8 @@ var async = require ('async'); | ||
var LocalSignal = require ('../signal/local'); | ||
var MemStats = require ('../stats/mem'); | ||
const MongoClient = require ('mongodb').MongoClient; | ||
function stats (q, cb) { | ||
@@ -58,2 +59,4 @@ async.series ({ | ||
url: 'mongodb://localhost/keuss_test_backends_deadletter', | ||
signaller: { provider: LocalSignal}, | ||
stats: {provider: MemStats}, | ||
deadletter: { | ||
@@ -60,0 +63,0 @@ } |
@@ -5,2 +5,5 @@ var async = require ('async'); | ||
var LocalSignal = require ('../signal/local'); | ||
var MemStats = require ('../stats/mem'); | ||
var MongoClient = require ('mongodb').MongoClient; | ||
@@ -22,2 +25,4 @@ | ||
url: 'mongodb://localhost/keuss_test_backends_rcr', | ||
signaller: { provider: LocalSignal}, | ||
stats: {provider: MemStats} | ||
}; | ||
@@ -42,3 +47,3 @@ | ||
it('queue is created empty and ok', done => { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_1'); | ||
should.equal(q.nextMatureDate(), null); | ||
@@ -63,3 +68,3 @@ | ||
it('sequential push & pops with no delay, go as expected', function (done) { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_2'); | ||
@@ -148,3 +153,3 @@ async.series([ | ||
it('sequential push & pops with delays, go as expected', function (done) { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_3'); | ||
@@ -260,3 +265,3 @@ async.series([ | ||
it('timed-out pops work as expected', function (done) { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_4'); | ||
@@ -389,3 +394,3 @@ async.series([ | ||
it('pop cancellation works as expected', function (done) { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_5'); | ||
@@ -467,3 +472,3 @@ async.series([ | ||
it('simultaneous timed out pops on delayed items go in the expected order', function (done) { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_6'); | ||
@@ -575,3 +580,3 @@ var hrTime = process.hrtime() | ||
it('should do raw reserve & commit as expected', function (done) { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_7'); | ||
var id = null; | ||
@@ -657,3 +662,3 @@ | ||
it('should do raw reserve & rollback as expected', function (done) { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_8'); | ||
var id = null; | ||
@@ -792,3 +797,3 @@ | ||
it('should do get.reserve & ok as expected', function (done) { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_9'); | ||
var id = null; | ||
@@ -866,3 +871,3 @@ | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_10'); | ||
@@ -894,3 +899,3 @@ async.series([ | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_11'); | ||
var state = {}; | ||
@@ -940,3 +945,3 @@ | ||
it('should manage rollback on unknown id as expected', function (done) { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_12'); | ||
@@ -958,3 +963,3 @@ async.series([ | ||
it('should rollback ok using full object', done => { | ||
var q = factory.queue('test_queue'); | ||
var q = factory.queue('test_queue_13'); | ||
var state = {}; | ||
@@ -961,0 +966,0 @@ |
@@ -35,6 +35,8 @@ | ||
signaller: { | ||
provider: signal_item.signal | ||
provider: signal_item.signal, | ||
opts: {url: 'mongodb://localhost/keuss_test_pause_signal'} | ||
}, | ||
stats: { | ||
provider: stats_item.stats | ||
provider: stats_item.stats, | ||
opts: {url: 'mongodb://localhost/keuss_test_pause_stats'} | ||
} | ||
@@ -57,2 +59,10 @@ }; | ||
cl.db().dropDatabase (() => cl.close (cb)) | ||
}), | ||
cb => MongoClient.connect ('mongodb://localhost/keuss_test_pause_stats', (err, cl) => { | ||
if (err) return done (err); | ||
cl.db().dropDatabase (() => cl.close (cb)) | ||
}), | ||
cb => MongoClient.connect ('mongodb://localhost/keuss_test_pause_signal', (err, cl) => { | ||
if (err) return done (err); | ||
cl.db().dropDatabase (() => cl.close (cb)) | ||
}) | ||
@@ -59,0 +69,0 @@ ], done); |
@@ -5,2 +5,5 @@ const async = require ('async'); | ||
var LocalSignal = require ('../signal/local'); | ||
var MemStats = require ('../stats/mem'); | ||
const MongoClient = require('mongodb').MongoClient; | ||
@@ -32,3 +35,5 @@ | ||
url: 'mongodb://localhost/__test_pipeline_choicelink__', | ||
opts: { useUnifiedTopology: true } | ||
opts: { useUnifiedTopology: true }, | ||
signaller: { provider: LocalSignal}, | ||
stats: {provider: MemStats} | ||
}; | ||
@@ -46,2 +51,6 @@ | ||
cb => factory.close (cb), | ||
cb => MongoClient.connect ('mongodb://localhost/__test_pipeline_choicelink__', (err, cl) => { | ||
if (err) return done (err); | ||
cl.db().dropDatabase (() => cl.close (cb)) | ||
}) | ||
], done)); | ||
@@ -92,3 +101,3 @@ | ||
done (); | ||
setTimeout (done, 250); | ||
}, 100); | ||
@@ -160,6 +169,12 @@ } | ||
const client = new MongoClient('mongodb://localhost/__test_pipeline_choicelink__'); | ||
client.connect(err => { | ||
client.db().collection ('pl2').deleteMany ({}, () => done ()); | ||
}); | ||
sk1.stop (); | ||
sk2.stop (); | ||
sk3.stop (); | ||
setTimeout (() => { | ||
const client = new MongoClient('mongodb://localhost/__test_pipeline_choicelink__'); | ||
client.connect(err => { | ||
client.db().collection ('pl2').deleteMany ({}, () => done ()); | ||
}); | ||
}, 250); | ||
}); | ||
@@ -210,6 +225,12 @@ | ||
const client = new MongoClient('mongodb://localhost/__test_pipeline_choicelink__'); | ||
client.connect(err => { | ||
client.db().collection ('pl3').deleteMany ({}, () => done ()); | ||
}); | ||
sk1.stop (); | ||
sk2.stop (); | ||
sk3.stop (); | ||
setTimeout (() => { | ||
const client = new MongoClient('mongodb://localhost/__test_pipeline_choicelink__'); | ||
client.connect(err => { | ||
client.db().collection ('pl3').deleteMany ({}, () => done ()); | ||
}); | ||
}, 250); | ||
}); | ||
@@ -216,0 +237,0 @@ |
@@ -5,4 +5,9 @@ | ||
var LocalSignal = require ('../signal/local'); | ||
var MemStats = require ('../stats/mem'); | ||
var PDL = require ('../Pipeline/DirectLink'); | ||
const MongoClient = require ('mongodb').MongoClient; | ||
var factory = null; | ||
@@ -17,3 +22,8 @@ | ||
before (done => { | ||
var opts = {}; | ||
var opts = { | ||
url: 'mongodb://localhost/__test_pipeline_directlink__', | ||
opts: { useUnifiedTopology: true }, | ||
signaller: { provider: LocalSignal}, | ||
stats: {provider: MemStats} | ||
}; | ||
@@ -29,3 +39,7 @@ MQ (opts, (err, fct) => { | ||
cb => setTimeout (cb, 1000), | ||
cb => factory.close (cb) | ||
cb => factory.close (cb), | ||
cb => MongoClient.connect ('mongodb://localhost/__test_pipeline_directlink__', (err, cl) => { | ||
if (err) return done (err); | ||
cl.db().dropDatabase (() => cl.close (cb)) | ||
}) | ||
], done)); | ||
@@ -63,3 +77,3 @@ | ||
done (); | ||
setTimeout (done, 250); | ||
}); | ||
@@ -104,3 +118,3 @@ | ||
done (); | ||
setTimeout (done, 250); | ||
}); | ||
@@ -142,3 +156,3 @@ | ||
done (); | ||
setTimeout (done, 250); | ||
}); | ||
@@ -215,3 +229,3 @@ | ||
done (); | ||
setTimeout (done, 250); | ||
}); | ||
@@ -264,3 +278,3 @@ | ||
res.should.eql ([0,0,0]); | ||
done (); | ||
setTimeout (done, 250); | ||
}); | ||
@@ -313,3 +327,3 @@ }, 500); | ||
res.should.eql ([0,0,0]); | ||
done (); | ||
setTimeout (done, 250); | ||
}); | ||
@@ -362,3 +376,3 @@ }, 500); | ||
res.should.eql ([0,0,0]); | ||
done (); | ||
setTimeout (done, 250); | ||
}); | ||
@@ -371,4 +385,3 @@ }, 500); | ||
}); | ||
}); |
@@ -5,5 +5,10 @@ | ||
var LocalSignal = require ('../signal/local'); | ||
var MemStats = require ('../stats/mem'); | ||
var PDL = require ('../Pipeline/DirectLink'); | ||
var PS = require ('../Pipeline/Sink'); | ||
const MongoClient = require ('mongodb').MongoClient; | ||
var factory = null; | ||
@@ -18,3 +23,8 @@ | ||
before (done => { | ||
var opts = {}; | ||
var opts = { | ||
url: 'mongodb://localhost/__test_pipeline_sink__', | ||
opts: { useUnifiedTopology: true }, | ||
signaller: { provider: LocalSignal}, | ||
stats: {provider: MemStats} | ||
}; | ||
@@ -30,3 +40,7 @@ MQ (opts, (err, fct) => { | ||
cb => setTimeout (cb, 1000), | ||
cb => factory.close (cb) | ||
cb => factory.close (cb), | ||
cb => MongoClient.connect ('mongodb://localhost/__test_pipeline_sink__', (err, cl) => { | ||
if (err) return done (err); | ||
cl.db().dropDatabase (() => cl.close (cb)) | ||
}) | ||
], done)); | ||
@@ -33,0 +47,0 @@ |
@@ -12,2 +12,4 @@ | ||
const MongoClient = require ('mongodb').MongoClient; | ||
var ns = 'some-class'; | ||
@@ -17,10 +19,8 @@ var name = 'test-stats'; | ||
var tests = { | ||
_.forEach ({ | ||
Mem: Mem, | ||
Redis: Redis, | ||
Mongo: Mongo | ||
}; | ||
}, (CL, CLName) => { | ||
_.forEach (tests, (CL, CLName) => { | ||
describe (CLName + ' stats provider', function () { | ||
@@ -34,2 +34,6 @@ | ||
cb => setTimeout (cb, 1000), | ||
cb => MongoClient.connect ('mongodb://localhost/keuss_stats', (err, cl) => { | ||
if (err) return done (err); | ||
cl.db().dropDatabase (() => cl.close (cb)) | ||
}) | ||
], done)); | ||
@@ -157,24 +161,3 @@ | ||
it ('manages topology ok', done => { | ||
CL ((err, ftry) => { | ||
if (err) return done(err); | ||
var mem = ftry.stats (ns, name); | ||
var topology = {a:1, b: {t:'yy', tt: 99}}; | ||
async.series([ | ||
cb => mem.clear (cb), | ||
cb => mem.topology (topology, cb), | ||
cb => setTimeout (() => mem.topology ((err, tplg) => { | ||
tplg.should.eql (topology); | ||
cb(); | ||
}), 200), | ||
cb => mem.clear (cb), | ||
cb => setTimeout (cb, 200) | ||
], (err, results) => { | ||
ftry.close(); | ||
done (err); | ||
}); | ||
}); | ||
}); | ||
it ('manages pause/resume ok', done => { | ||
@@ -218,4 +201,2 @@ CL ((err, ftry) => { | ||
var opts2 = {s:7, at: 'yy--j'}; | ||
var topology1 = {a:1, b: {t:'yy', tt: 99}}; | ||
var topology2 = {a:17, b: {t:'yyuyyrtyurt', tt: 77777}, cc: 7}; | ||
@@ -225,4 +206,2 @@ async.series([ | ||
cb => mem2.clear (cb), | ||
cb => mem1.topology (topology1, cb), | ||
cb => mem2.topology (topology2, cb), | ||
cb => mem1.opts (opts1, cb), | ||
@@ -242,4 +221,4 @@ cb => mem2.opts (opts2, cb), | ||
res.should.eql ({ | ||
'test-stats': { name: 'test-stats', ns: 'some-class', topology: topology1, opts: opts1, counters: {v1: 8, v2: 6 }, paused: false }, | ||
'test-stats-2': { name: 'test-stats-2', ns: 'some-class', topology: topology2, opts: opts2, counters: {v1: 4, v3: 45}, paused: false } | ||
'test-stats': { name: 'test-stats', ns: 'some-class', opts: opts1, counters: {v1: 8, v2: 6 }, paused: false }, | ||
'test-stats-2': { name: 'test-stats-2', ns: 'some-class', opts: opts2, counters: {v1: 4, v3: 45}, paused: false } | ||
}); | ||
@@ -246,0 +225,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Debug access
Supply chain riskUses debug, reflection and dynamic code execution features.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1657927
118
9687
571
2
+ Addedioredis@4.17.3(transitive)
+ Addedmitt@2.1.0(transitive)
+ Addeduuid@8.2.0(transitive)
- Removedioredis@4.16.3(transitive)
- Removedmitt@1.2.0(transitive)
- Removeduuid@8.0.0(transitive)
Updated@nodebb/mubsub@~1.6.2
Updatedasync-lock@~1.2.4
Updatedioredis@~4.17.3
Updatedlodash@~4.17.19
Updatedmitt@~2.1.0
Updatedmongodb@~3.5.9
Updateduuid@~8.2.0