async-deco
Advanced tools
Comparing version 6.2.1 to 6.2.2
{ | ||
"name": "async-deco", | ||
"version": "6.2.1", | ||
"version": "6.2.2", | ||
"description": "A collection of decorators for adding features to asynchronous functions (callback or promise based).", | ||
@@ -34,3 +34,3 @@ "main": "index.js", | ||
"es6-promisify": "^3.0.0", | ||
"little-ds-toolkit": "0.0.2", | ||
"little-ds-toolkit": "0.1.0", | ||
"lodash": "^4.13.1", | ||
@@ -37,0 +37,0 @@ "memoize-cache-utils": "^0.0.2", |
@@ -315,7 +315,4 @@ async-deco | ||
* a getKey function [optional]: it runs against the original arguments and returns the key used for creating different queues of execution. If it is missing there will be only one execution queue. If it returns null or undefined, the limit will be ignored. | ||
* a getPriority function [optional]: it runs against the original arguments and returns a number that represent the priority of this function when queued (less == prioritary). If it is missing there will be used an array and | ||
* a getPriority function [optional]: it runs against the original arguments and returns a number that represent the priority of this function when queued (less == prioritary). | ||
there will be only one execution queue. If it returns null or undefined, the limit will be ignored. | ||
It logs "limit-queue" when a function gets queued or "limit-drop" when a function gets rejected (queue full). It'll also log these data: { queueSize: number of function queued, key: cache key, parallel: number of functions currently running } | ||
@@ -322,0 +319,0 @@ |
@@ -7,2 +7,5 @@ require('setimmediate'); | ||
var queueFactory = TaskQueue.queueFactory; | ||
var TaskQueueOverflowError = TaskQueue.TaskQueueOverflowError; | ||
function limitDecorator(wrapper, max, getKey, getPriority) { | ||
@@ -23,3 +26,3 @@ getKey = keyGetter(getKey || function () { return '_default'; }); | ||
var args = Array.prototype.slice.call(arguments, 0); | ||
var logger = defaultLogger.apply(context); | ||
var logger; | ||
var cb = args[args.length - 1]; | ||
@@ -49,3 +52,3 @@ var cacheKey = getKey.apply(context, args); | ||
executionNumbers[cacheKey] = 0; | ||
queues[cacheKey] = new TaskQueue(getPriority); | ||
queues[cacheKey] = queueFactory(getPriority, queueSize); | ||
} | ||
@@ -67,12 +70,18 @@ | ||
} | ||
else if (executionNumbers[cacheKey] >= max) { | ||
if (queues[cacheKey].size() >= queueSize) { | ||
logger('limit-drop', { queueSize: queues[cacheKey].size(), parallel: executionNumbers[cacheKey], key: cacheKey }); | ||
cb(new LimitError('Max queue size reached (' + queueSize + ')')); | ||
else { | ||
try { | ||
queues[cacheKey].push(func, context, args, cb); | ||
logger = defaultLogger.apply(context); | ||
logger('limit-queue', { queueSize: queueSize, parallel: executionNumbers[cacheKey], key: cacheKey }); | ||
} | ||
else { | ||
logger('limit-queue', { queueSize:queues[cacheKey].size(), parallel: executionNumbers[cacheKey], key: cacheKey }); | ||
queues[cacheKey].push(func, context, args); | ||
catch (e) { | ||
if (e instanceof TaskQueueOverflowError) { | ||
logger = defaultLogger.apply(e.item.context); | ||
logger('limit-drop', { queueSize: queueSize, parallel: executionNumbers[cacheKey], key: cacheKey }); | ||
e.item.cb(new LimitError('Max queue size reached (' + queueSize + ')')); | ||
} | ||
else { | ||
throw e; | ||
} | ||
} | ||
} | ||
@@ -79,0 +88,0 @@ }; |
@@ -138,3 +138,3 @@ var assert = require('chai').assert; | ||
assert.isUndefined(dep); | ||
} else if (c == 2) { // the first function returns the result | ||
} else if (c === 2) { // the first function returns the result | ||
assertTimePassed(40); | ||
@@ -141,0 +141,0 @@ assert.equal(dep, 40); |
var assert = require('chai').assert; | ||
var TaskQueue = require('../../utils/task-queue'); | ||
var queueFactory = TaskQueue.queueFactory; | ||
var TaskQueueOverflowError = TaskQueue.TaskQueueOverflowError; | ||
describe('task queue', function () { | ||
it('must work without priorities', function () { | ||
var q = new TaskQueue(); | ||
var q = queueFactory(); | ||
var sequence = []; | ||
@@ -28,3 +30,3 @@ q.push(function (letter) { | ||
it('must work with priorities', function () { | ||
var q = new TaskQueue(function (letter, priority) { | ||
var q = queueFactory(function (letter, priority) { | ||
return priority; | ||
@@ -56,3 +58,3 @@ }); | ||
it('must work with priorities (2)', function () { | ||
var q = new TaskQueue(function (letter, priority) { | ||
var q = queueFactory(function (letter, priority) { | ||
return priority; | ||
@@ -78,2 +80,60 @@ }); | ||
}); | ||
it('must work with a max queueSize', function () { | ||
var q = queueFactory(undefined, 1); | ||
var sequence = []; | ||
q.push(function (letter) { | ||
sequence.push(letter); | ||
}, null, ['A']); | ||
assert.throws(function () { | ||
q.push(function (letter) { | ||
sequence.push(letter); | ||
}, null, ['B']); | ||
}, TaskQueueOverflowError, 'Queue full'); | ||
assert.equal(q.size(), 1); | ||
assert.throws(function () { | ||
q.push(function (letter) { | ||
sequence.push(letter); | ||
}, null, ['C']); | ||
}, TaskQueueOverflowError, 'Queue full'); | ||
assert.equal(q.size(), 1); | ||
q.shift()(); | ||
assert.equal(q.size(), 0); | ||
q.push(function (letter) { | ||
sequence.push(letter); | ||
}, null, ['D']); | ||
q.shift()(); | ||
assert.equal('AD', sequence.join('')); | ||
}); | ||
it('must work with a max queueSize and priority', function () { | ||
var q = queueFactory(function (letter, priority) { | ||
return priority; | ||
}, 2); | ||
var sequence = []; | ||
q.push(function (letter) { | ||
sequence.push(letter); | ||
}, null, ['A', 10]); | ||
q.push(function (letter) { | ||
sequence.push(letter); | ||
}, null, ['B', 4]); | ||
assert.throws(function () { | ||
q.push(function (letter) { | ||
sequence.push(letter); | ||
}, null, ['C', 5]); | ||
}, TaskQueueOverflowError, 'Queue full'); | ||
assert.equal(q.size(), 2); | ||
q.shift()(); | ||
q.shift()(); | ||
assert.equal('BC', sequence.join('')); | ||
}); | ||
}); |
var Heap = require('little-ds-toolkit/lib/heap'); | ||
var MinMaxHeap = require('little-ds-toolkit/lib/min-max-heap'); | ||
function TaskQueue(getPriority) { | ||
this.getPriority = getPriority; | ||
this.isArray = !getPriority; | ||
this.queue = this.isArray ? [] : new Heap(function (a, b) { | ||
return a.priority - b.priority; | ||
}); | ||
function minPriorityComparator(a, b) { | ||
return a.priority - b.priority; | ||
} | ||
TaskQueue.prototype.push = function (func, context, args) { | ||
function getFunction(item) { | ||
if (!item) return; | ||
return function () { | ||
item.func.apply(item.context, item.args); | ||
}; | ||
} | ||
function TaskQueueOverflowError(message, item) { | ||
this.name = 'TaskQueueOverflowError'; | ||
this.message = message || 'TaskQueueOverflowError'; | ||
this.stack = (new Error()).stack; | ||
this.item = item; | ||
} | ||
TaskQueueOverflowError.prototype = Object.create(Error.prototype); | ||
TaskQueueOverflowError.prototype.constructor = TaskQueueOverflowError; | ||
function Queue(queueSize) { | ||
this.data = []; | ||
this.queueSize = queueSize; | ||
} | ||
Queue.prototype.push = function (func, context, args, cb) { | ||
var item = { | ||
func: func, | ||
context: context, | ||
args: args | ||
args: args, | ||
cb: cb | ||
}; | ||
if (this.isArray) { | ||
return this.queue.push(item); | ||
if (typeof this.queueSize !== 'undefined' && this.data.length === this.queueSize) { | ||
throw new TaskQueueOverflowError('Queue full', item); | ||
} | ||
item.priority = this.getPriority.apply(context, args); | ||
return this.queue.push(item); | ||
this.data.push(item); | ||
}; | ||
TaskQueue.prototype.shift = function () { | ||
var item = this.isArray ? this.queue.shift() : this.queue.pop(); | ||
if (!item) return; | ||
return function () { | ||
item.func.apply(item.context, item.args); | ||
Queue.prototype.shift = function () { | ||
var item = this.data.shift(); | ||
return getFunction(item); | ||
}; | ||
Queue.prototype.size = function () { | ||
return this.data.length; | ||
}; | ||
function PriorityQueue(getPriority) { | ||
this.getPriority = getPriority; | ||
this.heap = new Heap(minPriorityComparator); | ||
} | ||
PriorityQueue.prototype.push = function (func, context, args, cb) { | ||
var item = { | ||
func: func, | ||
context: context, | ||
args: args, | ||
cb: cb | ||
}; | ||
item.priority = this.getPriority.apply(item.context, item.args); | ||
this.heap.push(item); | ||
}; | ||
TaskQueue.prototype.size = function () { | ||
return this.isArray ? this.queue.length : this.queue.size(); | ||
PriorityQueue.prototype.shift = function () { | ||
var item = this.heap.pop(); | ||
return getFunction(item); | ||
}; | ||
module.exports = TaskQueue; | ||
PriorityQueue.prototype.size = function () { | ||
return this.heap.size(); | ||
}; | ||
function MinMaxPriorityQueue(getPriority, queueSize) { | ||
this.getPriority = getPriority; | ||
this.heap = new MinMaxHeap(minPriorityComparator); | ||
this.queueSize = queueSize; | ||
} | ||
MinMaxPriorityQueue.prototype.push = function (func, context, args, cb) { | ||
var last; | ||
var item = { | ||
func: func, | ||
context: context, | ||
args: args, | ||
cb: cb | ||
}; | ||
item.priority = this.getPriority.apply(item.context, item.args); | ||
this.heap.push(item); | ||
if (this.heap.size() > this.queueSize) { | ||
last = this.heap.popMax(); | ||
throw new TaskQueueOverflowError('Queue full', last); | ||
} | ||
}; | ||
MinMaxPriorityQueue.prototype.shift = function () { | ||
var item = this.heap.popMin(); | ||
return getFunction(item); | ||
}; | ||
MinMaxPriorityQueue.prototype.size = function () { | ||
return this.heap.size(); | ||
}; | ||
function queueFactory(getPriority, queueSize) { | ||
if (!getPriority) { | ||
return new Queue(queueSize); | ||
} | ||
if (typeof queueSize === 'number' && queueSize !== Infinity) { | ||
return new MinMaxPriorityQueue(getPriority, queueSize); | ||
} | ||
return new PriorityQueue(getPriority); | ||
} | ||
module.exports = { | ||
MinMaxPriorityQueue: MinMaxPriorityQueue, | ||
PriorityQueue: PriorityQueue, | ||
Queue: Queue, | ||
TaskQueueOverflowError: TaskQueueOverflowError, | ||
queueFactory: queueFactory | ||
}; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
133164
3562
687
+ Addedlittle-ds-toolkit@0.1.0(transitive)
- Removedlittle-ds-toolkit@0.0.2(transitive)
Updatedlittle-ds-toolkit@0.1.0