stream-worker
Advanced tools
Comparing version 0.0.0 to 0.0.1
{ | ||
"name": "stream-worker", | ||
"version": "0.0.0", | ||
"version": "0.0.1", | ||
"description": "Execute an async function per stream data event, pausing the stream when a concurrency limit is saturated", | ||
"main": "stream-worker.js", | ||
"scripts": { | ||
"test": "mocha" | ||
"test": "grunt test" | ||
}, | ||
@@ -20,3 +20,14 @@ "repository": { | ||
"author": "Good Eggs <eng@goodeggs.com> (http://bites.goodeggs.com)", | ||
"license": "BSD" | ||
"license": "BSD", | ||
"devDependencies": { | ||
"expect.js": "~0.2.0", | ||
"sinon": "~1.7.3", | ||
"sinon-expect": "~0.2.0", | ||
"grunt-contrib-jshint": "~0.6.3", | ||
"grunt-simple-mocha": "~0.4.0", | ||
"matchdep": "~0.1.2" | ||
}, | ||
"engines": { | ||
"node": "~0.10.0" | ||
} | ||
} |
@@ -1,8 +0,32 @@ | ||
stream-worker | ||
stream-worker [![build status](https://secure.travis-ci.org/goodeggs/stream-worker.png)](http://travis-ci.org/goodeggs/stream-worker) | ||
============= | ||
Execute an async function per stream data event, pausing the stream when a concurrency limit is saturated | ||
Execute an async function per [stream](http://nodejs.org/api/stream.html) data event, pausing the stream when a concurrency limit is saturated. Inspired by [async.queue](https://github.com/caolan/async#queue), optimized for streams. | ||
The Basics | ||
---------- | ||
``` | ||
npm install stream-worker | ||
``` | ||
then | ||
```js | ||
var streamWorker = require('stream-worker'); | ||
streamWorker(stream, 10, | ||
function(data, done) { | ||
/* ... do some work with data ... */ | ||
done(err); | ||
}, | ||
function(err) { | ||
/* ... the stream is exhauseted and all workers are finished ... */ | ||
} | ||
); | ||
``` | ||
Signature | ||
--------- | ||
streamWorker(**stream**, **concurrencyLimit**, **work**, **done**) |
@@ -6,4 +6,3 @@ module.exports = function(stream, concurrency, worker, cb) { | ||
closed = false, | ||
firstError = null, | ||
startNextTask; | ||
firstError = null; | ||
@@ -26,17 +25,18 @@ function errorHandler (err) { | ||
function finishTask (err) { | ||
running -= 1; | ||
errorHandler(err); | ||
if (tasks.length) { | ||
startNextTask(); | ||
} else { | ||
completeIfDone(); | ||
stream.resume(); | ||
} | ||
} | ||
function startNextTask () { | ||
var data = tasks.shift(); | ||
if (data == null) { | ||
return completeIfDone(); | ||
completeIfDone(); | ||
} | ||
function finishTask (err) { | ||
running -= 1; | ||
errorHandler(err); | ||
if (!tasks.length) { | ||
stream.resume(); | ||
} | ||
startNextTask(); | ||
} | ||
running += 1; | ||
@@ -59,5 +59,3 @@ try { | ||
stream.on('error', function(err) { | ||
errorHandler(err); | ||
}); | ||
stream.on('error', errorHandler); | ||
@@ -68,2 +66,2 @@ stream.on('end', function() { | ||
}); | ||
} | ||
}; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
6037
7
33
6
54