stream-to-pull-stream
Advanced tools
Comparing version 1.7.1 to 1.7.2
19
index.js
@@ -160,2 +160,6 @@ var pull = require('pull-stream/pull') | ||
}) | ||
stream.on('close', function () { | ||
ended = true | ||
drain() | ||
}) | ||
stream.on('error', function (err) { | ||
@@ -168,6 +172,9 @@ ended = err | ||
if(abort) { | ||
stream.once('close', function () { | ||
function onAbort () { | ||
while(cbs.length) cbs.shift()(abort) | ||
cb(abort) | ||
}) | ||
} | ||
//if the stream happens to have already ended, then we don't need to abort. | ||
if(ended) return onAbort() | ||
stream.once('close', onAbort) | ||
destroy(stream) | ||
@@ -225,1 +232,9 @@ } | ||
{ | ||
"name": "stream-to-pull-stream", | ||
"description": "convert a stream1 or streams2 stream into a pull-stream", | ||
"version": "1.7.1", | ||
"version": "1.7.2", | ||
"homepage": "https://github.com/dominictarr/stream-to-pull-stream", | ||
@@ -6,0 +6,0 @@ "repository": { |
@@ -6,4 +6,5 @@ var pull = require('pull-stream') | ||
var Cat = require('pull-cat') | ||
var tape = require('tape') | ||
require('tape')('abort', function (t) { | ||
tape('abort', function (t) { | ||
@@ -38,4 +39,3 @@ t.plan(2) | ||
require('tape')('abort hang', function (t) { | ||
tape('abort hang', function (t) { | ||
var ts = through(), aborted = false, c = 0, _read, ended, closed | ||
@@ -77,2 +77,49 @@ ts.on('close', function () { | ||
tape('abort a stream that has already ended', function (t) { | ||
var ts = through() | ||
var n = 0 | ||
pull( | ||
toPull.source(ts), | ||
//like pull.take(4), but abort async. | ||
function (read) { | ||
return function (abort, cb) { | ||
console.log(n) | ||
if(n++ < 4) read(null, cb) | ||
else { | ||
//this would be quite a badly behaved node stream | ||
//but it can be difficult to make a node stream that behaves well. | ||
ts.end() | ||
setTimeout(function () { | ||
read(true, cb) | ||
}, 10) | ||
} | ||
} | ||
}, | ||
pull.collect(function (err, ary) { | ||
if(err) throw err | ||
t.deepEqual(ary, [1,2,3,4]) | ||
t.end() | ||
}) | ||
) | ||
ts.queue(1) | ||
ts.queue(2) | ||
ts.queue(3) | ||
ts.queue(4) | ||
}) | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
15923
547