Comparing version 0.0.2 to 1.0.0
@@ -8,36 +8,69 @@ #!/usr/bin/env node | ||
var Queue = require('../queue'); | ||
var Queue = require("../queue"); | ||
var q = new Queue({ | ||
timeout: 100, | ||
concurrency: 100 | ||
}); | ||
var results = []; | ||
var q = new Queue(); | ||
// advance handler | ||
q.on("advance", function () { | ||
console.log("The queue is about to advance"); | ||
// listen for events | ||
q.on('processed', function(job) { | ||
console.log('job finished processing:', job.toString().replace(/\n/g, '')); | ||
}); | ||
// drain handler | ||
q.on("drain", function () { | ||
console.log("All done:", results); | ||
q.on('drain', function() { | ||
console.log('all done:', results); | ||
}); | ||
// add individual functions | ||
q.push(function (cb) { | ||
results.push("one"); | ||
// add jobs using the familiar Array API | ||
q.push(function(cb) { | ||
results.push('two'); | ||
cb(); | ||
}, function (err, jobQueue) { | ||
console.log("This is a job specific callback"); | ||
}); | ||
// add arrays of functions | ||
q.push([ | ||
function (cb) { | ||
results.push("two"); | ||
q.push( | ||
function(cb) { | ||
results.push('four'); | ||
cb(); | ||
}, | ||
function (cb) { | ||
results[2] = "three"; | ||
function(cb) { | ||
results.push('five'); | ||
cb(); | ||
} | ||
]); | ||
); | ||
q.unshift(function(cb) { | ||
results.push('one'); | ||
cb(); | ||
}); | ||
q.splice(2, 0, function(cb) { | ||
results.push('three'); | ||
cb(); | ||
}); | ||
// use the timeout feature to deal with jobs that | ||
// take too long or forget to execute a callback | ||
q.on('timeout', function(job, next) { | ||
console.log('job timed out:', job.toString().replace(/\n/g, '')); | ||
next(); | ||
}); | ||
q.push(function(cb) { | ||
setTimeout(function() { | ||
console.log('slow job finished'); | ||
cb(); | ||
}, 200); | ||
}); | ||
q.push(function(cb) { | ||
console.log('forgot to execute callback'); | ||
}); |
{ | ||
"name": "queue", | ||
"version": "0.0.2", | ||
"version": "1.0.0", | ||
"description": "An async job queue with adjustable concurrency", | ||
@@ -5,0 +5,0 @@ "main": "queue.js", |
100
queue.js
@@ -6,60 +6,68 @@ /* | ||
module.exports = Queue; | ||
var util = require("util"); | ||
var EventEmitter = require("events").EventEmitter; | ||
var EventEmitter = require('events').EventEmitter; | ||
function Queue (concurrency) { | ||
this.concurrency = concurrency || 1; | ||
this.active = []; | ||
function Queue(options) { | ||
options = options || {}; | ||
this.concurrency = options.concurrency || 1; | ||
this.timeout = options.timeout || 0; | ||
this.pending = 0; | ||
this.jobs = []; | ||
} | ||
util.inherits(Queue, EventEmitter); | ||
Queue.prototype = new EventEmitter; | ||
Queue.prototype.push = function (job, cb) { | ||
var self = this; | ||
if (this.jobs.length === 0) { | ||
process.nextTick(function () { | ||
self.run(); | ||
}); | ||
Queue.prototype.__defineGetter__('length', function() { | ||
return this.pending + this.jobs.length; | ||
}); | ||
// expose selected array methods | ||
[ 'pop', 'shift', 'slice', 'reverse', 'indexOf', 'lastIndexOf' ].forEach(function(method) { | ||
Queue.prototype[method] = function() { | ||
return Array.prototype[method].apply(this.jobs, arguments); | ||
} | ||
if (job instanceof Array) { | ||
for (var i in job) { | ||
var j = job[i]; | ||
var c = null; | ||
if (cb instanceof Array) { | ||
c = cb[i]; | ||
} | ||
self.push(j, c); | ||
} | ||
} else { | ||
this.jobs.push([job, cb]); | ||
}); | ||
// additive Array methods should auto-advance the queue | ||
[ 'push', 'unshift', 'splice' ].forEach(function(method) { | ||
Queue.prototype[method] = function() { | ||
process.nextTick(this.process.bind(this)); | ||
return Array.prototype[method].apply(this.jobs, arguments); | ||
} | ||
} | ||
}); | ||
Queue.prototype.run = function () { | ||
if (this.jobs.length > 0 && this.active.length < this.concurrency) { | ||
Queue.prototype.process = function() { | ||
if (this.jobs.length > 0 && this.pending < this.concurrency) { | ||
this.pending++; | ||
var job = this.jobs.shift(); | ||
this.active.push(job); | ||
this.run(); | ||
var self = this; | ||
var cb = job[1]; | ||
job = job[0]; | ||
job(function (err) { | ||
if (cb) cb(err, self); | ||
self.emit("advance", err, self); | ||
if (self.jobs.length === 0 && self.active.length === 1) { | ||
self.active = []; | ||
self.emit("drain", self); | ||
} else { | ||
self.active.shift(); | ||
self.run(); | ||
var once = true; | ||
var timeoutId = null; | ||
var didTimeout = false; | ||
var next = function() { | ||
if (once) { | ||
once = false; | ||
self.pending--; | ||
if (timeoutId !== null) clearTimeout(timeoutId); | ||
if (didTimeout === false) self.emit('processed', job); | ||
if (self.pending === 0 && self.jobs.length === 0) { | ||
self.emit('drain', self); | ||
} else { | ||
self.process(); | ||
} | ||
} | ||
}); | ||
} | ||
if (this.timeout) { | ||
timeoutId = setTimeout(function() { | ||
didTimeout = true; | ||
self.emit('timeout', job, next); | ||
}, this.timeout); | ||
} | ||
job(next); | ||
this.process(); | ||
} | ||
} | ||
Queue.prototype.empty = function (job) { | ||
this.jobs = []; | ||
} | ||
module.exports = Queue; |
@@ -12,51 +12,98 @@ ``` | ||
## Why | ||
[async](https://github.com/caolan/async#queue)'s queue expects you to have one worker and many jobs. This queue simply expects a list of async functions, which is a bit more flexible - otherwise it's the same idea. | ||
Wanted something more flexible than [async](https://github.com/caolan/async#queue)'s queue. | ||
## How | ||
The module exports a class named ```Queue```. Pass the desired concurrency to the constructor, or change it later via the ```concurrency``` property. Pass async functions (ones that accept a callback) to an instance's ```push()``` method. Processing begins automatically on ```process.nextTick()```. | ||
The module exports a class named `Queue` that implements most of the Array api. Pass async functions (ones that accept a callback) to an instance's `push()` method. Processing begins automatically on `process.nextTick()`. | ||
## Install | ||
```npm install queue``` | ||
`npm install queue` | ||
## Properties | ||
* ```concurrency``` maximum number of jobs that the queue should process concurrently - the default is 1 | ||
* `concurrency` maximum number of jobs that the queue should process concurrently - the default is 1 | ||
* `timeout` milliseconds to wait for a job to execute its callback | ||
## Methods | ||
* ```push(job, cb)``` add a job (and optional callback) to the queue | ||
* ```empty()``` remove any remaining jobs in the queue | ||
* ```run()``` force run the queue immediately | ||
* `push(job)` add a job to the queue | ||
## Events | ||
* ```"advance"``` fires after any job finishes | ||
* ```"drain"``` fires when the queue finishes processing all its jobs | ||
* `'processed'` when jobs finish | ||
* `'timeout'` when `queue.timeout` milliseconds have elapsed and a job has not executed its callback | ||
* `'drain'` when the queue finishes processing all its jobs | ||
## Usage | ||
```javascript | ||
var Queue = require("../queue"); | ||
var Queue = require('queue'); | ||
var q = new Queue({ | ||
timeout: 100, | ||
concurrency: 100 | ||
}); | ||
var results = []; | ||
var q = new Queue(); | ||
// add a drain handler | ||
q.on("drain", function () { | ||
console.log("All done:", results); | ||
// listen for events | ||
q.on('processed', function(job) { | ||
console.log('job finished processing:', job.toString().replace(/\n/g, '')); | ||
}); | ||
// add individual functions | ||
q.push(function (cb) { | ||
results.push("one"); | ||
q.on('drain', function() { | ||
console.log('all done:', results); | ||
}); | ||
// add jobs using familiar Array api | ||
q.push(function(cb) { | ||
results.push('two'); | ||
cb(); | ||
}); | ||
// add arrays of functions | ||
q.push([ | ||
function (cb) { | ||
results.push("two"); | ||
q.push( | ||
function(cb) { | ||
results.push('four'); | ||
cb(); | ||
}, | ||
function (cb) { | ||
results[2] = "three"; | ||
function(cb) { | ||
results.push('five'); | ||
cb(); | ||
} | ||
]); | ||
); | ||
q.unshift(function(cb) { | ||
results.push('one'); | ||
cb(); | ||
}); | ||
q.splice(2, 0, function(cb) { | ||
results.push('three'); | ||
cb(); | ||
}); | ||
// use the timeout feature to deal with jobs that | ||
// take too long or forget to execute a callback | ||
q.on('timeout', function(job, next) { | ||
console.log('job timed out:', job.toString().replace(/\n/g, '')); | ||
next(); | ||
}) | ||
q.push(function(cb) { | ||
setTimeout(function() { | ||
console.log('slow job finished'); | ||
cb(); | ||
}, 200); | ||
}); | ||
q.push(function(cb) { | ||
console.log('forgot to execute callback'); | ||
}); | ||
``` | ||
## Note | ||
Version 1.0 introduces api changes and is NOT backwards compatible with 0.0.2 | ||
## License | ||
[WTFPL](http://www.wtfpl.net/txt/copying/) |
@@ -8,10 +8,9 @@ #!/usr/bin/env node | ||
var assert = require('assert'); | ||
var Queue = require('../queue'); | ||
var assert = require("assert"); | ||
var Queue = require("../queue"); | ||
var answers = []; | ||
var q = new Queue(100); | ||
q.on("drain", function () { | ||
var solutions = ["one", "two", "three"]; | ||
var q = new Queue({ concurrency: 100 }); | ||
q.on('drain', function() { | ||
var solutions = [ 'one', 'two', 'three' ]; | ||
for (var i in answers) { | ||
@@ -22,8 +21,8 @@ var answer = answers[i]; | ||
} | ||
console.log("It works! ✔"); | ||
console.log('It works! ✔'); | ||
}); | ||
q.push(function (cb) { | ||
setTimeout(function () { | ||
answers.push("one"); | ||
q.push(function(cb) { | ||
setTimeout(function() { | ||
answers.push('one'); | ||
cb(); | ||
@@ -33,14 +32,14 @@ }, 1); | ||
q.push(function (cb) { | ||
setTimeout(function () { | ||
answers.push("three"); | ||
q.push(function(cb) { | ||
setTimeout(function() { | ||
answers.push('three'); | ||
cb(); | ||
}, 3); | ||
}, 6); | ||
}); | ||
q.push(function (cb) { | ||
setTimeout(function () { | ||
answers.push("two"); | ||
q.push(function(cb) { | ||
setTimeout(function() { | ||
answers.push('two'); | ||
cb(); | ||
}, 2); | ||
}, 3); | ||
}); |
@@ -8,10 +8,9 @@ #!/usr/bin/env node | ||
var assert = require('assert'); | ||
var Queue = require('../queue'); | ||
var assert = require("assert"); | ||
var Queue = require("../queue"); | ||
var answers = []; | ||
var q = new Queue(); | ||
q.on("drain", function () { | ||
var solutions = ["one", "two", "three"]; | ||
q.on('drain', function() { | ||
var solutions = [ 'one', 'two', 'three' ]; | ||
for (var i in answers) { | ||
@@ -22,18 +21,18 @@ var answer = answers[i]; | ||
} | ||
console.log("It works! ✔"); | ||
console.log('It works! ✔'); | ||
}); | ||
q.push(function (cb) { | ||
answers.push("one"); | ||
q.push(function(cb) { | ||
answers.push('one'); | ||
cb(); | ||
}); | ||
q.push(function (cb) { | ||
answers.push("two"); | ||
q.push(function(cb) { | ||
answers.push('two'); | ||
cb(); | ||
}); | ||
q.push(function (cb) { | ||
answers.push("three"); | ||
q.push(function(cb) { | ||
answers.push('three'); | ||
cb(); | ||
}); |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
6902
183
1
109
1