Comparing version 0.4.7 to 0.5.0
{ | ||
"name": "qyu", | ||
"version": "0.4.7", | ||
"description": "A general-purpose asynchronous job queue for Node.js", | ||
"keywords": [ | ||
"queue", | ||
"job", | ||
"promise", | ||
"async", | ||
"throttle", | ||
"async pool" | ||
], | ||
"author": "Dor Shtaif <1dorshtief@gmail.com>", | ||
"license": "ISC", | ||
"engines": { | ||
"node": ">=7.6.0" | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/shtaif/qyu" | ||
}, | ||
"main": "index.js", | ||
"scripts": { | ||
"dev": "jest --watchAll --verbose false" | ||
}, | ||
"dependencies": {}, | ||
"devDependencies": { | ||
"jest": "^23.5.0" | ||
} | ||
"name": "qyu", | ||
"version": "0.5.0", | ||
"description": "A general-purpose asynchronous job queue for Node.js", | ||
"keywords": [ | ||
"queue", | ||
"job", | ||
"promise", | ||
"async", | ||
"throttle", | ||
"async pool" | ||
], | ||
"author": "Dor Shtaif <dorshtaif@gmail.com>", | ||
"license": "ISC", | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/shtaif/qyu" | ||
}, | ||
"main": "./src/index.js", | ||
"engines": { | ||
"node": ">=7.6.0" | ||
}, | ||
"scripts": { | ||
"dev": "mocha --watch", | ||
"test": "mocha" | ||
}, | ||
"files": [ | ||
"**/*.js" | ||
], | ||
"dependencies": { | ||
"eslint": "^7.25.0", | ||
"mocha": "^9.2.0" | ||
}, | ||
"devDependencies": { | ||
"chai": "^4.3.6", | ||
"chai-as-promised": "^7.1.1", | ||
"chai-subset": "^1.6.0", | ||
"eslint-config-prettier": "^8.3.0", | ||
"eslint-config-standard": "^16.0.3", | ||
"eslint-plugin-standard": "^5.0.0", | ||
"prettier": "^2.5.1", | ||
"sinon": "^13.0.1", | ||
"standard-version": "^9.5.0" | ||
} | ||
} |
@@ -10,4 +10,4 @@ # Qyu | ||
```javascript | ||
const Qyu = require('qyu'); | ||
```js | ||
const { Qyu } = require('qyu'); | ||
@@ -17,11 +17,16 @@ (async () => { | ||
async function performRequest(){ // Note that async functions always return a promise. Same could be accomplished with any "normal" function that returns a promise | ||
const {data} = await axios('https://www.example.com'); | ||
//.... | ||
} | ||
// Basic: | ||
q(myAsyncFunction); | ||
q(performRequest); // q expects a function that returns a promise | ||
// Extra options: | ||
q(myAsyncFunction, {priority: 2}, arg1, arg2 /*, ...*/)); | ||
q(performRequest, {priority: 2}, arg1, arg2 /*, ...*/); | ||
// Returns promise (resolving or rejecting when job is eventually picked from queue | ||
// and run with the same value it resolved or rejected with): | ||
let result = await q(myAsyncFunction); | ||
let result = await q(performRequest); | ||
@@ -33,3 +38,3 @@ // No matter if more jobs come around later! | ||
for (let i=0; i<10; i++) { | ||
q(myAsyncFunction); | ||
q(performRequest); | ||
} | ||
@@ -40,3 +45,2 @@ }, 2000); | ||
})(); | ||
``` | ||
@@ -53,3 +57,2 @@ | ||
- Pause/resume | ||
- Supports streaming (in object mode) for memory-efficient data processing | ||
@@ -161,3 +164,3 @@ | ||
*(alias: instance#map)* | ||
For each iteration of `iterator`, queues `mapperFn` on instance, injected with the value and the index from that iteration. | ||
For each iteration of `iterator`(an array for example), queues `mapperFn` on instance, injected with the value and the index from that iteration. | ||
Optional `options` will be supplied the same for all job queuings included in this call. | ||
@@ -171,2 +174,4 @@ ```javascript | ||
}); | ||
await q.whenEmpty()//Will be resolved when no queued jobs are left. | ||
``` | ||
@@ -244,5 +249,5 @@ | ||
Web Scraper: | ||
```javascript | ||
```js | ||
const | ||
Qyu = require('qyu'), | ||
{ Qyu } = require('qyu'), | ||
axios = require('axios'), | ||
@@ -249,0 +254,0 @@ cheerio = require('cheerio'); |
module.exports = class Deferred { | ||
constructor() { | ||
this.resolve = null; | ||
this.reject = null; | ||
this.promise = new Promise((resolve, reject) => { | ||
this.resolve = resolve; | ||
this.reject = reject; | ||
}); | ||
} | ||
constructor() { | ||
this.resolve = null; | ||
this.reject = null; | ||
this.promise = new Promise((resolve, reject) => { | ||
this.resolve = resolve; | ||
this.reject = reject; | ||
}); | ||
} | ||
}; |
module.exports = class QyuError extends Error { | ||
constructor(code, message) { | ||
super(message); | ||
this.code = code; | ||
Error.captureStackTrace(this, this.constructor); | ||
} | ||
constructor(code, message) { | ||
super(message); | ||
this.code = code; | ||
Error.captureStackTrace(this, this.constructor); | ||
} | ||
}; |
const QyuError = require('./qyu-error'); | ||
const { expect } = require('chai'); | ||
it('should extend the built-in Error', () => { | ||
let err = new QyuError; | ||
expect(err instanceof Error).toBe(true); | ||
const err = new QyuError(); | ||
expect(err).to.be.instanceOf(Error); | ||
}); | ||
it('should have a "code" property equal to the 1st constructor parameter passed to it', () => { | ||
let err = new QyuError('CODE'); | ||
expect(err.code).toBe('CODE'); | ||
const err = new QyuError('CODE'); | ||
expect(err.code).to.equal('CODE'); | ||
}); | ||
it('should have a "message" property equal to the 2nd constructor parameter passed to it', () => { | ||
let err = new QyuError('...', 'MESSAGE'); | ||
expect(err.message).toBe('MESSAGE'); | ||
const err = new QyuError('...', 'MESSAGE'); | ||
expect(err.message).to.equal('MESSAGE'); | ||
}); |
456
src/qyu.js
@@ -1,299 +0,263 @@ | ||
const | ||
stream = require('stream'), | ||
QyuError = require('./qyu-error'), | ||
Deferred = require('./deferred'); | ||
const QyuError = require('./qyu-error'); | ||
const Deferred = require('./deferred'); | ||
const noop = v => v; | ||
// To avoid "Unhandled promise rejections": | ||
const guardUnhandledPromiseRejections = jobObject => { | ||
return jobObject.deferred.promise.catch(noop); | ||
return jobObject.deferred.promise.catch(noop); | ||
}; | ||
const makeQyuProxy = q => { | ||
return new Proxy( | ||
function() { | ||
if (arguments[0][Symbol.iterator] instanceof Function) { | ||
return q.map(arguments[0], arguments[1], arguments[2]); | ||
} else { | ||
return q.add(...arguments); | ||
} | ||
}, | ||
{ | ||
get: (target, prop, receiver) => { | ||
return q[prop]; | ||
}, | ||
set: (obj, prop, value) => { | ||
q[prop] = value; | ||
return true; | ||
} | ||
} | ||
); | ||
return new Proxy( | ||
function () { | ||
if (arguments[0][Symbol.iterator] instanceof Function) { | ||
return q.map(arguments[0], arguments[1], arguments[2]); | ||
} else { | ||
return q.add(...arguments); | ||
} | ||
}, | ||
{ | ||
get: (target, prop, receiver) => { | ||
return q[prop]; | ||
}, | ||
set: (obj, prop, value) => { | ||
q[prop] = value; | ||
return true; | ||
}, | ||
} | ||
); | ||
}; | ||
class Qyu { | ||
constructor(opts={}, jobFn=null, jobOpts={}) { | ||
this.getRampUpPromise = null; | ||
this.jobQueue = []; | ||
this.jobChannels = []; | ||
this.isAtMaxConcurrency = false; | ||
this.isRunningJobChannels = false; | ||
this.isPaused = false; | ||
this.whenEmptyDeferred = new Deferred; | ||
this.whenEmptyDeferred.resolve(); | ||
this.whenFreeDeferred = new Deferred; | ||
this.whenFreeDeferred.resolve(); | ||
this.opts = { | ||
concurrency: 1, | ||
capacity: Infinity, | ||
rampUpTime: 0, | ||
...opts | ||
}; | ||
constructor(opts = {}, jobFn = null, jobOpts = {}) { | ||
this.getRampUpPromise = null; | ||
this.jobQueue = []; | ||
this.jobChannels = []; | ||
this.isAtMaxConcurrency = false; | ||
this.isRunningJobChannels = false; | ||
this.isPaused = false; | ||
this.whenEmptyDeferred = new Deferred(); | ||
this.whenEmptyDeferred.resolve(); | ||
this.whenFreeDeferred = new Deferred(); | ||
this.whenFreeDeferred.resolve(); | ||
this.opts = { | ||
concurrency: 1, | ||
capacity: Infinity, | ||
rampUpTime: 0, | ||
...opts, | ||
}; | ||
if (jobFn) { // TODO: Add this feature in docs... | ||
this.enqueue(jobFn, jobOpts); | ||
} | ||
return makeQyuProxy(this); | ||
if (jobFn) { | ||
// TODO: Add this feature in docs... | ||
this.enqueue(jobFn, jobOpts); | ||
} | ||
return makeQyuProxy(this); | ||
} | ||
set(newOpts) { | ||
let oldOpts = this.opts; | ||
this.opts = { ...this.opts, ...newOpts }; | ||
set(newOpts) { | ||
let oldOpts = this.opts; | ||
this.opts = { ...this.opts, ...newOpts }; | ||
if (newOpts.concurrency && newOpts.concurrency > oldOpts.concurrency) { | ||
this.runJobChannels(); | ||
} | ||
if (newOpts.concurrency && newOpts.concurrency > oldOpts.concurrency) { | ||
this.runJobChannels(); | ||
} | ||
if (newOpts.capacity) { | ||
while (this.jobQueue.length > newOpts.capacity) { | ||
this.jobQueue.pop().deferred.reject(new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity")); | ||
} | ||
} | ||
if (newOpts.capacity) { | ||
while (this.jobQueue.length > newOpts.capacity) { | ||
this.jobQueue | ||
.pop() | ||
.deferred.reject( | ||
new QyuError( | ||
'ERR_CAPACITY_FULL', | ||
"Can't queue job, queue is at max capacity" | ||
) | ||
); | ||
} | ||
} | ||
} | ||
async runJobChannel() { | ||
let job; | ||
while ( | ||
!this.isPaused && | ||
this.jobChannels.length <= this.opts.concurrency && | ||
(job = this.jobQueue.shift()) | ||
) { | ||
if (job.timeoutId) { | ||
clearTimeout(job.timeoutId); | ||
} | ||
try { | ||
let result = await job.fn.apply(this, job.opts.args); | ||
job.deferred.resolve(result); | ||
} | ||
catch (err) { | ||
job.deferred.reject(err); | ||
guardUnhandledPromiseRejections(job); | ||
} | ||
} | ||
async runJobChannel() { | ||
let job; | ||
while ( | ||
!this.isPaused && | ||
this.jobChannels.length <= this.opts.concurrency && | ||
(job = this.jobQueue.shift()) | ||
) { | ||
if (job.timeoutId) { | ||
clearTimeout(job.timeoutId); | ||
} | ||
try { | ||
let result = await job.fn.apply(this, job.opts.args); | ||
job.deferred.resolve(result); | ||
} catch (err) { | ||
job.deferred.reject(err); | ||
guardUnhandledPromiseRejections(job); | ||
} | ||
} | ||
} | ||
async runJobChannels() { | ||
if (!this.isRunningJobChannels) { | ||
this.isRunningJobChannels = true; | ||
async runJobChannels() { | ||
if (!this.isRunningJobChannels) { | ||
this.isRunningJobChannels = true; | ||
while ( | ||
this.jobQueue.length && | ||
!this.isPaused && | ||
this.jobChannels.length < this.opts.concurrency | ||
) { | ||
(async () => { | ||
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue | ||
if (!this.jobChannels.length) { | ||
this.whenEmptyDeferred = new Deferred(); | ||
} | ||
while ( | ||
this.jobQueue.length && | ||
!this.isPaused && | ||
this.jobChannels.length < this.opts.concurrency | ||
) { | ||
(async () => { | ||
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue | ||
if (!this.jobChannels.length) { | ||
this.whenEmptyDeferred = new Deferred; | ||
} | ||
if (this.jobChannels.length === this.opts.concurrency - 1) { | ||
this.whenFreeDeferred = new Deferred(); | ||
} | ||
if (this.jobChannels.length === this.opts.concurrency - 1) { | ||
this.whenFreeDeferred = new Deferred; | ||
} | ||
let promise = this.runJobChannel(); | ||
this.jobChannels.push(promise); | ||
await promise; | ||
this.jobChannels.splice(this.jobChannels.indexOf(promise), 1); | ||
let promise = this.runJobChannel(); | ||
this.jobChannels.push(promise); | ||
await promise; | ||
this.jobChannels.splice(this.jobChannels.indexOf(promise), 1); | ||
if (this.jobChannels.length === this.opts.concurrency - 1) { | ||
this.whenFreeDeferred.resolve(); | ||
} | ||
if (this.jobChannels.length === this.opts.concurrency - 1) { | ||
this.whenFreeDeferred.resolve(); | ||
} | ||
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue | ||
if (!this.jobChannels.length && !this.isPaused) { | ||
this.whenEmptyDeferred.resolve(); | ||
} | ||
})(); | ||
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue | ||
if (!this.jobChannels.length && !this.isPaused) { | ||
this.whenEmptyDeferred.resolve(); | ||
} | ||
})(); | ||
if (this.opts.rampUpTime && this.jobChannels.length) { | ||
await new Promise(resolve => | ||
setTimeout(resolve, this.opts.rampUpTime) | ||
); | ||
} | ||
} | ||
if (this.opts.rampUpTime && this.jobChannels.length) { | ||
await new Promise(resolve => setTimeout(resolve, this.opts.rampUpTime)); | ||
} | ||
} | ||
this.isRunningJobChannels = false; | ||
} | ||
this.isRunningJobChannels = false; | ||
} | ||
} | ||
enqueue(fn, opts = {}) { | ||
let job = { | ||
fn: fn, | ||
opts: { | ||
timeout: null, | ||
priority: 0, | ||
args: null, | ||
...opts, | ||
}, | ||
deferred: new Deferred(), | ||
timeoutId: null, | ||
}; | ||
enqueue(fn, opts={}) { | ||
let job = { | ||
fn: fn, | ||
opts: { | ||
timeout: null, | ||
priority: 0, | ||
args: null, | ||
...opts | ||
}, | ||
deferred: new Deferred, | ||
timeoutId: null | ||
}; | ||
if (this.jobQueue.length === this.opts.capacity) { | ||
job.deferred.reject(new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity")); | ||
guardUnhandledPromiseRejections(job); | ||
return job.deferred.promise; | ||
} | ||
if (opts.timeout) { | ||
job.timeoutId = setTimeout(() => { | ||
this.dequeue(job.deferred.promise); | ||
job.timeoutId = null; | ||
job.deferred.reject(new QyuError('ERR_JOB_TIMEOUT', "Job cancelled due to timeout")); | ||
guardUnhandledPromiseRejections(job); | ||
}, opts.timeout); | ||
} | ||
let i = 0; | ||
while ( | ||
i < this.jobQueue.length && job.opts.priority <= this.jobQueue[i].opts.priority | ||
) { ++i }; | ||
this.jobQueue.splice(i, 0, job); | ||
this.runJobChannels(); | ||
return job.deferred.promise; | ||
if (this.jobQueue.length === this.opts.capacity) { | ||
job.deferred.reject( | ||
new QyuError( | ||
'ERR_CAPACITY_FULL', | ||
"Can't queue job, queue is at max capacity" | ||
) | ||
); | ||
guardUnhandledPromiseRejections(job); | ||
return job.deferred.promise; | ||
} | ||
dequeue(promise) { | ||
for (let i=0; i<this.jobQueue.length; ++i) { | ||
if (this.jobQueue[i].deferred.promise === promise) { | ||
let splice = this.jobQueue.splice(i, 1); | ||
return splice[0]; | ||
} | ||
} | ||
return false; | ||
if (opts.timeout) { | ||
job.timeoutId = setTimeout(() => { | ||
this.dequeue(job.deferred.promise); | ||
job.timeoutId = null; | ||
job.deferred.reject( | ||
new QyuError('ERR_JOB_TIMEOUT', 'Job cancelled due to timeout') | ||
); | ||
guardUnhandledPromiseRejections(job); | ||
}, opts.timeout); | ||
} | ||
add() { | ||
let fn = arguments[0]; | ||
let opts = arguments[1] instanceof Object ? arguments[1] : {args: null}; | ||
if (arguments.length > 2) { | ||
opts.args = Array.prototype.slice.call(arguments, 2); | ||
} | ||
return this.enqueue(fn, opts); | ||
let i = 0; | ||
while ( | ||
i < this.jobQueue.length && | ||
job.opts.priority <= this.jobQueue[i].opts.priority | ||
) { | ||
++i; | ||
} | ||
this.jobQueue.splice(i, 0, job); | ||
this.runJobChannels(); | ||
map(iterator, fn, opts) { | ||
let counter = 0; | ||
let promises = []; | ||
for (let item of iterator) { | ||
promises.push( | ||
this.add(fn, opts, item, counter++) | ||
); | ||
} | ||
return Promise.all(promises); | ||
} | ||
return job.deferred.promise; | ||
} | ||
pause() { | ||
if (this.isPaused) { | ||
return; | ||
} | ||
this.isPaused = true; | ||
if (!this.jobQueue.length && !this.jobChannels.length) { | ||
this.whenEmptyDeferred = new Deferred; | ||
} | ||
// TODO: return a promise that will resolve when current jobs that were already running will finish. Perhaps: return this.whenEmpty(); | ||
return Promise.all(this.jobChannels); | ||
dequeue(promise) { | ||
for (let i = 0; i < this.jobQueue.length; ++i) { | ||
if (this.jobQueue[i].deferred.promise === promise) { | ||
let splice = this.jobQueue.splice(i, 1); | ||
return splice[0]; | ||
} | ||
} | ||
return false; | ||
} | ||
resume() { | ||
if (!this.isPaused) { | ||
return; | ||
} | ||
this.isPaused = false; | ||
if (!this.jobQueue.length && !this.jobChannels.length) { | ||
this.whenEmptyDeferred.resolve(); | ||
} | ||
this.runJobChannels(); | ||
add() { | ||
let fn = arguments[0]; | ||
let opts = arguments[1] instanceof Object ? arguments[1] : { args: null }; | ||
if (arguments.length > 2) { | ||
opts.args = Array.prototype.slice.call(arguments, 2); | ||
} | ||
return this.enqueue(fn, opts); | ||
} | ||
empty() { | ||
for (let job of this.jobQueue.splice(0)) { | ||
job.deferred.reject(new QyuError('ERR_JOB_DEQUEUED', "Job was dequeued out of the queue")); | ||
guardUnhandledPromiseRejections(job); | ||
} | ||
return Promise.all(this.jobChannels); | ||
map(iterator, fn, opts) { | ||
let counter = 0; | ||
let promises = []; | ||
for (let item of iterator) { | ||
promises.push(this.add(fn, opts, item, counter++)); | ||
} | ||
return Promise.all(promises); | ||
} | ||
whenEmpty() { | ||
return this.whenEmptyDeferred.promise; | ||
pause() { | ||
if (this.isPaused) { | ||
return; | ||
} | ||
this.isPaused = true; | ||
if (!this.jobQueue.length && !this.jobChannels.length) { | ||
this.whenEmptyDeferred = new Deferred(); | ||
} | ||
// TODO: return a promise that will resolve when current jobs that were already running will finish. Perhaps: return this.whenEmpty(); | ||
return Promise.all(this.jobChannels); | ||
} | ||
whenFree() { | ||
return this.whenFreeDeferred.promise; | ||
resume() { | ||
if (!this.isPaused) { | ||
return; | ||
} | ||
this.isPaused = false; | ||
if (!this.jobQueue.length && !this.jobChannels.length) { | ||
this.whenEmptyDeferred.resolve(); | ||
} | ||
this.runJobChannels(); | ||
} | ||
writeStream(chunkObjTransformer=v=>v) { | ||
let thisQueue = this; | ||
return new stream.Writable({ | ||
objectMode: true, | ||
highWaterMark: 0, | ||
write(obj, encoding, cb) { | ||
thisQueue.add(chunkObjTransformer, obj); | ||
thisQueue.whenFree().then(cb); | ||
}, | ||
final(cb) { | ||
thisQueue.whenEmpty().then(cb); | ||
} | ||
}); | ||
empty() { | ||
for (let job of this.jobQueue.splice(0)) { | ||
job.deferred.reject( | ||
new QyuError('ERR_JOB_DEQUEUED', 'Job was dequeued out of the queue') | ||
); | ||
guardUnhandledPromiseRejections(job); | ||
} | ||
return Promise.all(this.jobChannels); | ||
} | ||
whenEmpty() { | ||
return this.whenEmptyDeferred.promise; | ||
} | ||
transformStream(chunkObjTransformer=v=>v) { | ||
let thisQueue = this; | ||
return new stream.Transform({ | ||
objectMode: true, | ||
writableHighWaterMark: 0, | ||
readableHighWaterMark: thisQueue.opts.capacity, | ||
transform(obj, encoding, cb) { | ||
let job = () => chunkObjTransformer(obj); | ||
let jobResolved = jobResult => this.push(jobResult); | ||
thisQueue.enqueue(job).then(jobResolved, jobResolved); | ||
// thisQueue.add(chunkObjTransformer, obj); | ||
thisQueue.whenFree().then(cb); | ||
}, | ||
flush(cb) { | ||
thisQueue.whenFree().then(cb); | ||
} | ||
}); | ||
} | ||
whenFree() { | ||
return this.whenFreeDeferred.promise; | ||
} | ||
} | ||
@@ -300,0 +264,0 @@ |
@@ -1,422 +0,324 @@ | ||
const { Qyu, QyuError } = require('../index'); | ||
const { expect } = require('chai'); | ||
const sinon = require('sinon'); | ||
const { Qyu, QyuError } = require('.'); | ||
const { mockAsyncFn, delay, noop } = require('./testUtils'); | ||
describe('When A Qyu instance is invoked as a function', () => { | ||
it('with a function as the first arg - should internally call the `add` method with the job and options and injecting any additional args passed into it', () => { | ||
const q = new Qyu(); | ||
const jobOpts = {}; | ||
q.add = sinon.spy(); // `sinon.spy(q, 'add') doesn't work because `q` is a Proxy object | ||
q(noop, jobOpts, 'a', 'b', 'c', 'd'); | ||
expect(q.add.firstCall.args).to.deep.equal([ | ||
noop, | ||
jobOpts, | ||
'a', | ||
'b', | ||
'c', | ||
'd', | ||
]); | ||
}); | ||
const delay = async (time=1000) => { | ||
await new Promise(resolve => setTimeout(resolve, time)); | ||
}; | ||
it('with an array as the first arg - should internally call the `map` method with the array, function and options args passed into it', () => { | ||
const q = new Qyu(); | ||
const arr = [1, 2, 3]; | ||
const jobOpts = {}; | ||
q.map = sinon.spy(); // `sinon.spy(q, 'map') doesn't work because `q` is a Proxy object | ||
q(arr, noop, jobOpts); | ||
expect(q.map.firstCall.args).to.deep.equal([arr, noop, jobOpts]); | ||
}); | ||
}); | ||
const mockAsync = async (result=true, time=25) => { | ||
await delay(time); | ||
return result; | ||
}; | ||
describe('`add` method', () => { | ||
it('calls the added functions immediately if currently running jobs are below the concurrency limit', () => { | ||
const q = new Qyu({ concurrency: 2 }); | ||
const job1 = sinon.spy(mockAsyncFn); | ||
const job2 = sinon.spy(mockAsyncFn); | ||
q.add(job1); | ||
q.add(job2); | ||
expect(job1.calledOnce).to.be.true; | ||
expect(job2.calledOnce).to.be.true; | ||
}); | ||
const noop = val => val; | ||
it('will not call added functions immediately if currently running jobs are at the concurrency limit', () => { | ||
const q = new Qyu({ concurrency: 1 }); | ||
const job = sinon.spy(); | ||
q.add(mockAsyncFn); | ||
q.add(job); | ||
expect(job.notCalled).to.be.true; | ||
}); | ||
const getPromiseStatus = async input => { | ||
let wasInputArray = input instanceof Array; | ||
it('will not call added functions if they exceed the capacity limit', () => { | ||
const q = new Qyu({ concurrency: 1, capacity: 1 }); | ||
const job1 = sinon.spy(mockAsyncFn); | ||
const job2 = sinon.spy(mockAsyncFn); | ||
q.add(job1); | ||
q.add(job2); | ||
expect(job1.calledOnce).to.be.true; | ||
expect(job2.notCalled).to.be.true; | ||
}); | ||
if (!wasInputArray) { | ||
input = [input]; | ||
} | ||
it('will inject every 3rd and up additional arguments supplied to it to the job function itself', () => { | ||
const q = new Qyu(); | ||
const job = sinon.spy(); | ||
q.add(job, {}, 'a', 'b', 'c', 'd'); | ||
expect(job.calledOnce).to.be.true; | ||
expect(job.firstCall.args).to.deep.equal(['a', 'b', 'c', 'd']); | ||
}); | ||
let statuses = [...input].fill('pending'); | ||
input.forEach(async (promise, i) => { | ||
try { | ||
await promise; | ||
statuses[i] = 'resolved'; | ||
} catch (err) { | ||
statuses[i] = 'rejected'; | ||
} | ||
// TODO: This test sometimes seems to experience some timing glitches that makes it fail; refactor it to be more reliable | ||
it('will delay in starting the next job queued, regardless of concurrency setting, by the specified amount of time if `rampUpTime` is more than zero', async () => { | ||
const rampUpTime = 100; | ||
const q = new Qyu({ | ||
concurrency: 3, | ||
rampUpTime, | ||
}); | ||
const job1 = sinon.spy(() => mockAsyncFn(undefined, 250)); | ||
const job2 = sinon.spy(() => mockAsyncFn(undefined, 250)); | ||
const job3 = sinon.spy(() => mockAsyncFn(undefined, 250)); | ||
q.add(job1); | ||
q.add(job2); | ||
q.add(job3); | ||
expect(job1.calledOnce).to.be.true; | ||
expect(job2.calledOnce).to.be.false; | ||
expect(job3.calledOnce).to.be.false; | ||
await delay(rampUpTime + 20); | ||
expect(job2.calledOnce).to.be.true; | ||
expect(job3.calledOnce).to.be.false; | ||
await delay(rampUpTime + 20); | ||
expect(job3.calledOnce).to.be.true; | ||
}); | ||
await delay(0); | ||
return wasInputArray ? statuses : statuses[0]; | ||
}; | ||
describe('When A Qyu instance is invoked as a function', () => { | ||
it("with a function as the first arg - should internally call the `add` method with the job and options and injecting any addiontional args passed into it", () => { | ||
const q = new Qyu(); | ||
const jobOpts = {}; | ||
const spy = jest.spyOn(q, 'add'); | ||
q(noop, jobOpts, 'a', 'b', 'c', 'd'); | ||
expect(spy).toHaveBeenCalledWith(noop, jobOpts, 'a', 'b', 'c', 'd'); | ||
describe('should return a', () => { | ||
it('promise', () => { | ||
const q = new Qyu({}); | ||
const promise = q.add(mockAsyncFn); | ||
expect(promise instanceof Promise).to.be.true; | ||
expect(promise).to.be.instanceof(Promise); | ||
}); | ||
it("with an array as the first arg - should internally call the `map` method with the array, function and options args passed into it", () => { | ||
const q = new Qyu(); | ||
const arr = [1, 2, 3]; | ||
const jobOpts = {}; | ||
const spy = jest.spyOn(q, 'map'); | ||
q(arr, noop, jobOpts); | ||
expect(spy).toHaveBeenCalledWith(arr, noop, jobOpts); | ||
}); | ||
}); | ||
it('rejects immediately with a QyuError of code "ERR_CAPACITY_FULL" if instance capacity is full', async () => { | ||
const q = new Qyu({ capacity: 1 }); | ||
describe('`add` method', () => { | ||
it('calls the added functions immediately if currently running jobs are below the concurrency limit', () => { | ||
let q = new Qyu({concurrency: 2}); | ||
q.add(mockAsyncFn); | ||
q.add(mockAsyncFn); // this queuing and the one above it fill the queue length up to 1 (the earlier was called immediately, and the current is then put in queue) | ||
const promise = q.add(mockAsyncFn); // this is expected to reject since the current length of queue should be 1 at that point, which equals to the max capacity of 1 | ||
let job1 = jest.fn(mockAsync); | ||
let job2 = jest.fn(mockAsync); | ||
q.add(job1); | ||
q.add(job2); | ||
expect(job1).toHaveBeenCalled(); | ||
expect(job2).toHaveBeenCalled(); | ||
const err = await expect(promise).to.be.rejected; | ||
expect(err).to.be.instanceof(QyuError); | ||
expect(err.code).to.equal('ERR_CAPACITY_FULL'); | ||
}); | ||
it('will not call added functions immediately if currently running jobs are at the concurrency limit', () => { | ||
let q = new Qyu({concurrency: 1}); | ||
let job = jest.fn(); | ||
it('that resolves only after the actual job is resolved', async () => { | ||
const q = new Qyu({}); | ||
let done = false; | ||
q.add(mockAsync); | ||
q.add(job); | ||
const promise = q.add(async () => { | ||
await mockAsyncFn(); | ||
done = true; | ||
}); | ||
expect(job).not.toHaveBeenCalled(); | ||
await expect(promise).to.be.fulfilled; | ||
expect(done).to.be.true; | ||
}); | ||
it('will not call added functions if they exceed the capacity limit', () => { | ||
let q = new Qyu({concurrency: 1, capacity: 1}); | ||
let job1 = jest.fn(mockAsync); | ||
let job2 = jest.fn(mockAsync); | ||
it('and rejects only after the actual job is rejected', async () => { | ||
const q = new Qyu({}); | ||
let done = false; | ||
q.add(job1); | ||
q.add(job2); | ||
const promise = q.add(async () => { | ||
await mockAsyncFn(); | ||
done = true; | ||
throw new Error(); | ||
}); | ||
expect(job1).toHaveBeenCalled(); | ||
expect(job2).not.toHaveBeenCalled(); | ||
await expect(promise).to.be.rejected; | ||
expect(done).to.be.true; | ||
}); | ||
it('will inject every 3rd and up additional arguments supplied to it to the job function itself', () => { | ||
let q = new Qyu(); | ||
let job = jest.fn(); | ||
q.add(job, {}, 'a', 'b' ,'c', 'd'); | ||
expect(job).toHaveBeenCalledWith('a', 'b', 'c', 'd'); | ||
it('with the value the job resolved with', async () => { | ||
const q = new Qyu({}); | ||
const value = await q.add(() => mockAsyncFn('THE_VALUE')); | ||
expect(value).to.equal('THE_VALUE'); | ||
}); | ||
// TODO: This test sometimes seems to experience some timing glitches that makes it fail; refactor it to be more reliable | ||
it('will delay in starting the next job queued, regardless of concurrency setting, by the specified amount of time if `rampUpTime` is more than zero', async () => { | ||
let rampUpTime = 100; | ||
let q = new Qyu({ concurrency: 3, rampUpTime }); | ||
let started = 0; | ||
q.add(async () => { | ||
started++; | ||
await mockAsync(true, 250); | ||
}); | ||
q.add(async () => { | ||
started++; | ||
await mockAsync(true, 250); | ||
}); | ||
q.add(async () => { | ||
started++; | ||
await mockAsync(true, 250); | ||
}); | ||
await Promise.all([ | ||
(() => { | ||
expect(started).toBe(1); | ||
})(), | ||
(async () => { | ||
await delay(rampUpTime + 20); | ||
expect(started).toBe(2); | ||
})(), | ||
(async () => { | ||
await delay(rampUpTime * 2 + 20); | ||
expect(started).toBe(3); | ||
})() | ||
]); | ||
it('or with the value the job rejected with', async () => { | ||
const q = new Qyu({}); | ||
const promise = q.add(async () => { | ||
throw await mockAsyncFn('THE_VALUE'); | ||
}); | ||
await expect(promise).to.eventually.be.rejected.and.equal('THE_VALUE'); | ||
}); | ||
describe('should return a', () => { | ||
it('promise', () => { | ||
let q = new Qyu({}); | ||
let promise = q.add(() => mockAsync()); | ||
expect(promise instanceof Promise).toBe(true); | ||
}); | ||
it('rejects immediately with a QyuError of code "ERR_CAPACITY_FULL" if instance capacity is full', async () => { | ||
let q = new Qyu({capacity: 1}); | ||
q.add(mockAsync); | ||
q.add(mockAsync); // this queuing and the one above it fill the queue length up to 1 (the earlier was called immediately, and the current is then put in queue) | ||
let promise = q.add(() => {}); // this is expected to reject since the current length of queue should be 1 at that point, which equals to the max capacity of 1 | ||
expect(await getPromiseStatus(promise)).toBe('rejected'); | ||
try { await promise; } | ||
catch (err) { | ||
expect(err instanceof QyuError).toBe(true); | ||
expect(err.code).toBe('ERR_CAPACITY_FULL'); | ||
} | ||
}); | ||
it('that resolves only after the actual job is resolved', async () => { | ||
let done = false; | ||
let q = new Qyu({}); | ||
await q.add(async () => { | ||
await mockAsync(); | ||
done = true; | ||
}); | ||
expect(done).toBe(true); | ||
}); | ||
it('and rejects only after the actual job is rejected', async () => { | ||
let done = false; | ||
let q = new Qyu({}); | ||
try { | ||
await q.add(async () => { | ||
await mockAsync(); | ||
done = true; | ||
throw new Error; | ||
}); | ||
} catch (err) { | ||
expect(done).toBe(true); | ||
} | ||
}); | ||
it('with the value the job resolved with', async () => { | ||
let q = new Qyu({}); | ||
let value = await q.add(() => mockAsync('THE_VALUE')); | ||
expect(value).toBe('THE_VALUE'); | ||
}); | ||
it('or with the value the job rejected with', async () => { | ||
let q = new Qyu({}); | ||
let value; | ||
try { | ||
await q.add(async () => { | ||
throw await mockAsync('THE_VALUE'); | ||
}); | ||
} catch (thrown) { | ||
value = thrown; | ||
} | ||
expect(value).toBe('THE_VALUE'); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe('`map` method', () => { | ||
it("invokes the function in the second argument for each item in the first argument array with two arguments in itself: the item and it's index", () => { | ||
let q = new Qyu({concurrency: 3}); | ||
let items = ['A', 'B', 'C']; | ||
let fn = jest.fn(); | ||
q.map(items, fn); | ||
expect(fn).toHaveBeenCalledTimes(3); | ||
expect(fn).toHaveBeenCalledWith('C', 2); | ||
}); | ||
it("invokes the function in the second argument for each item in the first argument array with two arguments in itself: the item and it's index", () => { | ||
const q = new Qyu({ concurrency: 3 }); | ||
const items = ['A', 'B', 'C']; | ||
const fn = sinon.spy(); | ||
q.map(items, fn); | ||
expect(fn.args).to.deep.equal([ | ||
['A', 0], | ||
['B', 1], | ||
['C', 2], | ||
]); | ||
}); | ||
}); | ||
describe('`whenEmpty` method', () => { | ||
it('should return a promise', () => { | ||
let q = new Qyu({}); | ||
expect(q.whenEmpty() instanceof Promise).toBe(true); | ||
}); | ||
it('should return a promise', () => { | ||
const q = new Qyu({}); | ||
expect(q.whenEmpty()).to.be.instanceOf(Promise); | ||
}); | ||
it('that resolves once a Qyu instance has no running or queued jobs, regardless if some jobs ended up rejecting', async () => { | ||
let concurrency = 2; | ||
let numToRun = 3; | ||
let startedCount = 0; | ||
let finishedCount = 0; | ||
it('that resolves once a Qyu instance has no running or queued jobs, regardless if some jobs ended up fulfilled or rejected', async () => { | ||
const q = new Qyu({ concurrency: 2 }); | ||
let finishedCount = 0; | ||
let q = new Qyu({ concurrency }); | ||
q.add(async () => { | ||
await mockAsyncFn(); | ||
finishedCount++; | ||
}); | ||
q.add(async () => { | ||
await mockAsyncFn(); | ||
finishedCount++; | ||
throw new Error(); | ||
}); | ||
q.add(async () => { | ||
await mockAsyncFn(); | ||
finishedCount++; | ||
}); | ||
for (let i=0; i<numToRun; ++i) { | ||
q.add(async () => { | ||
startedCount++; | ||
await mockAsync(); | ||
finishedCount++; | ||
if (i % 2 === 1) { // If `i` is odd | ||
throw new Error('some_simulated_failure_error'); | ||
} | ||
}); | ||
} | ||
await q.whenEmpty(); | ||
await q.whenEmpty(); | ||
expect(startedCount).toBe(numToRun); | ||
expect(finishedCount).toBe(numToRun); | ||
}); | ||
expect(finishedCount).to.equal(3); | ||
}); | ||
}); | ||
describe('`empty` method', () => { | ||
it('should reject all queued jobs with a QyuError of code "ERR_JOB_DEQUEUED" and not call them', async () => { | ||
let q = new Qyu({concurrency: 1}); | ||
let fn = jest.fn(mockAsync); | ||
it('should reject all queued jobs with a QyuError of code "ERR_JOB_DEQUEUED" and not call them', async () => { | ||
const q = new Qyu({ concurrency: 1 }); | ||
const fn = sinon.spy(mockAsyncFn); | ||
q.add(mockAsync); | ||
let p1 = q.add(fn); | ||
let p2 = q.add(fn); | ||
const [, prom1, prom2] = [q.add(fn), q.add(fn), q.add(fn)]; | ||
q.empty(); | ||
q.empty(); | ||
expect(await getPromiseStatus([p1, p2])).toMatchObject(['rejected', 'rejected']); | ||
await expect(prom1) | ||
.to.be.eventually.rejected.and.be.instanceOf(QyuError) | ||
.and.containSubset({ code: 'ERR_JOB_DEQUEUED' }); | ||
try { await p1; } catch (err) { | ||
expect(err instanceof QyuError).toBe(true); | ||
expect(err.code).toBe('ERR_JOB_DEQUEUED'); | ||
} | ||
await expect(prom2) | ||
.to.be.eventually.rejected.and.be.instanceOf(QyuError) | ||
.and.containSubset({ code: 'ERR_JOB_DEQUEUED' }); | ||
try { await p2; } catch (err) { | ||
expect(err instanceof QyuError).toBe(true); | ||
expect(err.code).toBe('ERR_JOB_DEQUEUED'); | ||
} | ||
expect(fn.calledOnce).to.be.true; | ||
}); | ||
expect(fn).not.toHaveBeenCalled(); | ||
}); | ||
it('should return a promise that resolves once all active jobs at the time of calling are done', async () => { | ||
let q = new Qyu({concurrency: 2}); | ||
let p1 = new Promise(resolve => { | ||
q.add(async () => { | ||
await mockAsync(); | ||
resolve(); | ||
}); | ||
}); | ||
let p2 = new Promise(resolve => { | ||
q.add(async () => { | ||
await mockAsync(); | ||
resolve(); | ||
}); | ||
}); | ||
let p3 = new Promise(resolve => { | ||
q.add(async () => { | ||
await mockAsync(); | ||
resolve(); | ||
}); | ||
}); | ||
await q.empty(); | ||
expect(await getPromiseStatus([p1, p2])).toMatchObject([ | ||
'resolved', | ||
'resolved' | ||
]); | ||
}); | ||
it('should return a promise that resolves once all active jobs at the time of calling are done', async () => { | ||
const q = new Qyu({ concurrency: 2 }); | ||
let jobsDoneCount = 0; | ||
const job = async () => { | ||
await mockAsyncFn(); | ||
jobsDoneCount++; | ||
}; | ||
q.add(job); | ||
q.add(job); | ||
q.add(job); | ||
await q.empty(); | ||
expect(jobsDoneCount).to.equal(2); | ||
}); | ||
}); | ||
describe('`whenFree` method', () => { | ||
it('should return a promise', () => { | ||
let q = new Qyu({}); | ||
expect(q.whenFree() instanceof Promise).toBe(true); | ||
}); | ||
it('should return a promise', () => { | ||
const q = new Qyu({}); | ||
expect(q.whenFree()).to.be.instanceOf(Promise); | ||
}); | ||
it('that resolves once currently running jobs get below the concurrency limit', async () => { | ||
let concurrency = 2; | ||
let numToRun = 3; | ||
let startedCount = 0; | ||
let finishedCount = 0; | ||
let q = new Qyu({ concurrency }); | ||
for (let i=0; i<numToRun; ++i) { | ||
q.add(async () => { | ||
startedCount++; | ||
await mockAsync(); | ||
finishedCount++; | ||
}); | ||
} | ||
await q.whenFree(); | ||
expect(startedCount).toBe(numToRun); | ||
expect(finishedCount).toBe(2); | ||
}); | ||
it('that resolves once number of currently running jobs get below the concurrency limit', async () => { | ||
let startedCount = 0; | ||
let finishedCount = 0; | ||
const q = new Qyu({ concurrency: 2 }); | ||
const job = async () => { | ||
startedCount++; | ||
await mockAsyncFn(); | ||
finishedCount++; | ||
}; | ||
for (let i = 0; i < 3; ++i) { | ||
q.add(job); | ||
} | ||
await q.whenFree(); | ||
expect(startedCount).to.equal(3); | ||
expect(finishedCount).to.equal(2); | ||
}); | ||
}); | ||
describe('The `timeout` option, when adding a task', () => { | ||
it('should cancel a queued job if waits in queue more than the specified time', async () => { | ||
let q = new Qyu({ concurrency: 1 }); | ||
let fn = jest.fn(); | ||
it('should cancel a queued job if waits in queue more than the specified time', async () => { | ||
const q = new Qyu({ concurrency: 1 }); | ||
const fn = sinon.spy(); | ||
const promise = q.add(() => mockAsyncFn(undefined, 100)); | ||
q.add(fn, { timeout: 50 }); | ||
await promise; | ||
expect(fn.notCalled).to.be.true; | ||
}); | ||
let promise = new Promise(resolve => { | ||
q.add(async () => { | ||
await mockAsync(true, 100); | ||
resolve(); | ||
}); | ||
}); | ||
it('if waits in queue more than the specified time, should make the promise of a job queueing reject with a QyuError of code "ERR_JOB_TIMEOUT"', async () => { | ||
const q = new Qyu({ concurrency: 1 }); | ||
q.add(fn, {timeout: 50}); | ||
await promise; | ||
await delay(0); | ||
expect(fn).not.toHaveBeenCalled(); | ||
const promise = q.add(() => mockAsyncFn(undefined, 100)); | ||
const promiseWithTimeout = q.add(() => mockAsyncFn(undefined, 0), { | ||
timeout: 50, | ||
}); | ||
it('if waits in queue more than the specified time, should make the promise of a job queueing reject with a QyuError of code "ERR_JOB_TIMEOUT"', async () => { | ||
let q = new Qyu({ concurrency: 1 }); | ||
q.add(() => mockAsync(true, 100)); | ||
try { | ||
await q.add(() => {}, {timeout: 50}); | ||
} | ||
catch (err) { | ||
expect(err instanceof QyuError).toBe(true); | ||
expect(err.code).toBe('ERR_JOB_TIMEOUT'); | ||
return; | ||
} | ||
throw new Error('Expected job to reject (due to timeout), instead it resolved'); | ||
}); | ||
await expect(promise).to.eventually.be.fulfilled; | ||
await expect(promiseWithTimeout) | ||
.to.eventually.be.rejected.and.be.instanceOf(QyuError) | ||
.and.containSubset({ code: 'ERR_JOB_TIMEOUT' }); | ||
}); | ||
}); | ||
describe('The `priority` option, when adding a task', () => { | ||
it('will default to 0', async () => { | ||
let q = new Qyu({ concurrency: 1 }); | ||
let actualOrder = []; | ||
let push = value => actualOrder.push(value); | ||
it('will default to 0', async () => { | ||
const q = new Qyu({ concurrency: 1 }); | ||
const actualOrder = []; | ||
const push = value => actualOrder.push(value); | ||
q.add(mockAsyncFn); // To increase the activity up to the max concurrency... | ||
await Promise.all([ | ||
q.add(() => push('a'), { priority: -1 }), | ||
q.add(() => push('b'), { priority: 1 }), | ||
q.add(() => push('c'), {}), | ||
]); | ||
expect(actualOrder).to.deep.equal(['b', 'c', 'a']); | ||
}); | ||
q.add(mockAsync); // To raise activity to max concurrency... | ||
it('will queue jobs with the same priority by the order they were added', async () => { | ||
const q = new Qyu({ concurrency: 1 }); | ||
const actualOrder = []; | ||
const push = value => actualOrder.push(value); | ||
q.add(mockAsyncFn); // To increase the activity up to the max concurrency... | ||
await Promise.all([ | ||
q.add(() => push('a'), { priority: 0 }), | ||
q.add(() => push('b'), { priority: 0 }), | ||
q.add(() => push('c'), { priority: 0 }), | ||
q.add(() => push('d'), { priority: 0 }), | ||
]); | ||
expect(actualOrder).to.deep.equal(['a', 'b', 'c', 'd']); | ||
}); | ||
await Promise.all([ | ||
new Promise(resolve => q.add(() => { push('a'); resolve(); }, {priority: -1})), | ||
new Promise(resolve => q.add(() => { push('b'); resolve(); }, {priority: 1})), | ||
new Promise(resolve => q.add(() => { push('c'); resolve(); }, {})) | ||
]); | ||
expect(actualOrder).toMatchObject(['b', 'c', 'a']); | ||
}); | ||
it('will queue jobs with the same priority by the order they were added', async () => { | ||
let q = new Qyu({ concurrency: 1 }); | ||
let actualOrder = []; | ||
let push = value => actualOrder.push(value); | ||
q.add(mockAsync); // To raise activity to max concurrency... | ||
await Promise.all([ | ||
new Promise(resolve => q.add(() => { push('a'); resolve(); }, {priority: 0})), | ||
new Promise(resolve => q.add(() => { push('b'); resolve(); }, {priority: 0})), | ||
new Promise(resolve => q.add(() => { push('c'); resolve(); }, {priority: 0})), | ||
new Promise(resolve => q.add(() => { push('d'); resolve(); }, {priority: 0})) | ||
]); | ||
expect(actualOrder).toMatchObject(['a', 'b', 'c' ,'d']); | ||
}); | ||
it('if currently running jobs are at the concurrency limit, queue a job AFTER jobs with more or equal priority, and BEFORE other jobs that have less priority if any', async () => { | ||
let q = new Qyu({ concurrency: 1 }); | ||
let actualOrder = []; | ||
let push = value => actualOrder.push(value); | ||
q.add(mockAsync); // To raise activity to max concurrency... | ||
await Promise.all([ | ||
new Promise(resolve => q.add(() => { push('b'); resolve(); }, {priority: 2})), | ||
new Promise(resolve => q.add(() => { push('a'); resolve(); }, {priority: 3})), | ||
new Promise(resolve => q.add(() => { push('d'); resolve(); }, {priority: 1})), | ||
new Promise(resolve => q.add(() => { push('c'); resolve(); }, {priority: 2})) | ||
]); | ||
expect(actualOrder).toMatchObject(['a', 'b', 'c' ,'d']); | ||
}); | ||
it('if currently running jobs are at the concurrency limit, queue a job AFTER jobs with more or equal priority, and BEFORE other jobs that have less priority if any', async () => { | ||
const q = new Qyu({ concurrency: 1 }); | ||
const actualOrder = []; | ||
const push = value => actualOrder.push(value); | ||
q.add(mockAsyncFn); // To increase the activity up to the max concurrency... | ||
await Promise.all([ | ||
q.add(() => push('b'), { priority: 2 }), | ||
q.add(() => push('a'), { priority: 3 }), | ||
q.add(() => push('d'), { priority: 1 }), | ||
q.add(() => push('c'), { priority: 2 }), | ||
]); | ||
expect(actualOrder).to.deep.equal(['a', 'b', 'c', 'd']); | ||
}); | ||
}); |
@@ -1,127 +0,95 @@ | ||
const { Qyu, QyuError } = require('../index'); | ||
const { expect } = require('chai'); | ||
const sinon = require('sinon'); | ||
const { Qyu } = require('../src'); | ||
const { getPromiseStatus, mockAsyncFn } = require('../src/testUtils'); | ||
const delay = async (time=1000) => { | ||
await new Promise(resolve => setTimeout(resolve, time)); | ||
}; | ||
const mockAsync = async (result=true, time=25) => { | ||
await delay(time); | ||
return result; | ||
}; | ||
const noop = val => val; | ||
const getPromiseStatus = async input => { | ||
let wasInputArray = input instanceof Array; | ||
if (!wasInputArray) { | ||
input = [input]; | ||
} | ||
let statuses = [...input].fill('pending'); | ||
input.forEach(async (promise, i) => { | ||
try { | ||
await promise; | ||
statuses[i] = 'resolved'; | ||
} catch (err) { | ||
statuses[i] = 'rejected'; | ||
} | ||
}); | ||
await delay(0); | ||
return wasInputArray ? statuses : statuses[0]; | ||
}; | ||
describe('The `pause`/`resume` methods', () => { | ||
it('`pause` method should return a promise that is immediately resolved if no active jobs were at the time of calling `pause`', async () => { | ||
let q = new Qyu; | ||
let promise = q.pause(); | ||
expect(await getPromiseStatus(promise)).toBe('resolved'); | ||
}); | ||
it('`pause` method should return a promise that is immediately resolved if no active jobs were at the time of calling `pause`', async () => { | ||
const q = new Qyu(); | ||
const promise = q.pause(); | ||
expect(await getPromiseStatus(promise)).to.equal('resolved'); | ||
}); | ||
it('`pause` method should return a promise that resolves when all active jobs are done', async () => { | ||
let q = new Qyu; | ||
let fn1 = jest.fn(mockAsync); | ||
let fn2 = jest.fn(mockAsync); | ||
it('`pause` method should return a promise that resolves when all active jobs are done', async () => { | ||
const q = new Qyu(); | ||
const fn1 = sinon.spy(mockAsyncFn); | ||
const fn2 = sinon.spy(mockAsyncFn); | ||
let p1 = q.add(fn1); | ||
let p2 = q.add(fn2); | ||
const p1 = q.add(fn1); | ||
const p2 = q.add(fn2); | ||
await q.pause(); | ||
await q.pause(); | ||
expect(fn1).toHaveBeenCalled(); | ||
expect(fn2).not.toHaveBeenCalled(); | ||
expect(await getPromiseStatus([p1, p2])).toMatchObject(['resolved', 'pending']); | ||
}); | ||
expect(fn1.called).to.be.true; | ||
expect(fn2.notCalled).to.be.true; | ||
expect(await getPromiseStatus([p1, p2])).to.deep.equal([ | ||
'resolved', | ||
'pending', | ||
]); | ||
}); | ||
it('`whenEmpty` should return a pending promise if instance is paused when idle', async () => { | ||
let q = new Qyu; | ||
q.pause(); | ||
expect(await getPromiseStatus(q.whenEmpty())).toBe('pending'); | ||
}); | ||
it('`whenEmpty` should return a pending promise if instance is paused when idle', async () => { | ||
const q = new Qyu(); | ||
q.pause(); | ||
expect(await getPromiseStatus(q.whenEmpty())).to.equal('pending'); | ||
}); | ||
it('`whenEmpty` should return a pending promise if instance is paused when it has active jobs / jobs in queue', async () => { | ||
let q = new Qyu({concurrency: 1}); | ||
q.add(mockAsync); | ||
q.add(mockAsync); | ||
q.pause(); | ||
expect(await getPromiseStatus(q.whenEmpty())).toBe('pending'); | ||
}); | ||
it('`whenEmpty` should return a pending promise if instance is paused when it has active jobs / jobs in queue', async () => { | ||
const q = new Qyu({ concurrency: 1 }); | ||
q.add(mockAsyncFn); | ||
q.add(mockAsyncFn); | ||
q.pause(); | ||
expect(await getPromiseStatus(q.whenEmpty())).to.equal('pending'); | ||
}); | ||
it('`whenEmpty` should return a resolved promise if instance is paused when idle, then immediately resumed', async () => { | ||
let q = new Qyu; | ||
q.pause(); | ||
q.resume(); | ||
expect(await getPromiseStatus(q.whenEmpty())).toBe('resolved'); | ||
}); | ||
it('`whenEmpty` should return a resolved promise if instance is paused when idle, then immediately resumed', async () => { | ||
const q = new Qyu(); | ||
q.pause(); | ||
q.resume(); | ||
expect(await getPromiseStatus(q.whenEmpty())).to.equal('resolved'); | ||
}); | ||
}); | ||
describe('The `set` method', () => { | ||
describe('if passed a new concurrency value', () => { | ||
it('if new value is bigger than previous, will immediately call more jobs as much as the difference from previous value', async () => { | ||
let q = new Qyu({concurrency: 1}); | ||
let fn1 = jest.fn(mockAsync); | ||
let fn2 = jest.fn(mockAsync); | ||
let fn3 = jest.fn(mockAsync); | ||
let fn4 = jest.fn(mockAsync); | ||
q.add(fn1); | ||
q.add(fn2); | ||
q.add(fn3); | ||
q.add(fn4); | ||
q.set({concurrency: 3}); | ||
expect(fn1).toHaveBeenCalled(); | ||
expect(fn2).toHaveBeenCalled(); | ||
expect(fn3).toHaveBeenCalled(); | ||
expect(fn4).not.toHaveBeenCalled(); | ||
}); | ||
describe('if passed a new concurrency value', () => { | ||
it('if new value is bigger than previous, will immediately call more jobs as much as the difference from previous value', async () => { | ||
const q = new Qyu({ concurrency: 1 }); | ||
const fn1 = sinon.spy(mockAsyncFn); | ||
const fn2 = sinon.spy(mockAsyncFn); | ||
const fn3 = sinon.spy(mockAsyncFn); | ||
const fn4 = sinon.spy(mockAsyncFn); | ||
q.add(fn1); | ||
q.add(fn2); | ||
q.add(fn3); | ||
q.add(fn4); | ||
q.set({ concurrency: 3 }); | ||
expect(fn1.called).to.be.true; | ||
expect(fn2.called).to.be.true; | ||
expect(fn3.called).to.be.true; | ||
expect(fn4.notCalled).to.be.true; | ||
}); | ||
}); | ||
describe('if passed a new capacity value', () => { | ||
it('if new value is lower than previous, will immediately reject the last jobs in queue as much as the difference from previous value', async () => { | ||
let q = new Qyu({concurrency: 1, capacity: 4}); | ||
let fn1 = jest.fn(mockAsync); | ||
let fn2 = jest.fn(mockAsync); | ||
let fn3 = jest.fn(mockAsync); | ||
let fn4 = jest.fn(mockAsync); | ||
let fn5 = jest.fn(mockAsync); | ||
let p1 = q.add(fn1); | ||
let p2 = q.add(fn2); | ||
let p3 = q.add(fn3); | ||
let p4 = q.add(fn4); | ||
let p5 = q.add(fn5); | ||
q.set({capacity: 2}); | ||
expect(await getPromiseStatus([p2, p3, p4, p5])).toMatchObject(['pending', 'pending', 'rejected', 'rejected']); | ||
}); | ||
describe('if passed a new capacity value', () => { | ||
it('if new value is lower than previous, will immediately reject the last jobs in queue as much as the difference from previous value', async () => { | ||
const q = new Qyu({ concurrency: 1, capacity: 4 }); | ||
const fn1 = sinon.spy(mockAsyncFn); | ||
const fn2 = sinon.spy(mockAsyncFn); | ||
const fn3 = sinon.spy(mockAsyncFn); | ||
const fn4 = sinon.spy(mockAsyncFn); | ||
const fn5 = sinon.spy(mockAsyncFn); | ||
const p1 = q.add(fn1); | ||
const p2 = q.add(fn2); | ||
const p3 = q.add(fn3); | ||
const p4 = q.add(fn4); | ||
const p5 = q.add(fn5); | ||
q.set({ capacity: 2 }); | ||
expect(await getPromiseStatus([p2, p3, p4, p5])).to.deep.equal([ | ||
'pending', | ||
'pending', | ||
'rejected', | ||
'rejected', | ||
]); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
13
272
35941
2
9
1
670
1
+ Addedeslint@^7.25.0
+ Addedmocha@^9.2.0
+ Added@babel/code-frame@7.12.11(transitive)
+ Added@babel/helper-validator-identifier@7.25.9(transitive)
+ Added@babel/highlight@7.25.9(transitive)
+ Added@eslint/eslintrc@0.4.3(transitive)
+ Added@humanwhocodes/config-array@0.5.0(transitive)
+ Added@humanwhocodes/object-schema@1.2.1(transitive)
+ Added@ungap/promise-all-settled@1.1.2(transitive)
+ Addedacorn@7.4.1(transitive)
+ Addedacorn-jsx@5.3.2(transitive)
+ Addedajv@6.12.68.17.1(transitive)
+ Addedansi-colors@4.1.14.1.3(transitive)
+ Addedansi-regex@5.0.1(transitive)
+ Addedansi-styles@3.2.14.3.0(transitive)
+ Addedanymatch@3.1.3(transitive)
+ Addedargparse@1.0.102.0.1(transitive)
+ Addedastral-regex@2.0.0(transitive)
+ Addedbalanced-match@1.0.2(transitive)
+ Addedbinary-extensions@2.3.0(transitive)
+ Addedbrace-expansion@1.1.11(transitive)
+ Addedbraces@3.0.3(transitive)
+ Addedbrowser-stdout@1.3.1(transitive)
+ Addedcallsites@3.1.0(transitive)
+ Addedcamelcase@6.3.0(transitive)
+ Addedchalk@2.4.24.1.2(transitive)
+ Addedchokidar@3.5.3(transitive)
+ Addedcliui@7.0.4(transitive)
+ Addedcolor-convert@1.9.32.0.1(transitive)
+ Addedcolor-name@1.1.31.1.4(transitive)
+ Addedconcat-map@0.0.1(transitive)
+ Addedcross-spawn@7.0.5(transitive)
+ Addeddebug@4.3.34.3.7(transitive)
+ Addeddecamelize@4.0.0(transitive)
+ Addeddeep-is@0.1.4(transitive)
+ Addeddiff@5.0.0(transitive)
+ Addeddoctrine@3.0.0(transitive)
+ Addedemoji-regex@8.0.0(transitive)
+ Addedenquirer@2.4.1(transitive)
+ Addedescalade@3.2.0(transitive)
+ Addedescape-string-regexp@1.0.54.0.0(transitive)
+ Addedeslint@7.32.0(transitive)
+ Addedeslint-scope@5.1.1(transitive)
+ Addedeslint-utils@2.1.0(transitive)
+ Addedeslint-visitor-keys@1.3.02.1.0(transitive)
+ Addedespree@7.3.1(transitive)
+ Addedesprima@4.0.1(transitive)
+ Addedesquery@1.6.0(transitive)
+ Addedesrecurse@4.3.0(transitive)
+ Addedestraverse@4.3.05.3.0(transitive)
+ Addedesutils@2.0.3(transitive)
+ Addedfast-deep-equal@3.1.3(transitive)
+ Addedfast-json-stable-stringify@2.1.0(transitive)
+ Addedfast-levenshtein@2.0.6(transitive)
+ Addedfast-uri@3.0.3(transitive)
+ Addedfile-entry-cache@6.0.1(transitive)
+ Addedfill-range@7.1.1(transitive)
+ Addedfind-up@5.0.0(transitive)
+ Addedflat@5.0.2(transitive)
+ Addedflat-cache@3.2.0(transitive)
+ Addedflatted@3.3.1(transitive)
+ Addedfs.realpath@1.0.0(transitive)
+ Addedfsevents@2.3.3(transitive)
+ Addedfunctional-red-black-tree@1.0.1(transitive)
+ Addedget-caller-file@2.0.5(transitive)
+ Addedglob@7.2.0(transitive)
+ Addedglob-parent@5.1.2(transitive)
+ Addedglobals@13.24.0(transitive)
+ Addedgrowl@1.10.5(transitive)
+ Addedhas-flag@3.0.04.0.0(transitive)
+ Addedhe@1.2.0(transitive)
+ Addedignore@4.0.6(transitive)
+ Addedimport-fresh@3.3.0(transitive)
+ Addedimurmurhash@0.1.4(transitive)
+ Addedinflight@1.0.6(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedis-binary-path@2.1.0(transitive)
+ Addedis-extglob@2.1.1(transitive)
+ Addedis-fullwidth-code-point@3.0.0(transitive)
+ Addedis-glob@4.0.3(transitive)
+ Addedis-number@7.0.0(transitive)
+ Addedis-plain-obj@2.1.0(transitive)
+ Addedis-unicode-supported@0.1.0(transitive)
+ Addedisexe@2.0.0(transitive)
+ Addedjs-tokens@4.0.0(transitive)
+ Addedjs-yaml@3.14.14.1.0(transitive)
+ Addedjson-buffer@3.0.1(transitive)
+ Addedjson-schema-traverse@0.4.11.0.0(transitive)
+ Addedjson-stable-stringify-without-jsonify@1.0.1(transitive)
+ Addedkeyv@4.5.4(transitive)
+ Addedlevn@0.4.1(transitive)
+ Addedlocate-path@6.0.0(transitive)
+ Addedlodash.merge@4.6.2(transitive)
+ Addedlodash.truncate@4.4.2(transitive)
+ Addedlog-symbols@4.1.0(transitive)
+ Addedminimatch@3.1.24.2.1(transitive)
+ Addedmocha@9.2.2(transitive)
+ Addedms@2.1.22.1.3(transitive)
+ Addednanoid@3.3.1(transitive)
+ Addednatural-compare@1.4.0(transitive)
+ Addednormalize-path@3.0.0(transitive)
+ Addedonce@1.4.0(transitive)
+ Addedoptionator@0.9.4(transitive)
+ Addedp-limit@3.1.0(transitive)
+ Addedp-locate@5.0.0(transitive)
+ Addedparent-module@1.0.1(transitive)
+ Addedpath-exists@4.0.0(transitive)
+ Addedpath-is-absolute@1.0.1(transitive)
+ Addedpath-key@3.1.1(transitive)
+ Addedpicocolors@1.1.1(transitive)
+ Addedpicomatch@2.3.1(transitive)
+ Addedprelude-ls@1.2.1(transitive)
+ Addedprogress@2.0.3(transitive)
+ Addedpunycode@2.3.1(transitive)
+ Addedrandombytes@2.1.0(transitive)
+ Addedreaddirp@3.6.0(transitive)
+ Addedregexpp@3.2.0(transitive)
+ Addedrequire-directory@2.1.1(transitive)
+ Addedrequire-from-string@2.0.2(transitive)
+ Addedresolve-from@4.0.0(transitive)
+ Addedrimraf@3.0.2(transitive)
+ Addedsafe-buffer@5.2.1(transitive)
+ Addedsemver@7.6.3(transitive)
+ Addedserialize-javascript@6.0.0(transitive)
+ Addedshebang-command@2.0.0(transitive)
+ Addedshebang-regex@3.0.0(transitive)
+ Addedslice-ansi@4.0.0(transitive)
+ Addedsprintf-js@1.0.3(transitive)
+ Addedstring-width@4.2.3(transitive)
+ Addedstrip-ansi@6.0.1(transitive)
+ Addedstrip-json-comments@3.1.1(transitive)
+ Addedsupports-color@5.5.07.2.08.1.1(transitive)
+ Addedtable@6.8.2(transitive)
+ Addedtext-table@0.2.0(transitive)
+ Addedto-regex-range@5.0.1(transitive)
+ Addedtype-check@0.4.0(transitive)
+ Addedtype-fest@0.20.2(transitive)
+ Addeduri-js@4.4.1(transitive)
+ Addedv8-compile-cache@2.4.0(transitive)
+ Addedwhich@2.0.2(transitive)
+ Addedword-wrap@1.2.5(transitive)
+ Addedworkerpool@6.2.0(transitive)
+ Addedwrap-ansi@7.0.0(transitive)
+ Addedwrappy@1.0.2(transitive)
+ Addedy18n@5.0.8(transitive)
+ Addedyargs@16.2.0(transitive)
+ Addedyargs-parser@20.2.4(transitive)
+ Addedyargs-unparser@2.0.0(transitive)
+ Addedyocto-queue@0.1.0(transitive)