Comparing version 0.1.6 to 0.1.7
106
lib/queue.js
@@ -13,7 +13,7 @@ /** | ||
var QueueState = { | ||
STATE_ERROR: -1, | ||
STATE_NEW: 0, | ||
STATE_SHIFTED: 1, | ||
STATE_PARTIAL: 2, | ||
STATE_FINISHED: 3 | ||
STATE_FINISHED: 3, | ||
STATE_ERROR: 4 | ||
}; | ||
@@ -26,6 +26,9 @@ | ||
EventEmitter.call(this); | ||
options = options || {}; | ||
this._tasks = {}; | ||
this._topics = []; | ||
this._pausedTasks = []; | ||
this.delay = 1; | ||
this.delay = options.delay || 1; | ||
this.serial = options.serial || false; | ||
this.resultExpire = options.resultExpire || 14; | ||
this._state = 'stopped'; | ||
@@ -63,3 +66,2 @@ if (options.maxWorkers > 0) { | ||
this.queColl.ensureIndex([ | ||
['date', 1], | ||
['topic', 1], | ||
@@ -70,18 +72,19 @@ ['state', 1] | ||
this.resultColl.ensureIndex([ | ||
['date', 1], | ||
['topic', 1], | ||
['state', 1], | ||
['created', -1], | ||
['shifted', -1], | ||
['finished', -1] | ||
['shifted', -1] | ||
]); | ||
this.resultColl.ensureIndex({finished: 1}, {expireAfterSeconds: 24 * 3600 * this.resultExpire}); | ||
}; | ||
/** | ||
* Push a message to a topic queue | ||
* Push a message to a topic queue with optional options. | ||
* | ||
* @param {String} topic | ||
* @param {Object} message | ||
* @param topic {String} | ||
* @param message {Object} | ||
* @param [options] {Object} | ||
*/ | ||
Queue.prototype.push = function (topic, message) { | ||
Queue.prototype.push = function (topic, message, options) { | ||
var self = this; | ||
@@ -97,2 +100,7 @@ | ||
}; | ||
if (options) { | ||
if (!isNaN(options.priority)) { | ||
msg.state = options.priority > 0 ? options.priority * -1 : options.priority; | ||
} | ||
} | ||
self.queColl.insert(msg, {safe: true}).then(function (doc) { | ||
@@ -163,2 +171,7 @@ if (doc) { | ||
Queue.prototype._getTopic = function () { | ||
var idx = getRandomInt(0, this._topics.length - 1); | ||
return this._topics[idx]; | ||
}; | ||
Queue.prototype._shift = function () { | ||
@@ -170,5 +183,5 @@ if (this._state === 'stopped' || this._currentWorkers++ >= this._maxWorkers) { | ||
// get messages we are interested | ||
this.queColl.findAndModify({topic: {$in: self._topics}, state: QueueState.STATE_NEW}, | ||
// use `nature` order | ||
[], | ||
this.queColl.findAndModify({topic: self._getTopic(), state: {$lte: QueueState.STATE_NEW}}, | ||
// order by priority | ||
[['state', 1]], | ||
// modify the message status | ||
@@ -218,3 +231,3 @@ {$set: {shifted: new Date(), state: QueueState.STATE_SHIFTED}}, | ||
self._log(log, message, done); | ||
debug('Task "%s" "%s" "%s" error: %s.', log.topic, log._id, log.name, log.error); | ||
debug('Task "%s" "%s" "%s" error: %s.', log.topic, log._id, log.name, JSON.stringify(log.error)); | ||
} else { | ||
@@ -250,18 +263,19 @@ log.state = QueueState.STATE_FINISHED; | ||
if (++count === tasks.length) { | ||
if (Object.keys(errors).length > 0) { | ||
// errors found | ||
self.resultColl | ||
.update({_id: message._id}, {$set: {state: QueueState.STATE_PARTIAL}}, {safe: true}) | ||
.then(function () { | ||
self._emitOrLog('finished', [topic, errors, results]); | ||
// shift next | ||
self._currentWorkers--; | ||
self._shift(); | ||
}); | ||
var errorCount = Object.keys(errors).length; | ||
var setDoc = { | ||
state: QueueState.STATE_FINISHED | ||
}; | ||
if (errorCount > 0) { | ||
setDoc.state = errorCount === count ? QueueState.STATE_ERROR : QueueState.STATE_PARTIAL; | ||
} else { | ||
self._emitOrLog('finished', [topic, errors, results]); | ||
// shift next | ||
self._currentWorkers--; | ||
self._shift(); | ||
setDoc.finished = new Date(); | ||
} | ||
self.resultColl | ||
.update({_id: message._id}, {$set: setDoc}, {safe: true}) | ||
.then(function () { | ||
self._emitOrLog('finished', [topic, errors, results]); | ||
// shift next | ||
self._currentWorkers--; | ||
self._shift(); | ||
}); | ||
} | ||
@@ -277,7 +291,21 @@ } | ||
// run each tasks | ||
tasks.forEach(function (task) { | ||
self._runTask(task, message, errors, results, checkFinish); | ||
}); | ||
if (this.serial) { | ||
var len = tasks.length; | ||
var _runTask = function () { | ||
if (count < len) { | ||
self._runTask(tasks[count], message, errors, results, function () { | ||
checkFinish(); | ||
_runTask(); | ||
}); | ||
} else { | ||
checkFinish(); | ||
} | ||
}; | ||
_runTask(); | ||
} else { | ||
// run each tasks in parallel | ||
tasks.forEach(function (task) { | ||
self._runTask(task, message, errors, results, checkFinish); | ||
}); | ||
} | ||
}; | ||
@@ -287,6 +315,2 @@ | ||
var logName = 'logs.' + log.name; | ||
var setDoc = { | ||
topic: log.topic, | ||
state: log.state | ||
}; | ||
var pushDoc = {}; | ||
@@ -305,3 +329,3 @@ pushDoc[logName] = { | ||
var deferred = this.resultColl | ||
.update({_id: log._id}, {$set: setDoc, $push: pushDoc}, {upsert: true, safe: true}) | ||
.update({_id: log._id}, {$push: pushDoc}, {upsert: true, safe: true}) | ||
.fail(this._onFault(this)); | ||
@@ -409,1 +433,5 @@ if (log.error) { | ||
} | ||
function getRandomInt(min, max) { | ||
return Math.floor(Math.random() * (max - min + 1) + min); | ||
} |
{ | ||
"name": "nomoque", | ||
"version": "0.1.6", | ||
"version": "0.1.7", | ||
"description": "Database Centric Distributed Task Queue", | ||
"keywords": ["task", "queue", "database", "distributed", "persistent", "mongodb"], | ||
"keywords": ["task", "queue", "persistent", "priority", "database", "distributed", "mongodb"], | ||
"homepage": "https://github.com/lsm/nomoque", | ||
@@ -23,2 +23,2 @@ "author": "Senmiao Liu <senmiao.liu@gmail.com>", | ||
} | ||
} | ||
} |
@@ -9,2 +9,3 @@ var assert = require('assert'); | ||
describe('Queue', function () { | ||
this.timeout(10000); | ||
@@ -68,2 +69,3 @@ it('should create an instance of Queue', function () { | ||
worker.process('topicA', 'add', function (message, callback) { | ||
worker.resume('topicB'); | ||
assert.equal(1, message.a); | ||
@@ -74,2 +76,4 @@ assert.equal(2, message.b); | ||
}); | ||
worker.pause('topicB'); | ||
worker.start(); | ||
@@ -76,0 +80,0 @@ }); |
74328
506