stream-worker
Advanced tools
Comparing version 0.0.1 to 1.0.0
{ | ||
"name": "stream-worker", | ||
"version": "0.0.1", | ||
"version": "1.0.0", | ||
"description": "Execute an async function per stream data event, pausing the stream when a concurrency limit is saturated", | ||
@@ -31,3 +31,9 @@ "main": "stream-worker.js", | ||
"node": "~0.10.0" | ||
}, | ||
"dependencies": { | ||
"bluebird": "^3.1.1" | ||
}, | ||
"publishConfig": { | ||
"registry": "https://registry.npmjs.org/" | ||
} | ||
} |
@@ -6,3 +6,5 @@ stream-worker [![build status](https://secure.travis-ci.org/goodeggs/stream-worker.png)](http://travis-ci.org/goodeggs/stream-worker) | ||
Supports promises and callbacks. | ||
The Basics | ||
@@ -15,14 +17,34 @@ ---------- | ||
then | ||
Requiring: | ||
```js | ||
var streamWorker = require('stream-worker'); | ||
``` | ||
streamWorker(stream, 10, | ||
function(data, done) { | ||
/* ... do some work with data ... */ | ||
Promise style: | ||
```js | ||
streamWorker(stream, 10, function(data) { | ||
/* ... do some work with data ... */ | ||
return Promise.resolve(); | ||
}) | ||
.then(function() { | ||
/* ... the stream is exhausted and all workers are finished ... */ | ||
}, function(err) { | ||
/* ... there was an error processing the stream ... */ | ||
}) | ||
``` | ||
Callback style: | ||
```js | ||
streamWorker(stream, 10, | ||
function(data, done) { | ||
/* ... do some work with data ... */ | ||
done(err); | ||
}, | ||
function(err) { | ||
/* ... the stream is exhauseted and all workers are finished ... */ | ||
}, | ||
function(err) { | ||
/* ... the stream is exhauseted and all workers are finished ... */ | ||
} | ||
@@ -29,0 +51,0 @@ ); |
@@ -0,3 +1,4 @@ | ||
var Promise = require('bluebird'); | ||
module.exports = function(stream, concurrency, worker, cb) { | ||
var tasks = [], | ||
@@ -8,57 +9,77 @@ running = 0, | ||
function errorHandler (err) { | ||
if (err != null) { | ||
if (firstError == null) { | ||
firstError = err; | ||
} | ||
} | ||
if (worker.length > 1) { // worker returns a callback | ||
worker = Promise.promisify(worker); | ||
} | ||
function completeIfDone () { | ||
if (closed && tasks.length === 0 && running <= 0) { | ||
if (typeof cb === "function") { | ||
cb(firstError); | ||
return Promise.try(function() { | ||
var resolve, reject; | ||
streamPromise = new Promise(function(__resolve, __reject) { | ||
resolve = __resolve; | ||
reject = __reject; | ||
}); | ||
function errorHandler (err) { | ||
if (err != null) { | ||
if (firstError == null) { | ||
firstError = err; | ||
} | ||
} | ||
} | ||
} | ||
function finishTask (err) { | ||
running -= 1; | ||
errorHandler(err); | ||
if (tasks.length) { | ||
startNextTask(); | ||
} else { | ||
completeIfDone(); | ||
stream.resume(); | ||
function finishTask (err) { | ||
running -= 1; | ||
errorHandler(err); | ||
if (tasks.length) { | ||
startNextTask(); | ||
} else { | ||
completeIfDone(); | ||
stream.resume(); | ||
} | ||
} | ||
} | ||
function startNextTask () { | ||
var data = tasks.shift(); | ||
if (data == null) { | ||
completeIfDone(); | ||
function startNextTask () { | ||
var data = tasks.shift(); | ||
if (data == null) { | ||
completeIfDone(); | ||
} | ||
running += 1; | ||
try { | ||
Promise.resolve(worker(data)) | ||
.then(function (finishedWith) { | ||
finishTask(); | ||
}) | ||
.error(function (err) { | ||
finishTask(err); | ||
}); | ||
} catch (e) { | ||
finishTask(e); | ||
} | ||
} | ||
running += 1; | ||
try { | ||
worker(data, finishTask); | ||
} catch (e) { | ||
finishTask(e); | ||
} | ||
} | ||
stream.on('data', function(data) { | ||
tasks.push(data); | ||
if (running < concurrency) { | ||
startNextTask(); | ||
} else { | ||
stream.pause(); | ||
function completeIfDone () { | ||
if (!closed || tasks.length > 0 || running > 0) return; | ||
if (firstError) return reject(firstError); | ||
return resolve(); | ||
} | ||
}); | ||
stream.on('error', errorHandler); | ||
stream.on('data', function(data) { | ||
tasks.push(data); | ||
if (running < concurrency) { | ||
startNextTask(); | ||
} else { | ||
stream.pause(); | ||
} | ||
}); | ||
stream.on('end', function() { | ||
closed = true; | ||
completeIfDone(); | ||
}); | ||
}; | ||
stream.on('error', errorHandler); | ||
stream.on('end', function() { | ||
closed = true; | ||
completeIfDone(); | ||
}); | ||
return streamPromise; | ||
}) | ||
.asCallback(cb); | ||
}; |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
11088
72
1
55
1
+ Addedbluebird@^3.1.1
+ Addedbluebird@3.7.2(transitive)