Socket
Socket
Sign inDemoInstall

better-queue

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

better-queue - npm Package Compare versions

Comparing version 1.0.0 to 1.0.1

.travis.yml

61

lib/queue.js

@@ -1,2 +0,1 @@

var async = require('async');
var uuid = require('node-uuid');

@@ -36,3 +35,3 @@ var util = require('util');

self.processDelay = opts.processDelay || 0;
self.processTimeout = opts.processTimeout || Infinity;
self.maxTimeout = opts.maxTimeout || Infinity;
self.idleTimeout = opts.idleTimeout || 0;

@@ -113,4 +112,4 @@ self.maxRetries = opts.maxRetries || 0;

ticket
.on('done', function (result) { cb(null, result) })
.on('fail', function (err) { cb(err) })
.on('finish', function (result) { cb(null, result) })
.on('failed', function (err) { cb(err) })
}

@@ -123,2 +122,3 @@

var taskId = task.id || uuid.v4();
ticket.id = taskId;
ticket.accept();

@@ -224,2 +224,9 @@ self._queueTask(taskId, task, ticket);

if (self.concurrent - self._running > 1) {
// Continue processing until saturated
setImmediate(function () {
self._processNext();
})
}
// Fetch next batch

@@ -243,3 +250,3 @@ self._getNextBatch(function (err, batch) {

if (ticket) {
ticket.started(batch[taskId].total);
ticket.started();
tickets[taskId] = ticket;

@@ -251,7 +258,2 @@ delete self._tickets[taskId];

self._startBatch(batch, tickets);
// Continue processing until saturated
setImmediate(function () {
self._processNext();
})
});

@@ -263,3 +265,2 @@ }

var taskIds = Object.keys(batch);
var timeout = null;

