@goodware/task-queue: A lightweight task queue
Links
Requirements
NodeJS 8+
Installation
npm i --save @goodware/task-queue
Overview
This lightweight, battle-tested, single-dependency (joi) task queue limits the number of tasks (synchronous or asynchronous) that execute concurrently. The purpose of limiting task execution is to control resource usage such as memory and database connections.
Although several packages address this use case, this is apparently the only library that can queue tasks post-instantiation without using generators. The API is, most of all, easy to learn and use.
Creation
A task-queue object is instantiated by providing a configuration object to the constructor. The configuration object currently has one required and one optional property:
Name | Description |
---|
size | The size of the queue |
workers | The number of tasks that can execute simultaneously |
size can be provided without workers. workers can provided without size.
Usage
Functions are queued via the asynchronous method push(task)
. This method accepts a function named task
and returns a Promise that resolves to an object when the task
function is called (not when task
returns). task
is called only when a worker is available. task
does not need to return a Promise, but if it does, it can be acquired via the promise
property of the object returned by push().
Brief Example
- Create a queue that runs at most 10 running tasks
new (require('@goodware/task-queue'))({ size: 10 });
- Wait for the provided function to be invoked:
await queue.push(() => {...})
- Wait for the provided function to finish:
await (await queue.push(() => {...})).promise;
Code Sample
This example runs at most two tasks at a time. It outputs: 2, 1, 4, 3.
const queue = new (require('@goodware/task-queue'))({ size: 2 });
await queue.push(
() =>
new Promise((resolve) =>
setTimeout(() => {
console.log(`Task 1 ${Date.now()}`);
resolve();
}, 400)
)
);
await queue.push(
() =>
new Promise((resolve) =>
setTimeout(() => {
console.log(`Task 2 ${Date.now()}`);
resolve();
}, 300)
)
);
await queue.push(
() =>
new Promise((resolve) =>
setTimeout(() => {
console.log(`Task 3 ${Date.now()}`);
resolve();
}, 200)
)
);
const ret = await queue.push(
() =>
new Promise((resolve) =>
setTimeout(() => {
console.log(`Task 4 ${Date.now()}`);
resolve();
}, 100)
)
);
await ret.promise;
await queue.stop();
Minimizing Memory Usage
push()
returns a new Promise each time it is called, thus consuming memory. Depending on your application, it may be necessary to limit calls to push()
when the queue is full if you are unable to control the number of calls to push().
For example, consider the following constraints:
- Up to 10 workers can execute at the same time
- When 10 workers are running, up to 50 tasks can call
push()
and immediately continue their work. Subsequent callers will wait until a worker has finished.
Although it appears that resources are properly constrained in this scenario, if push()
is called, say, 1,000 times a second, and the workers take longer than 1 second each, the process will likely run out of memory. One solution to this scenario is backpressure.
No form of backpressure is a silver bullet. External systems must handle errors and retry.
Code Sample
const queue = new (require('@goodware/task-queue'))({ size: 50, workers: 10 });
async function doWork() {
const me = Date.now();
console.log(`${me} begin`);
await new Promise((resolve) => setTimeout(resolve, 200));
console.log(`${me} end`);
}
for (let i = 1; i <= 100; ++i) {
if (queue.full) {
console.log('full');
await new Promise((resolve)=>setTimeout(resolve, 50));
}
else {
await queue.push(doWork);
console.log('queued');
}
}
await queue.stop();