Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

qyu

Package Overview
Dependencies
Maintainers
1
Versions
21
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

qyu - npm Package Compare versions

Comparing version 0.4.6 to 0.4.7

2

package.json
{
"name": "qyu",
"version": "0.4.6",
"version": "0.4.7",
"description": "A general-purpose asynchronous job queue for Node.js",

@@ -5,0 +5,0 @@ "keywords": [

@@ -113,3 +113,3 @@ # Qyu

Can be any positive/negative integer/float number.
The greater the priority value, the ealier it will be called relative to other jobs.
The greater the priority value, the earlier it will be called relative to other jobs.
Queuings having identical priority will be put one after another in the same order in which they were passed to the instance.

@@ -210,3 +210,3 @@ *(default: 0)*

#### instance#empty()
Immediately empties the instance's entire queue from all queued jobs.
Immediately empties the instance's queue from all queued jobs, rejecting the promises returned from their queuings with a `"ERR_JOB_DEQUEUED"` type of `QyuError`.
Jobs currently running at the time `instance.empty()` was called keep running until finished.

@@ -216,5 +216,22 @@ ```javascript

q(job1); q(job2);
q.empty(); // Because the concurrency was set to "1", job1 is already running in this point, but job2 will be dequeued and never run.
await q.empty(); // Because the concurrency was set to "1", job1 is already running in this point, but job2 will be dequeued and never run.
// The above "await" will resolve once job1 finally finishes...
```
#### instance#set(`config`)
Update a living instance's config options in real time (`concurrency`, `capacity` and `rampUpTime`).
Note these (expected) side effects:
- If new `concurrency` is specified and is greater than previous setting, new jobs will immediately be drawn and called from queue as much as the difference from previously set `concurrency` up to the number of jobs that were held in queue at the time.
- if new `concurrency` is specified and is lower then previous setting, will postpone calling additional jobs until enough active jobs finish and make the actual concurrency degrade to the new setting by itself.
- if new `capacity` is specified and is lower than previous setting, will reject the last jobs in queue with a `"ERR_CAPACITY_FULL"` type of `QyuError` as much as the difference from the previously set `capacity`.
```javascript
const q = new Qyu({concurrency: 1, capacity: 3});
q(job1); q(job2); q(job3);
// Up until now, only job 1 was actually called due to the `concurrency` of 1.
q.set({concurrency: 2, capacity: 2});
// At this point, job2 will be called as well due to `concurrency` being increased to 2, but also having the `capacity` decreased by 1 causes job3 to immediately dequeue and reject in order to not exceed the updated capacity.
```
# Examples

@@ -225,5 +242,5 @@

const
Qyu = require('qyu'),
axios = require('axios'),
cheerio = require('cheerio');
Qyu = require('qyu'),
axios = require('axios'),
cheerio = require('cheerio');

@@ -230,0 +247,0 @@ (async () => {

@@ -19,3 +19,3 @@ const

function() {
if (arguments[0] instanceof Array) {
if (arguments[0][Symbol.iterator] instanceof Function) {
return q.map(arguments[0], arguments[1], arguments[2]);

@@ -64,2 +64,3 @@ } else {

set(newOpts) {

@@ -72,4 +73,11 @@ let oldOpts = this.opts;

}
if (newOpts.capacity) {
while (this.jobQueue.length > newOpts.capacity) {
this.jobQueue.pop().deferred.reject(new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity"));
}
}
}
async runJobChannel() {

@@ -96,2 +104,3 @@ let job;

async runJobChannels() {

@@ -140,2 +149,3 @@ if (!this.isRunningJobChannels) {

enqueue(fn, opts={}) {

@@ -155,6 +165,5 @@ let job = {

if (this.jobQueue.length === this.opts.capacity) {
job.deferred.reject(
new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity")
);
job.deferred.reject(new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity"));
guardUnhandledPromiseRejections(job);
return job.deferred.promise;
}

@@ -166,5 +175,3 @@

job.timeoutId = null;
job.deferred.reject(
new QyuError('ERR_JOB_TIMEOUT', "Job cancelled due to timeout")
);
job.deferred.reject(new QyuError('ERR_JOB_TIMEOUT', "Job cancelled due to timeout"));
guardUnhandledPromiseRejections(job);

@@ -185,2 +192,3 @@ }, opts.timeout);

dequeue(promise) {

@@ -196,2 +204,3 @@ for (let i=0; i<this.jobQueue.length; ++i) {

add() {

@@ -206,2 +215,3 @@ let fn = arguments[0];

map(iterator, fn, opts) {

@@ -218,2 +228,3 @@ let counter = 0;

pause() {

@@ -231,2 +242,3 @@ if (this.isPaused) {

resume() {

@@ -243,6 +255,12 @@ if (!this.isPaused) {

empty() {
this.jobQueue.splice(0);
for (let job of this.jobQueue.splice(0)) {
job.deferred.reject(new QyuError('ERR_JOB_DEQUEUED', "Job was dequeued out of the queue"));
guardUnhandledPromiseRejections(job);
}
return Promise.all(this.jobChannels);
}
whenEmpty() {

@@ -252,2 +270,3 @@ return this.whenEmptyDeferred.promise;

whenFree() {

@@ -257,2 +276,3 @@ return this.whenFreeDeferred.promise;

writeStream(chunkObjTransformer=v=>v) {

@@ -273,2 +293,3 @@ let thisQueue = this;

transformStream(chunkObjTransformer=v=>v) {

@@ -275,0 +296,0 @@ let thisQueue = this;

@@ -15,3 +15,26 @@ const { Qyu, QyuError } = require('../index');

const getPromiseStatus = async input => {
let wasInputArray = input instanceof Array;
if (!wasInputArray) {
input = [input];
}
let statuses = [...input].fill('pending');
input.forEach(async (promise, i) => {
try {
await promise;
statuses[i] = 'resolved';
} catch (err) {
statuses[i] = 'rejected';
}
});
await delay(0);
return wasInputArray ? statuses : statuses[0];
};
describe('When A Qyu instance is invoked as a function', () => {

@@ -60,2 +83,14 @@ it("with a function as the first arg - should internally call the `add` method with the job and options and injecting any addiontional args passed into it", () => {

it('will not call added functions if they exceed the capacity limit', () => {
let q = new Qyu({concurrency: 1, capacity: 1});
let job1 = jest.fn(mockAsync);
let job2 = jest.fn(mockAsync);
q.add(job1);
q.add(job2);
expect(job1).toHaveBeenCalled();
expect(job2).not.toHaveBeenCalled();
});
it('will inject every 3rd and up additional arguments supplied to it to the job function itself', () => {

@@ -109,2 +144,18 @@ let q = new Qyu();

it('rejects immediately with a QyuError of code "ERR_CAPACITY_FULL" if instance capacity is full', async () => {
let q = new Qyu({capacity: 1});
q.add(mockAsync);
q.add(mockAsync); // this queuing and the one above it fill the queue length up to 1 (the earlier was called immediately, and the current is then put in queue)
let promise = q.add(() => {}); // this is expected to reject since the current length of queue should be 1 at that point, which equals to the max capacity of 1
expect(await getPromiseStatus(promise)).toBe('rejected');
try { await promise; }
catch (err) {
expect(err instanceof QyuError).toBe(true);
expect(err.code).toBe('ERR_CAPACITY_FULL');
}
});
it('that resolves only after the actual job is resolved', async () => {

@@ -200,2 +251,59 @@ let done = false;

describe('`empty` method', () => {
it('should reject all queued jobs with a QyuError of code "ERR_JOB_DEQUEUED" and not call them', async () => {
let q = new Qyu({concurrency: 1});
let fn = jest.fn(mockAsync);
q.add(mockAsync);
let p1 = q.add(fn);
let p2 = q.add(fn);
q.empty();
expect(await getPromiseStatus([p1, p2])).toMatchObject(['rejected', 'rejected']);
try { await p1; } catch (err) {
expect(err instanceof QyuError).toBe(true);
expect(err.code).toBe('ERR_JOB_DEQUEUED');
}
try { await p2; } catch (err) {
expect(err instanceof QyuError).toBe(true);
expect(err.code).toBe('ERR_JOB_DEQUEUED');
}
expect(fn).not.toHaveBeenCalled();
});
it('should return a promise that resolves once all active jobs at the time of calling are done', async () => {
let q = new Qyu({concurrency: 2});
let p1 = new Promise(resolve => {
q.add(async () => {
await mockAsync();
resolve();
});
});
let p2 = new Promise(resolve => {
q.add(async () => {
await mockAsync();
resolve();
});
});
let p3 = new Promise(resolve => {
q.add(async () => {
await mockAsync();
resolve();
});
});
await q.empty();
expect(await getPromiseStatus([p1, p2])).toMatchObject([
'resolved',
'resolved'
]);
});
});
describe('`whenFree` method', () => {

@@ -202,0 +310,0 @@ it('should return a promise', () => {

@@ -84,3 +84,3 @@ const { Qyu, QyuError } = require('../index');

describe('The `set` method', () => {
describe('if called with new concurrency value', () => {
describe('if passed a new concurrency value', () => {
it('if new value is bigger than previous, will immediately call more jobs as much as the difference from previous value', async () => {

@@ -106,2 +106,24 @@ let q = new Qyu({concurrency: 1});

});
describe('if passed a new capacity value', () => {
it('if new value is lower than previous, will immediately reject the last jobs in queue as much as the difference from previous value', async () => {
let q = new Qyu({concurrency: 1, capacity: 4});
let fn1 = jest.fn(mockAsync);
let fn2 = jest.fn(mockAsync);
let fn3 = jest.fn(mockAsync);
let fn4 = jest.fn(mockAsync);
let fn5 = jest.fn(mockAsync);
let p1 = q.add(fn1);
let p2 = q.add(fn2);
let p3 = q.add(fn3);
let p4 = q.add(fn4);
let p5 = q.add(fn5);
q.set({capacity: 2});
expect(await getPromiseStatus([p2, p3, p4, p5])).toMatchObject(['pending', 'pending', 'rejected', 'rejected']);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc