New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

nomoque

Package Overview
Dependencies
Maintainers
1
Versions
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nomoque - npm Package Compare versions

Comparing version 0.1.1 to 0.1.2

39

index.js

@@ -5,5 +5,6 @@ /**

var common = require('./lib/common');
var connect = require('mongodb-async').connect;
var MongodbAsync = require('mongodb-async');
var connect = MongodbAsync.connect;
var AsyncDb = MongodbAsync.AsyncDb;
var Queue = require('./lib/queue');
var Worker = require('./lib/worker');

@@ -27,26 +28,12 @@ /**

*/
exports.createQueue = function(options) {
return new Queue(getDbObj(options));
exports.createQueue = function (options) {
var db;
if (options instanceof AsyncDb) {
db = options;
} else {
options = common.setDefaultOptions(options);
var server = connect(options.host, options.port, {poolSize: options.poolSize});
db = server.db(options.db);
}
return new Queue(db);
};
/**
* Connect to mongodb, create and return an instance of `Worker`
*
* @param options
* @api public
*/
exports.createWorker = function(options) {
return new Worker(getDbObj(options));
};
/**
* Create a mongodb-async instance according to options
* @param options
* @return {Object}
*/
function getDbObj(options) {
options = common.setDefaultOptions(options);
var server = connect(options.host, options.port, {poolSize: options.poolSize});
var db = server.db(options.db);
return db;
}

@@ -12,6 +12,15 @@ /**

var Queue = module.exports = function Queue(db) {
EventEmitter.call(this);
common.ensureIndexes(db);
this._tasks = {};
this._topics = [];
this.delay = 1;
this._state = 'stopped';
this._maxWorkers = 2;
this._currentWorkers = 0;
this.db = db;
this.queColl = db.collection('queues');
this.configColl = db.collection('configs')
this.resultColl = db.collection('results');
this.errColl = db.collection('errors');
this.configColl = db.collection('configs');
};

@@ -30,3 +39,3 @@

*/
Queue.prototype.push = function(topic, message) {
Queue.prototype.push = function (topic, message) {
var self = this;

@@ -43,3 +52,3 @@

self.queColl.insert(msg, {safe: true}).then(
function(doc) {
function (doc) {
if (doc) {

@@ -51,3 +60,3 @@ self.emit('queued', Array.isArray(doc) ? doc[0] : doc);

nextQueueId(this.configColl).and(saveMsg).fail(function(err) {
nextQueueId(this.configColl).and(saveMsg).fail(function (err) {
self.emit('fault', err);

@@ -57,3 +66,190 @@ });

Queue.prototype.close = function() {
/**
* Register a named task for specific topic
*
* @param {String} topic Topic of the queue
* @param {String} name Name of the task
* @param {Function} fn
*/
Queue.prototype.process = function (topic, name, fn) {
var taskHash = {};
if (typeof name === 'object') {
taskHash = name;
} else {
taskHash[name] = fn;
}
if (this._topics.indexOf(topic) === -1) {
this._topics.push(topic);
}
if (!this._tasks[topic]) {
this._tasks[topic] = [];
}
var self = this;
Object.keys(taskHash).forEach(function (taskName) {
self._tasks[topic].push({name: taskName, fn: taskHash[taskName]});
});
};
/**
* Move message from one collection to another
*
* @param message
* @param fromColl
* @param toColl
* @returns {*}
* @private
*/
Queue.prototype._mvMessages = function (message, fromColl, toColl) {
// update don't like `_id`
var id = message._id;
delete message['_id'];
// upsert the message into `toColl`
return toColl.update({_id: id}, {$set: message}, {safe: true, upsert: true})
.and(function (defer) {
// remove the old one from `from`
fromColl.remove({_id: id}, {safe: true})
.then(function () {
message._id = id;
defer.next(message);
}).fail(defer.error);
}).fail(this._onFault(this, true));
};
Queue.prototype._shift = function () {
if (this._state === 'stopped' || this._currentWorkers++ >= this._maxWorkers) {
return;
}
var self = this;
// get messages we are interested
this.queColl.findAndModify({topic: {$in: self._topics}, state: common.STATE_NEW},
// use `nature` order
[],
// modify the message status
{$set: {shifted: new Date, state: common.STATE_SHIFTED}},
// return updated message
{remove: false, 'new': true, upsert: false}).then(function (message) {
if (!message) {
tryNext();
return;
}
// handle shifted message
// move the message from `queues` to collection `results`
self._mvMessages(message, self.queColl, self.resultColl).then(function (message) {
// perform tasks
self._process(message);
});
}).fail(tryNext);
function tryNext() {
// if nothing, wait for `delay` and try shift again
self._currentWorkers--;
setTimeout(function () {
self._shift();
}, self.delay * 1000);
}
};
Queue.prototype._process = function (message) {
var tasks = this._tasks[message.topic];
var self = this;
var count = 0;
var partial = false;
var hasResult = false;
var results = {};
function checkFinish() {
if (++count === tasks.length) {
// update state and results
var updateDoc = {
finished: new Date,
state: partial ? common.STATE_PARTIAL : common.STATE_FINISHED,
results: hasResult ? results : undefined
};
self.resultColl
.update({_id: message._id}, {$set: updateDoc}, {safe: true, upsert: false})
.then(function () {
self.emit('finished', message.topic, results);
// shift next
self._currentWorkers--;
self._shift();
}).fail(self._onFault(self, true));
}
}
// run each tasks
tasks.forEach(function (task) {
var log = {
topic: message.topic,
mid: message._id,
name: task.name,
started: new Date,
state: common.STATE_INPROGRESS
};
try {
task.fn.call(message, message.message, function (taskErr, result) {
var ts = new Date;
log.finished = ts;
log.date = common.getDateString(ts);
if (taskErr) {
partial = true;
// record error
log.state = common.STATE_ERROR;
log.error = taskErr;
self.errColl.insert(log, {safe: true}).fail(self._onFault(self));
self.emit('error', log.error, log);
} else {
log.state = common.STATE_FINISHED;
// record result if any
if (result != undefined) {
hasResult = true;
results[task.name] = result;
}
}
// update the log
var msgLog = {
name: log.name,
started: log.started,
finished: log.finished,
state: log.state
};
self.resultColl
.update({_id: message._id}, {'$push': {logs: msgLog}}, {safe: true, upsert: false})
.then(checkFinish).fail(self._onFault(self));
});
} catch (e) {
log.state = common.STATE_ERROR;
log.error = e.stack || e;
self.errColl.insert(log, {safe: true}).fail(self._onFault(self));
self.emit('error', log.error, log);
}
});
};
Queue.prototype._onFault = function (self, workerFinished) {
return function onFault(err) {
if (err) {
self.emit('fault', err.stack || err);
}
if (workerFinished) {
self._currentWorkers--;
}
};
};
Queue.prototype.start = function (workers) {
this._state = 'running';
if (workers) {
this._maxWorkers = workers;
}
for (var i = 0; i < this._maxWorkers; i++) {
this._shift();
}
};
Queue.prototype.stop = function () {
this._state = 'stopped';
};
Queue.prototype.close = function () {
this.stop();
this.db.close();

@@ -60,0 +256,0 @@ };

{
"name" : "nomoque",
"version" : "0.1.1",
"version" : "0.1.2",
"description" : "A task queue powered by nodejs and mongodb",
"keywords" : ["task", "queue", "mongodb"],
"homepage" : "https://github.com/zir/nomoque",
"author" : "Senmiao Liu <zir.echo@gmail.com>",
"homepage" : "https://github.com/lsm/nomoque",
"author" : "Senmiao Liu <senmiao.liu@gmail.com>",
"main" : "./index",
"dependencies" : {
"mongodb-async": "~0.1.2"
"mongodb-async": "0.3.3"
},
"engines" : {"node" : ">= 0.4.8"}
"engines" : {"node" : ">= 0.8.0"}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc