Socket
Socket
Sign inDemoInstall

stream-worker

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-worker - npm Package Compare versions

Comparing version 0.0.0 to 0.0.1

.npmignore

17

package.json
{
"name": "stream-worker",
"version": "0.0.0",
"version": "0.0.1",
"description": "Execute an async function per stream data event, pausing the stream when a concurrency limit is saturated",
"main": "stream-worker.js",
"scripts": {
"test": "mocha"
"test": "grunt test"
},

@@ -20,3 +20,14 @@ "repository": {

"author": "Good Eggs <eng@goodeggs.com> (http://bites.goodeggs.com)",
"license": "BSD"
"license": "BSD",
"devDependencies": {
"expect.js": "~0.2.0",
"sinon": "~1.7.3",
"sinon-expect": "~0.2.0",
"grunt-contrib-jshint": "~0.6.3",
"grunt-simple-mocha": "~0.4.0",
"matchdep": "~0.1.2"
},
"engines": {
"node": "~0.10.0"
}
}

@@ -1,8 +0,32 @@

stream-worker
stream-worker [![build status](https://secure.travis-ci.org/goodeggs/stream-worker.png)](http://travis-ci.org/goodeggs/stream-worker)
=============
Execute an async function per stream data event, pausing the stream when a concurrency limit is saturated
Execute an async function per [stream](http://nodejs.org/api/stream.html) data event, pausing the stream when a concurrency limit is saturated. Inspired by [async.queue](https://github.com/caolan/async#queue), optimized for streams.
The Basics
----------
```
npm install stream-worker
```
then
```js
var streamWorker = require('stream-worker');
streamWorker(stream, 10,
function(data, done) {
/* ... do some work with data ... */
done(err);
},
function(err) {
/* ... the stream is exhauseted and all workers are finished ... */
}
);
```
Signature
---------
streamWorker(**stream**, **concurrencyLimit**, **work**, **done**)

32

stream-worker.js

@@ -6,4 +6,3 @@ module.exports = function(stream, concurrency, worker, cb) {

closed = false,
firstError = null,
startNextTask;
firstError = null;

@@ -26,17 +25,18 @@ function errorHandler (err) {

function finishTask (err) {
running -= 1;
errorHandler(err);
if (tasks.length) {
startNextTask();
} else {
completeIfDone();
stream.resume();
}
}
function startNextTask () {
var data = tasks.shift();
if (data == null) {
return completeIfDone();
completeIfDone();
}
function finishTask (err) {
running -= 1;
errorHandler(err);
if (!tasks.length) {
stream.resume();
}
startNextTask();
}
running += 1;

@@ -59,5 +59,3 @@ try {

stream.on('error', function(err) {
errorHandler(err);
});
stream.on('error', errorHandler);

@@ -68,2 +66,2 @@ stream.on('end', function() {

});
}
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc