New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

qyu

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

qyu - npm Package Compare versions

Comparing version 0.1.2 to 0.1.6

test/qyu.test.js

9

package.json
{
"name": "qyu",
"version": "0.1.2",
"version": "0.1.6",
"description": "A general-purpose asynchronous job queue for Node.js",

@@ -20,11 +20,14 @@ "keywords": [

"type": "git",
"url": "https://github.com/shtaif/funky-queue"
"url": "https://github.com/shtaif/qyu"
},
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"dev": "jest --watchAll --verbose false"
},
"dependencies": {
"bluebird": "^3.5.1"
},
"devDependencies": {
"jest": "^23.5.0"
}
}

@@ -1,1 +0,86 @@

Undergoing development...
# Qyu
### Usage:
```javascript
const Qyu = require('qyu');
(async () => {
let q = new Qyu({concurrency: 3});
// Basic:
q.add(myAsyncFunction);
// Extra options:
q.add({priority: 7}, myAsyncFunction, arg1, arg2/*...*/));
// Doesn't matter if more jobs come around later,
// Qyu will queue them as necessary and optimally manage them all
// for you based on your concurrency setting
// as necessary:
setTimeout(() => {
for (let i=0; i<10; i++) {
q.add(myAsyncFunction);
}
}, 2000);
await q.whenEmpty(); // When all tasks finished and queue is empty...
})();
```
`qyu` is a general-purpose asynchronous job queue for Node.js. It's flexible and easy to use, and always accepting jobs and running them as fast as the concurrency settings dictates.
Qyu was meant for:
- large-scale web scrapers
- throttling external API calls
- restraining database bandwidth usage in long-running database background operations
- a lot more...
# Features
- Always on and ready to receive additional tasks
- Concurrency setting
- Queue capacity
- Task priority
- Task timeout
- Pause/resume
- Supports streaming (in object mode) for memory-efficient data processing
# API
...
# Scraper Example:
```javascript
const
Qyu = require('qyu'),
axios = require('axios'),
cheerio = require('cheerio');
(async () => {
let siteUrl = 'http://www.store-to-crawl.com/products';
let q = new Qyu({concurrency: 3});
for (let i=1; i<=10; i++) {
q.add(async () => {
let { data: html } = await axios(siteUrl+'?page='+i);
let $ = cheerio.load(html);
let products = []
$('.prod-list .product').each((i, elem) => {
let $elem = $(elem);
let title = $elem.find('.title').text();
let price = $elem.find('.price').text();
products.push({ title, price });
});
// Do something with products...
});
}
await q.whenEmpty();
// All done!
})();
```

@@ -8,3 +8,11 @@ const

const noop = v => v;
// To avoid "Unhandled promise rejections":
const guardUnhandledPromiseRejections = jobObject => {
jobObject.deferred.promise.catch(noop);
};
class Qyu {

@@ -22,2 +30,7 @@ constructor(opts={}, job=null, jobOpts={}) {

this.whenFreeDeferred.resolve();
this.opts = {
concurrency: 1,
capacity: Infinity,
rampUpTime: 0,
};
this.set(opts);

@@ -31,16 +44,3 @@

set(newOpts) {
newOpts = Object.assign({
concurrency: 1,
capacity: Infinity,
rampUpTime: 0,
}, newOpts);
this.opts = Object.assign(this.opts || {}, newOpts);
if (this.opts.rampUpTime) {
this.getRampUpPromise = () => Promise.delay(this.opts.rampUpTime);
} else {
var rampUp = Promise.resolve();
this.getRampUpPromise = () => rampUp;
}
Object.assign(this.opts, newOpts);
}

@@ -68,2 +68,3 @@

current.deferred.reject(err);
guardUnhandledPromiseRejections(current);
}

@@ -91,3 +92,5 @@ }

this.runJobChannel();
await this.getRampUpPromise();
if (this.opts.rampUpTime) {
await new Promise(resolve => setTimeout(resolve, this.opts.rampUpTime));
}
}

@@ -106,3 +109,3 @@ this.isRunningJobChannels = false;

var jobObjects = [];
let jobObjects = [];
for (let inputJob of (inputJobs instanceof Function? [inputJobs] : inputJobs)) {

@@ -117,3 +120,3 @@ jobObjects.push({

var freeSlots = this.opts.capacity - this.jobObjects.length;
let freeSlots = this.opts.capacity - this.jobObjects.length;

@@ -124,5 +127,9 @@ for (let i=freeSlots; i<jobObjects.length; ++i) {

);
guardUnhandledPromiseRejections(jobObjects[i]);
}
for (var i = 0; i < this.jobObjects.length && opts.priority < this.jobObjects[i].opts.priority; ++i) {}
let i = 0;
while (
i < this.jobObjects.length && opts.priority <= this.jobObjects[i].opts.priority
) { ++i };
this.jobObjects.splice(i, 0, ...jobObjects.slice(0, freeSlots));

@@ -132,9 +139,10 @@

for (let jobObject of jobObjects) {
jobObject.timeoutId = setTimeout(jobObject => {
jobObject.timeoutId = setTimeout(() => {
this.dequeue(jobObject.deferred.promise);
jobObject.timeoutId = null;
jobObject.deferred.reject(
new QyuError('ERR_JOB_TIMEOUT', "Job cancelled due to timeout")
);
this.timeoutId = null;
}, opts.timeout, jobObject);
guardUnhandledPromiseRejections(jobObject);
}, opts.timeout);
}

@@ -141,0 +149,0 @@ }

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc