Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

qyu

Package Overview
Dependencies
Maintainers
1
Versions
21
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

qyu - npm Package Compare versions

Comparing version 0.4.7 to 0.5.0

CHANGELOG.md

71

package.json
{
"name": "qyu",
"version": "0.4.7",
"description": "A general-purpose asynchronous job queue for Node.js",
"keywords": [
"queue",
"job",
"promise",
"async",
"throttle",
"async pool"
],
"author": "Dor Shtaif <1dorshtief@gmail.com>",
"license": "ISC",
"engines": {
"node": ">=7.6.0"
},
"repository": {
"type": "git",
"url": "https://github.com/shtaif/qyu"
},
"main": "index.js",
"scripts": {
"dev": "jest --watchAll --verbose false"
},
"dependencies": {},
"devDependencies": {
"jest": "^23.5.0"
}
"name": "qyu",
"version": "0.5.0",
"description": "A general-purpose asynchronous job queue for Node.js",
"keywords": [
"queue",
"job",
"promise",
"async",
"throttle",
"async pool"
],
"author": "Dor Shtaif <dorshtaif@gmail.com>",
"license": "ISC",
"repository": {
"type": "git",
"url": "https://github.com/shtaif/qyu"
},
"main": "./src/index.js",
"engines": {
"node": ">=7.6.0"
},
"scripts": {
"dev": "mocha --watch",
"test": "mocha"
},
"files": [
"**/*.js"
],
"dependencies": {
"eslint": "^7.25.0",
"mocha": "^9.2.0"
},
"devDependencies": {
"chai": "^4.3.6",
"chai-as-promised": "^7.1.1",
"chai-subset": "^1.6.0",
"eslint-config-prettier": "^8.3.0",
"eslint-config-standard": "^16.0.3",
"eslint-plugin-standard": "^5.0.0",
"prettier": "^2.5.1",
"sinon": "^13.0.1",
"standard-version": "^9.5.0"
}
}

@@ -10,4 +10,4 @@ # Qyu

```javascript
const Qyu = require('qyu');
```js
const { Qyu } = require('qyu');

@@ -17,11 +17,16 @@ (async () => {

async function performRequest(){ // Note that async functions always return a promise. Same could be accomplished with any "normal" function that returns a promise
const {data} = await axios('https://www.example.com');
//....
}
// Basic:
q(myAsyncFunction);
q(performRequest); // q expects a function that returns a promise
// Extra options:
q(myAsyncFunction, {priority: 2}, arg1, arg2 /*, ...*/));
q(performRequest, {priority: 2}, arg1, arg2 /*, ...*/);
// Returns promise (resolving or rejecting when job is eventually picked from queue
// and run with the same value it resolved or rejected with):
let result = await q(myAsyncFunction);
let result = await q(performRequest);

@@ -33,3 +38,3 @@ // No matter if more jobs come around later!

for (let i=0; i<10; i++) {
q(myAsyncFunction);
q(performRequest);
}

@@ -40,3 +45,2 @@ }, 2000);

})();
```

@@ -53,3 +57,2 @@

- Pause/resume
- Supports streaming (in object mode) for memory-efficient data processing

@@ -161,3 +164,3 @@

*(alias: instance#map)*
For each iteration of `iterator`, queues `mapperFn` on instance, injected with the value and the index from that iteration.
For each iteration of `iterator`(an array for example), queues `mapperFn` on instance, injected with the value and the index from that iteration.
Optional `options` will be supplied the same for all job queuings included in this call.

@@ -171,2 +174,4 @@ ```javascript

});
await q.whenEmpty()//Will be resolved when no queued jobs are left.
```

@@ -244,5 +249,5 @@

Web Scraper:
```javascript
```js
const
Qyu = require('qyu'),
{ Qyu } = require('qyu'),
axios = require('axios'),

@@ -249,0 +254,0 @@ cheerio = require('cheerio');

module.exports = class Deferred {
constructor() {
this.resolve = null;
this.reject = null;
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
constructor() {
this.resolve = null;
this.reject = null;
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
};
module.exports = class QyuError extends Error {
constructor(code, message) {
super(message);
this.code = code;
Error.captureStackTrace(this, this.constructor);
}
constructor(code, message) {
super(message);
this.code = code;
Error.captureStackTrace(this, this.constructor);
}
};
const QyuError = require('./qyu-error');
const { expect } = require('chai');
it('should extend the built-in Error', () => {
let err = new QyuError;
expect(err instanceof Error).toBe(true);
const err = new QyuError();
expect(err).to.be.instanceOf(Error);
});
it('should have a "code" property equal to the 1st constructor parameter passed to it', () => {
let err = new QyuError('CODE');
expect(err.code).toBe('CODE');
const err = new QyuError('CODE');
expect(err.code).to.equal('CODE');
});
it('should have a "message" property equal to the 2nd constructor parameter passed to it', () => {
let err = new QyuError('...', 'MESSAGE');
expect(err.message).toBe('MESSAGE');
const err = new QyuError('...', 'MESSAGE');
expect(err.message).to.equal('MESSAGE');
});

@@ -1,299 +0,263 @@

const
stream = require('stream'),
QyuError = require('./qyu-error'),
Deferred = require('./deferred');
const QyuError = require('./qyu-error');
const Deferred = require('./deferred');
const noop = v => v;
// To avoid "Unhandled promise rejections":
const guardUnhandledPromiseRejections = jobObject => {
return jobObject.deferred.promise.catch(noop);
return jobObject.deferred.promise.catch(noop);
};
const makeQyuProxy = q => {
return new Proxy(
function() {
if (arguments[0][Symbol.iterator] instanceof Function) {
return q.map(arguments[0], arguments[1], arguments[2]);
} else {
return q.add(...arguments);
}
},
{
get: (target, prop, receiver) => {
return q[prop];
},
set: (obj, prop, value) => {
q[prop] = value;
return true;
}
}
);
return new Proxy(
function () {
if (arguments[0][Symbol.iterator] instanceof Function) {
return q.map(arguments[0], arguments[1], arguments[2]);
} else {
return q.add(...arguments);
}
},
{
get: (target, prop, receiver) => {
return q[prop];
},
set: (obj, prop, value) => {
q[prop] = value;
return true;
},
}
);
};
class Qyu {
constructor(opts={}, jobFn=null, jobOpts={}) {
this.getRampUpPromise = null;
this.jobQueue = [];
this.jobChannels = [];
this.isAtMaxConcurrency = false;
this.isRunningJobChannels = false;
this.isPaused = false;
this.whenEmptyDeferred = new Deferred;
this.whenEmptyDeferred.resolve();
this.whenFreeDeferred = new Deferred;
this.whenFreeDeferred.resolve();
this.opts = {
concurrency: 1,
capacity: Infinity,
rampUpTime: 0,
...opts
};
constructor(opts = {}, jobFn = null, jobOpts = {}) {
this.getRampUpPromise = null;
this.jobQueue = [];
this.jobChannels = [];
this.isAtMaxConcurrency = false;
this.isRunningJobChannels = false;
this.isPaused = false;
this.whenEmptyDeferred = new Deferred();
this.whenEmptyDeferred.resolve();
this.whenFreeDeferred = new Deferred();
this.whenFreeDeferred.resolve();
this.opts = {
concurrency: 1,
capacity: Infinity,
rampUpTime: 0,
...opts,
};
if (jobFn) { // TODO: Add this feature in docs...
this.enqueue(jobFn, jobOpts);
}
return makeQyuProxy(this);
if (jobFn) {
// TODO: Add this feature in docs...
this.enqueue(jobFn, jobOpts);
}
return makeQyuProxy(this);
}
set(newOpts) {
let oldOpts = this.opts;
this.opts = { ...this.opts, ...newOpts };
set(newOpts) {
let oldOpts = this.opts;
this.opts = { ...this.opts, ...newOpts };
if (newOpts.concurrency && newOpts.concurrency > oldOpts.concurrency) {
this.runJobChannels();
}
if (newOpts.concurrency && newOpts.concurrency > oldOpts.concurrency) {
this.runJobChannels();
}
if (newOpts.capacity) {
while (this.jobQueue.length > newOpts.capacity) {
this.jobQueue.pop().deferred.reject(new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity"));
}
}
if (newOpts.capacity) {
while (this.jobQueue.length > newOpts.capacity) {
this.jobQueue
.pop()
.deferred.reject(
new QyuError(
'ERR_CAPACITY_FULL',
"Can't queue job, queue is at max capacity"
)
);
}
}
}
async runJobChannel() {
let job;
while (
!this.isPaused &&
this.jobChannels.length <= this.opts.concurrency &&
(job = this.jobQueue.shift())
) {
if (job.timeoutId) {
clearTimeout(job.timeoutId);
}
try {
let result = await job.fn.apply(this, job.opts.args);
job.deferred.resolve(result);
}
catch (err) {
job.deferred.reject(err);
guardUnhandledPromiseRejections(job);
}
}
async runJobChannel() {
let job;
while (
!this.isPaused &&
this.jobChannels.length <= this.opts.concurrency &&
(job = this.jobQueue.shift())
) {
if (job.timeoutId) {
clearTimeout(job.timeoutId);
}
try {
let result = await job.fn.apply(this, job.opts.args);
job.deferred.resolve(result);
} catch (err) {
job.deferred.reject(err);
guardUnhandledPromiseRejections(job);
}
}
}
async runJobChannels() {
if (!this.isRunningJobChannels) {
this.isRunningJobChannels = true;
async runJobChannels() {
if (!this.isRunningJobChannels) {
this.isRunningJobChannels = true;
while (
this.jobQueue.length &&
!this.isPaused &&
this.jobChannels.length < this.opts.concurrency
) {
(async () => {
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue
if (!this.jobChannels.length) {
this.whenEmptyDeferred = new Deferred();
}
while (
this.jobQueue.length &&
!this.isPaused &&
this.jobChannels.length < this.opts.concurrency
) {
(async () => {
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue
if (!this.jobChannels.length) {
this.whenEmptyDeferred = new Deferred;
}
if (this.jobChannels.length === this.opts.concurrency - 1) {
this.whenFreeDeferred = new Deferred();
}
if (this.jobChannels.length === this.opts.concurrency - 1) {
this.whenFreeDeferred = new Deferred;
}
let promise = this.runJobChannel();
this.jobChannels.push(promise);
await promise;
this.jobChannels.splice(this.jobChannels.indexOf(promise), 1);
let promise = this.runJobChannel();
this.jobChannels.push(promise);
await promise;
this.jobChannels.splice(this.jobChannels.indexOf(promise), 1);
if (this.jobChannels.length === this.opts.concurrency - 1) {
this.whenFreeDeferred.resolve();
}
if (this.jobChannels.length === this.opts.concurrency - 1) {
this.whenFreeDeferred.resolve();
}
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue
if (!this.jobChannels.length && !this.isPaused) {
this.whenEmptyDeferred.resolve();
}
})();
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue
if (!this.jobChannels.length && !this.isPaused) {
this.whenEmptyDeferred.resolve();
}
})();
if (this.opts.rampUpTime && this.jobChannels.length) {
await new Promise(resolve =>
setTimeout(resolve, this.opts.rampUpTime)
);
}
}
if (this.opts.rampUpTime && this.jobChannels.length) {
await new Promise(resolve => setTimeout(resolve, this.opts.rampUpTime));
}
}
this.isRunningJobChannels = false;
}
this.isRunningJobChannels = false;
}
}
enqueue(fn, opts = {}) {
let job = {
fn: fn,
opts: {
timeout: null,
priority: 0,
args: null,
...opts,
},
deferred: new Deferred(),
timeoutId: null,
};
enqueue(fn, opts={}) {
let job = {
fn: fn,
opts: {
timeout: null,
priority: 0,
args: null,
...opts
},
deferred: new Deferred,
timeoutId: null
};
if (this.jobQueue.length === this.opts.capacity) {
job.deferred.reject(new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity"));
guardUnhandledPromiseRejections(job);
return job.deferred.promise;
}
if (opts.timeout) {
job.timeoutId = setTimeout(() => {
this.dequeue(job.deferred.promise);
job.timeoutId = null;
job.deferred.reject(new QyuError('ERR_JOB_TIMEOUT', "Job cancelled due to timeout"));
guardUnhandledPromiseRejections(job);
}, opts.timeout);
}
let i = 0;
while (
i < this.jobQueue.length && job.opts.priority <= this.jobQueue[i].opts.priority
) { ++i };
this.jobQueue.splice(i, 0, job);
this.runJobChannels();
return job.deferred.promise;
if (this.jobQueue.length === this.opts.capacity) {
job.deferred.reject(
new QyuError(
'ERR_CAPACITY_FULL',
"Can't queue job, queue is at max capacity"
)
);
guardUnhandledPromiseRejections(job);
return job.deferred.promise;
}
dequeue(promise) {
for (let i=0; i<this.jobQueue.length; ++i) {
if (this.jobQueue[i].deferred.promise === promise) {
let splice = this.jobQueue.splice(i, 1);
return splice[0];
}
}
return false;
if (opts.timeout) {
job.timeoutId = setTimeout(() => {
this.dequeue(job.deferred.promise);
job.timeoutId = null;
job.deferred.reject(
new QyuError('ERR_JOB_TIMEOUT', 'Job cancelled due to timeout')
);
guardUnhandledPromiseRejections(job);
}, opts.timeout);
}
add() {
let fn = arguments[0];
let opts = arguments[1] instanceof Object ? arguments[1] : {args: null};
if (arguments.length > 2) {
opts.args = Array.prototype.slice.call(arguments, 2);
}
return this.enqueue(fn, opts);
let i = 0;
while (
i < this.jobQueue.length &&
job.opts.priority <= this.jobQueue[i].opts.priority
) {
++i;
}
this.jobQueue.splice(i, 0, job);
this.runJobChannels();
map(iterator, fn, opts) {
let counter = 0;
let promises = [];
for (let item of iterator) {
promises.push(
this.add(fn, opts, item, counter++)
);
}
return Promise.all(promises);
}
return job.deferred.promise;
}
pause() {
if (this.isPaused) {
return;
}
this.isPaused = true;
if (!this.jobQueue.length && !this.jobChannels.length) {
this.whenEmptyDeferred = new Deferred;
}
// TODO: return a promise that will resolve when current jobs that were already running will finish. Perhaps: return this.whenEmpty();
return Promise.all(this.jobChannels);
dequeue(promise) {
for (let i = 0; i < this.jobQueue.length; ++i) {
if (this.jobQueue[i].deferred.promise === promise) {
let splice = this.jobQueue.splice(i, 1);
return splice[0];
}
}
return false;
}
resume() {
if (!this.isPaused) {
return;
}
this.isPaused = false;
if (!this.jobQueue.length && !this.jobChannels.length) {
this.whenEmptyDeferred.resolve();
}
this.runJobChannels();
add() {
let fn = arguments[0];
let opts = arguments[1] instanceof Object ? arguments[1] : { args: null };
if (arguments.length > 2) {
opts.args = Array.prototype.slice.call(arguments, 2);
}
return this.enqueue(fn, opts);
}
empty() {
for (let job of this.jobQueue.splice(0)) {
job.deferred.reject(new QyuError('ERR_JOB_DEQUEUED', "Job was dequeued out of the queue"));
guardUnhandledPromiseRejections(job);
}
return Promise.all(this.jobChannels);
map(iterator, fn, opts) {
let counter = 0;
let promises = [];
for (let item of iterator) {
promises.push(this.add(fn, opts, item, counter++));
}
return Promise.all(promises);
}
whenEmpty() {
return this.whenEmptyDeferred.promise;
pause() {
if (this.isPaused) {
return;
}
this.isPaused = true;
if (!this.jobQueue.length && !this.jobChannels.length) {
this.whenEmptyDeferred = new Deferred();
}
// TODO: return a promise that will resolve when current jobs that were already running will finish. Perhaps: return this.whenEmpty();
return Promise.all(this.jobChannels);
}
whenFree() {
return this.whenFreeDeferred.promise;
resume() {
if (!this.isPaused) {
return;
}
this.isPaused = false;
if (!this.jobQueue.length && !this.jobChannels.length) {
this.whenEmptyDeferred.resolve();
}
this.runJobChannels();
}
writeStream(chunkObjTransformer=v=>v) {
let thisQueue = this;
return new stream.Writable({
objectMode: true,
highWaterMark: 0,
write(obj, encoding, cb) {
thisQueue.add(chunkObjTransformer, obj);
thisQueue.whenFree().then(cb);
},
final(cb) {
thisQueue.whenEmpty().then(cb);
}
});
empty() {
for (let job of this.jobQueue.splice(0)) {
job.deferred.reject(
new QyuError('ERR_JOB_DEQUEUED', 'Job was dequeued out of the queue')
);
guardUnhandledPromiseRejections(job);
}
return Promise.all(this.jobChannels);
}
whenEmpty() {
return this.whenEmptyDeferred.promise;
}
transformStream(chunkObjTransformer=v=>v) {
let thisQueue = this;
return new stream.Transform({
objectMode: true,
writableHighWaterMark: 0,
readableHighWaterMark: thisQueue.opts.capacity,
transform(obj, encoding, cb) {
let job = () => chunkObjTransformer(obj);
let jobResolved = jobResult => this.push(jobResult);
thisQueue.enqueue(job).then(jobResolved, jobResolved);
// thisQueue.add(chunkObjTransformer, obj);
thisQueue.whenFree().then(cb);
},
flush(cb) {
thisQueue.whenFree().then(cb);
}
});
}
whenFree() {
return this.whenFreeDeferred.promise;
}
}

@@ -300,0 +264,0 @@

@@ -1,422 +0,324 @@

const { Qyu, QyuError } = require('../index');
const { expect } = require('chai');
const sinon = require('sinon');
const { Qyu, QyuError } = require('.');
const { mockAsyncFn, delay, noop } = require('./testUtils');
describe('When A Qyu instance is invoked as a function', () => {
it('with a function as the first arg - should internally call the `add` method with the job and options and injecting any additional args passed into it', () => {
const q = new Qyu();
const jobOpts = {};
q.add = sinon.spy(); // `sinon.spy(q, 'add') doesn't work because `q` is a Proxy object
q(noop, jobOpts, 'a', 'b', 'c', 'd');
expect(q.add.firstCall.args).to.deep.equal([
noop,
jobOpts,
'a',
'b',
'c',
'd',
]);
});
const delay = async (time=1000) => {
await new Promise(resolve => setTimeout(resolve, time));
};
it('with an array as the first arg - should internally call the `map` method with the array, function and options args passed into it', () => {
const q = new Qyu();
const arr = [1, 2, 3];
const jobOpts = {};
q.map = sinon.spy(); // `sinon.spy(q, 'map') doesn't work because `q` is a Proxy object
q(arr, noop, jobOpts);
expect(q.map.firstCall.args).to.deep.equal([arr, noop, jobOpts]);
});
});
const mockAsync = async (result=true, time=25) => {
await delay(time);
return result;
};
describe('`add` method', () => {
it('calls the added functions immediately if currently running jobs are below the concurrency limit', () => {
const q = new Qyu({ concurrency: 2 });
const job1 = sinon.spy(mockAsyncFn);
const job2 = sinon.spy(mockAsyncFn);
q.add(job1);
q.add(job2);
expect(job1.calledOnce).to.be.true;
expect(job2.calledOnce).to.be.true;
});
const noop = val => val;
it('will not call added functions immediately if currently running jobs are at the concurrency limit', () => {
const q = new Qyu({ concurrency: 1 });
const job = sinon.spy();
q.add(mockAsyncFn);
q.add(job);
expect(job.notCalled).to.be.true;
});
const getPromiseStatus = async input => {
let wasInputArray = input instanceof Array;
it('will not call added functions if they exceed the capacity limit', () => {
const q = new Qyu({ concurrency: 1, capacity: 1 });
const job1 = sinon.spy(mockAsyncFn);
const job2 = sinon.spy(mockAsyncFn);
q.add(job1);
q.add(job2);
expect(job1.calledOnce).to.be.true;
expect(job2.notCalled).to.be.true;
});
if (!wasInputArray) {
input = [input];
}
it('will inject every 3rd and up additional arguments supplied to it to the job function itself', () => {
const q = new Qyu();
const job = sinon.spy();
q.add(job, {}, 'a', 'b', 'c', 'd');
expect(job.calledOnce).to.be.true;
expect(job.firstCall.args).to.deep.equal(['a', 'b', 'c', 'd']);
});
let statuses = [...input].fill('pending');
input.forEach(async (promise, i) => {
try {
await promise;
statuses[i] = 'resolved';
} catch (err) {
statuses[i] = 'rejected';
}
// TODO: This test sometimes seems to experience some timing glitches that makes it fail; refactor it to be more reliable
it('will delay in starting the next job queued, regardless of concurrency setting, by the specified amount of time if `rampUpTime` is more than zero', async () => {
const rampUpTime = 100;
const q = new Qyu({
concurrency: 3,
rampUpTime,
});
const job1 = sinon.spy(() => mockAsyncFn(undefined, 250));
const job2 = sinon.spy(() => mockAsyncFn(undefined, 250));
const job3 = sinon.spy(() => mockAsyncFn(undefined, 250));
q.add(job1);
q.add(job2);
q.add(job3);
expect(job1.calledOnce).to.be.true;
expect(job2.calledOnce).to.be.false;
expect(job3.calledOnce).to.be.false;
await delay(rampUpTime + 20);
expect(job2.calledOnce).to.be.true;
expect(job3.calledOnce).to.be.false;
await delay(rampUpTime + 20);
expect(job3.calledOnce).to.be.true;
});
await delay(0);
return wasInputArray ? statuses : statuses[0];
};
describe('When A Qyu instance is invoked as a function', () => {
it("with a function as the first arg - should internally call the `add` method with the job and options and injecting any addiontional args passed into it", () => {
const q = new Qyu();
const jobOpts = {};
const spy = jest.spyOn(q, 'add');
q(noop, jobOpts, 'a', 'b', 'c', 'd');
expect(spy).toHaveBeenCalledWith(noop, jobOpts, 'a', 'b', 'c', 'd');
describe('should return a', () => {
it('promise', () => {
const q = new Qyu({});
const promise = q.add(mockAsyncFn);
expect(promise instanceof Promise).to.be.true;
expect(promise).to.be.instanceof(Promise);
});
it("with an array as the first arg - should internally call the `map` method with the array, function and options args passed into it", () => {
const q = new Qyu();
const arr = [1, 2, 3];
const jobOpts = {};
const spy = jest.spyOn(q, 'map');
q(arr, noop, jobOpts);
expect(spy).toHaveBeenCalledWith(arr, noop, jobOpts);
});
});
it('rejects immediately with a QyuError of code "ERR_CAPACITY_FULL" if instance capacity is full', async () => {
const q = new Qyu({ capacity: 1 });
describe('`add` method', () => {
it('calls the added functions immediately if currently running jobs are below the concurrency limit', () => {
let q = new Qyu({concurrency: 2});
q.add(mockAsyncFn);
q.add(mockAsyncFn); // this queuing and the one above it fill the queue length up to 1 (the earlier was called immediately, and the current is then put in queue)
const promise = q.add(mockAsyncFn); // this is expected to reject since the current length of queue should be 1 at that point, which equals to the max capacity of 1
let job1 = jest.fn(mockAsync);
let job2 = jest.fn(mockAsync);
q.add(job1);
q.add(job2);
expect(job1).toHaveBeenCalled();
expect(job2).toHaveBeenCalled();
const err = await expect(promise).to.be.rejected;
expect(err).to.be.instanceof(QyuError);
expect(err.code).to.equal('ERR_CAPACITY_FULL');
});
it('will not call added functions immediately if currently running jobs are at the concurrency limit', () => {
let q = new Qyu({concurrency: 1});
let job = jest.fn();
it('that resolves only after the actual job is resolved', async () => {
const q = new Qyu({});
let done = false;
q.add(mockAsync);
q.add(job);
const promise = q.add(async () => {
await mockAsyncFn();
done = true;
});
expect(job).not.toHaveBeenCalled();
await expect(promise).to.be.fulfilled;
expect(done).to.be.true;
});
it('will not call added functions if they exceed the capacity limit', () => {
let q = new Qyu({concurrency: 1, capacity: 1});
let job1 = jest.fn(mockAsync);
let job2 = jest.fn(mockAsync);
it('and rejects only after the actual job is rejected', async () => {
const q = new Qyu({});
let done = false;
q.add(job1);
q.add(job2);
const promise = q.add(async () => {
await mockAsyncFn();
done = true;
throw new Error();
});
expect(job1).toHaveBeenCalled();
expect(job2).not.toHaveBeenCalled();
await expect(promise).to.be.rejected;
expect(done).to.be.true;
});
it('will inject every 3rd and up additional arguments supplied to it to the job function itself', () => {
let q = new Qyu();
let job = jest.fn();
q.add(job, {}, 'a', 'b' ,'c', 'd');
expect(job).toHaveBeenCalledWith('a', 'b', 'c', 'd');
it('with the value the job resolved with', async () => {
const q = new Qyu({});
const value = await q.add(() => mockAsyncFn('THE_VALUE'));
expect(value).to.equal('THE_VALUE');
});
// TODO: This test sometimes seems to experience some timing glitches that makes it fail; refactor it to be more reliable
it('will delay in starting the next job queued, regardless of concurrency setting, by the specified amount of time if `rampUpTime` is more than zero', async () => {
let rampUpTime = 100;
let q = new Qyu({ concurrency: 3, rampUpTime });
let started = 0;
q.add(async () => {
started++;
await mockAsync(true, 250);
});
q.add(async () => {
started++;
await mockAsync(true, 250);
});
q.add(async () => {
started++;
await mockAsync(true, 250);
});
await Promise.all([
(() => {
expect(started).toBe(1);
})(),
(async () => {
await delay(rampUpTime + 20);
expect(started).toBe(2);
})(),
(async () => {
await delay(rampUpTime * 2 + 20);
expect(started).toBe(3);
})()
]);
it('or with the value the job rejected with', async () => {
const q = new Qyu({});
const promise = q.add(async () => {
throw await mockAsyncFn('THE_VALUE');
});
await expect(promise).to.eventually.be.rejected.and.equal('THE_VALUE');
});
describe('should return a', () => {
it('promise', () => {
let q = new Qyu({});
let promise = q.add(() => mockAsync());
expect(promise instanceof Promise).toBe(true);
});
it('rejects immediately with a QyuError of code "ERR_CAPACITY_FULL" if instance capacity is full', async () => {
let q = new Qyu({capacity: 1});
q.add(mockAsync);
q.add(mockAsync); // this queuing and the one above it fill the queue length up to 1 (the earlier was called immediately, and the current is then put in queue)
let promise = q.add(() => {}); // this is expected to reject since the current length of queue should be 1 at that point, which equals to the max capacity of 1
expect(await getPromiseStatus(promise)).toBe('rejected');
try { await promise; }
catch (err) {
expect(err instanceof QyuError).toBe(true);
expect(err.code).toBe('ERR_CAPACITY_FULL');
}
});
it('that resolves only after the actual job is resolved', async () => {
let done = false;
let q = new Qyu({});
await q.add(async () => {
await mockAsync();
done = true;
});
expect(done).toBe(true);
});
it('and rejects only after the actual job is rejected', async () => {
let done = false;
let q = new Qyu({});
try {
await q.add(async () => {
await mockAsync();
done = true;
throw new Error;
});
} catch (err) {
expect(done).toBe(true);
}
});
it('with the value the job resolved with', async () => {
let q = new Qyu({});
let value = await q.add(() => mockAsync('THE_VALUE'));
expect(value).toBe('THE_VALUE');
});
it('or with the value the job rejected with', async () => {
let q = new Qyu({});
let value;
try {
await q.add(async () => {
throw await mockAsync('THE_VALUE');
});
} catch (thrown) {
value = thrown;
}
expect(value).toBe('THE_VALUE');
});
});
});
});
describe('`map` method', () => {
it("invokes the function in the second argument for each item in the first argument array with two arguments in itself: the item and it's index", () => {
let q = new Qyu({concurrency: 3});
let items = ['A', 'B', 'C'];
let fn = jest.fn();
q.map(items, fn);
expect(fn).toHaveBeenCalledTimes(3);
expect(fn).toHaveBeenCalledWith('C', 2);
});
it("invokes the function in the second argument for each item in the first argument array with two arguments in itself: the item and it's index", () => {
const q = new Qyu({ concurrency: 3 });
const items = ['A', 'B', 'C'];
const fn = sinon.spy();
q.map(items, fn);
expect(fn.args).to.deep.equal([
['A', 0],
['B', 1],
['C', 2],
]);
});
});
describe('`whenEmpty` method', () => {
it('should return a promise', () => {
let q = new Qyu({});
expect(q.whenEmpty() instanceof Promise).toBe(true);
});
it('should return a promise', () => {
const q = new Qyu({});
expect(q.whenEmpty()).to.be.instanceOf(Promise);
});
it('that resolves once a Qyu instance has no running or queued jobs, regardless if some jobs ended up rejecting', async () => {
let concurrency = 2;
let numToRun = 3;
let startedCount = 0;
let finishedCount = 0;
it('that resolves once a Qyu instance has no running or queued jobs, regardless if some jobs ended up fulfilled or rejected', async () => {
const q = new Qyu({ concurrency: 2 });
let finishedCount = 0;
let q = new Qyu({ concurrency });
q.add(async () => {
await mockAsyncFn();
finishedCount++;
});
q.add(async () => {
await mockAsyncFn();
finishedCount++;
throw new Error();
});
q.add(async () => {
await mockAsyncFn();
finishedCount++;
});
for (let i=0; i<numToRun; ++i) {
q.add(async () => {
startedCount++;
await mockAsync();
finishedCount++;
if (i % 2 === 1) { // If `i` is odd
throw new Error('some_simulated_failure_error');
}
});
}
await q.whenEmpty();
await q.whenEmpty();
expect(startedCount).toBe(numToRun);
expect(finishedCount).toBe(numToRun);
});
expect(finishedCount).to.equal(3);
});
});
describe('`empty` method', () => {
it('should reject all queued jobs with a QyuError of code "ERR_JOB_DEQUEUED" and not call them', async () => {
let q = new Qyu({concurrency: 1});
let fn = jest.fn(mockAsync);
it('should reject all queued jobs with a QyuError of code "ERR_JOB_DEQUEUED" and not call them', async () => {
const q = new Qyu({ concurrency: 1 });
const fn = sinon.spy(mockAsyncFn);
q.add(mockAsync);
let p1 = q.add(fn);
let p2 = q.add(fn);
const [, prom1, prom2] = [q.add(fn), q.add(fn), q.add(fn)];
q.empty();
q.empty();
expect(await getPromiseStatus([p1, p2])).toMatchObject(['rejected', 'rejected']);
await expect(prom1)
.to.be.eventually.rejected.and.be.instanceOf(QyuError)
.and.containSubset({ code: 'ERR_JOB_DEQUEUED' });
try { await p1; } catch (err) {
expect(err instanceof QyuError).toBe(true);
expect(err.code).toBe('ERR_JOB_DEQUEUED');
}
await expect(prom2)
.to.be.eventually.rejected.and.be.instanceOf(QyuError)
.and.containSubset({ code: 'ERR_JOB_DEQUEUED' });
try { await p2; } catch (err) {
expect(err instanceof QyuError).toBe(true);
expect(err.code).toBe('ERR_JOB_DEQUEUED');
}
expect(fn.calledOnce).to.be.true;
});
expect(fn).not.toHaveBeenCalled();
});
it('should return a promise that resolves once all active jobs at the time of calling are done', async () => {
let q = new Qyu({concurrency: 2});
let p1 = new Promise(resolve => {
q.add(async () => {
await mockAsync();
resolve();
});
});
let p2 = new Promise(resolve => {
q.add(async () => {
await mockAsync();
resolve();
});
});
let p3 = new Promise(resolve => {
q.add(async () => {
await mockAsync();
resolve();
});
});
await q.empty();
expect(await getPromiseStatus([p1, p2])).toMatchObject([
'resolved',
'resolved'
]);
});
it('should return a promise that resolves once all active jobs at the time of calling are done', async () => {
const q = new Qyu({ concurrency: 2 });
let jobsDoneCount = 0;
const job = async () => {
await mockAsyncFn();
jobsDoneCount++;
};
q.add(job);
q.add(job);
q.add(job);
await q.empty();
expect(jobsDoneCount).to.equal(2);
});
});
describe('`whenFree` method', () => {
it('should return a promise', () => {
let q = new Qyu({});
expect(q.whenFree() instanceof Promise).toBe(true);
});
it('should return a promise', () => {
const q = new Qyu({});
expect(q.whenFree()).to.be.instanceOf(Promise);
});
it('that resolves once currently running jobs get below the concurrency limit', async () => {
let concurrency = 2;
let numToRun = 3;
let startedCount = 0;
let finishedCount = 0;
let q = new Qyu({ concurrency });
for (let i=0; i<numToRun; ++i) {
q.add(async () => {
startedCount++;
await mockAsync();
finishedCount++;
});
}
await q.whenFree();
expect(startedCount).toBe(numToRun);
expect(finishedCount).toBe(2);
});
it('that resolves once number of currently running jobs get below the concurrency limit', async () => {
let startedCount = 0;
let finishedCount = 0;
const q = new Qyu({ concurrency: 2 });
const job = async () => {
startedCount++;
await mockAsyncFn();
finishedCount++;
};
for (let i = 0; i < 3; ++i) {
q.add(job);
}
await q.whenFree();
expect(startedCount).to.equal(3);
expect(finishedCount).to.equal(2);
});
});
describe('The `timeout` option, when adding a task', () => {
it('should cancel a queued job if waits in queue more than the specified time', async () => {
let q = new Qyu({ concurrency: 1 });
let fn = jest.fn();
it('should cancel a queued job if waits in queue more than the specified time', async () => {
const q = new Qyu({ concurrency: 1 });
const fn = sinon.spy();
const promise = q.add(() => mockAsyncFn(undefined, 100));
q.add(fn, { timeout: 50 });
await promise;
expect(fn.notCalled).to.be.true;
});
let promise = new Promise(resolve => {
q.add(async () => {
await mockAsync(true, 100);
resolve();
});
});
it('if waits in queue more than the specified time, should make the promise of a job queueing reject with a QyuError of code "ERR_JOB_TIMEOUT"', async () => {
const q = new Qyu({ concurrency: 1 });
q.add(fn, {timeout: 50});
await promise;
await delay(0);
expect(fn).not.toHaveBeenCalled();
const promise = q.add(() => mockAsyncFn(undefined, 100));
const promiseWithTimeout = q.add(() => mockAsyncFn(undefined, 0), {
timeout: 50,
});
it('if waits in queue more than the specified time, should make the promise of a job queueing reject with a QyuError of code "ERR_JOB_TIMEOUT"', async () => {
let q = new Qyu({ concurrency: 1 });
q.add(() => mockAsync(true, 100));
try {
await q.add(() => {}, {timeout: 50});
}
catch (err) {
expect(err instanceof QyuError).toBe(true);
expect(err.code).toBe('ERR_JOB_TIMEOUT');
return;
}
throw new Error('Expected job to reject (due to timeout), instead it resolved');
});
await expect(promise).to.eventually.be.fulfilled;
await expect(promiseWithTimeout)
.to.eventually.be.rejected.and.be.instanceOf(QyuError)
.and.containSubset({ code: 'ERR_JOB_TIMEOUT' });
});
});
describe('The `priority` option, when adding a task', () => {
it('will default to 0', async () => {
let q = new Qyu({ concurrency: 1 });
let actualOrder = [];
let push = value => actualOrder.push(value);
it('will default to 0', async () => {
const q = new Qyu({ concurrency: 1 });
const actualOrder = [];
const push = value => actualOrder.push(value);
q.add(mockAsyncFn); // To increase the activity up to the max concurrency...
await Promise.all([
q.add(() => push('a'), { priority: -1 }),
q.add(() => push('b'), { priority: 1 }),
q.add(() => push('c'), {}),
]);
expect(actualOrder).to.deep.equal(['b', 'c', 'a']);
});
q.add(mockAsync); // To raise activity to max concurrency...
it('will queue jobs with the same priority by the order they were added', async () => {
const q = new Qyu({ concurrency: 1 });
const actualOrder = [];
const push = value => actualOrder.push(value);
q.add(mockAsyncFn); // To increase the activity up to the max concurrency...
await Promise.all([
q.add(() => push('a'), { priority: 0 }),
q.add(() => push('b'), { priority: 0 }),
q.add(() => push('c'), { priority: 0 }),
q.add(() => push('d'), { priority: 0 }),
]);
expect(actualOrder).to.deep.equal(['a', 'b', 'c', 'd']);
});
await Promise.all([
new Promise(resolve => q.add(() => { push('a'); resolve(); }, {priority: -1})),
new Promise(resolve => q.add(() => { push('b'); resolve(); }, {priority: 1})),
new Promise(resolve => q.add(() => { push('c'); resolve(); }, {}))
]);
expect(actualOrder).toMatchObject(['b', 'c', 'a']);
});
it('will queue jobs with the same priority by the order they were added', async () => {
let q = new Qyu({ concurrency: 1 });
let actualOrder = [];
let push = value => actualOrder.push(value);
q.add(mockAsync); // To raise activity to max concurrency...
await Promise.all([
new Promise(resolve => q.add(() => { push('a'); resolve(); }, {priority: 0})),
new Promise(resolve => q.add(() => { push('b'); resolve(); }, {priority: 0})),
new Promise(resolve => q.add(() => { push('c'); resolve(); }, {priority: 0})),
new Promise(resolve => q.add(() => { push('d'); resolve(); }, {priority: 0}))
]);
expect(actualOrder).toMatchObject(['a', 'b', 'c' ,'d']);
});
it('if currently running jobs are at the concurrency limit, queue a job AFTER jobs with more or equal priority, and BEFORE other jobs that have less priority if any', async () => {
let q = new Qyu({ concurrency: 1 });
let actualOrder = [];
let push = value => actualOrder.push(value);
q.add(mockAsync); // To raise activity to max concurrency...
await Promise.all([
new Promise(resolve => q.add(() => { push('b'); resolve(); }, {priority: 2})),
new Promise(resolve => q.add(() => { push('a'); resolve(); }, {priority: 3})),
new Promise(resolve => q.add(() => { push('d'); resolve(); }, {priority: 1})),
new Promise(resolve => q.add(() => { push('c'); resolve(); }, {priority: 2}))
]);
expect(actualOrder).toMatchObject(['a', 'b', 'c' ,'d']);
});
it('if currently running jobs are at the concurrency limit, queue a job AFTER jobs with more or equal priority, and BEFORE other jobs that have less priority if any', async () => {
const q = new Qyu({ concurrency: 1 });
const actualOrder = [];
const push = value => actualOrder.push(value);
q.add(mockAsyncFn); // To increase the activity up to the max concurrency...
await Promise.all([
q.add(() => push('b'), { priority: 2 }),
q.add(() => push('a'), { priority: 3 }),
q.add(() => push('d'), { priority: 1 }),
q.add(() => push('c'), { priority: 2 }),
]);
expect(actualOrder).to.deep.equal(['a', 'b', 'c', 'd']);
});
});

@@ -1,127 +0,95 @@

const { Qyu, QyuError } = require('../index');
const { expect } = require('chai');
const sinon = require('sinon');
const { Qyu } = require('../src');
const { getPromiseStatus, mockAsyncFn } = require('../src/testUtils');
const delay = async (time=1000) => {
await new Promise(resolve => setTimeout(resolve, time));
};
const mockAsync = async (result=true, time=25) => {
await delay(time);
return result;
};
const noop = val => val;
const getPromiseStatus = async input => {
let wasInputArray = input instanceof Array;
if (!wasInputArray) {
input = [input];
}
let statuses = [...input].fill('pending');
input.forEach(async (promise, i) => {
try {
await promise;
statuses[i] = 'resolved';
} catch (err) {
statuses[i] = 'rejected';
}
});
await delay(0);
return wasInputArray ? statuses : statuses[0];
};
describe('The `pause`/`resume` methods', () => {
it('`pause` method should return a promise that is immediately resolved if no active jobs were at the time of calling `pause`', async () => {
let q = new Qyu;
let promise = q.pause();
expect(await getPromiseStatus(promise)).toBe('resolved');
});
it('`pause` method should return a promise that is immediately resolved if no active jobs were at the time of calling `pause`', async () => {
const q = new Qyu();
const promise = q.pause();
expect(await getPromiseStatus(promise)).to.equal('resolved');
});
it('`pause` method should return a promise that resolves when all active jobs are done', async () => {
let q = new Qyu;
let fn1 = jest.fn(mockAsync);
let fn2 = jest.fn(mockAsync);
it('`pause` method should return a promise that resolves when all active jobs are done', async () => {
const q = new Qyu();
const fn1 = sinon.spy(mockAsyncFn);
const fn2 = sinon.spy(mockAsyncFn);
let p1 = q.add(fn1);
let p2 = q.add(fn2);
const p1 = q.add(fn1);
const p2 = q.add(fn2);
await q.pause();
await q.pause();
expect(fn1).toHaveBeenCalled();
expect(fn2).not.toHaveBeenCalled();
expect(await getPromiseStatus([p1, p2])).toMatchObject(['resolved', 'pending']);
});
expect(fn1.called).to.be.true;
expect(fn2.notCalled).to.be.true;
expect(await getPromiseStatus([p1, p2])).to.deep.equal([
'resolved',
'pending',
]);
});
it('`whenEmpty` should return a pending promise if instance is paused when idle', async () => {
let q = new Qyu;
q.pause();
expect(await getPromiseStatus(q.whenEmpty())).toBe('pending');
});
it('`whenEmpty` should return a pending promise if instance is paused when idle', async () => {
const q = new Qyu();
q.pause();
expect(await getPromiseStatus(q.whenEmpty())).to.equal('pending');
});
it('`whenEmpty` should return a pending promise if instance is paused when it has active jobs / jobs in queue', async () => {
let q = new Qyu({concurrency: 1});
q.add(mockAsync);
q.add(mockAsync);
q.pause();
expect(await getPromiseStatus(q.whenEmpty())).toBe('pending');
});
it('`whenEmpty` should return a pending promise if instance is paused when it has active jobs / jobs in queue', async () => {
const q = new Qyu({ concurrency: 1 });
q.add(mockAsyncFn);
q.add(mockAsyncFn);
q.pause();
expect(await getPromiseStatus(q.whenEmpty())).to.equal('pending');
});
it('`whenEmpty` should return a resolved promise if instance is paused when idle, then immediately resumed', async () => {
let q = new Qyu;
q.pause();
q.resume();
expect(await getPromiseStatus(q.whenEmpty())).toBe('resolved');
});
it('`whenEmpty` should return a resolved promise if instance is paused when idle, then immediately resumed', async () => {
const q = new Qyu();
q.pause();
q.resume();
expect(await getPromiseStatus(q.whenEmpty())).to.equal('resolved');
});
});
describe('The `set` method', () => {
describe('if passed a new concurrency value', () => {
it('if new value is bigger than previous, will immediately call more jobs as much as the difference from previous value', async () => {
let q = new Qyu({concurrency: 1});
let fn1 = jest.fn(mockAsync);
let fn2 = jest.fn(mockAsync);
let fn3 = jest.fn(mockAsync);
let fn4 = jest.fn(mockAsync);
q.add(fn1);
q.add(fn2);
q.add(fn3);
q.add(fn4);
q.set({concurrency: 3});
expect(fn1).toHaveBeenCalled();
expect(fn2).toHaveBeenCalled();
expect(fn3).toHaveBeenCalled();
expect(fn4).not.toHaveBeenCalled();
});
describe('if passed a new concurrency value', () => {
it('if new value is bigger than previous, will immediately call more jobs as much as the difference from previous value', async () => {
const q = new Qyu({ concurrency: 1 });
const fn1 = sinon.spy(mockAsyncFn);
const fn2 = sinon.spy(mockAsyncFn);
const fn3 = sinon.spy(mockAsyncFn);
const fn4 = sinon.spy(mockAsyncFn);
q.add(fn1);
q.add(fn2);
q.add(fn3);
q.add(fn4);
q.set({ concurrency: 3 });
expect(fn1.called).to.be.true;
expect(fn2.called).to.be.true;
expect(fn3.called).to.be.true;
expect(fn4.notCalled).to.be.true;
});
});
describe('if passed a new capacity value', () => {
it('if new value is lower than previous, will immediately reject the last jobs in queue as much as the difference from previous value', async () => {
let q = new Qyu({concurrency: 1, capacity: 4});
let fn1 = jest.fn(mockAsync);
let fn2 = jest.fn(mockAsync);
let fn3 = jest.fn(mockAsync);
let fn4 = jest.fn(mockAsync);
let fn5 = jest.fn(mockAsync);
let p1 = q.add(fn1);
let p2 = q.add(fn2);
let p3 = q.add(fn3);
let p4 = q.add(fn4);
let p5 = q.add(fn5);
q.set({capacity: 2});
expect(await getPromiseStatus([p2, p3, p4, p5])).toMatchObject(['pending', 'pending', 'rejected', 'rejected']);
});
describe('if passed a new capacity value', () => {
it('if new value is lower than previous, will immediately reject the last jobs in queue as much as the difference from previous value', async () => {
const q = new Qyu({ concurrency: 1, capacity: 4 });
const fn1 = sinon.spy(mockAsyncFn);
const fn2 = sinon.spy(mockAsyncFn);
const fn3 = sinon.spy(mockAsyncFn);
const fn4 = sinon.spy(mockAsyncFn);
const fn5 = sinon.spy(mockAsyncFn);
const p1 = q.add(fn1);
const p2 = q.add(fn2);
const p3 = q.add(fn3);
const p4 = q.add(fn4);
const p5 = q.add(fn5);
q.set({ capacity: 2 });
expect(await getPromiseStatus([p2, p3, p4, p5])).to.deep.equal([
'pending',
'pending',
'rejected',
'rejected',
]);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc