async-deco
Advanced tools
Comparing version 5.0.4 to 5.1.0
{ | ||
"name": "async-deco", | ||
"version": "5.0.4", | ||
"version": "5.1.0", | ||
"description": "A collection of decorators for adding features to asynchronous functions (callback or promise based).", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -283,3 +283,3 @@ async-deco | ||
----- | ||
Limit the concurrency of a function. | ||
Limit the concurrency of a function. Every function call that excedees the limit will be queued. If the queue size is reached the function call will return an error. | ||
```js | ||
@@ -292,7 +292,11 @@ var limitDecorator = require('async-deco/callback/limit'); | ||
You can initialise the decorator with 1 argument: | ||
* number of parallel execution [mandatory] | ||
* a getKey function [optional]: it runs against the original arguments and returns the key used for creating different queues of execution. If it is missing there will be only one execution queue. | ||
* number of parallel execution [default to 1]. It can also be an object: {max: number, queueSize: number}. | ||
* "limit" will be the number of parallel execution | ||
* "queueSize" is the size of the queue (default to Infinity). If the queue reaches this size any further function call will return an error without calling the original function | ||
* a getKey function [optional]: it runs against the original arguments and returns the key used for creating different queues of execution. If it is missing there will be only one execution queue. If it returns null or undefined, the limit will be ignored. | ||
It logs "limit" when a function gets queued with { number: number of function queued, key: cache key } | ||
logger('limit-drop', { queueSize: queues[cacheKey].length, parallel: executionNumbers[cacheKey], key: cacheKey }); | ||
It logs "limit-queue" when a function gets queued or "limit-drop" when a function gets rejected (queue full). It'll also log these data: { queueSize: number of function queued, key: cache key, parallel: number of functions currently running } | ||
Dedupe | ||
@@ -312,4 +316,4 @@ ------ | ||
It logs "dedupe" whenever is calling more than one callback with the same results. | ||
{len: number of function call saved, key: cache key} | ||
It logs "dedupe-queue" when a function is queued waiting for the result from another function. | ||
{key: cache key} | ||
@@ -316,0 +320,0 @@ Utilities |
@@ -19,5 +19,2 @@ var defaultLogger = require('../utils/default-logger'); | ||
var len = cacheKey in callback_queues ? callback_queues[cacheKey].length : 0; | ||
if (len > 1) { | ||
logger('dedupe', {len: len, key: cacheKey}); | ||
} | ||
for (var i = 0; i < len; i++) { | ||
@@ -29,3 +26,3 @@ callback_queues[cacheKey][i](err, dep); | ||
if (cacheKey === null) { | ||
if (cacheKey == null) { | ||
func.apply(context, args); | ||
@@ -44,2 +41,3 @@ } | ||
else { | ||
logger('dedupe-queue', {key: cacheKey}); | ||
callback_queues[cacheKey].push(cb); | ||
@@ -46,0 +44,0 @@ } |
require('setimmediate'); | ||
var LimitError = require('../errors/limit-error'); | ||
var defaultLogger = require('../utils/default-logger'); | ||
@@ -7,3 +8,8 @@ var keyGetter = require('memoize-cache/key-getter'); | ||
getKey = keyGetter(getKey || function () { return '_default'; }); | ||
max = max || 1; | ||
var queueSize = Infinity; | ||
if (typeof max === 'object') { | ||
queueSize = max.queueSize; | ||
max = max.limit; | ||
} | ||
return wrapper(function (func) { | ||
@@ -25,3 +31,2 @@ var executionNumbers = {}; | ||
if (executionNumbers[cacheKey] >= max) { | ||
logger('limit', { number: executionNumbers[cacheKey], key: cacheKey }); | ||
continue; | ||
@@ -37,2 +42,5 @@ } | ||
if (cacheKey == null ) { | ||
return func.apply(context, args); | ||
} | ||
@@ -54,6 +62,19 @@ if (!(cacheKey in executionNumbers)) { | ||
queues[cacheKey].push(function () { | ||
if (executionNumbers[cacheKey] < max) { | ||
executionNumbers[cacheKey]++; | ||
func.apply(context, args); | ||
}); | ||
runQueues(); | ||
} | ||
else if (executionNumbers[cacheKey] >= max) { | ||
if (queues[cacheKey].length >= queueSize) { | ||
logger('limit-drop', { queueSize: queues[cacheKey].length, parallel: executionNumbers[cacheKey], key: cacheKey }); | ||
cb(new LimitError('Max queue size reached (' + queueSize + ')')); | ||
} | ||
else { | ||
logger('limit-queue', { queueSize:queues[cacheKey].length, parallel: executionNumbers[cacheKey], key: cacheKey }); | ||
queues[cacheKey].push(function () { | ||
func.apply(context, args); | ||
}); | ||
} | ||
} | ||
}; | ||
@@ -60,0 +81,0 @@ }); |
var assert = require('chai').assert; | ||
var limitDecorator = require('../../callback/limit'); | ||
var LimitError = require('../../errors/limit-error'); | ||
@@ -112,3 +113,67 @@ function timePassedFrom() { | ||
}); | ||
}); | ||
describe('hard limit (callback)', function () { | ||
var limitToOne, limitToTwo, limitToThree; | ||
beforeEach(function () { | ||
limitToOne = limitDecorator({limit: 1, queueSize: 0}); | ||
limitToTwo = limitDecorator({limit: 1, queueSize: 1}); | ||
}); | ||
it('must limit to one function call', function (done) { | ||
var assertTimePassed = timePassedFrom(); | ||
var f = limitToOne(function (a, next) { | ||
setTimeout(function () { | ||
next(undefined, a); | ||
}, a); | ||
}); | ||
var c = 0; | ||
var getResult = function (err, dep) { | ||
c++; | ||
if (c === 1) { // the second function returns an error immediately | ||
assert.instanceOf(err, LimitError); | ||
assert.isUndefined(dep); | ||
} else if (c == 2) { // the first function returns the result | ||
assertTimePassed(40); | ||
assert.equal(dep, 40); | ||
done(); | ||
} | ||
}; | ||
f(40, getResult); | ||
f(20, getResult); | ||
}); | ||
it('must limit to 2 function call', function (done) { | ||
var assertTimePassed = timePassedFrom(); | ||
var f = limitToTwo(function (a, next) { | ||
setTimeout(function () { | ||
next(undefined, a); | ||
}, a); | ||
}); | ||
var c = 0; | ||
var getResult = function (err, dep) { | ||
c++; | ||
if (c === 1) { // the thirs function returns an error immediately | ||
assert.instanceOf(err, LimitError); | ||
assert.isUndefined(dep); | ||
} else if (c == 2) { | ||
assertTimePassed(40); | ||
assert.equal(dep, 40); | ||
} else if (c == 3) { | ||
assertTimePassed(60); | ||
assert.equal(dep, 20); | ||
done(); | ||
} | ||
}; | ||
f(40, getResult); | ||
f(20, getResult); | ||
f(60, getResult); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
101350
2716
518