Comparing version 0.1.1 to 0.1.2
39
index.js
@@ -5,5 +5,6 @@ /** | ||
var common = require('./lib/common'); | ||
var connect = require('mongodb-async').connect; | ||
var MongodbAsync = require('mongodb-async'); | ||
var connect = MongodbAsync.connect; | ||
var AsyncDb = MongodbAsync.AsyncDb; | ||
var Queue = require('./lib/queue'); | ||
var Worker = require('./lib/worker'); | ||
@@ -27,26 +28,12 @@ /** | ||
*/ | ||
exports.createQueue = function(options) { | ||
return new Queue(getDbObj(options)); | ||
exports.createQueue = function (options) { | ||
var db; | ||
if (options instanceof AsyncDb) { | ||
db = options; | ||
} else { | ||
options = common.setDefaultOptions(options); | ||
var server = connect(options.host, options.port, {poolSize: options.poolSize}); | ||
db = server.db(options.db); | ||
} | ||
return new Queue(db); | ||
}; | ||
/** | ||
* Connect to mongodb, create and return an instance of `Worker` | ||
* | ||
* @param options | ||
* @api public | ||
*/ | ||
exports.createWorker = function(options) { | ||
return new Worker(getDbObj(options)); | ||
}; | ||
/** | ||
* Create a mongodb-async instance according to options | ||
* @param options | ||
* @return {Object} | ||
*/ | ||
function getDbObj(options) { | ||
options = common.setDefaultOptions(options); | ||
var server = connect(options.host, options.port, {poolSize: options.poolSize}); | ||
var db = server.db(options.db); | ||
return db; | ||
} |
206
lib/queue.js
@@ -12,6 +12,15 @@ /** | ||
var Queue = module.exports = function Queue(db) { | ||
EventEmitter.call(this); | ||
common.ensureIndexes(db); | ||
this._tasks = {}; | ||
this._topics = []; | ||
this.delay = 1; | ||
this._state = 'stopped'; | ||
this._maxWorkers = 2; | ||
this._currentWorkers = 0; | ||
this.db = db; | ||
this.queColl = db.collection('queues'); | ||
this.configColl = db.collection('configs') | ||
this.resultColl = db.collection('results'); | ||
this.errColl = db.collection('errors'); | ||
this.configColl = db.collection('configs'); | ||
}; | ||
@@ -30,3 +39,3 @@ | ||
*/ | ||
Queue.prototype.push = function(topic, message) { | ||
Queue.prototype.push = function (topic, message) { | ||
var self = this; | ||
@@ -43,3 +52,3 @@ | ||
self.queColl.insert(msg, {safe: true}).then( | ||
function(doc) { | ||
function (doc) { | ||
if (doc) { | ||
@@ -51,3 +60,3 @@ self.emit('queued', Array.isArray(doc) ? doc[0] : doc); | ||
nextQueueId(this.configColl).and(saveMsg).fail(function(err) { | ||
nextQueueId(this.configColl).and(saveMsg).fail(function (err) { | ||
self.emit('fault', err); | ||
@@ -57,3 +66,190 @@ }); | ||
Queue.prototype.close = function() { | ||
/** | ||
* Register a named task for specific topic | ||
* | ||
* @param {String} topic Topic of the queue | ||
* @param {String} name Name of the task | ||
* @param {Function} fn | ||
*/ | ||
Queue.prototype.process = function (topic, name, fn) { | ||
var taskHash = {}; | ||
if (typeof name === 'object') { | ||
taskHash = name; | ||
} else { | ||
taskHash[name] = fn; | ||
} | ||
if (this._topics.indexOf(topic) === -1) { | ||
this._topics.push(topic); | ||
} | ||
if (!this._tasks[topic]) { | ||
this._tasks[topic] = []; | ||
} | ||
var self = this; | ||
Object.keys(taskHash).forEach(function (taskName) { | ||
self._tasks[topic].push({name: taskName, fn: taskHash[taskName]}); | ||
}); | ||
}; | ||
/** | ||
* Move message from one collection to another | ||
* | ||
* @param message | ||
* @param fromColl | ||
* @param toColl | ||
* @returns {*} | ||
* @private | ||
*/ | ||
Queue.prototype._mvMessages = function (message, fromColl, toColl) { | ||
// update don't like `_id` | ||
var id = message._id; | ||
delete message['_id']; | ||
// upsert the message into `toColl` | ||
return toColl.update({_id: id}, {$set: message}, {safe: true, upsert: true}) | ||
.and(function (defer) { | ||
// remove the old one from `from` | ||
fromColl.remove({_id: id}, {safe: true}) | ||
.then(function () { | ||
message._id = id; | ||
defer.next(message); | ||
}).fail(defer.error); | ||
}).fail(this._onFault(this, true)); | ||
}; | ||
Queue.prototype._shift = function () { | ||
if (this._state === 'stopped' || this._currentWorkers++ >= this._maxWorkers) { | ||
return; | ||
} | ||
var self = this; | ||
// get messages we are interested | ||
this.queColl.findAndModify({topic: {$in: self._topics}, state: common.STATE_NEW}, | ||
// use `nature` order | ||
[], | ||
// modify the message status | ||
{$set: {shifted: new Date, state: common.STATE_SHIFTED}}, | ||
// return updated message | ||
{remove: false, 'new': true, upsert: false}).then(function (message) { | ||
if (!message) { | ||
tryNext(); | ||
return; | ||
} | ||
// handle shifted message | ||
// move the message from `queues` to collection `results` | ||
self._mvMessages(message, self.queColl, self.resultColl).then(function (message) { | ||
// perform tasks | ||
self._process(message); | ||
}); | ||
}).fail(tryNext); | ||
function tryNext() { | ||
// if nothing, wait for `delay` and try shift again | ||
self._currentWorkers--; | ||
setTimeout(function () { | ||
self._shift(); | ||
}, self.delay * 1000); | ||
} | ||
}; | ||
Queue.prototype._process = function (message) { | ||
var tasks = this._tasks[message.topic]; | ||
var self = this; | ||
var count = 0; | ||
var partial = false; | ||
var hasResult = false; | ||
var results = {}; | ||
function checkFinish() { | ||
if (++count === tasks.length) { | ||
// update state and results | ||
var updateDoc = { | ||
finished: new Date, | ||
state: partial ? common.STATE_PARTIAL : common.STATE_FINISHED, | ||
results: hasResult ? results : undefined | ||
}; | ||
self.resultColl | ||
.update({_id: message._id}, {$set: updateDoc}, {safe: true, upsert: false}) | ||
.then(function () { | ||
self.emit('finished', message.topic, results); | ||
// shift next | ||
self._currentWorkers--; | ||
self._shift(); | ||
}).fail(self._onFault(self, true)); | ||
} | ||
} | ||
// run each tasks | ||
tasks.forEach(function (task) { | ||
var log = { | ||
topic: message.topic, | ||
mid: message._id, | ||
name: task.name, | ||
started: new Date, | ||
state: common.STATE_INPROGRESS | ||
}; | ||
try { | ||
task.fn.call(message, message.message, function (taskErr, result) { | ||
var ts = new Date; | ||
log.finished = ts; | ||
log.date = common.getDateString(ts); | ||
if (taskErr) { | ||
partial = true; | ||
// record error | ||
log.state = common.STATE_ERROR; | ||
log.error = taskErr; | ||
self.errColl.insert(log, {safe: true}).fail(self._onFault(self)); | ||
self.emit('error', log.error, log); | ||
} else { | ||
log.state = common.STATE_FINISHED; | ||
// record result if any | ||
if (result != undefined) { | ||
hasResult = true; | ||
results[task.name] = result; | ||
} | ||
} | ||
// update the log | ||
var msgLog = { | ||
name: log.name, | ||
started: log.started, | ||
finished: log.finished, | ||
state: log.state | ||
}; | ||
self.resultColl | ||
.update({_id: message._id}, {'$push': {logs: msgLog}}, {safe: true, upsert: false}) | ||
.then(checkFinish).fail(self._onFault(self)); | ||
}); | ||
} catch (e) { | ||
log.state = common.STATE_ERROR; | ||
log.error = e.stack || e; | ||
self.errColl.insert(log, {safe: true}).fail(self._onFault(self)); | ||
self.emit('error', log.error, log); | ||
} | ||
}); | ||
}; | ||
Queue.prototype._onFault = function (self, workerFinished) { | ||
return function onFault(err) { | ||
if (err) { | ||
self.emit('fault', err.stack || err); | ||
} | ||
if (workerFinished) { | ||
self._currentWorkers--; | ||
} | ||
}; | ||
}; | ||
Queue.prototype.start = function (workers) { | ||
this._state = 'running'; | ||
if (workers) { | ||
this._maxWorkers = workers; | ||
} | ||
for (var i = 0; i < this._maxWorkers; i++) { | ||
this._shift(); | ||
} | ||
}; | ||
Queue.prototype.stop = function () { | ||
this._state = 'stopped'; | ||
}; | ||
Queue.prototype.close = function () { | ||
this.stop(); | ||
this.db.close(); | ||
@@ -60,0 +256,0 @@ }; |
{ | ||
"name" : "nomoque", | ||
"version" : "0.1.1", | ||
"version" : "0.1.2", | ||
"description" : "A task queue powered by nodejs and mongodb", | ||
"keywords" : ["task", "queue", "mongodb"], | ||
"homepage" : "https://github.com/zir/nomoque", | ||
"author" : "Senmiao Liu <zir.echo@gmail.com>", | ||
"homepage" : "https://github.com/lsm/nomoque", | ||
"author" : "Senmiao Liu <senmiao.liu@gmail.com>", | ||
"main" : "./index", | ||
"dependencies" : { | ||
"mongodb-async": "~0.1.2" | ||
"mongodb-async": "0.3.3" | ||
}, | ||
"engines" : {"node" : ">= 0.4.8"} | ||
"engines" : {"node" : ">= 0.8.0"} | ||
} |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
No website
QualityPackage does not have a website.
Found 1 instance in 1 package
1
11331
5
378
1
+ Addedbson@0.1.8(transitive)
+ Addedgenji@0.7.1(transitive)
+ Addedmongodb@1.2.14(transitive)
+ Addedmongodb-async@0.3.3(transitive)
- Removed@mongodb-js/saslprep@1.1.9(transitive)
- Removed@types/webidl-conversions@7.0.3(transitive)
- Removed@types/whatwg-url@11.0.5(transitive)
- Removedbson@6.10.1(transitive)
- Removedgenji@0.2.3(transitive)
- Removedmemory-pager@1.5.0(transitive)
- Removedmongodb@6.12.0(transitive)
- Removedmongodb-async@0.1.2(transitive)
- Removedmongodb-connection-string-url@3.0.2(transitive)
- Removedpunycode@2.3.1(transitive)
- Removedsparse-bitfield@3.0.3(transitive)
- Removedtr46@5.0.0(transitive)
- Removedwebidl-conversions@7.0.0(transitive)
- Removedwhatwg-url@14.1.0(transitive)
Updatedmongodb-async@0.3.3