Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

qyu

Package Overview
Dependencies
Maintainers
1
Versions
21
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

qyu - npm Package Compare versions

Comparing version 0.4.4 to 0.4.5

tests/qyu.test.js

2

package.json
{
"name": "qyu",
"version": "0.4.4",
"version": "0.4.5",
"description": "A general-purpose asynchronous job queue for Node.js",

@@ -5,0 +5,0 @@ "keywords": [

@@ -26,3 +26,3 @@ # Qyu

// Doesn't matter if more jobs come around later,
// No matter if more jobs come around later!
// Qyu will queue them as necessary and optimally manage them all

@@ -52,2 +52,74 @@ // for you based on your concurrency setting

# Instance Config
Defaults:
```javascript
new Qyu({
concurrency: 1,
capacity: Infinity,
rampUpTime: 0
});
```
#### concurrency:
Determines the maximum number of jobs allowed to be run concurrently.
*(default: 1)*
```javascript
const q = new Qyu({concurrency: 2}); // Max 2 jobs can run concurrently
q(job1); // Runs immediately
q(job2); // Also runs immediately
q(job3); // will be queued up until either job1 or job2 is complete to maintain no more than 2 jobs at a time
```
#### capacity:
Sets a limit on the job queue length, causing any additional job queuings to be immediately rejected with a specific `"ERR_CAPACITY_FULL"` type of `QyuError`.
*(default: Infinity)*
```javascript
const q = new Qyu({capacity: 5});
// Queuing a batch of 6 jobs; since the 6th one crosses the max capcacity, it's returned promise is going to be immediately rejected
for (let i=0; i<6; i++) {
q(job)
.then(result => console.log(`Job ${i} complete!`, result))
.catch(err => console.error(`job ${i} error`, err)); // err is a QyuError with code: "ERR_CAPACITY_FULL"
}
```
#### rampUpTime:
If specified a non-zero number, will delay the concurrency-ramping-up time of additional job executions, one by one, as the instance attempts to reach maximum configured concurrency.
Represents number of milliseconds.
*(default: 0)*
```javascript
const q = new Qyu({
rampUpTime: 1000,
concurrency: 3
});
// Let's say each of these jobs is some long-running task:
q(job1);
q(job2);
q(job3);
q(job4);
// All 4 jobs are queued at the same time, but:
// job1 starts immediately, job2 will start after 1000ms, job3 will start after 2000ms, job4 crosses the max concurrency of 3, so will expectedly wait until either one of previous jobs is finished before it is started.
```
# Queuing options
Defaults:
```javascript
q(job, {
priority: 0,
timeout: null
});
```
#### priority:
Determines order in which queued jobs will run.
Can be any positive/negative integer/float number.
The greater the priority value, the ealier it will be called relative to other jobs.
Queuings having identical priority will be put one after another in the same order in which they were passed to the instance.
*(default: 0)*
#### timeout:
If is non-zero number, will dequeue jobs that waited in queue without running for that amount of time long (in milliseconds).
additionally, when a queued job reaches it's timeout, the promise it returned from it's queuing will reject with a `"ERR_JOB_TIMEOUT"` type of `QyuError`.
*(default: null)*
# API

@@ -54,0 +126,0 @@

@@ -38,6 +38,6 @@ const

class Qyu {
constructor(opts={}, job=null, jobOpts={}) {
constructor(opts={}, jobFn=null, jobOpts={}) {
this.getRampUpPromise = null;
this.jobQueue = [];
this.activeCount = 0;
this.jobChannels = [];
this.isAtMaxConcurrency = false;

@@ -57,4 +57,4 @@ this.isRunningJobChannels = false;

if (job) { // TODO: Add this feature in docs...
this.enqueue(job, jobOpts);
if (jobFn) { // TODO: Add this feature in docs...
this.enqueue(jobFn, jobOpts);
}

@@ -75,44 +75,20 @@

async runJobChannel() {
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue
if (!this.activeCount) {
this.whenEmptyDeferred = new Deferred;
}
++this.activeCount;
this.isAtMaxConcurrency = this.activeCount === this.opts.concurrency;
if (this.isAtMaxConcurrency) {
this.whenFreeDeferred = new Deferred;
}
let current;
let job;
while (
!this.isPaused &&
this.activeCount <= this.opts.concurrency &&
(current = this.jobQueue.shift())
this.jobChannels.length <= this.opts.concurrency &&
(job = this.jobQueue.shift())
) {
if (current.timeoutId) {
clearTimeout(current.timeoutId);
if (job.timeoutId) {
clearTimeout(job.timeoutId);
}
try {
let result = await current.job.apply(this, current.opts.args);
current.deferred.resolve(result);
let result = await job.fn.apply(this, job.opts.args);
job.deferred.resolve(result);
}
catch (err) {
current.deferred.reject(err);
guardUnhandledPromiseRejections(current);
job.deferred.reject(err);
guardUnhandledPromiseRejections(job);
}
}
if (this.isAtMaxConcurrency) {
this.whenFreeDeferred.resolve();
}
--this.activeCount;
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue
if (!this.activeCount) {
this.whenEmptyDeferred.resolve();
}
}

@@ -124,5 +100,33 @@

while (this.jobQueue.length && this.activeCount < this.opts.concurrency) {
this.runJobChannel();
if (this.opts.rampUpTime && this.activeCount) {
while (
this.jobQueue.length &&
!this.isPaused &&
this.jobChannels.length < this.opts.concurrency
) {
(async () => {
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue
if (!this.jobChannels.length) {
this.whenEmptyDeferred = new Deferred;
}
if (this.jobChannels.length === this.opts.concurrency - 1) {
this.whenFreeDeferred = new Deferred;
}
let promise = this.runJobChannel();
this.jobChannels.push(promise);
await promise;
this.jobChannels.splice(this.jobChannels.indexOf(promise), 1);
if (this.jobChannels.length === this.opts.concurrency - 1) {
this.whenFreeDeferred.resolve();
}
// TODO: Add additional condition here: "&& !this.jobQueue.length" for when pause() is engaged while there are still jobs in the jobQueue
if (!this.jobChannels.length && !this.isPaused) {
this.whenEmptyDeferred.resolve();
}
})();
if (this.opts.rampUpTime && this.jobChannels.length) {
await new Promise(resolve => setTimeout(resolve, this.opts.rampUpTime));

@@ -136,5 +140,5 @@ }

enqueue(inputJob, opts={}) {
let jobObject = {
job: inputJob,
enqueue(fn, opts={}) {
let job = {
fn: fn,
opts: {

@@ -151,16 +155,16 @@ timeout: null,

if (this.jobQueue.length === this.opts.capacity) {
jobObject.deferred.reject(
job.deferred.reject(
new QyuError('ERR_CAPACITY_FULL', "Can't queue job, queue is at max capacity")
);
guardUnhandledPromiseRejections(jobObject);
guardUnhandledPromiseRejections(job);
}
if (opts.timeout) {
jobObject.timeoutId = setTimeout(() => {
this.dequeue(jobObject.deferred.promise);
jobObject.timeoutId = null;
jobObject.deferred.reject(
job.timeoutId = setTimeout(() => {
this.dequeue(job.deferred.promise);
job.timeoutId = null;
job.deferred.reject(
new QyuError('ERR_JOB_TIMEOUT', "Job cancelled due to timeout")
);
guardUnhandledPromiseRejections(jobObject);
guardUnhandledPromiseRejections(job);
}, opts.timeout);

@@ -171,11 +175,9 @@ }

while (
i < this.jobQueue.length && opts.priority <= this.jobQueue[i].opts.priority
i < this.jobQueue.length && job.opts.priority <= this.jobQueue[i].opts.priority
) { ++i };
this.jobQueue.splice(i, 0, jobObject);
this.jobQueue.splice(i, 0, job);
if (!this.isPaused) {
this.runJobChannels();
}
this.runJobChannels();
return jobObject.deferred.promise;
return job.deferred.promise;
}

@@ -194,13 +196,8 @@

add() {
let job = arguments[0];
let opts;
if (arguments[1] instanceof Object) {
opts = arguments[1];
} else {
opts = {args: null};
}
let fn = arguments[0];
let opts = arguments[1] instanceof Object ? arguments[1] : {args: null};
if (arguments.length > 2) {
opts.args = Array.prototype.slice.call(arguments, 2);
}
return this.enqueue(job, opts);
return this.enqueue(fn, opts);
}

@@ -220,8 +217,21 @@

pause() {
if (this.isPaused) {
return;
}
this.isPaused = true;
if (!this.jobQueue.length && !this.jobChannels.length) {
this.whenEmptyDeferred = new Deferred;
}
// TODO: return a promise that will resolve when current jobs that were already running will finish. Perhaps: return this.whenEmpty();
return Promise.all(this.jobChannels);
}
resume() {
if (!this.isPaused) {
return;
}
this.isPaused = false;
if (!this.jobQueue.length && !this.jobChannels.length) {
this.whenEmptyDeferred.resolve();
}
this.runJobChannels();

@@ -228,0 +238,0 @@ }

@@ -233,3 +233,3 @@ const { Qyu, QyuError } = require('../index');

q.add(async () => {
await mockAsync(true, 1000);
await mockAsync(true, 100);
resolve();

@@ -239,3 +239,3 @@ });

q.add(fn, {timeout: 100});
q.add(fn, {timeout: 50});

@@ -251,6 +251,6 @@ await promise;

q.add(() => mockAsync(true, 1000));
q.add(() => mockAsync(true, 100));
try {
await q.add(() => {}, {timeout: 100});
await q.add(() => {}, {timeout: 50});
}

@@ -268,2 +268,35 @@ catch (err) {

describe('The `priority` option, when adding a task', () => {
it('will default to 0', async () => {
let q = new Qyu({ concurrency: 1 });
let actualOrder = [];
let push = value => actualOrder.push(value);
q.add(mockAsync); // To raise activity to max concurrency...
await Promise.all([
new Promise(resolve => q.add(() => { push('a'); resolve(); }, {priority: -1})),
new Promise(resolve => q.add(() => { push('b'); resolve(); }, {priority: 1})),
new Promise(resolve => q.add(() => { push('c'); resolve(); }, {}))
]);
expect(actualOrder).toMatchObject(['b', 'c', 'a']);
});
it('will queue jobs with the same priority by the order they were added', async () => {
let q = new Qyu({ concurrency: 1 });
let actualOrder = [];
let push = value => actualOrder.push(value);
q.add(mockAsync); // To raise activity to max concurrency...
await Promise.all([
new Promise(resolve => q.add(() => { push('a'); resolve(); }, {priority: 0})),
new Promise(resolve => q.add(() => { push('b'); resolve(); }, {priority: 0})),
new Promise(resolve => q.add(() => { push('c'); resolve(); }, {priority: 0})),
new Promise(resolve => q.add(() => { push('d'); resolve(); }, {priority: 0}))
]);
expect(actualOrder).toMatchObject(['a', 'b', 'c' ,'d']);
});
it('if currently running jobs are at the concurrency limit, queue a job AFTER jobs with more or equal priority, and BEFORE other jobs that have less priority if any', async () => {

@@ -270,0 +303,0 @@ let q = new Qyu({ concurrency: 1 });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc