Comparing version 0.4.6 to 0.4.7
{ | ||
"name": "qyu", | ||
"version": "0.4.6", | ||
"version": "0.4.7", | ||
"description": "A general-purpose asynchronous job queue for Node.js", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -113,3 +113,3 @@ # Qyu | ||
Can be any positive/negative integer/float number. | ||
The greater the priority value, the ealier it will be called relative to other jobs. | ||
The greater the priority value, the earlier it will be called relative to other jobs. | ||
Queuings having identical priority will be put one after another in the same order in which they were passed to the instance. | ||
@@ -210,3 +210,3 @@ *(default: 0)* | ||
#### instance#empty() | ||
Immediately empties the instance's entire queue from all queued jobs. | ||
Immediately empties the instance's queue from all queued jobs, rejecting the promises returned from their queuings with a `"ERR_JOB_DEQUEUED"` type of `QyuError`. | ||
Jobs currently running at the time `instance.empty()` was called keep running until finished. | ||
@@ -216,5 +216,22 @@ ```javascript | ||
q(job1); q(job2); | ||
q.empty(); // Because the concurrency was set to "1", job1 is already running in this point, but job2 will be dequeued and never run. | ||
await q.empty(); // Because the concurrency was set to "1", job1 is already running in this point, but job2 will be dequeued and never run. | ||
// The above "await" will resolve once job1 finally finishes... | ||
``` | ||
#### instance#set(`config`) | ||
Update a living instance's config options in real time (`concurrency`, `capacity` and `rampUpTime`). | ||
Note these (expected) side effects: | ||
- If new `concurrency` is specified and is greater than previous setting, new jobs will immediately be drawn and called from queue as much as the difference from previously set `concurrency` up to the number of jobs that were held in queue at the time. | ||
- if new `concurrency` is specified and is lower then previous setting, will postpone calling additional jobs until enough active jobs finish and make the actual concurrency degrade to the new setting by itself. | ||
- if new `capacity` is specified and is lower than previous setting, will reject the last jobs in queue with a `"ERR_CAPACITY_FULL"` type of `QyuError` as much as the difference from the previously set `capacity`. | ||
```javascript | ||
const q = new Qyu({concurrency: 1, capacity: 3}); | ||
q(job1); q(job2); q(job3); | ||
// Up until now, only job 1 was actually called due to the `concurrency` of 1. | ||
q.set({concurrency: 2, capacity: 2}); | ||
// At this point, job2 will be called as well due to `concurrency` being increased to 2, but also having the `capacity` decreased by 1 causes job3 to immediately dequeue and reject in order to not exceed the updated capacity. | ||
``` | ||
# Examples | ||
@@ -225,5 +242,5 @@ | ||
const | ||
Qyu = require('qyu'), | ||
axios = require('axios'), | ||
cheerio = require('cheerio'); | ||
Qyu = require('qyu'), | ||
axios = require('axios'), | ||
cheerio = require('cheerio'); | ||
@@ -230,0 +247,0 @@ (async () => { |
@@ -19,3 +19,3 @@ const | ||
function() { | ||
if (arguments[0] instanceof Array) { | ||
if (arguments[0][Symbol.iterator] instanceof Function) { | ||
return q.map(arguments[0], arguments[1], arguments[2]); | ||
@@ -64,2 +64,3 @@ } else { | ||
set(newOpts) { | ||
@@ -72,4 +73,11 @@ let oldOpts = this.opts; | ||
} | ||
if (newOpts.capacity) { | ||
while (this.jobQueue.length > newOpts.capacity) { | ||
this.jobQueue.pop().deferred.reject(new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity")); | ||
} | ||
} | ||
} | ||
async runJobChannel() { | ||
@@ -96,2 +104,3 @@ let job; | ||
async runJobChannels() { | ||
@@ -140,2 +149,3 @@ if (!this.isRunningJobChannels) { | ||
enqueue(fn, opts={}) { | ||
@@ -155,6 +165,5 @@ let job = { | ||
if (this.jobQueue.length === this.opts.capacity) { | ||
job.deferred.reject( | ||
new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity") | ||
); | ||
job.deferred.reject(new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity")); | ||
guardUnhandledPromiseRejections(job); | ||
return job.deferred.promise; | ||
} | ||
@@ -166,5 +175,3 @@ | ||
job.timeoutId = null; | ||
job.deferred.reject( | ||
new QyuError('ERR_JOB_TIMEOUT', "Job cancelled due to timeout") | ||
); | ||
job.deferred.reject(new QyuError('ERR_JOB_TIMEOUT', "Job cancelled due to timeout")); | ||
guardUnhandledPromiseRejections(job); | ||
@@ -185,2 +192,3 @@ }, opts.timeout); | ||
dequeue(promise) { | ||
@@ -196,2 +204,3 @@ for (let i=0; i<this.jobQueue.length; ++i) { | ||
add() { | ||
@@ -206,2 +215,3 @@ let fn = arguments[0]; | ||
map(iterator, fn, opts) { | ||
@@ -218,2 +228,3 @@ let counter = 0; | ||
pause() { | ||
@@ -231,2 +242,3 @@ if (this.isPaused) { | ||
resume() { | ||
@@ -243,6 +255,12 @@ if (!this.isPaused) { | ||
empty() { | ||
this.jobQueue.splice(0); | ||
for (let job of this.jobQueue.splice(0)) { | ||
job.deferred.reject(new QyuError('ERR_JOB_DEQUEUED', "Job was dequeued out of the queue")); | ||
guardUnhandledPromiseRejections(job); | ||
} | ||
return Promise.all(this.jobChannels); | ||
} | ||
whenEmpty() { | ||
@@ -252,2 +270,3 @@ return this.whenEmptyDeferred.promise; | ||
whenFree() { | ||
@@ -257,2 +276,3 @@ return this.whenFreeDeferred.promise; | ||
writeStream(chunkObjTransformer=v=>v) { | ||
@@ -273,2 +293,3 @@ let thisQueue = this; | ||
transformStream(chunkObjTransformer=v=>v) { | ||
@@ -275,0 +296,0 @@ let thisQueue = this; |
@@ -15,3 +15,26 @@ const { Qyu, QyuError } = require('../index'); | ||
const getPromiseStatus = async input => { | ||
let wasInputArray = input instanceof Array; | ||
if (!wasInputArray) { | ||
input = [input]; | ||
} | ||
let statuses = [...input].fill('pending'); | ||
input.forEach(async (promise, i) => { | ||
try { | ||
await promise; | ||
statuses[i] = 'resolved'; | ||
} catch (err) { | ||
statuses[i] = 'rejected'; | ||
} | ||
}); | ||
await delay(0); | ||
return wasInputArray ? statuses : statuses[0]; | ||
}; | ||
describe('When A Qyu instance is invoked as a function', () => { | ||
@@ -60,2 +83,14 @@ it("with a function as the first arg - should internally call the `add` method with the job and options and injecting any addiontional args passed into it", () => { | ||
it('will not call added functions if they exceed the capacity limit', () => { | ||
let q = new Qyu({concurrency: 1, capacity: 1}); | ||
let job1 = jest.fn(mockAsync); | ||
let job2 = jest.fn(mockAsync); | ||
q.add(job1); | ||
q.add(job2); | ||
expect(job1).toHaveBeenCalled(); | ||
expect(job2).not.toHaveBeenCalled(); | ||
}); | ||
it('will inject every 3rd and up additional arguments supplied to it to the job function itself', () => { | ||
@@ -109,2 +144,18 @@ let q = new Qyu(); | ||
it('rejects immediately with a QyuError of code "ERR_CAPACITY_FULL" if instance capacity is full', async () => { | ||
let q = new Qyu({capacity: 1}); | ||
q.add(mockAsync); | ||
q.add(mockAsync); // this queuing and the one above it fill the queue length up to 1 (the earlier was called immediately, and the current is then put in queue) | ||
let promise = q.add(() => {}); // this is expected to reject since the current length of queue should be 1 at that point, which equals to the max capacity of 1 | ||
expect(await getPromiseStatus(promise)).toBe('rejected'); | ||
try { await promise; } | ||
catch (err) { | ||
expect(err instanceof QyuError).toBe(true); | ||
expect(err.code).toBe('ERR_CAPACITY_FULL'); | ||
} | ||
}); | ||
it('that resolves only after the actual job is resolved', async () => { | ||
@@ -200,2 +251,59 @@ let done = false; | ||
describe('`empty` method', () => { | ||
it('should reject all queued jobs with a QyuError of code "ERR_JOB_DEQUEUED" and not call them', async () => { | ||
let q = new Qyu({concurrency: 1}); | ||
let fn = jest.fn(mockAsync); | ||
q.add(mockAsync); | ||
let p1 = q.add(fn); | ||
let p2 = q.add(fn); | ||
q.empty(); | ||
expect(await getPromiseStatus([p1, p2])).toMatchObject(['rejected', 'rejected']); | ||
try { await p1; } catch (err) { | ||
expect(err instanceof QyuError).toBe(true); | ||
expect(err.code).toBe('ERR_JOB_DEQUEUED'); | ||
} | ||
try { await p2; } catch (err) { | ||
expect(err instanceof QyuError).toBe(true); | ||
expect(err.code).toBe('ERR_JOB_DEQUEUED'); | ||
} | ||
expect(fn).not.toHaveBeenCalled(); | ||
}); | ||
it('should return a promise that resolves once all active jobs at the time of calling are done', async () => { | ||
let q = new Qyu({concurrency: 2}); | ||
let p1 = new Promise(resolve => { | ||
q.add(async () => { | ||
await mockAsync(); | ||
resolve(); | ||
}); | ||
}); | ||
let p2 = new Promise(resolve => { | ||
q.add(async () => { | ||
await mockAsync(); | ||
resolve(); | ||
}); | ||
}); | ||
let p3 = new Promise(resolve => { | ||
q.add(async () => { | ||
await mockAsync(); | ||
resolve(); | ||
}); | ||
}); | ||
await q.empty(); | ||
expect(await getPromiseStatus([p1, p2])).toMatchObject([ | ||
'resolved', | ||
'resolved' | ||
]); | ||
}); | ||
}); | ||
describe('`whenFree` method', () => { | ||
@@ -202,0 +310,0 @@ it('should return a promise', () => { |
@@ -84,3 +84,3 @@ const { Qyu, QyuError } = require('../index'); | ||
describe('The `set` method', () => { | ||
describe('if called with new concurrency value', () => { | ||
describe('if passed a new concurrency value', () => { | ||
it('if new value is bigger than previous, will immediately call more jobs as much as the difference from previous value', async () => { | ||
@@ -106,2 +106,24 @@ let q = new Qyu({concurrency: 1}); | ||
}); | ||
describe('if passed a new capacity value', () => { | ||
it('if new value is lower than previous, will immediately reject the last jobs in queue as much as the difference from previous value', async () => { | ||
let q = new Qyu({concurrency: 1, capacity: 4}); | ||
let fn1 = jest.fn(mockAsync); | ||
let fn2 = jest.fn(mockAsync); | ||
let fn3 = jest.fn(mockAsync); | ||
let fn4 = jest.fn(mockAsync); | ||
let fn5 = jest.fn(mockAsync); | ||
let p1 = q.add(fn1); | ||
let p2 = q.add(fn2); | ||
let p3 = q.add(fn3); | ||
let p4 = q.add(fn4); | ||
let p5 = q.add(fn5); | ||
q.set({capacity: 2}); | ||
expect(await getPromiseStatus([p2, p3, p4, p5])).toMatchObject(['pending', 'pending', 'rejected', 'rejected']); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
39127
718
267