@@ -271,8 +272,9 @@ var worker = new Worker({

})
if (self.processTimeout < Infinity) {
if (self.maxTimeout < Infinity) {
timeout = setTimeout(function () {
worker.failed('task_timeout');
}, self.processTimeout);
}, self.maxTimeout);
}
worker.on('task_failed', function (taskId, msg) {
worker.on('task_failed', function (id, msg) {
var taskId = taskIds[id];
self._retries[taskId] = self._retries[taskId] || 0;

@@ -293,3 +295,4 @@ self._retries[taskId]++;

})
worker.on('task_finish', function (taskId, result) {
worker.on('task_finish', function (id, result) {
var taskId = taskIds[id];
if (tickets[taskId]) {

@@ -301,9 +304,19 @@ tickets[taskId].finish(result);

})
worker.on('task_progress', function (taskId, completed) {
worker.on('task_progress', function (id, completed, total) {
var taskId = taskIds[id];
if (tickets[taskId]) {
tickets[taskId].progress(completed);
tickets[taskId].progress(completed, total);
delete tickets[taskId];
}
self.emit('task_progress', taskId, completed);
self.emit('task_progress', taskId, completed, total);
})
worker.on('progress', function (progress) {
self.emit('progress', progress);
})
worker.on('finish', function (result) {
self.emit('finish', result);
})
worker.on('failed', function (err) {
self.emit('failed', err);
})
worker.on('end', function () {

@@ -317,5 +330,11 @@ self._running--;

});
setTimeout(function () {
self._processNext();
}, self.idleTimeout);
if (self.idleTimeout) {
setTimeout(function () {
self._processNext();
}, self.idleTimeout);
} else {
setImmediate(function () {
self._processNext();
})
}
})

@@ -322,0 +341,0 @@

@@ -34,4 +34,4 @@

Ticket.prototype.started = function (total) {
this.eta.count = total || 1;
Ticket.prototype.started = function () {
this.eta.count = 1;
this.eta.start();

@@ -46,3 +46,3 @@ this.isStarted = true;

this.status = 'failed';
this.emit('fail', msg);
this.emit('failed', msg);
}

@@ -55,3 +55,3 @@

this.result = result;
this.emit('done', this.result);
this.emit('finish', this.result);
}

@@ -67,4 +67,5 @@

Ticket.prototype.progress = function (current) {
Ticket.prototype.progress = function (current, total) {
this.eta.done = current;
this.eta.count = total;
this.emit('progress', {

@@ -71,0 +72,0 @@ current: this.eta.done,

@@ -42,10 +42,9 @@

// Setup
self._taskIds.forEach(function (taskId) {
self._waiting[taskId] = true;
self.progress.tasks[taskId] = {
self._taskIds.forEach(function (taskId, id) {
self._waiting[id] = true;
self.progress.tasks[id] = {
pct: 0,
complete: 0,
total: self.batch[taskId].total || 1,
total: 1,
}
self.progress.total += self.batch[taskId].total || 1;
})

@@ -64,5 +63,5 @@ }

self.status = 'in-progress';
var tasks = self.batch;
var tasks = self._taskIds.map(function (taskId) { return self.batch[taskId] });
if (self.single) {
tasks = self.batch[self._taskIds[0]]
tasks = tasks[0]
}

@@ -111,3 +110,3 @@ self._process = self.fn.call(self, tasks, function (err, result) {

Worker.prototype.failed = function (taskId, msg) {
Worker.prototype.failed = function (id, msg) {
var self = this;

@@ -117,17 +116,19 @@ if (!self.active) return;

// Apply to remaining
msg = taskId;
Object.keys(self._waiting).forEach(function (taskId) {
self.failed(taskId, msg);
msg = id;
Object.keys(self._waiting).forEach(function (id) {
if (!self._waiting[id]) return;
self.failed(id, msg);
})
self.emit('failed', msg);
self.end();
} else {
// Apply to taskId
delete self._waiting[taskId];
} else if (self._waiting[id]) {
// Apply to id
self._waiting[id] = false;
self.counts.failed++;
self.counts.completed++;
self.emit('task_failed', taskId, msg);
self.emit('task_failed', id, msg);
}
}
Worker.prototype.finish = function (taskId, result) {
Worker.prototype.finish = function (id, result) {
var self = this;

@@ -137,36 +138,42 @@ if (!self.active) return;

// Apply to remaining
result = taskId;
Object.keys(self._waiting).forEach(function (taskId) {
self.finish(taskId, result);
result = id;
Object.keys(self._waiting).forEach(function (id) {
if (!self._waiting[id]) return;
self.finish(id, result);
})
self.emit('finish', result);
self.end();
} else {
// Apply to taskId
delete self._waiting[taskId];
} else if (self._waiting[id]) {
// Apply to id
self._waiting[id] = false;
self.counts.finished++;
self.counts.completed++;
self.emit('task_finish', taskId, result);
self.emit('task_finish', id, result);
}
}
Worker.prototype.progress = function (taskId, completed) {
Worker.prototype.progress = function (id, completed, total) {
var self = this;
if (!self.active) return;
if (completed === undefined) {
if (total === undefined) {
// Apply to remaining
completed = taskId;
Object.keys(self._waiting).forEach(function (taskId) {
self.progress(taskId, completed*self.progress.tasks[taskId].total);
completed = id;
total = completed;
Object.keys(self._waiting).forEach(function (id) {
if (!self._waiting[id]) return;
self.progress(id, completed, total);
})
} else {
// Apply to taskId
self.progress.complete = 0;
self.progress.tasks[taskId].complete = completed;
self.progress.tasks[taskId].pct = Math.max(0, Math.min(1, completed/self.progress.tasks[taskId].total));
self._taskIds.forEach(function (taskId) {
self.progress.complete += self.progress.tasks[taskId].pct;
self._taskIds.forEach(function (taskId, id) {
self.progress.complete += self.progress.tasks[id].pct;
})
self._eta.done = self.progress.complete;
self.progress.eta = self._eta.format('{{etah}}')
self.emit('task_progress', taskId, completed);
self.emit('progress', self.progress);
} else if (self._waiting[id] && total !== 0) {
// Apply to id
self.progress.tasks[id].complete = completed;
self.progress.tasks[id].total = total;
self.progress.tasks[id].pct = Math.max(0, Math.min(1, completed/total));
self.emit('task_progress', id, completed, total);
}

@@ -173,0 +180,0 @@ }

{
"name": "better-queue",
"version": "1.0.0",
"version": "1.0.1",
"description": "Better Queue for NodeJS",

@@ -40,3 +40,2 @@ "main": "lib/queue.js",

"dependencies": {
"async": "^1.5.2",
"node-eta": "^0.9.0",

@@ -43,0 +42,0 @@ "node-uuid": "^1.4.7"

@@ -1,5 +0,337 @@

# better-queue
Better Queue for NodeJS
# Better Queue - Powerful flow control
[![npm package](https://nodei.co/npm/better-queue.png?downloads=true&downloadRank=true&stars=true)](https://nodei.co/npm/better-queue/)
Queues can be quite hard. There's a lot of cases to consider. Luckily, better-queue handles all of these cases!
[![Build status](https://img.shields.io/travis/leanderlee/better-queue.svg?style=flat-square)](https://travis-ci.org/leanderlee/better-queue)
[![Dependency Status](https://img.shields.io/david/leanderlee/better-queue.svg?style=flat-square)](https://david-dm.org/leanderlee/better-queue)
[![Known Vulnerabilities](https://snyk.io/test/npm/better-queue/badge.svg?style=flat-square)](https://snyk.io/test/npm/better-queue)
[![Gitter](https://img.shields.io/badge/gitter-join_chat-blue.svg?style=flat-square)](https://gitter.im/leanderlee/better-queue?utm_source=badge)
## Super simple to use
Better Queue is designed to be simple to set up but still let you do complex things.
```js
var Queue = require('better-queue');
var q = new Queue(function (n, cb) {
cb(null, n+1);
})
q.push(1)
q.push(2)
q.push(3)
```
## Table of contents
- [Setting Up](#setting-up-the-queue)
- [Queuing](#queuing)
- [Task Management](#task-management)
- [Timing](#timing)
- [Control Flow](#control-flow)
- [Storage](#storage)
- [Full Documentation](#full-documentation)
---
You will be able to combine any (and all) of these options
for your queue!
## Setting up the queue
You can control how many tasks happen at the same time.
```js
var q = new Queue(fn, { concurrent: 3 })
```
Now the queue will allow 3 tasks running at the same time. (By
default, we handle tasks one at a time.)
You can also turn the queue into a stack by turning on `filo`.
```js
var q = new Queue(fn, { filo: true })
```
Now items you push on will be handled first.
## Queuing
It's very easy to push tasks into the queue.
```js
var q = new Queue(fn);
q.push(1);
q.push({ x: 1, y: 2 });
q.push("hello");
```
You can also include a callback as a second parameter to the push
function, which would be called when that task is done. For example:
```js
var q = new Queue(fn);
q.push(1, function (err, result) {
// Results from the task!
});
```
You can also listen to events on the results of the `push` call.
```js
var q = new Queue(fn);
q.push(1)
.on('done', function (result) {
// Task succeeded with {result}!
})
.on('fail', function (err) {
// Task failed!
})
```
Alternatively, you can subscribe to the queue's events.
```js
var q = new Queue(fn);
q.on('task_finish', function (taskId, result) {
// taskId = 1, result: 3
// taskId = 2, result: 5
})
q.on('task_failed', function (taskId, err) {
// Handle error
})
q.on('empty', function (){})
q.on('drain', function (){})
q.push({ id: 1, a: 1, b: 2 });
q.push({ id: 2, a: 2, b: 3 });
```
`empty` event fires when all of the tasks have been pulled off of
the queue (there may still be tasks running!)
`drain` event fires when there are no more tasks on the queue _and_
when no more tasks are running.
[back to top](#table-of-contents)
---
## Task Management
#### Batch Processing
Tasks can be identified by `task.id`. If it isn't defined,
a unique ID is automatically assigned. One thing you can do
with Task ID is merge tasks with the same ID.
```js
var counter = new Queue(function (task, cb) {
console.log("I have %d %ss.", task.count, task.id);
cb();
}, {
merge: function (oldTask, newTask, cb) {
oldTask.count += newTask.count;
cb(null, oldTask);
}
})
counter.push({ id: 'apple', count: 2 });
counter.push({ id: 'apple', count: 1 });
counter.push({ id: 'orange', count: 1 });
counter.push({ id: 'orange', count: 1 });
// Prints out:
// I have 3 apples.
// I have 2 oranges.
```
Your processing function can also be modified to handle multiple
tasks at the same time. For example:
```js
var ages = new Queue(function (batch, cb) {
// Batch 1:
// [ { id: 'steve', age: 21 },
// { id: 'john', age: 34 },
// { id: 'joe', age: 18 } ]
// Batch 2:
// [ { id: 'mary', age: 23 } ]
cb();
}, { batchSize: 3 })
ages.push({ id: 'steve', age: 21 });
ages.push({ id: 'john', age: 34 });
ages.push({ id: 'joe', age: 18 });
ages.push({ id: 'mary', age: 23 });
```
Note how the queue will only handle at most 3 items at a time.
Below is another example of a batched call with numbers.
```js
var ages = new Queue(function (batch, cb) {
// batch = [1,2,3]
cb();
}, { batchSize: 3 })
ages.push(1);
ages.push(2);
ages.push(3);
```
#### Filtering, Validation and Priority
You can also format (and filter) the input that arrives from a push
before it gets processed by the queue by passing in a `filter`
function.
```js
var greeter = new Queue(function (name, cb) {
console.log("Hello, %s!", name)
cb();
}, {
filter: function (input, cb) {
if (input === 'Bob') {
return cb('not_allowed');
}
return cb(null, input.toUpperCase())
}
});
greeter.push('anna'); // Prints 'Hello, ANNA!'
```
This can be particularly useful if your queue needs to do some pre-processing,
input validation, database lookup, etc. before you load it onto the queue.
You can also define a priority function to control which tasks get
processed first.
```js
var greeter = new Queue(function (name, cb) {
console.log("Greetings, %s.", name);
cb();
}, {
priority: function (name, cb) {
if (name === "Steve") return cb(null, 10);
if (name === "Mary") return cb(null, 5);
if (name === "Joe") return cb(null, 5);
cb(null, 1);
}
})
greeter.push("Steve");
greeter.push("John");
greeter.push("Joe");
greeter.push("Mary");
// Prints out:
// Greetings, Steve.
// Greetings, Joe.
// Greetings, Mary.
// Greetings, John.
```
If `filo` is set to `true` in the example above, then Joe and Mary
would swap order.
#### Retry
You can set tasks to retry `maxRetries` times if they fail. By default,
tasks will fail (and will not retry.)
```js
var q = new Queue(fn, { maxRetries: 10 })
```
#### Progress/Finish/Fail Specific Tasks
The process function will be run in a context of a `Worker` object, which
gives you access to functions to help report on the status of specific tasks.
The example below illustrates how you can use these functions:
```js
var uploader = new Queue(function (file, cb) {
this.progress()
});
uploader.on('task_progress', function (taskId, progress) {})
uploader.push('/some/file.jpg')
.on('progress', function (err, progress) {
// progress.eta - human readable string estimating time remaining
// progress.pct - % complete (out of 100)
// progress.current - # completed so far
// progress.total - # for completion
})
```
[back to top](#table-of-contents)
---
## Timing
You can configure the queue to have a `maxTimeout`.
```js
var q = new Queue(function (name, cb) {
someLongTask(function () {
cb();
})
}, { maxTimeout: 2000 })
```
After 2 seconds, the process will throw an error instead of waiting for the
callback to finish.
You can also delay the queue before it starts its processing. This is the
behaviour of a timed cargo.
```js
var q = new Queue(function (batch, cb) {
// Batch [1,2] will process after 2s.
cb();
}, { batchSize: 5, processDelay: 2000 })
q.push(1);
setTimeout(function () {
q.push(2);
}, 1000)
```
You can also set `idleTimeout`, which will delay processing between tasks.
```js
var q = new Queue(function (task, cb) {
cb(); // Will wait 1 second before taking the next task
}, { idleTimeout: 1000 })
q.push(1);
q.push(2);
```
[back to top](#table-of-contents)
---
## Control Flow
There are even more options to control
- cancel, pause, resume
- cancelIfRunning
[back to top](#table-of-contents)
---
## Storage
- store
[back to top](#table-of-contents)
---
## Full Documentation
var Queue = require('./lib/queue')
var q = new Queue(function (task, cb) {
console.log('working' + task);
setTimeout(function () {
console.log('finished' + task);
cb(null, { message: 'done' + task });
}, task*1000);
}, { concurrent: 2, idleTimeout: 1000 })
console.log('queued 1')
q.push(1);
console.log('queued 2')
q.push(2);
console.log('queued 3')
q.push(3);
console.log('queued 4')
q.push(4);
// var q = new Queue(function (task, cb) {
// console.log('working' + task);
// setTimeout(function () {
// console.log('finished' + task);
// cb(null, { message: 'done' + task });
// }, task*1000);
// }, { concurrent: 2, idleTimeout: 1000 })
// console.log('queued 1')
// q.push(1);
// console.log('queued 2')
// q.push(2);
// console.log('queued 3')
// q.push(3);
// console.log('queued 4')
// q.push(4);
// var greeter = new Queue(function (name, cb) {
// console.log("Hello, %s!", name)
// cb();
// }, {
// filter: function (input, cb) {
// if (input === 'Bob') {
// return cb('not_allowed');
// }
// return cb(null, input.toUpperCase())
// }
// });
// greeter.push('anna'); // Prints 'Hello, ANNA!'
// var counter = new Queue(function (task, cb) {
// console.log("I have %d %ss.", task.count, task.id);
// cb();
// }, {
// merge: function (oldTask, newTask, cb) {
// oldTask.count += newTask.count;
// cb(null, oldTask);
// }
// })
// counter.push({ id: 'apple', count: 2 });
// counter.push({ id: 'apple', count: 1 });
// counter.push({ id: 'orange', count: 1 });
// counter.push({ id: 'orange', count: 1 });
// var greeter = new Queue(function (name, cb) {
// console.log("Greetings, %s.", name);
// cb();
// }, {
// priority: function (name, cb) {
// if (name === "Steve") return cb(null, 10);
// if (name === "Mary") return cb(null, 5);
// if (name === "Joe") return cb(null, 5);
// cb(null, 1);
// }
// })
// greeter.push("Steve");
// greeter.push("John");
// greeter.push("Joe");
// greeter.push("Mary");
// var q = new Queue(function (batch, cb) {
// console.log(batch.length)
// cb();
// }, { batchSize: 10, processDelay: 2000 })
// q.push(1);
// q.push(2);
// setTimeout(function () {
// q.push(3);
// q.push(4);
// q.push(5);
// }, 1000)
// var q = new Queue(function (task, cb) {
// console.log("Finished %s.", task)
// cb();
// }, { idleTimeout: 1000 })
// q.push("task1");
// q.push("task2");
var ages = new Queue(function (batch, cb) {
// Batch:
// {
//
// }
console.log(batch);
cb();
}, { batchSize: 3 })
ages.push({ id: 'steve', age: 21 });
ages.push({ id: 'john', age: 34 });
ages.push({ id: 'joe', age: 18 });
ages.push({ id: 'mary', age: 23 });

@@ -91,3 +91,3 @@ var assert = require('assert');

it('should max timeout', function (done) {
var q = new Queue(function (tasks, cb) {}, { processTimeout: 1 })
var q = new Queue(function (tasks, cb) {}, { maxTimeout: 1 })
q.on('task_failed', function (taskId, msg) {

@@ -94,0 +94,0 @@ assert.equal(msg, 'task_timeout');

@@ -37,3 +37,3 @@ var assert = require('assert');

assert.ok(!t.isFinished, 'ticket is not finished');
t.once('done', function (result) {
t.once('finish', function (result) {
assert.deepEqual(result, { x: 1 });

@@ -48,3 +48,3 @@ assert.ok(t.isFinished, 'ticket is finished');

assert.ok(!t.isFailed, 'ticket not failed');
t.once('fail', function (err) {
t.once('failed', function (err) {
assert.equal(err, 'some_error');

@@ -58,6 +58,5 @@ assert.ok(t.isFailed, 'ticket failed');

it('should progress and emit', function (done) {
t.started(2);
t.started();
t.once('progress', function (progress) {
assert.equal(progress.pct, 50);
assert.equal(progress.current, 1);
assert.equal(typeof progress.eta, 'string');

@@ -70,5 +69,5 @@ t.once('progress', function (progress) {

});
t.progress(2);
t.progress(2, 2);
});
t.progress(1);
t.progress(1, 2);
})

@@ -75,0 +74,0 @@

@@ -46,3 +46,3 @@ var assert = require('assert');

assert.ok(!t2.isFinished, 'ticket 2 is not finished');
t2.once('done', function (result) {
t2.once('finish', function (result) {
assert.deepEqual(result, { x: 1 });

@@ -59,3 +59,3 @@ assert.ok(t2.isFinished, 'ticket 2 is finished');

var called = 0;
t1.once('fail', function (err) {
t1.once('failed', function (err) {
assert.equal(err, 'some_error');

@@ -66,3 +66,3 @@ assert.ok(t1.isFailed, 'ticket 1 failed');

})
t2.once('fail', function (err) {
t2.once('failed', function (err) {
assert.equal(err, 'some_error');

@@ -77,3 +77,2 @@ assert.ok(t2.isFailed, 'ticket 2 failed');

it('should progress and emit', function (done) {
ts.started(2);
t1.once('progress', function (progress) {

@@ -89,5 +88,5 @@ assert.equal(progress.pct, 50);

});
ts.progress(2);
ts.progress(2, 2);
});
ts.progress(1);
ts.progress(1, 2);
})

@@ -94,0 +93,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc