Comparing version 0.1.5 to 0.1.6
18
index.js
@@ -37,3 +37,2 @@ /** | ||
var db = server.db(options.dbName); | ||
ensureIndexes(db); | ||
return new Queue(db, options); | ||
@@ -54,18 +53,1 @@ }; | ||
} | ||
function ensureIndexes(db) { | ||
db.collection('queues').ensureIndex([ | ||
['date', 1], | ||
['topic', 1], | ||
['state', 1] | ||
]); | ||
// index for `results` | ||
db.collection('results').ensureIndex([ | ||
['date', 1], | ||
['topic', 1], | ||
['state', 1], | ||
['created', -1], | ||
['shifted', -1], | ||
['finished', -1] | ||
]); | ||
} |
113
lib/queue.js
@@ -27,5 +27,10 @@ /** | ||
this._topics = []; | ||
this._pausedTasks = []; | ||
this.delay = 1; | ||
this._state = 'stopped'; | ||
this._maxWorkers = 2; | ||
if (options.maxWorkers > 0) { | ||
this._maxWorkers = options.maxWorkers; | ||
} else { | ||
this._maxWorkers = 2; | ||
} | ||
this._currentWorkers = 0; | ||
@@ -46,2 +51,4 @@ this.db = db; | ||
this.configColl = db.collection(configColl); | ||
this.ensureIndexes(); | ||
}; | ||
@@ -54,2 +61,19 @@ | ||
Queue.prototype.ensureIndexes = function () { | ||
this.queColl.ensureIndex([ | ||
['date', 1], | ||
['topic', 1], | ||
['state', 1] | ||
]); | ||
// index for `results` | ||
this.resultColl.ensureIndex([ | ||
['date', 1], | ||
['topic', 1], | ||
['state', 1], | ||
['created', -1], | ||
['shifted', -1], | ||
['finished', -1] | ||
]); | ||
}; | ||
/** | ||
@@ -66,3 +90,3 @@ * Push a message to a topic queue | ||
var qid = nextIdDoc.currentId; | ||
var date = new Date; | ||
var date = new Date(); | ||
var msg = {_id: qid, topic: topic, message: message, | ||
@@ -89,4 +113,4 @@ state: QueueState.STATE_NEW, | ||
* | ||
* @param {String} topic Topic of the queue | ||
* @param {String} name Name of the task | ||
* @param topic {String} Topic of the queue | ||
* @param name {String} Name of the task | ||
* @param {Function} fn | ||
@@ -125,3 +149,3 @@ */ | ||
var id = message._id; | ||
delete message['_id']; | ||
delete message._id; | ||
// upsert the message into `toColl` | ||
@@ -134,2 +158,3 @@ return toColl.update({_id: id}, {$set: message}, {safe: true, upsert: true}) | ||
message._id = id; | ||
debug('Message "%s" "%s" moved from "%s" to "%s".', message.topic, id, fromColl.name, toColl.name); | ||
defer.next(message); | ||
@@ -150,3 +175,3 @@ }).fail(defer.error); | ||
// modify the message status | ||
{$set: {shifted: new Date, state: QueueState.STATE_SHIFTED}}, | ||
{$set: {shifted: new Date(), state: QueueState.STATE_SHIFTED}}, | ||
// return updated message | ||
@@ -158,2 +183,3 @@ {remove: false, 'new': true, upsert: false}).then(function (message) { | ||
} | ||
debug('Message "%s" shifted from queue.', message._id); | ||
// handle shifted message | ||
@@ -184,2 +210,3 @@ // move the message from `queues` to collection `results` | ||
}; | ||
debug('Processing task "%s" "%s" "%s".', log.topic, log._id, log.name); | ||
try { | ||
@@ -191,5 +218,6 @@ task.fn.call(message, message.message, function (taskErr, result) { | ||
log.state = QueueState.STATE_ERROR; | ||
log.error = taskErr; | ||
log.error = taskErr.stack || taskErr.message || taskErr; | ||
errors[task.name] = taskErr; | ||
self._log(log, message, done); | ||
debug('Task "%s" "%s" "%s" error: %s.', log.topic, log._id, log.name, log.error); | ||
} else { | ||
@@ -203,2 +231,3 @@ log.state = QueueState.STATE_FINISHED; | ||
self._log(log, message, done); | ||
debug('Task "%s" "%s" "%s" finished: %s.', log.topic, log._id, log.name, log.result || '(no result)'); | ||
} | ||
@@ -211,2 +240,3 @@ }); | ||
self._log(log, message, done); | ||
debug('Task "%s" "%s" "%s" exception: %s.', log.topic, log._id, log.name, log.error); | ||
} | ||
@@ -216,3 +246,4 @@ }; | ||
Queue.prototype._process = function (message) { | ||
var tasks = this._tasks[message.topic]; | ||
var topic = message.topic; | ||
var tasks = this._tasks[topic]; | ||
var self = this; | ||
@@ -222,3 +253,3 @@ var count = 0; | ||
var errors = {}; | ||
debug('Processing message %s for topic %s.', message._id, topic); | ||
function checkFinish() { | ||
@@ -231,3 +262,3 @@ if (++count === tasks.length) { | ||
.then(function () { | ||
self._emitOrLog('finished', [message.topic, errors, results]); | ||
self._emitOrLog('finished', [topic, errors, results]); | ||
// shift next | ||
@@ -238,3 +269,3 @@ self._currentWorkers--; | ||
} else { | ||
self._emitOrLog('finished', [message.topic, errors, results]); | ||
self._emitOrLog('finished', [topic, errors, results]); | ||
// shift next | ||
@@ -247,2 +278,10 @@ self._currentWorkers--; | ||
// filter paused tasks | ||
if (this._pausedTasks.length > 0) { | ||
tasks = tasks.filter(function (task) { | ||
return self._pausedTasks.indexOf(topic + '-' + task.name) === -1; | ||
}); | ||
} | ||
// run each tasks | ||
@@ -273,3 +312,3 @@ tasks.forEach(function (task) { | ||
var deferred = this.resultColl | ||
.update({_id: log._id}, {$set:setDoc, $push: pushDoc}, {upsert: true, safe: true}) | ||
.update({_id: log._id}, {$set: setDoc, $push: pushDoc}, {upsert: true, safe: true}) | ||
.fail(this._onFault(this)); | ||
@@ -303,3 +342,3 @@ if (log.error) { | ||
} else { | ||
debug("Event %s: %s", event, JSON.stringify(args)); | ||
debug('Event "%s": %s', event, JSON.stringify(args)); | ||
} | ||
@@ -309,2 +348,3 @@ }; | ||
Queue.prototype.start = function (workers) { | ||
debug('Start processing queue.'); | ||
this._state = 'running'; | ||
@@ -320,2 +360,3 @@ if (workers) { | ||
Queue.prototype.stop = function () { | ||
debug('Stop processing queue.'); | ||
this._state = 'stopped'; | ||
@@ -329,2 +370,46 @@ }; | ||
/** | ||
* Pause processing a topic or task under that topic. | ||
* | ||
* @param topic {String} Topic that you want to pause processing. | ||
* @param [task] {String} Specific task for the topic you want to pause. | ||
* @public | ||
*/ | ||
Queue.prototype.pause = function (topic, task) { | ||
if (task) { | ||
topic += '-' + task; | ||
if (-1 === this._pausedTasks.indexOf(topic)) { | ||
this._pausedTasks.push(topic); | ||
} | ||
} else { | ||
this._topics = this._topics.filter(function (t) { | ||
return topic !== t; | ||
}); | ||
} | ||
return this; | ||
}; | ||
/** | ||
* Resume processing a topic or task under that topic. | ||
* | ||
* @param topic {String} Topic that you want to resume processing. | ||
* @param [task] {String} Specific task for the topic you want to resume. | ||
* @public | ||
*/ | ||
Queue.prototype.resume = function (topic, task) { | ||
if (task) { | ||
topic += '-' + task; | ||
this._pausedTasks = this._pausedTasks.filter(function (task) { | ||
return task !== topic; | ||
}); | ||
} else { | ||
if (-1 === this._topics.indexOf(topic)) { | ||
this._topics.push(topic); | ||
} | ||
} | ||
return this; | ||
}; | ||
Queue.QueueState = QueueState; | ||
@@ -334,2 +419,2 @@ | ||
return coll.findAndModify({_id: 'nextQueueId'}, [], {$inc: {currentId: 1}}, {'new': true, upsert: 'true'}); | ||
} | ||
} |
{ | ||
"name": "nomoque", | ||
"version": "0.1.5", | ||
"version": "0.1.6", | ||
"description": "Database Centric Distributed Task Queue", | ||
@@ -5,0 +5,0 @@ "keywords": ["task", "queue", "database", "distributed", "persistent", "mongodb"], |
73282
477