through2-concurrent
Advanced tools
Comparing version 0.2.0 to 0.3.0
{ | ||
"name": "through2-concurrent", | ||
"version": "0.2.0", | ||
"version": "0.3.0", | ||
"description": "Like through2 except runs in parallel with limited concurrency", | ||
@@ -5,0 +5,0 @@ "main": "through2-concurrent.js", |
through2-concurrent | ||
=================== | ||
[![NPM](https://nodei.co/npm/through2-concurrent.png?downloads&downloadRank)](https://nodei.co/npm/through2-concurrent/) | ||
A simple way to create a Node.JS Transform stream which processes in | ||
@@ -5,0 +7,0 @@ parallel. You can limit the concurrency (default is 16) and order is |
35
tests.js
@@ -95,3 +95,38 @@ var through2Concurrent = require('./through2-concurrent'); | ||
}); | ||
it('should pass down the stream data added with this.push', function () { | ||
var passingThrough = through2Concurrent.obj( | ||
{maxConcurrency: 1}, | ||
function (chunk, enc, callback) { | ||
this.push({original: chunk}); | ||
callback(); | ||
},function (callback) { | ||
this.push({flushed: true}); | ||
}); | ||
var out = []; | ||
passingThrough.on('data', function (data) { | ||
out.push(data); | ||
}); | ||
passingThrough.write("Hello"); | ||
passingThrough.write("World"); | ||
passingThrough.end(); | ||
expect(out).to.eql([{original: "Hello"}, {original: "World"}, {flushed: true}]); | ||
}); | ||
it('should pass down the stream data added as arguments to the callback', function () { | ||
var passingThrough = through2Concurrent.obj( | ||
{maxConcurrency: 1}, | ||
function (chunk, enc, callback) { | ||
callback(null, {original: chunk}); | ||
}); | ||
var out = []; | ||
passingThrough.on('data', function (data) { | ||
out.push(data); | ||
}); | ||
passingThrough.write("Hello"); | ||
passingThrough.write("World"); | ||
passingThrough.end(); | ||
expect(out).to.eql([{original: "Hello"}, {original: "World"}]); | ||
}); | ||
}); | ||
}); |
@@ -17,2 +17,3 @@ // Like through2 except execute in parallel with a set maximum | ||
function _transform (message, enc, callback) { | ||
var self = this; | ||
var callbackCalled = false; | ||
@@ -29,3 +30,3 @@ concurrent++; | ||
transform(message, enc, function () { | ||
transform.call(this, message, enc, function (err) { | ||
// Ignore multiple calls of the callback (shouldn't ever | ||
@@ -35,2 +36,9 @@ // happen, but just in case) | ||
callbackCalled = true; | ||
if (err) { | ||
self.emit('error', err); | ||
} else if (arguments.length > 1) { | ||
self.push(arguments[1]); | ||
} | ||
concurrent--; | ||
@@ -52,3 +60,3 @@ if (lastCallback) { | ||
if (concurrent === 0) { | ||
flush(callback); | ||
flush.call(this,callback); | ||
} else { | ||
@@ -55,0 +63,0 @@ pendingFlush = flush.bind(this, callback); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
8996
185
71