concurrent-queue
Advanced tools
Comparing version 7.0.0 to 7.0.1
188
index.js
@@ -1,9 +0,10 @@ | ||
var assert = require('assert'), | ||
assign = require('object-assign'), | ||
onerr = require('on-error'), | ||
eventuate = require('eventuate'), | ||
once = require('once'), | ||
Promise = require('promise-polyfill'), | ||
errors = require('./errors'), | ||
setImmediate = require('timers').setImmediate | ||
var assert = require('assert'), | ||
assign = require('object-assign'), | ||
onerr = require('on-error'), | ||
eventuate = require('eventuate'), | ||
once = require('once'), | ||
Promise = require('promise-polyfill'), | ||
after = require('afterward'), | ||
setImmediate = require('timers').setImmediate, | ||
MaxSizeExceededError = require('./errors').MaxSizeExceededError | ||
@@ -19,82 +20,50 @@ module.exports = function () { | ||
function cq (item, cb) { | ||
cb = typeof cb === 'function' ? cb : function () {} | ||
if (pending.length >= maxSize) { | ||
var err = new MaxSizeExceededError('unable to queue item') | ||
cb(err) | ||
cq.rejected.produce({ item: item, err: err }) | ||
return Promise.reject(err) | ||
} | ||
drained = false | ||
setImmediate(drain) | ||
return new Promise(function (resolve, reject) { | ||
var done = new Promise(function (resolve, reject) { | ||
if (pending.length >= maxSize) { | ||
var err = new MaxSizeExceededError('unable to queue item') | ||
reject(err) | ||
return cq.rejected.produce({ item: item, err: err }) | ||
} | ||
drained = false | ||
setImmediate(drain) | ||
pending.push({ | ||
item: item, | ||
resolve: function (value) { | ||
cb.apply(undefined, [null].concat(Array.prototype.slice.call(arguments, 0))) | ||
resolve.apply(undefined, arguments) | ||
cq.processingEnded.produce({ item: item }) | ||
}, | ||
reject: function (err) { | ||
cb(err) | ||
reject(err) | ||
cq.processingEnded.produce({ err: err, item: item }) | ||
} | ||
item : item, | ||
resolve: onResolve, | ||
reject : onReject | ||
}) | ||
cq.enqueued.produce({ item: item }) | ||
function onResolve (value) { | ||
resolve(value) | ||
cq.processingEnded.produce({ item: item }) | ||
} | ||
function onReject (err) { | ||
reject(err) | ||
cq.processingEnded.produce({ err: err, item: item }) | ||
} | ||
}) | ||
return after(done, cb) | ||
} | ||
Object.defineProperties(cq, { | ||
size: { enumerable: true, get: function () { | ||
return pending.length | ||
}}, | ||
isDrained: { enumerable: true, get: function () { | ||
return drained | ||
}}, | ||
pending: { enumerable: true, get: function () { | ||
return pending.map(function (task) { | ||
return task.item | ||
}) | ||
}}, | ||
processing: { enumerable: true, get: function () { | ||
return processing.map(function (task) { | ||
return task.item | ||
}) | ||
}}, | ||
concurrency: { enumerable: true, get: function () { | ||
return concurrency | ||
}, set: function (value) { | ||
if (typeof value !== 'number') throw new TypeError('concurrency must be a number') | ||
concurrency = value | ||
}}, | ||
maxSize: { enumerable: true, get: function () { | ||
return maxSize | ||
}, set: function (value) { | ||
if (typeof value !== 'number') throw new TypeError('maxSize must be a number') | ||
maxSize = value | ||
}}, | ||
processor: { get: function () { | ||
return processor | ||
}}, | ||
limit: { value: function (limits) { | ||
limits = assign({ concurrency: Infinity, maxSize: Infinity }, limits) | ||
assert(typeof limits.maxSize === 'number', 'maxSize must be a number') | ||
assert(typeof limits.concurrency === 'number', 'concurrency must be a number') | ||
maxSize = limits.maxSize | ||
concurrency = limits.concurrency | ||
return cq | ||
}}, | ||
process: { value: function (func) { | ||
assert(typeof func === 'function', 'process requires a processor function') | ||
assert(!processor, 'queue processor already defined') | ||
processor = func | ||
setImmediate(drain) | ||
return cq | ||
}}, | ||
enqueued: { value: eventuate() }, | ||
rejected: { value: eventuate() }, | ||
size : { get: getSize, enumerable: true }, | ||
isDrained : { get: getIsDrained, enumerable: true }, | ||
pending : { get: getPending, enumerable: true }, | ||
processing : { get: getProcessing, enumerable: true }, | ||
concurrency : { get: getConcurrency, set: setConcurrency, enumerable: true }, | ||
maxSize : { get: getMaxSize, set: setMaxSize, enumerable: true }, | ||
processor : { get: getProcessor }, | ||
limit : { value: limit }, | ||
process : { value: process }, | ||
enqueued : { value: eventuate() }, | ||
rejected : { value: eventuate() }, | ||
processingStarted: { value: eventuate() }, | ||
processingEnded: { value: eventuate() }, | ||
drained: { value: eventuate() } | ||
processingEnded : { value: eventuate() }, | ||
drained : { value: eventuate() } | ||
}) | ||
return cq | ||
function drain () { | ||
@@ -133,5 +102,60 @@ if (!drained && pending.length === 0 && processing.length === 0) { | ||
return cq | ||
function getSize () { | ||
return pending.length | ||
} | ||
function getIsDrained () { | ||
return drained | ||
} | ||
function getPending () { | ||
return pending.map(function (task) { | ||
return task.item | ||
}) | ||
} | ||
function getProcessing () { | ||
return processing.map(function (task) { | ||
return task.item | ||
}) | ||
} | ||
function getConcurrency () { | ||
return concurrency | ||
} | ||
function setConcurrency (value) { | ||
if (typeof value !== 'number') throw new TypeError('concurrency must be a number') | ||
concurrency = value | ||
} | ||
function getMaxSize () { | ||
return maxSize | ||
} | ||
function setMaxSize (value) { | ||
if (typeof value !== 'number') throw new TypeError('maxSize must be a number') | ||
maxSize = value | ||
} | ||
function getProcessor () { | ||
return processor | ||
} | ||
function limit (limits) { | ||
limits = assign({ concurrency: Infinity, maxSize: Infinity }, limits) | ||
assert(typeof limits.maxSize === 'number', 'maxSize must be a number') | ||
assert(typeof limits.concurrency === 'number', 'concurrency must be a number') | ||
maxSize = limits.maxSize | ||
concurrency = limits.concurrency | ||
return cq | ||
} | ||
function process (func) { | ||
assert(typeof func === 'function', 'process requires a processor function') | ||
assert(!processor, 'queue processor already defined') | ||
processor = func | ||
setImmediate(drain) | ||
return cq | ||
} | ||
} | ||
var MaxSizeExceededError = errors.MaxSizeExceededError |
{ | ||
"name": "concurrent-queue", | ||
"version": "7.0.0", | ||
"version": "7.0.1", | ||
"description": "Fifo queue with concurrency control", | ||
@@ -30,3 +30,3 @@ "main": "index.js", | ||
"devDependencies": { | ||
"@jasonpincin/standard": "~5.0.0-7", | ||
"@jasonpincin/standard": "~5.0.0-8", | ||
"istanbul": "~0.3.17", | ||
@@ -41,2 +41,3 @@ "opn": "~1.0.2", | ||
"dependencies": { | ||
"afterward": "~2.0.0", | ||
"define-error": "~1.0.0", | ||
@@ -43,0 +44,0 @@ "eventuate": "~4.0.0", |
@@ -131,6 +131,13 @@ # concurrent-queue | ||
### var MaxSizeExceededError = require('concurrent-queue/errors').MaxSizeExceededError | ||
### errors | ||
The constructor for a MaxSizeExceededError. This is supplied to the callback and/or promise rejection when an item cannot be queued due to `queue.maxSize` constraints. Example: | ||
```javascript | ||
var errors = require('concurrent-queue/errors') | ||
var MaxSizeExceededError = errors.MaxSizeExceededError | ||
``` | ||
#### MaxSizeExceededError | ||
Constructor for errors representing the `queue.maxSize` constraint being exceeded. This is supplied to the callback and/or promise rejection when an item cannot be queued due to `queue.maxSize` constraints. Example: | ||
```javascript | ||
@@ -137,0 +144,0 @@ var cq = require('concurrent-queue'), |
29374
476
188
7
+ Addedafterward@~2.0.0
+ Addedafterward@2.0.0(transitive)