Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

hive-dev

Package Overview
Dependencies
Maintainers
3
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

hive-dev - npm Package Compare versions

Comparing version 1.0.7 to 1.0.8

example/throttle.js

3

example/myPlugin.js

@@ -61,6 +61,7 @@ /**

}
}
},
};

@@ -104,3 +104,6 @@ /**

_this.queue = new HiveQueue(_this.config, _this.runner);
_this.queue.init(started);
// clean any old workers that have been marked as orphaned for more than 5 mins
_this.queue.init(function() {
_this.queue.cleanOldWorkers(3600, started);
});
}],

@@ -163,5 +166,4 @@

createUserTasks: ['setupRedis', 'setupIPC', 'setupTasks', 'openLanes', function createUserTasks(created) {
var _this = this;
async.each(_this.tasks, function (task, doneTask) {
Hive.task.create(task, doneTask);
_this.task.create(task, doneTask);
}, created)

@@ -168,0 +170,0 @@ }]

@@ -35,3 +35,3 @@ /**

// if ioredis and sentinels array is populate then drop the host and port params.
var redis = _.assign(require('./defaults/redis')(), userConfig.redis || {});
var redis = _.merge(require('./defaults/redis')(), userConfig.redis || {});

@@ -52,6 +52,6 @@ // drop port and host from defaults if sentinels array provided

redis: redis,
tasks: _.assign(require('./defaults/tasks')(), userConfig.tasks || {}),
general: _.assign(require('./defaults/general')(), userConfig.general || {}),
logs: _.assign(require('./defaults/logs')(), userConfig.logs || {}),
stats: _.assign(require('./defaults/stats')(), userConfig.stats || {})
tasks: _.merge(require('./defaults/tasks')(), userConfig.tasks || {}),
general: _.merge(require('./defaults/general')(), userConfig.general || {}),
logs: _.merge(require('./defaults/logs')(), userConfig.logs || {}),
stats: _.merge(require('./defaults/stats')(), userConfig.stats || {})
};

@@ -58,0 +58,0 @@

@@ -9,2 +9,3 @@ var runPlugin = function (pluginReference, type, func, queue, job, args, callback) {

var pluginName = pluginReference;
var pluginOptions;
if (typeof pluginReference === 'function') {

@@ -15,7 +16,8 @@ pluginName = new pluginReference().name;

if (self.runner[func]['pluginOptions'] != null && self.runner[func]['pluginOptions'][pluginName] != null) {
var pluginOptions = self.runner[func]['pluginOptions'][pluginName]
pluginOptions = self.runner[func]['pluginOptions'][pluginName]
} else {
var pluginOptions = {};
pluginOptions = {};
}
var plugin = null
var plugin = null;
if (typeof pluginReference === 'string') {

@@ -33,3 +35,4 @@ var pluginConstructor = require(__dirname + "/plugins/" + pluginReference + ".js")[pluginReference];

} else {
plugin[type](function (err, toRun) {
plugin[type](function (err, toRun, task) {
callback(err, toRun);

@@ -36,0 +39,0 @@ });

@@ -5,5 +5,5 @@ var util = require('util');

var queue = function(config, runner, callback){
var queue = function (config, runner, callback) {
var _this = this;
if(typeof runner == 'function' && callback === undefined){
if (typeof runner == 'function' && callback === undefined) {
callback = runner;

@@ -16,20 +16,24 @@ runner = {};

_this.runPlugin = pluginRunner.runPlugin;
_this.runPlugin = pluginRunner.runPlugin;
_this.runPlugins = pluginRunner.runPlugins;
_this.connection = new connection(config.connection);
_this.connection.connect(function(err){
if(typeof callback === 'function'){ callback(err); }
_this.connection.connect(function (err) {
if (typeof callback === 'function') {
callback(err);
}
});
};
queue.prototype.end = function(callback){
queue.prototype.end = function (callback) {
var _this = this;
_this.connection.disconnect();
process.nextTick(function(){
if(typeof callback === 'function'){ callback(); }
process.nextTick(function () {
if (typeof callback === 'function') {
callback();
}
});
};
queue.prototype.encode = function(q, func, args){
queue.prototype.encode = function (q, func, args) {
return JSON.stringify({

@@ -42,8 +46,8 @@ runner: func,

queue.prototype.enqueue = function(q, func, args, callback){
queue.prototype.enqueue = function (q, func, args, callback) {
var _this = this;
if(arguments.length === 3 && typeof args === 'function'){
if (arguments.length === 3 && typeof args === 'function') {
callback = args;
args = [];
}else if(arguments.length < 3){
} else if (arguments.length < 3) {
args = [];

@@ -53,11 +57,17 @@ }

var job = _this.runner[func];
_this.runPlugins('before_enqueue', func, q, job, args, function(err, toRun){
if(toRun === false){
if(typeof callback === 'function'){ callback(err, toRun); }
}else{
_this.connection.ensureConnected(callback, function(){
_this.connection.redis.sadd(_this.connection.key('queues'), q, function(){
_this.connection.redis.rpush(_this.connection.key('queue', q), _this.encode(q, func, args), function(){
_this.runPlugins('after_enqueue', func, q, job, args, function(){
if(typeof callback === 'function'){ callback(err, toRun); }
_this.runPlugins('before_enqueue', func, q, job, args, function (err, toRun, newArgs) {
if (toRun === false) {
if (typeof callback === 'function') {
callback(err, toRun);
}
} else {
_this.connection.ensureConnected(callback, function () {
_this.connection.redis.sadd(_this.connection.key('queues'), q, function () {
if (newArgs)
newArgs = arrayify(newArgs);
_this.connection.redis.rpush(_this.connection.key('queue', q), _this.encode(q, func, newArgs || args), function () {
_this.runPlugins('after_enqueue', func, q, job, args, function () {
if (typeof callback === 'function') {
callback(err, toRun);
}
});

@@ -71,22 +81,24 @@ });

queue.prototype.enqueueAt = function(timestamp, q, func, args, callback){
queue.prototype.enqueueAt = function (timestamp, q, func, args, callback) {
// Don't run plugins here, they should be run by scheduler at the enqueue step
var _this = this;
if(arguments.length === 4 && typeof args === 'function'){
if (arguments.length === 4 && typeof args === 'function') {
callback = args;
args = [];
}else if(arguments.length < 4){
} else if (arguments.length < 4) {
args = [];
}
args = arrayify(args);
_this.connection.ensureConnected(callback, function(){
_this.connection.ensureConnected(callback, function () {
var item = _this.encode(q, func, args);
var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms
// enqueue the encoded job into a list per timestmp to be popped and workered later
_this.connection.redis.rpush(_this.connection.key("delayed:" + rTimestamp), item, function(){
_this.connection.redis.rpush(_this.connection.key("delayed:" + rTimestamp), item, function () {
// save the job + args into a set so that it can be checked by plugins
_this.connection.redis.sadd(_this.connection.key("timestamps:" + item), _this.connection.key("delayed:" + rTimestamp), function(){
_this.connection.redis.sadd(_this.connection.key("timestamps:" + item), _this.connection.key("delayed:" + rTimestamp), function () {
// and the timestamp in question to a zset to the scheduler will know which timestamps have data to work
_this.connection.redis.zadd(_this.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){
if(typeof callback === 'function'){ callback(); }
_this.connection.redis.zadd(_this.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function () {
if (typeof callback === 'function') {
callback();
}
});

@@ -98,8 +110,8 @@ });

queue.prototype.enqueueIn = function(time, q, func, args, callback){
queue.prototype.enqueueIn = function (time, q, func, args, callback) {
var _this = this;
if(arguments.length === 4 && typeof args === 'function'){
if (arguments.length === 4 && typeof args === 'function') {
callback = args;
args = [];
}else if(arguments.length < 4){
} else if (arguments.length < 4) {
args = [];

@@ -109,11 +121,13 @@ }

var timestamp = (new Date().getTime()) + time;
_this.enqueueAt(timestamp, q, func, args, function(){
if(typeof callback === 'function'){ callback(); }
_this.enqueueAt(timestamp, q, func, args, function () {
if (typeof callback === 'function') {
callback();
}
});
};
queue.prototype.queues = function(callback){
queue.prototype.queues = function (callback) {
var _this = this;
_this.connection.ensureConnected(callback, function(){
_this.connection.redis.smembers(_this.connection.key('queues'), function(err, queues){
_this.connection.ensureConnected(callback, function () {
_this.connection.redis.smembers(_this.connection.key('queues'), function (err, queues) {
callback(err, queues);

@@ -124,7 +138,7 @@ });

queue.prototype.delQueue = function(q, callback){
queue.prototype.delQueue = function (q, callback) {
var _this = this;
_this.connection.redis.del(_this.connection.key('queue', q), function(err){
if(err) return callback(err)
_this.connection.redis.srem(_this.connection.key('queues'), q, function(err){
_this.connection.redis.del(_this.connection.key('queue', q), function (err) {
if (err) return callback(err)
_this.connection.redis.srem(_this.connection.key('queues'), q, function (err) {
callback(err);

@@ -135,6 +149,6 @@ })

queue.prototype.length = function(q, callback){
queue.prototype.length = function (q, callback) {
var _this = this;
_this.connection.ensureConnected(callback, function(){
_this.connection.redis.llen(_this.connection.key('queue', q), function(err, length){
_this.connection.ensureConnected(callback, function () {
_this.connection.redis.llen(_this.connection.key('queue', q), function (err, length) {
callback(err, length);

@@ -145,9 +159,9 @@ });

queue.prototype.del = function(q, func, args, count, callback){
queue.prototype.del = function (q, func, args, count, callback) {
var _this = this;
if(typeof count === 'function' && callback === undefined){
if (typeof count === 'function' && callback === undefined) {
callback = count;
count = 0;
}else if(arguments.length === 3){
if(typeof args === 'function'){
} else if (arguments.length === 3) {
if (typeof args === 'function') {
callback = args;

@@ -157,3 +171,3 @@ args = [];

count = 0;
}else if(arguments.length < 3){
} else if (arguments.length < 3) {
args = [];

@@ -163,5 +177,7 @@ count = 0;

args = arrayify(args);
_this.connection.ensureConnected(callback, function(){
_this.connection.redis.lrem(_this.connection.key('queue', q), count, _this.encode(q, func, args), function(err, count){
if(typeof callback === 'function'){ callback(err, count); }
_this.connection.ensureConnected(callback, function () {
_this.connection.redis.lrem(_this.connection.key('queue', q), count, _this.encode(q, func, args), function (err, count) {
if (typeof callback === 'function') {
callback(err, count);
}
});

@@ -171,8 +187,8 @@ });

queue.prototype.delDelayed = function(q, func, args, callback){
queue.prototype.delDelayed = function (q, func, args, callback) {
var _this = this;
if(arguments.length === 3 && typeof args === 'function'){
if (arguments.length === 3 && typeof args === 'function') {
callback = args;
args = [];
}else if(arguments.length < 3){
} else if (arguments.length < 3) {
args = [];

@@ -182,16 +198,22 @@ }

var search = _this.encode(q, func, args);
_this.connection.ensureConnected(callback, function(){
var timestamps = _this.connection.redis.smembers(_this.connection.key("timestamps:" + search), function(err, members){
if(members.length === 0 ){ if(typeof callback === 'function'){ callback(err, []); } }
else{
_this.connection.ensureConnected(callback, function () {
var timestamps = _this.connection.redis.smembers(_this.connection.key("timestamps:" + search), function (err, members) {
if (members.length === 0) {
if (typeof callback === 'function') {
callback(err, []);
}
}
else {
var started = 0;
var timestamps = [];
members.forEach(function(key){
members.forEach(function (key) {
started++;
_this.connection.redis.lrem(key, 0, search, function(){
_this.connection.redis.srem(_this.connection.key("timestamps:" + search), key, function(){
_this.connection.redis.lrem(key, 0, search, function () {
_this.connection.redis.srem(_this.connection.key("timestamps:" + search), key, function () {
timestamps.push(key.split(":")[key.split(":").length - 1]);
started--;
if(started === 0){
if(typeof callback === 'function'){ callback(err, timestamps); }
if (started === 0) {
if (typeof callback === 'function') {
callback(err, timestamps);
}
}

@@ -206,8 +228,8 @@ });

queue.prototype.scheduledAt = function(q, func, args, callback){
queue.prototype.scheduledAt = function (q, func, args, callback) {
var _this = this;
if(arguments.length === 3 && typeof args === 'function'){
if (arguments.length === 3 && typeof args === 'function') {
callback = args;
args = [];
}else if(arguments.length < 3){
} else if (arguments.length < 3) {
args = [];

@@ -217,11 +239,13 @@ }

var search = _this.encode(q, func, args);
_this.connection.ensureConnected(callback, function(){
_this.connection.redis.smembers(_this.connection.key("timestamps:" + search), function(err, members){
_this.connection.ensureConnected(callback, function () {
_this.connection.redis.smembers(_this.connection.key("timestamps:" + search), function (err, members) {
var timestamps = [];
if(members !== null){
members.forEach(function(key){
if (members !== null) {
members.forEach(function (key) {
timestamps.push(key.split(":")[key.split(":").length - 1]);
});
}
if(typeof callback === 'function'){ callback(err, timestamps); }
if (typeof callback === 'function') {
callback(err, timestamps);
}
});

@@ -231,7 +255,7 @@ });

queue.prototype.timestamps = function(callback){
queue.prototype.timestamps = function (callback) {
var _this = this;
var results = [];
_this.connection.redis.keys(_this.connection.key("delayed:*"), function(err, timestamps){
timestamps.forEach(function(timestamp){
_this.connection.redis.keys(_this.connection.key("delayed:*"), function (err, timestamps) {
timestamps.forEach(function (timestamp) {
var parts = timestamp.split(":");

@@ -245,9 +269,9 @@ results.push(parseInt(parts[(parts.length - 1)]) * 1000);

queue.prototype.delayedAt = function(timestamp, callback){
queue.prototype.delayedAt = function (timestamp, callback) {
var _this = this;
var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms
var tasks = [];
_this.connection.redis.lrange(_this.connection.key("delayed:" + rTimestamp), 0, -1, function(err, items){
items.forEach(function(i){
tasks.push( JSON.parse(i) );
_this.connection.redis.lrange(_this.connection.key("delayed:" + rTimestamp), 0, -1, function (err, items) {
items.forEach(function (i) {
tasks.push(JSON.parse(i));
});

@@ -258,7 +282,7 @@ callback(err, tasks, rTimestamp);

queue.prototype.queued = function(q, start, stop, callback){
queue.prototype.queued = function (q, start, stop, callback) {
var _this = this;
_this.connection.ensureConnected(callback, function(){
_this.connection.redis.lrange(_this.connection.key('queue', q), start, stop, function(err, items){
var tasks = items.map(function(i){
_this.connection.ensureConnected(callback, function () {
_this.connection.redis.lrange(_this.connection.key('queue', q), start, stop, function (err, items) {
var tasks = items.map(function (i) {
return JSON.parse(i)

@@ -271,16 +295,18 @@ });

queue.prototype.allDelayed = function(callback){
queue.prototype.allDelayed = function (callback) {
var _this = this;
var started = 0;
var results = {};
_this.timestamps(function(err, timestamps){
if(timestamps.length === 0){
_this.timestamps(function (err, timestamps) {
if (timestamps.length === 0) {
callback(err, {});
}else{
timestamps.forEach(function(timestamp){
} else {
timestamps.forEach(function (timestamp) {
started++;
_this.delayedAt(timestamp, function(err, tasks, rTimestamp){
_this.delayedAt(timestamp, function (err, tasks, rTimestamp) {
results[(rTimestamp * 1000)] = tasks;
started--;
if(started === 0){ callback(err, results); }
if (started === 0) {
callback(err, results);
}
});

@@ -292,19 +318,19 @@ });

queue.prototype.workers = function(callback){
queue.prototype.workers = function (callback) {
var _this = this;
var workers = {};
_this.connection.redis.smembers(_this.connection.key('workers'), function(err, results){
if(results){
results.forEach(function(r){
_this.connection.redis.smembers(_this.connection.key('workers'), function (err, results) {
if (results) {
results.forEach(function (r) {
var parts = r.split(':');
var name, queues;
if(parts.length === 1){
if (parts.length === 1) {
name = parts[0];
workers[name] = null;
}
else if(parts.length === 2){
else if (parts.length === 2) {
name = parts[0];
queues = parts[1];
workers[name] = queues;
}else{
} else {
name = parts.shift() + ":" + parts.shift();

@@ -316,34 +342,38 @@ queues = parts.join(':');

}
if(typeof callback === 'function'){ callback(err, workers); }
if (typeof callback === 'function') {
callback(err, workers);
}
});
};
queue.prototype.workingOn = function(workerName, queues, callback){
queue.prototype.workingOn = function (workerName, queues, callback) {
var _this = this;
var fullWorkerName = workerName + ':' + queues;
_this.connection.redis.get(_this.connection.key('worker', fullWorkerName), function(err, data){
if(typeof callback === 'function'){ callback(err, data); }
_this.connection.redis.get(_this.connection.key('worker', fullWorkerName), function (err, data) {
if (typeof callback === 'function') {
callback(err, data);
}
});
};
queue.prototype.allWorkingOn = function(callback){
queue.prototype.allWorkingOn = function (callback) {
var _this = this;
var results = {};
var counter = 0;
_this.workers(function(err, workers){
if(err && typeof callback === 'function'){
_this.workers(function (err, workers) {
if (err && typeof callback === 'function') {
callback(err, results);
}else if(!workers || hashLength(workers) === 0){
} else if (!workers || hashLength(workers) === 0) {
callback(null, results);
}else{
for(var w in workers){
} else {
for (var w in workers) {
counter++;
results[w] = 'started';
_this.workingOn(w, workers[w], function(err, data){
_this.workingOn(w, workers[w], function (err, data) {
counter--;
if(data){
if (data) {
data = JSON.parse(data);
results[data.worker] = data;
}
if(counter === 0 && typeof callback === 'function'){
if (counter === 0 && typeof callback === 'function') {
callback(err, results);

@@ -357,13 +387,19 @@ }

queue.prototype.forceCleanWorker = function(workerName, callback){
queue.prototype.forceCleanWorker = function (workerName, callback) {
var _this = this;
_this.workers(function(err, workers){
_this.workers(function (err, workers) {
var queues = workers[workerName];
var errorPayload;
if(err){ callback(err); }
else if(!queues){ callback(new Error('worker not round')); }
else{
_this.workingOn(workerName, queues, function(err, workingOn){
if(err){ callback(err); }
else if(workingOn){
if (err) {
callback(err);
}
else if (!queues) {
callback(new Error('worker not round'));
}
else {
_this.workingOn(workerName, queues, function (err, workingOn) {
if (err) {
callback(err);
}
else if (workingOn) {
workingOn = JSON.parse(workingOn);

@@ -389,3 +425,3 @@ errorPayload = {

_this.connection.redis.srem(_this.connection.key('workers'), workerName + ':' + queues)
], function(err, data){
], function (err, data) {
callback(err, errorPayload);

@@ -399,28 +435,36 @@ });

queue.prototype.cleanOldWorkers = function(age, callback){
/**
*
* @param age
* @param callback
*/
queue.prototype.cleanOldWorkers = function (age, callback) {
// note: this method will remove the data created by a "stuck" worker and move the payload to the error queue
// however, it will not actually remove any processes which may be running. A job *may* be running that you have removed
var _this = this;
console.dir(this)
var results = {};
_this.allWorkingOn(function(err, data){
if(err && typeof callback === 'function'){
_this.allWorkingOn(function (err, data) {
if (err && typeof callback === 'function') {
callback(err);
}else if((!data || hashLength(data) && typeof callback === 'function' ) === 0){
} else if ((!data || hashLength(data) && typeof callback === 'function' ) === 0) {
callback(null, results);
}else{
} else {
var started = 0;
for(var workerName in data){
for (var workerName in data) {
started++;
if(Date.now() - Date.parse(data[workerName].run_at) > age){
_this.forceCleanWorker(workerName, function(error, errorPayload){
if(errorPayload && errorPayload.worker ){ results[errorPayload.worker] = errorPayload; }
if (Date.now() - Date.parse(data[workerName].run_at) > age) {
_this.forceCleanWorker(workerName, function (error, errorPayload) {
if (errorPayload && errorPayload.worker) {
results[errorPayload.worker] = errorPayload;
}
started--;
if(started === 0 && typeof callback === 'function'){
if (started === 0 && typeof callback === 'function') {
callback(null, results);
}
});
}else{
process.nextTick(function(){
} else {
process.nextTick(function () {
started--;
if(started === 0 && typeof callback === 'function'){
if (started === 0 && typeof callback === 'function') {
callback(null, results);

@@ -435,5 +479,5 @@ }

queue.prototype.failedCount = function(callback){
queue.prototype.failedCount = function (callback) {
var _this = this;
_this.connection.redis.llen(_this.connection.key('failed'), function(err, length){
_this.connection.redis.llen(_this.connection.key('failed'), function (err, length) {
callback(err, length);

@@ -443,7 +487,9 @@ });

queue.prototype.failed = function(start, stop, callback){
queue.prototype.failed = function (start, stop, callback) {
var _this = this;
var results = [];
_this.connection.redis.lrange(_this.connection.key('failed'), start, stop, function(err, data){
data.forEach(function(d){ results.push( JSON.parse(d) ); });
_this.connection.redis.lrange(_this.connection.key('failed'), start, stop, function (err, data) {
data.forEach(function (d) {
results.push(JSON.parse(d));
});
callback(err, results);

@@ -453,3 +499,3 @@ });

queue.prototype.removeFailed = function(failedJob, callback){
queue.prototype.removeFailed = function (failedJob, callback) {
var _this = this;

@@ -459,7 +505,11 @@ _this.connection.redis.lrem(_this.connection.key('failed'), 1, JSON.stringify(failedJob), callback);

queue.prototype.retryAndRemoveFailed = function(failedJob, callback){
queue.prototype.retryAndRemoveFailed = function (failedJob, callback) {
var _this = this;
_this.removeFailed(failedJob, function(err, countFailed){
if(err){return callback(err, failedJob); }
if(countFailed < 1 ){return callback(new Error('This job is not in failed queue'), failedJob); }
_this.removeFailed(failedJob, function (err, countFailed) {
if (err) {
return callback(err, failedJob);
}
if (countFailed < 1) {
return callback(new Error('This job is not in failed queue'), failedJob);
}
_this.enqueue(failedJob.queue, failedJob.payload.runner, failedJob.payload.args, callback);

@@ -473,6 +523,6 @@ });

var arrayify = function(o){
if(Array.isArray(o)){
var arrayify = function (o) {
if (Array.isArray(o)) {
return o;
}else{
} else {
return [o];

@@ -482,6 +532,8 @@ }

var hashLength = function(obj) {
var hashLength = function (obj) {
var size = 0, key;
for(key in obj){
if(obj.hasOwnProperty(key)){ size++; }
for (key in obj) {
if (obj.hasOwnProperty(key)) {
size++;
}
}

@@ -488,0 +540,0 @@ return size;

@@ -120,3 +120,3 @@ /**

task = _.assign(this.hive.config.tasks.defaultTaskOptions, task);
task = _.merge(this.hive.config.tasks.defaultTaskOptions, task);

@@ -123,0 +123,0 @@ }

{
"name": "hive-dev",
"version": "1.0.7",
"version": "1.0.8",
"description": "An intelligent Redis powered, job, worker and queue library with advanced options and plugin support.",

@@ -5,0 +5,0 @@ "main": "index.js",

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc