Qyu
qyu
is a general-purpose asynchronous job queue for the browser and Node.js. It is flexible and easy to use, always accepting jobs and running them as fast as the concurrency settings dictate.
Qyu was meant for:
- large-scale web scrapers
- throttling external API calls
- restraining database bandwidth usage in long-running database background operations
- a lot more...
const { Qyu } = require('qyu');
async function performRequest(){
const { data } = await axios('https://www.example.com');
}
(async () => {
const q = new Qyu({ concurrency: 3 });
q(performRequest);
q({
fn: performRequest,
priority: 2,
});
const result = await q(performRequest);
setTimeout(() => {
for (let i = 0; i < 10; i++) {
q(performRequest);
}
}, 2000);
await q.whenEmpty();
})();
Features
- Always on and ready to receive additional tasks
- Configurable concurrency limit
- Configurable queue capacity limit
- Configurable priority per individual tasks
- Configurable queue timeout duration per individual tasks
- Pause/resume queue execution
- Compatible for browsers as well as Node.js environments
- Written in TypeScript, full type definitions built-in
- Provides both CJS and ESM builds
Instance Config
Defaults:
new Qyu({
concurrency: 1,
capacity: Infinity,
rampUpTime: 0
});
concurrency
:
Determines the maximum number of jobs allowed to be executed concurrently.
(default: 1
)
const q = new Qyu({ concurrency: 2 });
q(job1);
q(job2);
q(job3);
capacity
:
Sets a limit on the job queue length, causing any additional job queuings to be immediately rejected with an instance of QyuError
with a code
property holding "ERR_CAPACITY_FULL"
.
(default: Infinity
)
const q = new Qyu({ capacity: 5 });
for (let i = 0; i < 6; i++) {
q(job)
.then(result => console.log(`Job ${i} complete!`, result))
.catch(err => console.error(`job ${i} error`, err));
}
rampUpTime:
If specified a non-zero number, will delay the concurrency-ramping-up time of additional job executions, one by one, as the instance attempts to reach maximum configured concurrency.
Represents number of milliseconds.
(default: 0
)
const q = new Qyu({
rampUpTime: 1000,
concurrency: 3
});
q(job1);
q(job2);
q(job3);
q(job4);
Queuing options
priority
:
Determines order in which queued jobs will be picked up for execution.
Can be any positive/negative integer/float number.
The greater the priority value, the earlier the job will be called in relation to other queued jobs.
Queued jobs having the same priority (or similarly having no explicit priority
provided) will get scheduled relative to one another by the very order in which they were passed on to the Qyu instance.
(default: 0
)
Example:
const q = new Qyu();
const fn1 = async () => {};
const fn2 = async () => {};
q([
{ fn: fn1 },
{ fn: fn2, priority: 2 },
]);
timeout
:
If is a non-zero positive number (representing milliseconds), the given job would be dequeued and prevented were it still be pending in queue at the time this duration expires.
Additionally, when a queued job reaches its timeout, the promise that was returned when it was initially queued would immediately become rejected with an instance of QyuError
with a code
property holding "ERR_JOB_TIMEOUT"
.
(default: undefined
)
Example:
const q = new Qyu({ concurrency: 1 });
q(async () => {});
q({
fn: async () => {},
timeout: 3000
});
API
instance(jobs
)
(alias: instance#add)
Queues up the given jobs
, which can be either a single "job", or an array of such for batch queuing.
Every job (whether given as singular or as an array) can either be a plain function or a "job object" with the following properties:
Returns:
If given a singular non-array input - returns a promise that fulfills with the resolution or rejection value the job will produce when it eventually gets picked up from the queue and executed. Example:
const q = new Qyu();
const myTaskFn = async () => {};
const result = await q(myTaskFn);
const result = await q({
fn: myTaskFn1,
priority: 1,
timeout: 3000,
});
If given an array input - returns a promise that resolves when each of the given jobs were resolved, with the value of an array containing each job's resolution value, ordered as were the jobs when originally given, or rejects as soon as any of the given jobs happens to reject, reflecting that job's rejection value (very similarly to the native Promise.all
behavior). Example:
const q = new Qyu();
const myTaskFn1 = async () => {};
const myTaskFn2 = async () => {};
const [result1, result2] = await q([myTaskFn1, myTaskFn2]);
const [result1, result2] = await q([
{
fn: myTaskFn1,
priority: 1,
timeout: 3000,
},
{
fn: myTaskFn2,
priority: 2,
timeout: 3000,
},
]);
instance#whenEmpty()
Returns: a promise that resolves if/when an instance has no running or queued jobs.
Guaranteed to resolve, regardless if one or some jobs resulted in error.
const q = new Qyu();
await q.whenEmpty();
instance#whenFree()
Returns: a promise that resolves if/when number of currently running jobs are below the concurrency limit.
Guaranteed to resolve, regardless if one or some jobs resulted in error.
const q = new Qyu();
await q.whenFree();
instance#pause()
Pauses instance's operation, so it effectively stops picking more jobs from queue.
Jobs currently running at time instance.pause()
was called keep running until finished.
const q = new Qyu({concurrency: 2});
q(job1); q(job2); q(job3);
await q.pause();
q(job4);
instance#resume()
Resumes instance's operation after a previous call to instance.pause()
.
An instance is in "resumed" mode by default when instantiated.
const q = new Qyu();
q.pause();
q.resume();
instance#empty()
Immediately empties the instance's queue from all queued jobs, rejecting the promises returned from their queuings with a "ERR_JOB_DEQUEUED"
type of QyuError
.
Jobs currently running at the time instance.empty()
was called keep running until finished.
const q = new Qyu({concurrency: 1});
q(job1); q(job2);
await q.empty();
instance#set(config
)
Update a living instance's config options in real time (concurrency
, capacity
and rampUpTime
).
Note these (expected) side effects:
- If new
concurrency
is specified and is greater than previous setting, new jobs will immediately be drawn and called from queue as much as the difference from previously set concurrency
up to the number of jobs that were held in queue at the time. - if new
concurrency
is specified and is lower then previous setting, will postpone calling additional jobs until enough active jobs finish and make the actual concurrency degrade to the new setting by itself. - if new
capacity
is specified and is lower than previous setting, will reject the last jobs in queue with a "ERR_CAPACITY_FULL"
type of QyuError
as much as the difference from the previously set capacity
.
const q = new Qyu({concurrency: 1, capacity: 3});
q(job1); q(job2); q(job3);
q.set({concurrency: 2, capacity: 2});
Examples
Throttled, concurrent file deletion:
const fs = require('fs/promises');
const { Qyu } = require('qyu');
const q = new Qyu({ concurrency: 3 });
const filesToDelete = [
'/path/to/file1.png',
'/path/to/file2.png',
'/path/to/file3.png'
'/path/to/file4.png'
];
const deletionJobs = filesToDelete.map(path => () => fs.unlink(path));
await q(deletionJobs);
Web Scraper:
const { Qyu } = require('qyu');
const axios = require('axios');
const cheerio = require('cheerio');
(async () => {
const siteUrl = 'http://www.store-to-crawl.com/products';
const q = new Qyu({ concurrency: 3 });
for (let i = 1; i <= 10; i++) {
q(async () => {
const resp = await axios(`${siteUrl}?page=${i}`);
const $ = cheerio.load(resp.data);
const products = [];
$('.product-list .product').each((i, elem) => {
const $elem = $(elem);
const title = $elem.find('.title').text();
const price = $elem.find('.price').text();
products.push({ title, price });
});
});
}
await q.whenEmpty();
})();