Socket
Socket
Sign inDemoInstall

fastq

Package Overview
Dependencies
Maintainers
1
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fastq - npm Package Compare versions

Comparing version 1.2.0 to 1.3.0

8

package.json
{
"name": "fastq",
"version": "1.2.0",
"version": "1.3.0",
"description": "Fast, in memory work queue",

@@ -16,3 +16,3 @@ "main": "queue.js",

"type": "git",
"url": "git+https://github.com/mcollina/fastqueue.git"
"url": "git+https://github.com/mcollina/fastq.git"
},

@@ -28,5 +28,5 @@ "keywords": [

"bugs": {
"url": "https://github.com/mcollina/fastqueue/issues"
"url": "https://github.com/mcollina/fastq/issues"
},
"homepage": "https://github.com/mcollina/fastqueue#readme",
"homepage": "https://github.com/mcollina/fastq#readme",
"devDependencies": {

@@ -33,0 +33,0 @@ "async": "^1.2.1",

@@ -5,5 +5,5 @@ 'use strict'

function fastqueue (context, worker, limit) {
function fastqueue (context, worker, concurrency) {
if (typeof context === 'function') {
limit = worker
concurrency = worker
worker = context

@@ -16,4 +16,18 @@ context = null

var queueTail = null
var _running = 0
var self = {
push: push
push: push,
drain: noop,
saturated: noop,
pause: pause,
paused: false,
concurrency: concurrency,
running: running,
resume: resume,
idle: idle,
length: length,
unshift: unshift,
empty: noop,
kill: kill
}

@@ -23,2 +37,35 @@

function running () {
return _running
}
function pause () {
self.paused = true
}
function length () {
var current = queueHead
var counter = 0
while (current) {
current = current.next
counter++
}
return counter
}
function resume () {
if (!self.paused) return
self.paused = false
for (var i = 0; i < self.concurrency; i++) {
_running++
release()
}
}
function idle () {
return _running === 0
}
function push (value, done) {

@@ -32,3 +79,3 @@ var current = cache.get()

if (limit === 0) {
if (_running === self.concurrency || self.paused) {
if (queueTail) {

@@ -40,5 +87,6 @@ queueTail.next = current

queueTail = current
self.saturated()
}
} else {
limit--
_running++
worker.call(context, current.value, current.worked)

@@ -48,16 +96,54 @@ }

function unshift (value, done) {
var current = cache.get()
current.context = context
current.release = release
current.value = value
current.callback = done
if (_running === self.concurrency || self.paused) {
if (queueHead) {
current.next = queueHead
queueHead = current
} else {
queueHead = current
queueTail = current
self.saturated()
}
} else {
_running++
worker.call(context, current.value, current.worked)
}
}
function release (holder) {
cache.release(holder)
if (holder) {
cache.release(holder)
}
var next = queueHead
if (next) {
if (queueTail === queueHead) {
queueTail = null
if (!self.paused) {
if (queueTail === queueHead) {
queueTail = null
}
queueHead = next.next
next.next = null
worker.call(context, next.value, next.worked)
if (queueTail === null) {
self.empty()
}
} else {
_running--
}
queueHead = next.next
next.next = null
worker.call(context, next.value, next.worked)
} else {
limit++
} else if (--_running === 0) {
self.drain()
}
}
function kill () {
queueHead = null
queueTail = null
self.drain = noop
}
}

@@ -64,0 +150,0 @@

# fastq&nbsp;&nbsp;[![build status](https://secure.travis-ci.org/mcollina/fastq.png)](http://travis-ci.org/mcollina/fastq)
Fast, in memory work queue.
Fast, in memory work queue. `fastq` is API compatible with
[`async.queue`](https://github.com/caolan/async#queueworker-concurrency)
Benchmarks (1 million tasks):
* setImmedidate: 1715ms
* fastq: 1824ms
* async.queue: 6158ms
* setImmedidate: 1313ms
* fastq: 1462ms
* async.queue: 3989ms
Obtained on node 0.12.3, on a HP Spectre 360 (the Build 2015 edition).
Obtained on node 4.2.2, on a MacBook Pro 2014 (i7, 16GB of RAM).

@@ -19,2 +20,6 @@ If you need zero-overhead series function call, check out

* <a href="#install">Installation</a>
* <a href="#basic">Basic Example</a>
* <a href="#api">API</a>
* <a href="#licence">Licence &amp; copyright</a>

@@ -62,4 +67,111 @@ ## Install

## API
* <a href="#fastqueue"><code>fastqueue()</code></a>
* <a href="#push"><code>queue#<b>push()</b></code></a>
* <a href="#unshift"><code>queue#<b>unshift()</b></code></a>
* <a href="#pause"><code>queue#<b>pause()</b></code></a>
* <a href="#resume"><code>queue#<b>resume()</b></code></a>
* <a href="#idle"><code>queue#<b>idle()</b></code></a>
* <a href="#length"><code>queue#<b>length()</b></code></a>
* <a href="#kill"><code>queue#<b>kill()</b></code></a>
* <a href="#concurrency"><code>queue#<b>concurrency</b></code></a>
* <a href="#drain"><code>queue#<b>drain</b></code></a>
* <a href="#empty"><code>queue#<b>empty</b></code></a>
* <a href="#saturated"><code>queue#<b>saturated</b></code></a>
-------------------------------------------------------
<a name="fastqueue"></a>
### fastqueue([that], worker, concurrency)
Creates a new queue.
Arguments:
* `that`, optional context of the `worker` function.
* `worker`, worker function, it would be called with `that` as `this`,
if that is specified.
* `concurrency`, number of concurrent tasks that could be executed in
parallel.
-------------------------------------------------------
<a name="push"></a>
### queue.push(task, done)
Add a task at the end of the queue. `done(err, result)` will be called
when the task was processed.
-------------------------------------------------------
<a name="unshift"></a>
### queue.unshift(task, done)
Add a task at the beginning of the queue. `done(err, result)` will be called
when the task was processed.
-------------------------------------------------------
<a name="pause"></a>
### queue.pause()
Pause the processing of tasks. Currently worked tasks are not
stopped.
-------------------------------------------------------
<a name="resume"></a>
### queue.resume()
Resume the processing of tasks.
-------------------------------------------------------
<a name="idle"></a>
### queue.idle()
Returns `true` if there are tasks being processed or waiting to be processed.
`false` otherwise.
-------------------------------------------------------
<a name="length"></a>
### queue.length()
Returns the number of tasks waiting to be processed (in the queue).
-------------------------------------------------------
<a name="kill"></a>
### queue.kill()
Removes all tasks waiting to be processed, and reset `drain` to an empty
function.
-------------------------------------------------------
<a name="concurrency"></a>
### queue.concurrency
Property that returns the number of concurrent tasks that could be executed in
parallel. It can be altered at runtime.
-------------------------------------------------------
<a name="drain"></a>
### queue.drain
Function that will be called when the last
item from the queue has been processed by a worker.
It can be altered at runtime.
-------------------------------------------------------
<a name="empty"></a>
### queue.empty
Function that will be called when the last
item from the queue has been assigned to a worker.
It can be altered at runtime.
-------------------------------------------------------
<a name="saturated"></a>
### queue.saturated
Function that will be called when the queue hits the concurrency
limit.
It can be altered at runtime.
## License
ISC

@@ -58,3 +58,2 @@ 'use strict'

function worker (arg, cb) {
console.log('received', arg)
t.equal(arg, toExec[count], 'arg matches')

@@ -107,1 +106,283 @@ count++

})
test('drain', function (t) {
t.plan(4)
var queue = buildQueue(worker, 1)
var worked = false
queue.push(42, function (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
})
queue.drain = function () {
t.equal(true, worked, 'drained')
}
function worker (arg, cb) {
t.equal(arg, 42)
worked = true
setImmediate(cb, null, true)
}
})
test('pause && resume', function (t) {
t.plan(7)
var queue = buildQueue(worker, 1)
var worked = false
t.notOk(queue.paused, 'it should not be paused')
queue.pause()
queue.push(42, function (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
})
t.notOk(worked, 'it should be paused')
t.ok(queue.paused, 'it should be paused')
queue.resume()
t.notOk(queue.paused, 'it should not be paused')
function worker (arg, cb) {
t.equal(arg, 42)
worked = true
cb(null, true)
}
})
test('altering concurrency', function (t) {
t.plan(7)
var queue = buildQueue(worker, 1)
var count = 0
queue.pause()
queue.push(24, workDone)
queue.push(24, workDone)
queue.concurrency = 2
queue.resume()
t.equal(queue.running(), 2, '2 jobs running')
function workDone (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
}
function worker (arg, cb) {
t.equal(0, count, 'works in parallel')
setImmediate(function () {
count++
cb(null, true)
})
}
})
test('idle()', function (t) {
t.plan(12)
var queue = buildQueue(worker, 1)
t.ok(queue.idle(), 'queue is idle')
queue.push(42, function (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
t.notOk(queue.idle(), 'queue is not idle')
})
queue.push(42, function (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
// it will go idle after executing this function
setImmediate(function () {
t.ok(queue.idle(), 'queue is now idle')
})
})
t.notOk(queue.idle(), 'queue is not idle')
function worker (arg, cb) {
t.notOk(queue.idle(), 'queue is not idle')
t.equal(arg, 42)
setImmediate(cb, null, true)
}
})
test('saturated', function (t) {
t.plan(9)
var queue = buildQueue(worker, 1)
var preworked = 0
var worked = 0
queue.saturated = function () {
t.pass('saturated')
t.equal(preworked, 1, 'started 1 task')
t.equal(worked, 0, 'worked zero task')
}
queue.push(42, done)
queue.push(42, done)
function done (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
}
function worker (arg, cb) {
t.equal(arg, 42)
preworked++
setImmediate(function () {
worked++
cb(null, true)
})
}
})
test('length', function (t) {
t.plan(7)
var queue = buildQueue(worker, 1)
t.equal(queue.length(), 0, 'nothing waiting')
queue.push(42, done)
t.equal(queue.length(), 0, 'nothing waiting')
queue.push(42, done)
t.equal(queue.length(), 1, 'one task waiting')
queue.push(42, done)
t.equal(queue.length(), 2, 'two tasks waiting')
function done (err, result) {
t.error(err, 'no error')
}
function worker (arg, cb) {
setImmediate(function () {
cb(null, true)
})
}
})
test('unshift', function (t) {
t.plan(8)
var queue = buildQueue(worker, 1)
var expected = [1, 2, 3, 4]
queue.push(1, done)
queue.push(4, done)
queue.unshift(3, done)
queue.unshift(2, done)
function done (err, result) {
t.error(err, 'no error')
}
function worker (arg, cb) {
t.equal(expected.shift(), arg, 'tasks come in order')
setImmediate(function () {
cb(null, true)
})
}
})
test('unshift && empty', function (t) {
t.plan(2)
var queue = buildQueue(worker, 1)
var completed = false
queue.pause()
queue.empty = function () {
t.notOk(completed, 'the task has not completed yet')
}
queue.unshift(1, done)
queue.resume()
function done (err, result) {
completed = true
t.error(err, 'no error')
}
function worker (arg, cb) {
setImmediate(function () {
cb(null, true)
})
}
})
test('push && empty', function (t) {
t.plan(2)
var queue = buildQueue(worker, 1)
var completed = false
queue.pause()
queue.empty = function () {
t.notOk(completed, 'the task has not completed yet')
}
queue.push(1, done)
queue.resume()
function done (err, result) {
completed = true
t.error(err, 'no error')
}
function worker (arg, cb) {
setImmediate(function () {
cb(null, true)
})
}
})
test('kill', function (t) {
t.plan(5)
var queue = buildQueue(worker, 1)
var expected = [1]
var predrain = queue.drain
queue.drain = function drain () {
t.fail('drain should never be called')
}
queue.push(1, done)
queue.push(4, done)
queue.unshift(3, done)
queue.unshift(2, done)
queue.kill()
function done (err, result) {
t.error(err, 'no error')
setImmediate(function () {
t.equal(queue.length(), 0, 'no queued tasks')
t.equal(queue.running(), 0, 'no running tasks')
t.equal(queue.drain, predrain, 'drain is back to default')
})
}
function worker (arg, cb) {
t.equal(expected.shift(), arg, 'tasks come in order')
setImmediate(function () {
cb(null, true)
})
}
})

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc