Socket
Socket
Sign inDemoInstall

concurrent-queue

Package Overview
Dependencies
10
Maintainers
1
Versions
18
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 7.0.0 to 7.0.1

188

index.js

@@ -1,9 +0,10 @@

var assert = require('assert'),
assign = require('object-assign'),
onerr = require('on-error'),
eventuate = require('eventuate'),
once = require('once'),
Promise = require('promise-polyfill'),
errors = require('./errors'),
setImmediate = require('timers').setImmediate
var assert = require('assert'),
assign = require('object-assign'),
onerr = require('on-error'),
eventuate = require('eventuate'),
once = require('once'),
Promise = require('promise-polyfill'),
after = require('afterward'),
setImmediate = require('timers').setImmediate,
MaxSizeExceededError = require('./errors').MaxSizeExceededError

@@ -19,82 +20,50 @@ module.exports = function () {

function cq (item, cb) {
cb = typeof cb === 'function' ? cb : function () {}
if (pending.length >= maxSize) {
var err = new MaxSizeExceededError('unable to queue item')
cb(err)
cq.rejected.produce({ item: item, err: err })
return Promise.reject(err)
}
drained = false
setImmediate(drain)
return new Promise(function (resolve, reject) {
var done = new Promise(function (resolve, reject) {
if (pending.length >= maxSize) {
var err = new MaxSizeExceededError('unable to queue item')
reject(err)
return cq.rejected.produce({ item: item, err: err })
}
drained = false
setImmediate(drain)
pending.push({
item: item,
resolve: function (value) {
cb.apply(undefined, [null].concat(Array.prototype.slice.call(arguments, 0)))
resolve.apply(undefined, arguments)
cq.processingEnded.produce({ item: item })
},
reject: function (err) {
cb(err)
reject(err)
cq.processingEnded.produce({ err: err, item: item })
}
item : item,
resolve: onResolve,
reject : onReject
})
cq.enqueued.produce({ item: item })
function onResolve (value) {
resolve(value)
cq.processingEnded.produce({ item: item })
}
function onReject (err) {
reject(err)
cq.processingEnded.produce({ err: err, item: item })
}
})
return after(done, cb)
}
Object.defineProperties(cq, {
size: { enumerable: true, get: function () {
return pending.length
}},
isDrained: { enumerable: true, get: function () {
return drained
}},
pending: { enumerable: true, get: function () {
return pending.map(function (task) {
return task.item
})
}},
processing: { enumerable: true, get: function () {
return processing.map(function (task) {
return task.item
})
}},
concurrency: { enumerable: true, get: function () {
return concurrency
}, set: function (value) {
if (typeof value !== 'number') throw new TypeError('concurrency must be a number')
concurrency = value
}},
maxSize: { enumerable: true, get: function () {
return maxSize
}, set: function (value) {
if (typeof value !== 'number') throw new TypeError('maxSize must be a number')
maxSize = value
}},
processor: { get: function () {
return processor
}},
limit: { value: function (limits) {
limits = assign({ concurrency: Infinity, maxSize: Infinity }, limits)
assert(typeof limits.maxSize === 'number', 'maxSize must be a number')
assert(typeof limits.concurrency === 'number', 'concurrency must be a number')
maxSize = limits.maxSize
concurrency = limits.concurrency
return cq
}},
process: { value: function (func) {
assert(typeof func === 'function', 'process requires a processor function')
assert(!processor, 'queue processor already defined')
processor = func
setImmediate(drain)
return cq
}},
enqueued: { value: eventuate() },
rejected: { value: eventuate() },
size : { get: getSize, enumerable: true },
isDrained : { get: getIsDrained, enumerable: true },
pending : { get: getPending, enumerable: true },
processing : { get: getProcessing, enumerable: true },
concurrency : { get: getConcurrency, set: setConcurrency, enumerable: true },
maxSize : { get: getMaxSize, set: setMaxSize, enumerable: true },
processor : { get: getProcessor },
limit : { value: limit },
process : { value: process },
enqueued : { value: eventuate() },
rejected : { value: eventuate() },
processingStarted: { value: eventuate() },
processingEnded: { value: eventuate() },
drained: { value: eventuate() }
processingEnded : { value: eventuate() },
drained : { value: eventuate() }
})
return cq
function drain () {

@@ -133,5 +102,60 @@ if (!drained && pending.length === 0 && processing.length === 0) {

return cq
function getSize () {
return pending.length
}
function getIsDrained () {
return drained
}
function getPending () {
return pending.map(function (task) {
return task.item
})
}
function getProcessing () {
return processing.map(function (task) {
return task.item
})
}
function getConcurrency () {
return concurrency
}
function setConcurrency (value) {
if (typeof value !== 'number') throw new TypeError('concurrency must be a number')
concurrency = value
}
function getMaxSize () {
return maxSize
}
function setMaxSize (value) {
if (typeof value !== 'number') throw new TypeError('maxSize must be a number')
maxSize = value
}
function getProcessor () {
return processor
}
function limit (limits) {
limits = assign({ concurrency: Infinity, maxSize: Infinity }, limits)
assert(typeof limits.maxSize === 'number', 'maxSize must be a number')
assert(typeof limits.concurrency === 'number', 'concurrency must be a number')
maxSize = limits.maxSize
concurrency = limits.concurrency
return cq
}
function process (func) {
assert(typeof func === 'function', 'process requires a processor function')
assert(!processor, 'queue processor already defined')
processor = func
setImmediate(drain)
return cq
}
}
var MaxSizeExceededError = errors.MaxSizeExceededError
{
"name": "concurrent-queue",
"version": "7.0.0",
"version": "7.0.1",
"description": "Fifo queue with concurrency control",

@@ -30,3 +30,3 @@ "main": "index.js",

"devDependencies": {
"@jasonpincin/standard": "~5.0.0-7",
"@jasonpincin/standard": "~5.0.0-8",
"istanbul": "~0.3.17",

@@ -41,2 +41,3 @@ "opn": "~1.0.2",

"dependencies": {
"afterward": "~2.0.0",
"define-error": "~1.0.0",

@@ -43,0 +44,0 @@ "eventuate": "~4.0.0",

@@ -131,6 +131,13 @@ # concurrent-queue

### var MaxSizeExceededError = require('concurrent-queue/errors').MaxSizeExceededError
### errors
The constructor for a MaxSizeExceededError. This is supplied to the callback and/or promise rejection when an item cannot be queued due to `queue.maxSize` constraints. Example:
```javascript
var errors = require('concurrent-queue/errors')
var MaxSizeExceededError = errors.MaxSizeExceededError
```
#### MaxSizeExceededError
Constructor for errors representing the `queue.maxSize` constraint being exceeded. This is supplied to the callback and/or promise rejection when an item cannot be queued due to `queue.maxSize` constraints. Example:
```javascript

@@ -137,0 +144,0 @@ var cq = require('concurrent-queue'),

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc