parallel-stream
Advanced tools
Comparing version 1.0.0 to 1.1.0
@@ -60,2 +60,3 @@ var stream = require('stream'); | ||
callback(); | ||
if (writable._writableState.buffer.length === 0) writable.emit('empty'); | ||
}; | ||
@@ -66,2 +67,8 @@ | ||
writable.end = function(chunk, enc, callback) { | ||
if (writable._writableState.buffer.length) { | ||
return writable.once('empty', function() { | ||
writable.end(chunk, enc, callback); | ||
}); | ||
} | ||
if (writable.pending) { | ||
@@ -68,0 +75,0 @@ return writable.once('free', function() { |
{ | ||
"name": "parallel-stream", | ||
"version": "1.0.0", | ||
"version": "1.1.0", | ||
"description": "Concurrent transform stream", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -18,8 +18,15 @@ var stream = require('stream'); | ||
worker.data.push(Number(data.toString())); | ||
setTimeout(function() { | ||
if (worker.err) callback(worker.err); | ||
function work(data) { | ||
if (worker.err) { | ||
console.log('sending work error'); | ||
callback(worker.err); | ||
} | ||
if (err) callback(err); | ||
else callback(null, data); | ||
worker.running--; | ||
}, pauseTime, data); | ||
} | ||
if (pauseTime) setTimeout(work, pauseTime, data); | ||
else work(data); | ||
}; | ||
@@ -201,2 +208,23 @@ worker.running = 0; | ||
}); | ||
test('[writable: ' + concurrency + '] synchronous writer', function(assert) { | ||
var work = timedWorker(0); | ||
var read = timedReadable(20) | ||
.on('end', function() { | ||
work.err = new Error('post .end() failure'); | ||
}); | ||
setTimeout(function() { | ||
read.stop = true; | ||
}, 2000); | ||
read.pipe(parallel.writable(work, options)) | ||
.on('error', function(err) { | ||
assert.ifError(err, 'should not error'); | ||
}) | ||
.on('finish', function() { | ||
assert.pass('finish event should fire'); | ||
assert.end(); | ||
}); | ||
}); | ||
} | ||
@@ -203,0 +231,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
21033
487
0