Qyu
qyu
is a general-purpose asynchronous job queue for Node.js. It is flexible and easy to use, always accepting jobs and running them as fast as the concurrency settings dictate.
Qyu was meant for:
- large-scale web scrapers
- throttling external API calls
- restraining database bandwidth usage in long-running database background operations
- a lot more...
const Qyu = require('qyu');
(async () => {
const q = new Qyu({concurrency: 3});
q(myAsyncFunction);
q(myAsyncFunction, {priority: 2}, arg1, arg2 ));
let result = await q(myAsyncFunction);
setTimeout(() => {
for (let i=0; i<10; i++) {
q(myAsyncFunction);
}
}, 2000);
await q.whenEmpty();
})();
Features
- Always on and ready to receive additional tasks
- Concurrency setting
- Queue capacity
- Task priority
- Task timeout
- Pause/resume
- Supports streaming (in object mode) for memory-efficient data processing
API
instance(fn
[, options
[, ...args
]])
(alias: instance#add)
Queues function fn
on instance with optional options
and args
.
args
will all be injected as arguments to fn
once called.
Returns: a promise that is tied to the jobs resolution or rejection value when it will be picked from queue and run.
const q = new Qyu({concurrency: 5});
q(job1);
q(job2, {priority: 2, timeout: 1000*10});
try {
const result = await q(job3);
console.log("Job 3's result:", result);
} catch (err) {
console.log('Job 3 errored:', err);
}
q((arg1, arg2) => {
}, null, arg1, arg2 );
instance(iterator
, mapperFn
[, options
])
(alias: instance#map)
For each iteration of iterator
, queues mapperFn
on instance, injected with the value and the index from that iteration.
Optional options
will be supplied the same for all job queuings included in this call.
const q = new Qyu({concurrency: 3});
const files = ['/path/to/file1.png', '/path/to/file2.png', '/path/to/file3.png', '/path/to/file4.png'];
q(files, async (file, i) => {
await fs.unlink(file);
});
instance#whenEmpty()
Returns: a promise that resolves if/when an instance has no running or queued jobs.
Guaranteed to resolve, regardless if one or some jobs resulted in error.
const q = new Qyu();
await q.whenEmpty();
instance#whenFree()
Returns: a promise that resolves if/when number of currently running jobs are below the concurrency limit.
Guaranteed to resolve, regardless if one or some jobs resulted in error.
const q = new Qyu();
await q.whenFree();
instance#pause()
Pauses instance's operation, so it effectively stops picking more jobs from queue.
Jobs currently running at time instance.pause()
was called keep running until finished.
const q = new Qyu({concurrency: 2});
q(job1); q(job2); q(job3);
await q.pause();
q(job4);
instance#resume()
Resumes instance's operation after a previous call to instance.pause()
.
An instance is in "resumed" mode by default when instantiated.
const q = new Qyu();
q.pause();
q.resume();
instance#empty()
Immediately empties the instance's entire queue from all queued jobs.
Jobs currently running at the time instance.empty()
was called keep running until finished.
const q = new Qyu({concurrency: 1});
q(job1); q(job2);
q.empty();
Examples
Web Scraper:
const
Qyu = require('qyu'),
axios = require('axios'),
cheerio = require('cheerio');
(async () => {
const siteUrl = 'http://www.store-to-crawl.com/products';
const q = new Qyu({concurrency: 3});
for (let i=1; i<=10; i++) {
q(async () => {
let resp = await axios(siteUrl+'?page='+i);
let $ = cheerio.load(resp.data);
let products = [];
$('.product-list .product').each((i, elem) => {
let $elem = $(elem);
let title = $elem.find('.title').text();
let price = $elem.find('.price').text();
products.push({ title, price });
});
});
}
await q.whenEmpty();
})();