Qyu
qyu
is a general-purpose asynchronous job queue for Node.js. It is flexible and easy to use, always accepting jobs and running them as fast as the concurrency settings dictate.
Qyu was meant for:
- large-scale web scrapers
- throttling external API calls
- restraining database bandwidth usage in long-running database background operations
- a lot more...
const Qyu = require('qyu');
(async () => {
const q = new Qyu({concurrency: 3});
q(myAsyncFunction);
q(myAsyncFunction, {priority: 2}, arg1, arg2 ));
let result = await q(myAsyncFunction);
setTimeout(() => {
for (let i=0; i<10; i++) {
q(myAsyncFunction);
}
}, 2000);
await q.whenEmpty();
})();
Features
- Always on and ready to receive additional tasks
- Concurrency setting
- Queue capacity
- Task priority
- Task timeout
- Pause/resume
- Supports streaming (in object mode) for memory-efficient data processing
Instance Config
Defaults:
new Qyu({
concurrency: 1,
capacity: Infinity,
rampUpTime: 0
});
concurrency:
Determines the maximum number of jobs allowed to be run concurrently.
(default: 1)
const q = new Qyu({concurrency: 2});
q(job1);
q(job2);
q(job3);
capacity:
Sets a limit on the job queue length, causing any additional job queuings to be immediately rejected with a specific "ERR_CAPACITY_FULL"
type of QyuError
.
(default: Infinity)
const q = new Qyu({capacity: 5});
for (let i=0; i<6; i++) {
q(job)
.then(result => console.log(`Job ${i} complete!`, result))
.catch(err => console.error(`job ${i} error`, err));
}
rampUpTime:
If specified a non-zero number, will delay the concurrency-ramping-up time of additional job executions, one by one, as the instance attempts to reach maximum configured concurrency.
Represents number of milliseconds.
(default: 0)
const q = new Qyu({
rampUpTime: 1000,
concurrency: 3
});
q(job1);
q(job2);
q(job3);
q(job4);
Queuing options
Defaults:
q(job, {
priority: 0,
timeout: null
});
priority:
Determines order in which queued jobs will run.
Can be any positive/negative integer/float number.
The greater the priority value, the ealier it will be called relative to other jobs.
Queuings having identical priority will be put one after another in the same order in which they were passed to the instance.
(default: 0)
timeout:
If is non-zero number, will dequeue jobs that waited in queue without running for that amount of time long (in milliseconds).
additionally, when a queued job reaches it's timeout, the promise it returned from it's queuing will reject with a "ERR_JOB_TIMEOUT"
type of QyuError
.
(default: null)
API
instance(fn
[, options
[, ...args
]])
(alias: instance#add)
Queues function fn
on instance with optional options
and args
.
args
will all be injected as arguments to fn
once called.
Returns: a promise that is tied to the jobs resolution or rejection value when it will be picked from queue and run.
const q = new Qyu({concurrency: 5});
q(job1);
q(job2, {priority: 2, timeout: 1000*10});
try {
const result = await q(job3);
console.log("Job 3's result:", result);
} catch (err) {
console.log('Job 3 errored:', err);
}
q((arg1, arg2) => {
}, null, arg1, arg2 );
instance(iterator
, mapperFn
[, options
])
(alias: instance#map)
For each iteration of iterator
, queues mapperFn
on instance, injected with the value and the index from that iteration.
Optional options
will be supplied the same for all job queuings included in this call.
const q = new Qyu({concurrency: 3});
const files = ['/path/to/file1.png', '/path/to/file2.png', '/path/to/file3.png', '/path/to/file4.png'];
q(files, async (file, i) => {
await fs.unlink(file);
});
instance#whenEmpty()
Returns: a promise that resolves if/when an instance has no running or queued jobs.
Guaranteed to resolve, regardless if one or some jobs resulted in error.
const q = new Qyu();
await q.whenEmpty();
instance#whenFree()
Returns: a promise that resolves if/when number of currently running jobs are below the concurrency limit.
Guaranteed to resolve, regardless if one or some jobs resulted in error.
const q = new Qyu();
await q.whenFree();
instance#pause()
Pauses instance's operation, so it effectively stops picking more jobs from queue.
Jobs currently running at time instance.pause()
was called keep running until finished.
const q = new Qyu({concurrency: 2});
q(job1); q(job2); q(job3);
await q.pause();
q(job4);
instance#resume()
Resumes instance's operation after a previous call to instance.pause()
.
An instance is in "resumed" mode by default when instantiated.
const q = new Qyu();
q.pause();
q.resume();
instance#empty()
Immediately empties the instance's entire queue from all queued jobs.
Jobs currently running at the time instance.empty()
was called keep running until finished.
const q = new Qyu({concurrency: 1});
q(job1); q(job2);
q.empty();
Examples
Web Scraper:
const
Qyu = require('qyu'),
axios = require('axios'),
cheerio = require('cheerio');
(async () => {
const siteUrl = 'http://www.store-to-crawl.com/products';
const q = new Qyu({concurrency: 3});
for (let i=1; i<=10; i++) {
q(async () => {
let resp = await axios(siteUrl+'?page='+i);
let $ = cheerio.load(resp.data);
let products = [];
$('.product-list .product').each((i, elem) => {
let $elem = $(elem);
let title = $elem.find('.title').text();
let price = $elem.find('.price').text();
products.push({ title, price });
});
});
}
await q.whenEmpty();
})();