Socket
Socket
Sign inDemoInstall

stream-worker

Package Overview
Dependencies
Maintainers
2
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-worker - npm Package Compare versions

Comparing version 0.0.1 to 1.0.0

8

package.json
{
"name": "stream-worker",
"version": "0.0.1",
"version": "1.0.0",
"description": "Execute an async function per stream data event, pausing the stream when a concurrency limit is saturated",

@@ -31,3 +31,9 @@ "main": "stream-worker.js",

"node": "~0.10.0"
},
"dependencies": {
"bluebird": "^3.1.1"
},
"publishConfig": {
"registry": "https://registry.npmjs.org/"
}
}

@@ -6,3 +6,5 @@ stream-worker [![build status](https://secure.travis-ci.org/goodeggs/stream-worker.png)](http://travis-ci.org/goodeggs/stream-worker)

Supports promises and callbacks.
The Basics

@@ -15,14 +17,34 @@ ----------

then
Requiring:
```js
var streamWorker = require('stream-worker');
```
streamWorker(stream, 10,
function(data, done) {
/* ... do some work with data ... */
Promise style:
```js
streamWorker(stream, 10, function(data) {
/* ... do some work with data ... */
return Promise.resolve();
})
.then(function() {
/* ... the stream is exhausted and all workers are finished ... */
}, function(err) {
/* ... there was an error processing the stream ... */
})
```
Callback style:
```js
streamWorker(stream, 10,
function(data, done) {
/* ... do some work with data ... */
done(err);
},
function(err) {
/* ... the stream is exhauseted and all workers are finished ... */
},
function(err) {
/* ... the stream is exhauseted and all workers are finished ... */
}

@@ -29,0 +51,0 @@ );

111

stream-worker.js

@@ -0,3 +1,4 @@

var Promise = require('bluebird');
module.exports = function(stream, concurrency, worker, cb) {
var tasks = [],

@@ -8,57 +9,77 @@ running = 0,

function errorHandler (err) {
if (err != null) {
if (firstError == null) {
firstError = err;
}
}
if (worker.length > 1) { // worker returns a callback
worker = Promise.promisify(worker);
}
function completeIfDone () {
if (closed && tasks.length === 0 && running <= 0) {
if (typeof cb === "function") {
cb(firstError);
return Promise.try(function() {
var resolve, reject;
streamPromise = new Promise(function(__resolve, __reject) {
resolve = __resolve;
reject = __reject;
});
function errorHandler (err) {
if (err != null) {
if (firstError == null) {
firstError = err;
}
}
}
}
function finishTask (err) {
running -= 1;
errorHandler(err);
if (tasks.length) {
startNextTask();
} else {
completeIfDone();
stream.resume();
function finishTask (err) {
running -= 1;
errorHandler(err);
if (tasks.length) {
startNextTask();
} else {
completeIfDone();
stream.resume();
}
}
}
function startNextTask () {
var data = tasks.shift();
if (data == null) {
completeIfDone();
function startNextTask () {
var data = tasks.shift();
if (data == null) {
completeIfDone();
}
running += 1;
try {
Promise.resolve(worker(data))
.then(function (finishedWith) {
finishTask();
})
.error(function (err) {
finishTask(err);
});
} catch (e) {
finishTask(e);
}
}
running += 1;
try {
worker(data, finishTask);
} catch (e) {
finishTask(e);
}
}
stream.on('data', function(data) {
tasks.push(data);
if (running < concurrency) {
startNextTask();
} else {
stream.pause();
function completeIfDone () {
if (!closed || tasks.length > 0 || running > 0) return;
if (firstError) return reject(firstError);
return resolve();
}
});
stream.on('error', errorHandler);
stream.on('data', function(data) {
tasks.push(data);
if (running < concurrency) {
startNextTask();
} else {
stream.pause();
}
});
stream.on('end', function() {
closed = true;
completeIfDone();
});
};
stream.on('error', errorHandler);
stream.on('end', function() {
closed = true;
completeIfDone();
});
return streamPromise;
})
.asCallback(cb);
};

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc