Comparing version
{ | ||
"name": "qjobs", | ||
"version": "1.1.0", | ||
"version": "1.1.1", | ||
"description": "qjobs is a simple and stupid queue job manager for nodejs", | ||
@@ -13,3 +13,3 @@ "main": "qjobs.js", | ||
"engines": { | ||
"node": "*" | ||
"node": ">=0.9" | ||
}, | ||
@@ -16,0 +16,0 @@ "repository": { |
181
qjobs.js
@@ -0,24 +1,39 @@ | ||
var util = require('util'); | ||
var events = require('events').EventEmitter; | ||
var EventEmitter = require('events').EventEmitter; | ||
var qjob = function(options) { | ||
var maxConcurrency = 10; | ||
var jobsRunning = 0; | ||
var jobsDone = 0; | ||
var jobsTotal = 0; | ||
var timeStart; | ||
var jobId = 0; | ||
var jobsList = []; | ||
var paused = false; | ||
var pausedId = null; | ||
var lastPause = 0; | ||
if(false === (this instanceof qjob)) { | ||
return new qjob(options); | ||
} | ||
var interval = null; | ||
var stopAdding = false; | ||
var sleeping = false; | ||
this.maxConcurrency = 10; | ||
this.jobsRunning = 0; | ||
this.jobsDone = 0; | ||
this.jobsTotal = 0; | ||
this.timeStart; | ||
this.jobId = 0; | ||
this.jobsList = []; | ||
this.paused = false; | ||
this.pausedId = null; | ||
this.lastPause = 0; | ||
this.interval = null; | ||
this.stopAdding = false; | ||
this.sleeping = false; | ||
if (options) { | ||
this.maxConcurrency = options.maxConcurrency || this.maxConcurrency; | ||
this.interval = options.interval || this.interval; | ||
} | ||
events.call(this); | ||
}; | ||
util.inherits(qjob, events); | ||
/* | ||
* helper to set max concurrency | ||
*/ | ||
var setConcurrency = function(max) { | ||
maxConcurrency = max; | ||
qjob.prototype.setConcurrency = function(max) { | ||
this.maxConcurrency = max; | ||
} | ||
@@ -29,4 +44,4 @@ | ||
*/ | ||
var setInterval = function(delay) { | ||
interval = delay; | ||
qjob.prototype.setInterval = function(delay) { | ||
this.interval = delay; | ||
} | ||
@@ -37,5 +52,6 @@ | ||
*/ | ||
var add = function(job,args) { | ||
jobsList.push([job,args]); | ||
jobsTotal++; | ||
qjob.prototype.add = function(job,args) { | ||
var self = this; | ||
self.jobsList.push([job,args]); | ||
self.jobsTotal++; | ||
} | ||
@@ -46,12 +62,16 @@ | ||
*/ | ||
var sleepDueToInterval = function() { | ||
if (interval === null) return; | ||
qjob.prototype.sleepDueToInterval = function() { | ||
var self = this; | ||
if (sleeping) { | ||
if (this.interval === null) { | ||
return; | ||
} | ||
if (this.sleeping) { | ||
return true; | ||
} | ||
if (stopAdding) { | ||
if (this.stopAdding) { | ||
if (jobsRunning > 0) { | ||
if (this.jobsRunning > 0) { | ||
//console.log('waiting for '+jobsRunning+' jobs to finish'); | ||
@@ -62,11 +82,11 @@ return true; | ||
//console.log('waiting for '+rafaleDelay+' ms'); | ||
sleeping = true; | ||
this.sleeping = true; | ||
self.emit('sleep'); | ||
setTimeout(function() { | ||
stopAdding = false; | ||
sleeping = false; | ||
this.stopAdding = false; | ||
this.sleeping = false; | ||
self.emit('continu'); | ||
run(); | ||
},interval); | ||
self.run(); | ||
}.bind(self),this.interval); | ||
@@ -76,5 +96,5 @@ return true; | ||
if (jobsRunning + 1 == maxConcurrency) { | ||
if (this.jobsRunning + 1 == this.maxConcurrency) { | ||
//console.log('max concurrent jobs reached'); | ||
stopAdding = true; | ||
this.stopAdding = true; | ||
return true; | ||
@@ -87,11 +107,13 @@ } | ||
*/ | ||
var run = function() { | ||
qjob.prototype.run = function() { | ||
var self = this; | ||
// first launch, let's emit start event | ||
if (jobsDone == 0) { | ||
if (this.jobsDone == 0) { | ||
self.emit('start'); | ||
timeStart = Date.now(); | ||
this.timeStart = Date.now(); | ||
} | ||
if (sleepDueToInterval()) return; | ||
if (self.sleepDueToInterval()) return; | ||
@@ -102,9 +124,9 @@ // while queue is empty and number of job running | ||
while (jobsList.length && jobsRunning < maxConcurrency) { | ||
while (this.jobsList.length && this.jobsRunning < this.maxConcurrency) { | ||
// get the next job and | ||
// remove it from the queue | ||
var job = jobsList.shift(); | ||
var job = self.jobsList.shift(); | ||
// increment number of job running | ||
jobsRunning++; | ||
self.jobsRunning++; | ||
@@ -115,3 +137,3 @@ // fetch args for the job | ||
// add jobId in args | ||
args._jobId = jobId++; | ||
args._jobId = this.jobId++; | ||
@@ -122,8 +144,9 @@ // emit jobStart event | ||
// run the job | ||
job[0](job[1],next.bind(this,args)); | ||
setTimeout(function() { | ||
this.j(this.args,self.next.bind(self,this.args)); | ||
}.bind({j:job[0],args:args}),1); | ||
} | ||
// all jobs done ? emit end event | ||
if (jobsList.length == 0 && jobsRunning == 0) { | ||
if (this.jobsList.length == 0 && this.jobsRunning == 0) { | ||
self.emit('end'); | ||
@@ -137,7 +160,9 @@ } | ||
*/ | ||
var next = function(args) { | ||
qjob.prototype.next = function(args) { | ||
var self = this; | ||
// update counters | ||
jobsRunning--; | ||
jobsDone++; | ||
this.jobsRunning--; | ||
this.jobsDone++; | ||
@@ -149,6 +174,6 @@ // emit 'jobEnd' event | ||
// then do nothing | ||
if (paused) return; | ||
if (this.paused) return; | ||
// else, execute run() function | ||
run(); | ||
self.run(); | ||
} | ||
@@ -162,11 +187,12 @@ | ||
*/ | ||
var pause = function(status) { | ||
paused = status; | ||
if (!paused && pausedId) { | ||
clearInterval(pausedId); | ||
run(); | ||
qjob.prototype.pause = function(status) { | ||
var self = this; | ||
this.paused = status; | ||
if (!this.paused && this.pausedId) { | ||
clearInterval(this.pausedId); | ||
this.run(); | ||
} | ||
if (paused && !pausedId) { | ||
lastPause = Date.now(); | ||
pausedId = setInterval(function() { | ||
if (this.paused && !this.pausedId) { | ||
this.lastPause = Date.now(); | ||
this.pausedId = setInterval(function() { | ||
var since = Date.now() - lastPause; | ||
@@ -178,3 +204,3 @@ self.emit('inPause',since); | ||
var stats = function() { | ||
qjob.prototype.stats = function() { | ||
@@ -184,11 +210,11 @@ var now = Date.now(); | ||
var o = {}; | ||
o._timeStart = timeStart || 'N/A'; | ||
o._timeElapsed = (now - timeStart) || 'N/A'; | ||
o._jobsTotal = jobsTotal; | ||
o._jobsRunning = jobsRunning; | ||
o._jobsDone = jobsDone; | ||
o._progress = Math.floor((jobsDone/jobsTotal)*100); | ||
o._concurrency = maxConcurrency; | ||
o._timeStart = this.timeStart || 'N/A'; | ||
o._timeElapsed = (now - this.timeStart) || 'N/A'; | ||
o._jobsTotal = this.jobsTotal; | ||
o._jobsRunning = this.jobsRunning; | ||
o._jobsDone = this.jobsDone; | ||
o._progress = Math.floor((this.jobsDone/this.jobsTotal)*100); | ||
o._concurrency = this.maxConcurrency; | ||
if (paused) { | ||
if (this.paused) { | ||
o._status = 'Paused'; | ||
@@ -203,3 +229,3 @@ return o; | ||
if (jobsTotal == jobsDone) { | ||
if (this.jobsTotal == this.jobsDone) { | ||
o._status = 'Finished'; | ||
@@ -213,22 +239,3 @@ return o; | ||
var self = new EventEmitter(); | ||
module.exports = function(options) { | ||
maxConcurrency = options.maxConcurrency || maxConcurrency; | ||
interval = options.interval || interval; | ||
self.run = run; | ||
self.add = add; | ||
self.setConcurrency = setConcurrency; | ||
self.setInterval = setInterval; | ||
self.stats = stats; | ||
return self; | ||
}; | ||
// backward compatibility < 1.0.9 | ||
module.exports.run = run; | ||
module.exports.add = add; | ||
module.exports.setConcurrency = setConcurrency; | ||
module.exports.setInterval = setInterval; | ||
module.exports.stats = stats; | ||
module.exports.on = self.on | ||
module.exports = qjob; |
#!/usr/bin/env node | ||
var assert = require('assert'); | ||
var qjob = require('../qjobs'); | ||
// only 2 jobs in the same time | ||
var q = new require('../qjobs'); | ||
var q = new qjob({maxConcurrency:2}); | ||
q.setConcurrency(2); | ||
var testExecutedJobs = 0; | ||
@@ -10,0 +9,0 @@ |
#!/usr/bin/env node | ||
var assert = require('assert'); | ||
var qjob = require('../qjobs'); | ||
var maxConcurrency = 2; | ||
// only 2 jobs in the same time | ||
var q = new require('../qjobs')({maxConcurrency:maxConcurrency}); | ||
var q = new qjob({maxConcurrency:maxConcurrency}); | ||
@@ -50,4 +50,2 @@ var testExecutedJobs = 0; | ||
setTimeout(q.run,1); | ||
var running = q.stats()._jobsRunning; | ||
@@ -61,1 +59,3 @@ | ||
assert.ok(!testEndFired); | ||
q.run(); |
#!/usr/bin/env node | ||
var assert = require('assert'); | ||
var qjob = require('../qjobs'); | ||
@@ -10,3 +11,3 @@ // maximum number of jobs executed in parallels | ||
var q = new require('../qjobs')({ | ||
var q = new qjob({ | ||
maxConcurrency:maxConcurrency, | ||
@@ -13,0 +14,0 @@ interval:interval |
Sorry, the diff of this file is not supported yet
12660
3.65%348
1.46%1
-75%