Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

queue

Package Overview
Dependencies
Maintainers
1
Versions
38
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

queue - npm Package Compare versions

Comparing version 0.0.2 to 1.0.0

73

example/example.js

@@ -8,36 +8,69 @@ #!/usr/bin/env node

var Queue = require('../queue');
var Queue = require("../queue");
var q = new Queue({
timeout: 100,
concurrency: 100
});
var results = [];
var q = new Queue();
// advance handler
q.on("advance", function () {
console.log("The queue is about to advance");
// listen for events
q.on('processed', function(job) {
console.log('job finished processing:', job.toString().replace(/\n/g, ''));
});
// drain handler
q.on("drain", function () {
console.log("All done:", results);
q.on('drain', function() {
console.log('all done:', results);
});
// add individual functions
q.push(function (cb) {
results.push("one");
// add jobs using the familiar Array API
q.push(function(cb) {
results.push('two');
cb();
}, function (err, jobQueue) {
console.log("This is a job specific callback");
});
// add arrays of functions
q.push([
function (cb) {
results.push("two");
q.push(
function(cb) {
results.push('four');
cb();
},
function (cb) {
results[2] = "three";
function(cb) {
results.push('five');
cb();
}
]);
);
q.unshift(function(cb) {
results.push('one');
cb();
});
q.splice(2, 0, function(cb) {
results.push('three');
cb();
});
// use the timeout feature to deal with jobs that
// take too long or forget to execute a callback
q.on('timeout', function(job, next) {
console.log('job timed out:', job.toString().replace(/\n/g, ''));
next();
});
q.push(function(cb) {
setTimeout(function() {
console.log('slow job finished');
cb();
}, 200);
});
q.push(function(cb) {
console.log('forgot to execute callback');
});
{
"name": "queue",
"version": "0.0.2",
"version": "1.0.0",
"description": "An async job queue with adjustable concurrency",

@@ -5,0 +5,0 @@ "main": "queue.js",

@@ -6,60 +6,68 @@ /*

module.exports = Queue;
var util = require("util");
var EventEmitter = require("events").EventEmitter;
var EventEmitter = require('events').EventEmitter;
function Queue (concurrency) {
this.concurrency = concurrency || 1;
this.active = [];
function Queue(options) {
options = options || {};
this.concurrency = options.concurrency || 1;
this.timeout = options.timeout || 0;
this.pending = 0;
this.jobs = [];
}
util.inherits(Queue, EventEmitter);
Queue.prototype = new EventEmitter;
Queue.prototype.push = function (job, cb) {
var self = this;
if (this.jobs.length === 0) {
process.nextTick(function () {
self.run();
});
Queue.prototype.__defineGetter__('length', function() {
return this.pending + this.jobs.length;
});
// expose selected array methods
[ 'pop', 'shift', 'slice', 'reverse', 'indexOf', 'lastIndexOf' ].forEach(function(method) {
Queue.prototype[method] = function() {
return Array.prototype[method].apply(this.jobs, arguments);
}
if (job instanceof Array) {
for (var i in job) {
var j = job[i];
var c = null;
if (cb instanceof Array) {
c = cb[i];
}
self.push(j, c);
}
} else {
this.jobs.push([job, cb]);
});
// additive Array methods should auto-advance the queue
[ 'push', 'unshift', 'splice' ].forEach(function(method) {
Queue.prototype[method] = function() {
process.nextTick(this.process.bind(this));
return Array.prototype[method].apply(this.jobs, arguments);
}
}
});
Queue.prototype.run = function () {
if (this.jobs.length > 0 && this.active.length < this.concurrency) {
Queue.prototype.process = function() {
if (this.jobs.length > 0 && this.pending < this.concurrency) {
this.pending++;
var job = this.jobs.shift();
this.active.push(job);
this.run();
var self = this;
var cb = job[1];
job = job[0];
job(function (err) {
if (cb) cb(err, self);
self.emit("advance", err, self);
if (self.jobs.length === 0 && self.active.length === 1) {
self.active = [];
self.emit("drain", self);
} else {
self.active.shift();
self.run();
var once = true;
var timeoutId = null;
var didTimeout = false;
var next = function() {
if (once) {
once = false;
self.pending--;
if (timeoutId !== null) clearTimeout(timeoutId);
if (didTimeout === false) self.emit('processed', job);
if (self.pending === 0 && self.jobs.length === 0) {
self.emit('drain', self);
} else {
self.process();
}
}
});
}
if (this.timeout) {
timeoutId = setTimeout(function() {
didTimeout = true;
self.emit('timeout', job, next);
}, this.timeout);
}
job(next);
this.process();
}
}
Queue.prototype.empty = function (job) {
this.jobs = [];
}
module.exports = Queue;

@@ -12,51 +12,98 @@ ```

## Why
[async](https://github.com/caolan/async#queue)'s queue expects you to have one worker and many jobs. This queue simply expects a list of async functions, which is a bit more flexible - otherwise it's the same idea.
Wanted something more flexible than [async](https://github.com/caolan/async#queue)'s queue.
## How
The module exports a class named ```Queue```. Pass the desired concurrency to the constructor, or change it later via the ```concurrency``` property. Pass async functions (ones that accept a callback) to an instance's ```push()``` method. Processing begins automatically on ```process.nextTick()```.
The module exports a class named `Queue` that implements most of the Array api. Pass async functions (ones that accept a callback) to an instance's `push()` method. Processing begins automatically on `process.nextTick()`.
## Install
```npm install queue```
`npm install queue`
## Properties
* ```concurrency``` maximum number of jobs that the queue should process concurrently - the default is 1
* `concurrency` maximum number of jobs that the queue should process concurrently - the default is 1
* `timeout` milliseconds to wait for a job to execute its callback
## Methods
* ```push(job, cb)``` add a job (and optional callback) to the queue
* ```empty()``` remove any remaining jobs in the queue
* ```run()``` force run the queue immediately
* `push(job)` add a job to the queue
## Events
* ```"advance"``` fires after any job finishes
* ```"drain"``` fires when the queue finishes processing all its jobs
* `'processed'` when jobs finish
* `'timeout'` when `queue.timeout` milliseconds have elapsed and a job has not executed its callback
* `'drain'` when the queue finishes processing all its jobs
## Usage
```javascript
var Queue = require("../queue");
var Queue = require('queue');
var q = new Queue({
timeout: 100,
concurrency: 100
});
var results = [];
var q = new Queue();
// add a drain handler
q.on("drain", function () {
console.log("All done:", results);
// listen for events
q.on('processed', function(job) {
console.log('job finished processing:', job.toString().replace(/\n/g, ''));
});
// add individual functions
q.push(function (cb) {
results.push("one");
q.on('drain', function() {
console.log('all done:', results);
});
// add jobs using familiar Array api
q.push(function(cb) {
results.push('two');
cb();
});
// add arrays of functions
q.push([
function (cb) {
results.push("two");
q.push(
function(cb) {
results.push('four');
cb();
},
function (cb) {
results[2] = "three";
function(cb) {
results.push('five');
cb();
}
]);
);
q.unshift(function(cb) {
results.push('one');
cb();
});
q.splice(2, 0, function(cb) {
results.push('three');
cb();
});
// use the timeout feature to deal with jobs that
// take too long or forget to execute a callback
q.on('timeout', function(job, next) {
console.log('job timed out:', job.toString().replace(/\n/g, ''));
next();
})
q.push(function(cb) {
setTimeout(function() {
console.log('slow job finished');
cb();
}, 200);
});
q.push(function(cb) {
console.log('forgot to execute callback');
});
```
## Note
Version 1.0 introduces api changes and is NOT backwards compatible with 0.0.2
## License
[WTFPL](http://www.wtfpl.net/txt/copying/)

@@ -8,10 +8,9 @@ #!/usr/bin/env node

var assert = require('assert');
var Queue = require('../queue');
var assert = require("assert");
var Queue = require("../queue");
var answers = [];
var q = new Queue(100);
q.on("drain", function () {
var solutions = ["one", "two", "three"];
var q = new Queue({ concurrency: 100 });
q.on('drain', function() {
var solutions = [ 'one', 'two', 'three' ];
for (var i in answers) {

@@ -22,8 +21,8 @@ var answer = answers[i];

}
console.log("It works! ✔");
console.log('It works! ✔');
});
q.push(function (cb) {
setTimeout(function () {
answers.push("one");
q.push(function(cb) {
setTimeout(function() {
answers.push('one');
cb();

@@ -33,14 +32,14 @@ }, 1);

q.push(function (cb) {
setTimeout(function () {
answers.push("three");
q.push(function(cb) {
setTimeout(function() {
answers.push('three');
cb();
}, 3);
}, 6);
});
q.push(function (cb) {
setTimeout(function () {
answers.push("two");
q.push(function(cb) {
setTimeout(function() {
answers.push('two');
cb();
}, 2);
}, 3);
});

@@ -8,10 +8,9 @@ #!/usr/bin/env node

var assert = require('assert');
var Queue = require('../queue');
var assert = require("assert");
var Queue = require("../queue");
var answers = [];
var q = new Queue();
q.on("drain", function () {
var solutions = ["one", "two", "three"];
q.on('drain', function() {
var solutions = [ 'one', 'two', 'three' ];
for (var i in answers) {

@@ -22,18 +21,18 @@ var answer = answers[i];

}
console.log("It works! ✔");
console.log('It works! ✔');
});
q.push(function (cb) {
answers.push("one");
q.push(function(cb) {
answers.push('one');
cb();
});
q.push(function (cb) {
answers.push("two");
q.push(function(cb) {
answers.push('two');
cb();
});
q.push(function (cb) {
answers.push("three");
q.push(function(cb) {
answers.push('three');
cb();
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc