Comparing version 1.0.7 to 1.0.8
@@ -61,6 +61,7 @@ /** | ||
} | ||
} | ||
}, | ||
}; | ||
@@ -104,3 +104,6 @@ /** | ||
_this.queue = new HiveQueue(_this.config, _this.runner); | ||
_this.queue.init(started); | ||
// clean any old workers that have been marked as orphaned for more than 5 mins | ||
_this.queue.init(function() { | ||
_this.queue.cleanOldWorkers(3600, started); | ||
}); | ||
}], | ||
@@ -163,5 +166,4 @@ | ||
createUserTasks: ['setupRedis', 'setupIPC', 'setupTasks', 'openLanes', function createUserTasks(created) { | ||
var _this = this; | ||
async.each(_this.tasks, function (task, doneTask) { | ||
Hive.task.create(task, doneTask); | ||
_this.task.create(task, doneTask); | ||
}, created) | ||
@@ -168,0 +170,0 @@ }] |
@@ -35,3 +35,3 @@ /** | ||
// if ioredis and sentinels array is populate then drop the host and port params. | ||
var redis = _.assign(require('./defaults/redis')(), userConfig.redis || {}); | ||
var redis = _.merge(require('./defaults/redis')(), userConfig.redis || {}); | ||
@@ -52,6 +52,6 @@ // drop port and host from defaults if sentinels array provided | ||
redis: redis, | ||
tasks: _.assign(require('./defaults/tasks')(), userConfig.tasks || {}), | ||
general: _.assign(require('./defaults/general')(), userConfig.general || {}), | ||
logs: _.assign(require('./defaults/logs')(), userConfig.logs || {}), | ||
stats: _.assign(require('./defaults/stats')(), userConfig.stats || {}) | ||
tasks: _.merge(require('./defaults/tasks')(), userConfig.tasks || {}), | ||
general: _.merge(require('./defaults/general')(), userConfig.general || {}), | ||
logs: _.merge(require('./defaults/logs')(), userConfig.logs || {}), | ||
stats: _.merge(require('./defaults/stats')(), userConfig.stats || {}) | ||
}; | ||
@@ -58,0 +58,0 @@ |
@@ -9,2 +9,3 @@ var runPlugin = function (pluginReference, type, func, queue, job, args, callback) { | ||
var pluginName = pluginReference; | ||
var pluginOptions; | ||
if (typeof pluginReference === 'function') { | ||
@@ -15,7 +16,8 @@ pluginName = new pluginReference().name; | ||
if (self.runner[func]['pluginOptions'] != null && self.runner[func]['pluginOptions'][pluginName] != null) { | ||
var pluginOptions = self.runner[func]['pluginOptions'][pluginName] | ||
pluginOptions = self.runner[func]['pluginOptions'][pluginName] | ||
} else { | ||
var pluginOptions = {}; | ||
pluginOptions = {}; | ||
} | ||
var plugin = null | ||
var plugin = null; | ||
if (typeof pluginReference === 'string') { | ||
@@ -33,3 +35,4 @@ var pluginConstructor = require(__dirname + "/plugins/" + pluginReference + ".js")[pluginReference]; | ||
} else { | ||
plugin[type](function (err, toRun) { | ||
plugin[type](function (err, toRun, task) { | ||
callback(err, toRun); | ||
@@ -36,0 +39,0 @@ }); |
@@ -5,5 +5,5 @@ var util = require('util'); | ||
var queue = function(config, runner, callback){ | ||
var queue = function (config, runner, callback) { | ||
var _this = this; | ||
if(typeof runner == 'function' && callback === undefined){ | ||
if (typeof runner == 'function' && callback === undefined) { | ||
callback = runner; | ||
@@ -16,20 +16,24 @@ runner = {}; | ||
_this.runPlugin = pluginRunner.runPlugin; | ||
_this.runPlugin = pluginRunner.runPlugin; | ||
_this.runPlugins = pluginRunner.runPlugins; | ||
_this.connection = new connection(config.connection); | ||
_this.connection.connect(function(err){ | ||
if(typeof callback === 'function'){ callback(err); } | ||
_this.connection.connect(function (err) { | ||
if (typeof callback === 'function') { | ||
callback(err); | ||
} | ||
}); | ||
}; | ||
queue.prototype.end = function(callback){ | ||
queue.prototype.end = function (callback) { | ||
var _this = this; | ||
_this.connection.disconnect(); | ||
process.nextTick(function(){ | ||
if(typeof callback === 'function'){ callback(); } | ||
process.nextTick(function () { | ||
if (typeof callback === 'function') { | ||
callback(); | ||
} | ||
}); | ||
}; | ||
queue.prototype.encode = function(q, func, args){ | ||
queue.prototype.encode = function (q, func, args) { | ||
return JSON.stringify({ | ||
@@ -42,8 +46,8 @@ runner: func, | ||
queue.prototype.enqueue = function(q, func, args, callback){ | ||
queue.prototype.enqueue = function (q, func, args, callback) { | ||
var _this = this; | ||
if(arguments.length === 3 && typeof args === 'function'){ | ||
if (arguments.length === 3 && typeof args === 'function') { | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 3){ | ||
} else if (arguments.length < 3) { | ||
args = []; | ||
@@ -53,11 +57,17 @@ } | ||
var job = _this.runner[func]; | ||
_this.runPlugins('before_enqueue', func, q, job, args, function(err, toRun){ | ||
if(toRun === false){ | ||
if(typeof callback === 'function'){ callback(err, toRun); } | ||
}else{ | ||
_this.connection.ensureConnected(callback, function(){ | ||
_this.connection.redis.sadd(_this.connection.key('queues'), q, function(){ | ||
_this.connection.redis.rpush(_this.connection.key('queue', q), _this.encode(q, func, args), function(){ | ||
_this.runPlugins('after_enqueue', func, q, job, args, function(){ | ||
if(typeof callback === 'function'){ callback(err, toRun); } | ||
_this.runPlugins('before_enqueue', func, q, job, args, function (err, toRun, newArgs) { | ||
if (toRun === false) { | ||
if (typeof callback === 'function') { | ||
callback(err, toRun); | ||
} | ||
} else { | ||
_this.connection.ensureConnected(callback, function () { | ||
_this.connection.redis.sadd(_this.connection.key('queues'), q, function () { | ||
if (newArgs) | ||
newArgs = arrayify(newArgs); | ||
_this.connection.redis.rpush(_this.connection.key('queue', q), _this.encode(q, func, newArgs || args), function () { | ||
_this.runPlugins('after_enqueue', func, q, job, args, function () { | ||
if (typeof callback === 'function') { | ||
callback(err, toRun); | ||
} | ||
}); | ||
@@ -71,22 +81,24 @@ }); | ||
queue.prototype.enqueueAt = function(timestamp, q, func, args, callback){ | ||
queue.prototype.enqueueAt = function (timestamp, q, func, args, callback) { | ||
// Don't run plugins here, they should be run by scheduler at the enqueue step | ||
var _this = this; | ||
if(arguments.length === 4 && typeof args === 'function'){ | ||
if (arguments.length === 4 && typeof args === 'function') { | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 4){ | ||
} else if (arguments.length < 4) { | ||
args = []; | ||
} | ||
args = arrayify(args); | ||
_this.connection.ensureConnected(callback, function(){ | ||
_this.connection.ensureConnected(callback, function () { | ||
var item = _this.encode(q, func, args); | ||
var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms | ||
// enqueue the encoded job into a list per timestmp to be popped and workered later | ||
_this.connection.redis.rpush(_this.connection.key("delayed:" + rTimestamp), item, function(){ | ||
_this.connection.redis.rpush(_this.connection.key("delayed:" + rTimestamp), item, function () { | ||
// save the job + args into a set so that it can be checked by plugins | ||
_this.connection.redis.sadd(_this.connection.key("timestamps:" + item), _this.connection.key("delayed:" + rTimestamp), function(){ | ||
_this.connection.redis.sadd(_this.connection.key("timestamps:" + item), _this.connection.key("delayed:" + rTimestamp), function () { | ||
// and the timestamp in question to a zset to the scheduler will know which timestamps have data to work | ||
_this.connection.redis.zadd(_this.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){ | ||
if(typeof callback === 'function'){ callback(); } | ||
_this.connection.redis.zadd(_this.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function () { | ||
if (typeof callback === 'function') { | ||
callback(); | ||
} | ||
}); | ||
@@ -98,8 +110,8 @@ }); | ||
queue.prototype.enqueueIn = function(time, q, func, args, callback){ | ||
queue.prototype.enqueueIn = function (time, q, func, args, callback) { | ||
var _this = this; | ||
if(arguments.length === 4 && typeof args === 'function'){ | ||
if (arguments.length === 4 && typeof args === 'function') { | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 4){ | ||
} else if (arguments.length < 4) { | ||
args = []; | ||
@@ -109,11 +121,13 @@ } | ||
var timestamp = (new Date().getTime()) + time; | ||
_this.enqueueAt(timestamp, q, func, args, function(){ | ||
if(typeof callback === 'function'){ callback(); } | ||
_this.enqueueAt(timestamp, q, func, args, function () { | ||
if (typeof callback === 'function') { | ||
callback(); | ||
} | ||
}); | ||
}; | ||
queue.prototype.queues = function(callback){ | ||
queue.prototype.queues = function (callback) { | ||
var _this = this; | ||
_this.connection.ensureConnected(callback, function(){ | ||
_this.connection.redis.smembers(_this.connection.key('queues'), function(err, queues){ | ||
_this.connection.ensureConnected(callback, function () { | ||
_this.connection.redis.smembers(_this.connection.key('queues'), function (err, queues) { | ||
callback(err, queues); | ||
@@ -124,7 +138,7 @@ }); | ||
queue.prototype.delQueue = function(q, callback){ | ||
queue.prototype.delQueue = function (q, callback) { | ||
var _this = this; | ||
_this.connection.redis.del(_this.connection.key('queue', q), function(err){ | ||
if(err) return callback(err) | ||
_this.connection.redis.srem(_this.connection.key('queues'), q, function(err){ | ||
_this.connection.redis.del(_this.connection.key('queue', q), function (err) { | ||
if (err) return callback(err) | ||
_this.connection.redis.srem(_this.connection.key('queues'), q, function (err) { | ||
callback(err); | ||
@@ -135,6 +149,6 @@ }) | ||
queue.prototype.length = function(q, callback){ | ||
queue.prototype.length = function (q, callback) { | ||
var _this = this; | ||
_this.connection.ensureConnected(callback, function(){ | ||
_this.connection.redis.llen(_this.connection.key('queue', q), function(err, length){ | ||
_this.connection.ensureConnected(callback, function () { | ||
_this.connection.redis.llen(_this.connection.key('queue', q), function (err, length) { | ||
callback(err, length); | ||
@@ -145,9 +159,9 @@ }); | ||
queue.prototype.del = function(q, func, args, count, callback){ | ||
queue.prototype.del = function (q, func, args, count, callback) { | ||
var _this = this; | ||
if(typeof count === 'function' && callback === undefined){ | ||
if (typeof count === 'function' && callback === undefined) { | ||
callback = count; | ||
count = 0; | ||
}else if(arguments.length === 3){ | ||
if(typeof args === 'function'){ | ||
} else if (arguments.length === 3) { | ||
if (typeof args === 'function') { | ||
callback = args; | ||
@@ -157,3 +171,3 @@ args = []; | ||
count = 0; | ||
}else if(arguments.length < 3){ | ||
} else if (arguments.length < 3) { | ||
args = []; | ||
@@ -163,5 +177,7 @@ count = 0; | ||
args = arrayify(args); | ||
_this.connection.ensureConnected(callback, function(){ | ||
_this.connection.redis.lrem(_this.connection.key('queue', q), count, _this.encode(q, func, args), function(err, count){ | ||
if(typeof callback === 'function'){ callback(err, count); } | ||
_this.connection.ensureConnected(callback, function () { | ||
_this.connection.redis.lrem(_this.connection.key('queue', q), count, _this.encode(q, func, args), function (err, count) { | ||
if (typeof callback === 'function') { | ||
callback(err, count); | ||
} | ||
}); | ||
@@ -171,8 +187,8 @@ }); | ||
queue.prototype.delDelayed = function(q, func, args, callback){ | ||
queue.prototype.delDelayed = function (q, func, args, callback) { | ||
var _this = this; | ||
if(arguments.length === 3 && typeof args === 'function'){ | ||
if (arguments.length === 3 && typeof args === 'function') { | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 3){ | ||
} else if (arguments.length < 3) { | ||
args = []; | ||
@@ -182,16 +198,22 @@ } | ||
var search = _this.encode(q, func, args); | ||
_this.connection.ensureConnected(callback, function(){ | ||
var timestamps = _this.connection.redis.smembers(_this.connection.key("timestamps:" + search), function(err, members){ | ||
if(members.length === 0 ){ if(typeof callback === 'function'){ callback(err, []); } } | ||
else{ | ||
_this.connection.ensureConnected(callback, function () { | ||
var timestamps = _this.connection.redis.smembers(_this.connection.key("timestamps:" + search), function (err, members) { | ||
if (members.length === 0) { | ||
if (typeof callback === 'function') { | ||
callback(err, []); | ||
} | ||
} | ||
else { | ||
var started = 0; | ||
var timestamps = []; | ||
members.forEach(function(key){ | ||
members.forEach(function (key) { | ||
started++; | ||
_this.connection.redis.lrem(key, 0, search, function(){ | ||
_this.connection.redis.srem(_this.connection.key("timestamps:" + search), key, function(){ | ||
_this.connection.redis.lrem(key, 0, search, function () { | ||
_this.connection.redis.srem(_this.connection.key("timestamps:" + search), key, function () { | ||
timestamps.push(key.split(":")[key.split(":").length - 1]); | ||
started--; | ||
if(started === 0){ | ||
if(typeof callback === 'function'){ callback(err, timestamps); } | ||
if (started === 0) { | ||
if (typeof callback === 'function') { | ||
callback(err, timestamps); | ||
} | ||
} | ||
@@ -206,8 +228,8 @@ }); | ||
queue.prototype.scheduledAt = function(q, func, args, callback){ | ||
queue.prototype.scheduledAt = function (q, func, args, callback) { | ||
var _this = this; | ||
if(arguments.length === 3 && typeof args === 'function'){ | ||
if (arguments.length === 3 && typeof args === 'function') { | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 3){ | ||
} else if (arguments.length < 3) { | ||
args = []; | ||
@@ -217,11 +239,13 @@ } | ||
var search = _this.encode(q, func, args); | ||
_this.connection.ensureConnected(callback, function(){ | ||
_this.connection.redis.smembers(_this.connection.key("timestamps:" + search), function(err, members){ | ||
_this.connection.ensureConnected(callback, function () { | ||
_this.connection.redis.smembers(_this.connection.key("timestamps:" + search), function (err, members) { | ||
var timestamps = []; | ||
if(members !== null){ | ||
members.forEach(function(key){ | ||
if (members !== null) { | ||
members.forEach(function (key) { | ||
timestamps.push(key.split(":")[key.split(":").length - 1]); | ||
}); | ||
} | ||
if(typeof callback === 'function'){ callback(err, timestamps); } | ||
if (typeof callback === 'function') { | ||
callback(err, timestamps); | ||
} | ||
}); | ||
@@ -231,7 +255,7 @@ }); | ||
queue.prototype.timestamps = function(callback){ | ||
queue.prototype.timestamps = function (callback) { | ||
var _this = this; | ||
var results = []; | ||
_this.connection.redis.keys(_this.connection.key("delayed:*"), function(err, timestamps){ | ||
timestamps.forEach(function(timestamp){ | ||
_this.connection.redis.keys(_this.connection.key("delayed:*"), function (err, timestamps) { | ||
timestamps.forEach(function (timestamp) { | ||
var parts = timestamp.split(":"); | ||
@@ -245,9 +269,9 @@ results.push(parseInt(parts[(parts.length - 1)]) * 1000); | ||
queue.prototype.delayedAt = function(timestamp, callback){ | ||
queue.prototype.delayedAt = function (timestamp, callback) { | ||
var _this = this; | ||
var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms | ||
var tasks = []; | ||
_this.connection.redis.lrange(_this.connection.key("delayed:" + rTimestamp), 0, -1, function(err, items){ | ||
items.forEach(function(i){ | ||
tasks.push( JSON.parse(i) ); | ||
_this.connection.redis.lrange(_this.connection.key("delayed:" + rTimestamp), 0, -1, function (err, items) { | ||
items.forEach(function (i) { | ||
tasks.push(JSON.parse(i)); | ||
}); | ||
@@ -258,7 +282,7 @@ callback(err, tasks, rTimestamp); | ||
queue.prototype.queued = function(q, start, stop, callback){ | ||
queue.prototype.queued = function (q, start, stop, callback) { | ||
var _this = this; | ||
_this.connection.ensureConnected(callback, function(){ | ||
_this.connection.redis.lrange(_this.connection.key('queue', q), start, stop, function(err, items){ | ||
var tasks = items.map(function(i){ | ||
_this.connection.ensureConnected(callback, function () { | ||
_this.connection.redis.lrange(_this.connection.key('queue', q), start, stop, function (err, items) { | ||
var tasks = items.map(function (i) { | ||
return JSON.parse(i) | ||
@@ -271,16 +295,18 @@ }); | ||
queue.prototype.allDelayed = function(callback){ | ||
queue.prototype.allDelayed = function (callback) { | ||
var _this = this; | ||
var started = 0; | ||
var results = {}; | ||
_this.timestamps(function(err, timestamps){ | ||
if(timestamps.length === 0){ | ||
_this.timestamps(function (err, timestamps) { | ||
if (timestamps.length === 0) { | ||
callback(err, {}); | ||
}else{ | ||
timestamps.forEach(function(timestamp){ | ||
} else { | ||
timestamps.forEach(function (timestamp) { | ||
started++; | ||
_this.delayedAt(timestamp, function(err, tasks, rTimestamp){ | ||
_this.delayedAt(timestamp, function (err, tasks, rTimestamp) { | ||
results[(rTimestamp * 1000)] = tasks; | ||
started--; | ||
if(started === 0){ callback(err, results); } | ||
if (started === 0) { | ||
callback(err, results); | ||
} | ||
}); | ||
@@ -292,19 +318,19 @@ }); | ||
queue.prototype.workers = function(callback){ | ||
queue.prototype.workers = function (callback) { | ||
var _this = this; | ||
var workers = {}; | ||
_this.connection.redis.smembers(_this.connection.key('workers'), function(err, results){ | ||
if(results){ | ||
results.forEach(function(r){ | ||
_this.connection.redis.smembers(_this.connection.key('workers'), function (err, results) { | ||
if (results) { | ||
results.forEach(function (r) { | ||
var parts = r.split(':'); | ||
var name, queues; | ||
if(parts.length === 1){ | ||
if (parts.length === 1) { | ||
name = parts[0]; | ||
workers[name] = null; | ||
} | ||
else if(parts.length === 2){ | ||
else if (parts.length === 2) { | ||
name = parts[0]; | ||
queues = parts[1]; | ||
workers[name] = queues; | ||
}else{ | ||
} else { | ||
name = parts.shift() + ":" + parts.shift(); | ||
@@ -316,34 +342,38 @@ queues = parts.join(':'); | ||
} | ||
if(typeof callback === 'function'){ callback(err, workers); } | ||
if (typeof callback === 'function') { | ||
callback(err, workers); | ||
} | ||
}); | ||
}; | ||
queue.prototype.workingOn = function(workerName, queues, callback){ | ||
queue.prototype.workingOn = function (workerName, queues, callback) { | ||
var _this = this; | ||
var fullWorkerName = workerName + ':' + queues; | ||
_this.connection.redis.get(_this.connection.key('worker', fullWorkerName), function(err, data){ | ||
if(typeof callback === 'function'){ callback(err, data); } | ||
_this.connection.redis.get(_this.connection.key('worker', fullWorkerName), function (err, data) { | ||
if (typeof callback === 'function') { | ||
callback(err, data); | ||
} | ||
}); | ||
}; | ||
queue.prototype.allWorkingOn = function(callback){ | ||
queue.prototype.allWorkingOn = function (callback) { | ||
var _this = this; | ||
var results = {}; | ||
var counter = 0; | ||
_this.workers(function(err, workers){ | ||
if(err && typeof callback === 'function'){ | ||
_this.workers(function (err, workers) { | ||
if (err && typeof callback === 'function') { | ||
callback(err, results); | ||
}else if(!workers || hashLength(workers) === 0){ | ||
} else if (!workers || hashLength(workers) === 0) { | ||
callback(null, results); | ||
}else{ | ||
for(var w in workers){ | ||
} else { | ||
for (var w in workers) { | ||
counter++; | ||
results[w] = 'started'; | ||
_this.workingOn(w, workers[w], function(err, data){ | ||
_this.workingOn(w, workers[w], function (err, data) { | ||
counter--; | ||
if(data){ | ||
if (data) { | ||
data = JSON.parse(data); | ||
results[data.worker] = data; | ||
} | ||
if(counter === 0 && typeof callback === 'function'){ | ||
if (counter === 0 && typeof callback === 'function') { | ||
callback(err, results); | ||
@@ -357,13 +387,19 @@ } | ||
queue.prototype.forceCleanWorker = function(workerName, callback){ | ||
queue.prototype.forceCleanWorker = function (workerName, callback) { | ||
var _this = this; | ||
_this.workers(function(err, workers){ | ||
_this.workers(function (err, workers) { | ||
var queues = workers[workerName]; | ||
var errorPayload; | ||
if(err){ callback(err); } | ||
else if(!queues){ callback(new Error('worker not round')); } | ||
else{ | ||
_this.workingOn(workerName, queues, function(err, workingOn){ | ||
if(err){ callback(err); } | ||
else if(workingOn){ | ||
if (err) { | ||
callback(err); | ||
} | ||
else if (!queues) { | ||
callback(new Error('worker not round')); | ||
} | ||
else { | ||
_this.workingOn(workerName, queues, function (err, workingOn) { | ||
if (err) { | ||
callback(err); | ||
} | ||
else if (workingOn) { | ||
workingOn = JSON.parse(workingOn); | ||
@@ -389,3 +425,3 @@ errorPayload = { | ||
_this.connection.redis.srem(_this.connection.key('workers'), workerName + ':' + queues) | ||
], function(err, data){ | ||
], function (err, data) { | ||
callback(err, errorPayload); | ||
@@ -399,28 +435,36 @@ }); | ||
queue.prototype.cleanOldWorkers = function(age, callback){ | ||
/** | ||
* | ||
* @param age | ||
* @param callback | ||
*/ | ||
queue.prototype.cleanOldWorkers = function (age, callback) { | ||
// note: this method will remove the data created by a "stuck" worker and move the payload to the error queue | ||
// however, it will not actually remove any processes which may be running. A job *may* be running that you have removed | ||
var _this = this; | ||
console.dir(this) | ||
var results = {}; | ||
_this.allWorkingOn(function(err, data){ | ||
if(err && typeof callback === 'function'){ | ||
_this.allWorkingOn(function (err, data) { | ||
if (err && typeof callback === 'function') { | ||
callback(err); | ||
}else if((!data || hashLength(data) && typeof callback === 'function' ) === 0){ | ||
} else if ((!data || hashLength(data) && typeof callback === 'function' ) === 0) { | ||
callback(null, results); | ||
}else{ | ||
} else { | ||
var started = 0; | ||
for(var workerName in data){ | ||
for (var workerName in data) { | ||
started++; | ||
if(Date.now() - Date.parse(data[workerName].run_at) > age){ | ||
_this.forceCleanWorker(workerName, function(error, errorPayload){ | ||
if(errorPayload && errorPayload.worker ){ results[errorPayload.worker] = errorPayload; } | ||
if (Date.now() - Date.parse(data[workerName].run_at) > age) { | ||
_this.forceCleanWorker(workerName, function (error, errorPayload) { | ||
if (errorPayload && errorPayload.worker) { | ||
results[errorPayload.worker] = errorPayload; | ||
} | ||
started--; | ||
if(started === 0 && typeof callback === 'function'){ | ||
if (started === 0 && typeof callback === 'function') { | ||
callback(null, results); | ||
} | ||
}); | ||
}else{ | ||
process.nextTick(function(){ | ||
} else { | ||
process.nextTick(function () { | ||
started--; | ||
if(started === 0 && typeof callback === 'function'){ | ||
if (started === 0 && typeof callback === 'function') { | ||
callback(null, results); | ||
@@ -435,5 +479,5 @@ } | ||
queue.prototype.failedCount = function(callback){ | ||
queue.prototype.failedCount = function (callback) { | ||
var _this = this; | ||
_this.connection.redis.llen(_this.connection.key('failed'), function(err, length){ | ||
_this.connection.redis.llen(_this.connection.key('failed'), function (err, length) { | ||
callback(err, length); | ||
@@ -443,7 +487,9 @@ }); | ||
queue.prototype.failed = function(start, stop, callback){ | ||
queue.prototype.failed = function (start, stop, callback) { | ||
var _this = this; | ||
var results = []; | ||
_this.connection.redis.lrange(_this.connection.key('failed'), start, stop, function(err, data){ | ||
data.forEach(function(d){ results.push( JSON.parse(d) ); }); | ||
_this.connection.redis.lrange(_this.connection.key('failed'), start, stop, function (err, data) { | ||
data.forEach(function (d) { | ||
results.push(JSON.parse(d)); | ||
}); | ||
callback(err, results); | ||
@@ -453,3 +499,3 @@ }); | ||
queue.prototype.removeFailed = function(failedJob, callback){ | ||
queue.prototype.removeFailed = function (failedJob, callback) { | ||
var _this = this; | ||
@@ -459,7 +505,11 @@ _this.connection.redis.lrem(_this.connection.key('failed'), 1, JSON.stringify(failedJob), callback); | ||
queue.prototype.retryAndRemoveFailed = function(failedJob, callback){ | ||
queue.prototype.retryAndRemoveFailed = function (failedJob, callback) { | ||
var _this = this; | ||
_this.removeFailed(failedJob, function(err, countFailed){ | ||
if(err){return callback(err, failedJob); } | ||
if(countFailed < 1 ){return callback(new Error('This job is not in failed queue'), failedJob); } | ||
_this.removeFailed(failedJob, function (err, countFailed) { | ||
if (err) { | ||
return callback(err, failedJob); | ||
} | ||
if (countFailed < 1) { | ||
return callback(new Error('This job is not in failed queue'), failedJob); | ||
} | ||
_this.enqueue(failedJob.queue, failedJob.payload.runner, failedJob.payload.args, callback); | ||
@@ -473,6 +523,6 @@ }); | ||
var arrayify = function(o){ | ||
if(Array.isArray(o)){ | ||
var arrayify = function (o) { | ||
if (Array.isArray(o)) { | ||
return o; | ||
}else{ | ||
} else { | ||
return [o]; | ||
@@ -482,6 +532,8 @@ } | ||
var hashLength = function(obj) { | ||
var hashLength = function (obj) { | ||
var size = 0, key; | ||
for(key in obj){ | ||
if(obj.hasOwnProperty(key)){ size++; } | ||
for (key in obj) { | ||
if (obj.hasOwnProperty(key)) { | ||
size++; | ||
} | ||
} | ||
@@ -488,0 +540,0 @@ return size; |
@@ -120,3 +120,3 @@ /** | ||
task = _.assign(this.hive.config.tasks.defaultTaskOptions, task); | ||
task = _.merge(this.hive.config.tasks.defaultTaskOptions, task); | ||
@@ -123,0 +123,0 @@ } |
{ | ||
"name": "hive-dev", | ||
"version": "1.0.7", | ||
"version": "1.0.8", | ||
"description": "An intelligent Redis powered, job, worker and queue library with advanced options and plugin support.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
165866
47
3119