pull-stream
Advanced tools
Comparing version 2.28.3 to 2.28.4
{ | ||
"name": "pull-stream", | ||
"description": "minimal pull stream", | ||
"version": "2.28.3", | ||
"version": "2.28.4", | ||
"homepage": "https://github.com/dominictarr/pull-stream", | ||
@@ -6,0 +6,0 @@ "repository": { |
@@ -56,2 +56,71 @@ var pull = require('../') | ||
test('flatten stream of broken streams', function (t) { | ||
var _err = new Error('I am broken'), sosEnded | ||
pull( | ||
pull.values([ | ||
pull.Source(function read(abort, cb) { | ||
cb(_err) | ||
}) | ||
], function(err) { | ||
sosEnded = err; | ||
}), | ||
pull.flatten(), | ||
pull.onEnd(function (err) { | ||
t.equal(err, _err) | ||
process.nextTick(function() { | ||
t.equal(sosEnded, null, 'should abort stream of streams') | ||
t.end() | ||
}) | ||
}) | ||
) | ||
}) | ||
test('abort flatten', function (t) { | ||
var sosEnded, s1Ended, s2Ended | ||
var read = pull( | ||
pull.values([ | ||
pull.values([1,2], function(err) {s1Ended = err}), | ||
pull.values([3,4], function(err) {s2Ended = err}), | ||
], function(err) { | ||
sosEnded = err; | ||
}), | ||
pull.flatten() | ||
) | ||
read(null, function(err, data) { | ||
t.notOk(err) | ||
t.equal(data,1) | ||
read(true, function(err, data) { | ||
t.equal(err, true) | ||
process.nextTick(function() { | ||
t.equal(sosEnded, null, 'should abort stream of streams') | ||
t.equal(s1Ended, null, 'should abort current nested stream') | ||
t.equal(s2Ended, undefined, 'should not abort queued nested stream') | ||
t.end() | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('abort flatten before 1st read', function (t) { | ||
var sosEnded, s1Ended | ||
var read = pull( | ||
pull.values([ | ||
pull.values([1,2], function(err) {s1Ended = err}) | ||
], function(err) { | ||
sosEnded = err; | ||
}), | ||
pull.flatten() | ||
) | ||
read(true, function(err, data) { | ||
t.equal(err, true) | ||
t.notOk(data) | ||
process.nextTick(function() { | ||
t.equal(sosEnded, null, 'should abort stream of streams') | ||
t.equal(s1Ended, undefined, 'should abort current nested stream') | ||
t.end() | ||
}) | ||
}) | ||
}) | ||
@@ -215,10 +215,20 @@ var u = require('pull-core') | ||
return function (abort, cb) { | ||
if (abort) return read(abort, cb) | ||
if(_read) nextChunk() | ||
else nextStream() | ||
if (abort) { | ||
_read ? _read(abort, function(err) { | ||
read(err || abort, cb) | ||
}) : read(abort, cb) | ||
} | ||
else if(_read) nextChunk() | ||
else nextStream() | ||
function nextChunk () { | ||
_read(null, function (end, data) { | ||
if(end) nextStream() | ||
else cb(null, data) | ||
_read(null, function (err, data) { | ||
if (err === true) nextStream() | ||
else if (err) { | ||
read(true, function(abortErr) { | ||
// TODO: what do we do with the abortErr? | ||
cb(err) | ||
}) | ||
} | ||
else cb(null, data) | ||
}) | ||
@@ -225,0 +235,0 @@ } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
56670
1627