Async-Limiter
A module for limiting concurrent asynchronous actions in flight. Forked from queue.
This module exports a class Limiter
that implements some of the Array
API.
Pass async functions (ones that accept a callback or return a promise) to an instance's additive array methods.
Motivation
Certain functions, like zlib
, have undesirable behavior when
run at infinite concurrency.
In this case, it is actually faster, and takes far less memory, to limit concurrency.
This module should do the absolute minimum work necessary to queue up functions. PRs are welcome that would
make this module faster or lighter, but new functionality is not desired.
Style should confirm to nodejs/node style.
Example
var Limiter = require('async-limiter');
var t = new Limiter({ concurrency: 2 });
var results = [];
t.push(function(cb) {
results.push('two');
cb();
});
t.push(
function(cb) {
results.push('four');
cb();
},
function(cb) {
results.push('five');
cb();
}
);
t.unshift(function(cb) {
results.push('one');
cb();
});
t.splice(2, 0, function(cb) {
results.push('three');
cb();
});
t.onDone(function() {
console.log('all done:', results);
});
Zlib Example
const zlib = require('zlib');
const Limiter = require('async-limiter');
const message = { some: 'data' };
const payload = new Buffer(JSON.stringify(message));
const t = new Limiter({ concurrency: 5 });
function deflate(payload, cb) {
t.push(function(done) {
zlib.deflate(payload, function(err, buffer) {
done();
cb(err, buffer);
});
});
}
console.time('deflate');
for (let i = 0; i < 30000; ++i) {
deflate(payload, function(err, buffer) {});
}
t.onDone(function() {
console.timeEnd('deflate');
});
Install
npm install async-limiter
Test
npm test
API
var t = new Limiter([opts])
Constructor. opts
may contain inital values for:
Instance methods
t.onDone(fn)
fn
will be called once and only once, when the queue is empty.
If the queue is empty on the next tick, onDone()
will be called.
Instance methods mixed in from Array
Mozilla has docs on how these methods work here.
t.push(element1, ..., elementN)
t.unshift(element1, ..., elementN)
t.splice(index , howMany[, element1[, ...[, elementN]]])
On the next tick, job processing will start.
Properties
t.concurrency
Max number of jobs the queue should process concurrently, defaults to Infinity
.
t.length
Jobs pending + jobs to process (readonly).