pullstream
Advanced tools
Comparing version 0.4.1 to 1.0.0
{ | ||
"name": "pullstream", | ||
"version": "0.4.1", | ||
"version": "1.0.0", | ||
"description": "A stream you can pull data from.", | ||
"main": "pullstream.js", | ||
"scripts": { | ||
"test": "nodeunit test" | ||
"test": "tap test" | ||
}, | ||
"license": "MIT", | ||
"bugs": { | ||
"url": "https://github.com/nearinfinity/node-pullstream/issues" | ||
"url": "https://github.com/EvanOxfeld/node-pullstream/issues" | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/nearinfinity/node-pullstream.git" | ||
"url": "https://github.com/EvanOxfeld/node-pullstream.git" | ||
}, | ||
@@ -22,12 +22,14 @@ "keywords": [ | ||
"devDependencies": { | ||
"nodeunit": ">= 0.9.0 < 1", | ||
"stream-buffers": ">= 0.2.6 < 1", | ||
"async": ">= 0.9.0 <" | ||
"async": ">= 0.9.0 <", | ||
"tap": "^1.4.1" | ||
}, | ||
"dependencies": { | ||
"over": ">= 0.0.5 < 1", | ||
"readable-stream": "~1.0.31", | ||
"setimmediate": ">= 1.0.2 < 2", | ||
"readable-stream": "^2.0.2", | ||
"slice-stream": ">= 1.0.0 < 2" | ||
}, | ||
"engines": { | ||
"node": ">= 0.10.0" | ||
} | ||
} |
@@ -5,3 +5,2 @@ 'use strict'; | ||
require("setimmediate"); | ||
var inherits = require("util").inherits; | ||
@@ -8,0 +7,0 @@ var PassThrough = require('readable-stream/passthrough'); |
'use strict'; | ||
var nodeunit = require('nodeunit'); | ||
var tap = require('tap'); | ||
var fs = require("fs"); | ||
var path = require("path"); | ||
var streamBuffers = require("stream-buffers"); | ||
var async = require('async') | ||
var async = require('async'); | ||
var PullStream = require('../'); | ||
module.exports = { | ||
"source sending 1-byte at a time": function (t) { | ||
t.expect(3); | ||
var ps = new PullStream({ lowWaterMark : 0 }); | ||
ps.on('finish', function () { | ||
sourceStream.destroy(); | ||
}); | ||
tap.test("source sending 1-byte at a time", function (t) { | ||
t.plan(3); | ||
var ps = new PullStream({ lowWaterMark: 0 }); | ||
ps.on('finish', function () { | ||
sourceStream.destroy(); | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1 | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1 | ||
}); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
var writableStream = new streamBuffers.WritableStreamBuffer({ | ||
initialSize: 100 | ||
}); | ||
writableStream.on('close', function () { | ||
var str = writableStream.getContentsAsString('utf8'); | ||
t.equal(' World', str); | ||
var writableStream = new streamBuffers.WritableStreamBuffer({ | ||
initialSize: 100 | ||
}); | ||
writableStream.on('close', function () { | ||
var str = writableStream.getContentsAsString('utf8'); | ||
t.equal(' World', str); | ||
ps.pull(function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
t.equal('!', data.toString()); | ||
return t.done(); | ||
}); | ||
ps.pull(function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
t.equal('!', data.toString()); | ||
return t.end(); | ||
}); | ||
ps.pipe(' World'.length, writableStream); | ||
}); | ||
}, | ||
"source sending twelve bytes at once": function (t) { | ||
t.expect(3); | ||
var ps = new PullStream({ lowWaterMark : 0 }); | ||
ps.on('finish', function () { | ||
sourceStream.destroy(); | ||
}); | ||
ps.pipe(' World'.length, writableStream); | ||
}); | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
tap.test("source sending twelve bytes at once", function (t) { | ||
t.plan(3); | ||
var ps = new PullStream({ lowWaterMark: 0 }); | ||
ps.on('finish', function () { | ||
sourceStream.destroy(); | ||
}); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
var writableStream = new streamBuffers.WritableStreamBuffer({ | ||
initialSize: 100 | ||
}); | ||
writableStream.on('close', function () { | ||
var str = writableStream.getContentsAsString('utf8'); | ||
t.equal(' World', str); | ||
ps.pull(function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
t.equal('!', data.toString()); | ||
return t.done(); | ||
}); | ||
}); | ||
ps.pipe(' World'.length, writableStream); | ||
}); | ||
}, | ||
"source sending 512 bytes at once": function (t) { | ||
t.expect(512 / 4); | ||
var ps = new PullStream({ lowWaterMark : 0 }); | ||
ps.on('finish', function() { | ||
sourceStream.destroy(); | ||
}); | ||
var values = []; | ||
for (var i = 0; i < 512; i+=4) { | ||
values.push(i + 1000); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
t.equal('Hello', data.toString()); | ||
sourceStream.pipe(ps); | ||
values.forEach(function(val) { | ||
sourceStream.put(val); | ||
var writableStream = new streamBuffers.WritableStreamBuffer({ | ||
initialSize: 100 | ||
}); | ||
writableStream.on('close', function () { | ||
var str = writableStream.getContentsAsString('utf8'); | ||
t.equal(' World', str); | ||
async.forEachSeries(values, function (val, callback) { | ||
ps.pull(4, function (err, data) { | ||
ps.pull(function (err, data) { | ||
if (err) { | ||
return callback(err); | ||
return t.end(err); | ||
} | ||
t.equal(val, data.toString()); | ||
return callback(null); | ||
t.equal('!', data.toString()); | ||
return t.end(); | ||
}); | ||
}, function (err) { | ||
t.done(err); | ||
}); | ||
}, | ||
"two length pulls": function (t) { | ||
t.expect(2); | ||
var ps = new PullStream({ lowWaterMark : 0 }); | ||
ps.on('finish', function () { | ||
sourceStream.destroy(); | ||
}); | ||
ps.pipe(' World'.length, writableStream); | ||
}); | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
tap.test("source sending 512 bytes at once", function (t) { | ||
t.plan(512 / 4); | ||
var ps = new PullStream({ lowWaterMark: 0 }); | ||
ps.on('finish', function() { | ||
sourceStream.destroy(); | ||
}); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
var values = []; | ||
for (var i = 0; i < 512; i += 4) { | ||
values.push(String(i + 1000)); | ||
} | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
ps.pull('Hello'.length, function (err, data) { | ||
sourceStream.pipe(ps); | ||
values.forEach(function(val) { | ||
sourceStream.put(val); | ||
}); | ||
async.forEachSeries(values, function (val, callback) { | ||
ps.pull(4, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
return callback(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
ps.pull(' World!'.length, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
t.equal(' World!', data.toString()); | ||
return t.done(); | ||
}); | ||
t.equal(val, data.toString()); | ||
return callback(null); | ||
}); | ||
}, | ||
}, function (err) { | ||
t.end(err); | ||
}); | ||
}); | ||
"pulling zero bytes returns empty data": function (t) { | ||
t.expect(1); | ||
var ps = new PullStream({ lowWaterMark : 0 }); | ||
tap.test("two length pulls", function (t) { | ||
t.plan(2); | ||
var ps = new PullStream({ lowWaterMark: 0 }); | ||
ps.on('finish', function () { | ||
sourceStream.destroy(); | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
chunkSize: 1000 | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
ps.pull(0, function (err, data) { | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
ps.pull(' World!'.length, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
return t.end(err); | ||
} | ||
t.equal(0, data.length, "data is empty"); | ||
sourceStream.destroy(); | ||
return t.done(); | ||
t.equal(' World!', data.toString()); | ||
return t.end(); | ||
}); | ||
}, | ||
}); | ||
}); | ||
"read from file": function (t) { | ||
t.expect(2); | ||
var ps = new PullStream({ lowWaterMark : 0 }); | ||
tap.test("pulling zero bytes returns empty data", function (t) { | ||
t.plan(1); | ||
var ps = new PullStream({ lowWaterMark: 0 }); | ||
var sourceStream = fs.createReadStream(path.join(__dirname, 'testFile.txt')); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
chunkSize: 1000 | ||
}); | ||
sourceStream.pipe(ps); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
ps.pull('Hello'.length, function (err, data) { | ||
ps.pull(0, function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
t.equal(0, data.length, "data is empty"); | ||
sourceStream.destroy(); | ||
return t.end(); | ||
}); | ||
}); | ||
tap.test("read from file", function (t) { | ||
t.plan(2); | ||
var ps = new PullStream({ lowWaterMark: 0 }); | ||
var sourceStream = fs.createReadStream(path.join(__dirname, 'testFile.txt')); | ||
sourceStream.pipe(ps); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
ps.pull(' World!'.length, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
return t.end(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
ps.pull(' World!'.length, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
t.equal(' World!', data.toString()); | ||
return t.done(); | ||
}); | ||
t.equal(' World!', data.toString()); | ||
return t.end(); | ||
}); | ||
}, | ||
}); | ||
}); | ||
"read past end of stream": function (t) { | ||
t.expect(2); | ||
var ps = new PullStream({ lowWaterMark : 0 }); | ||
ps.on('finish', function () { | ||
sourceStream.destroy(); | ||
}); | ||
tap.test("read past end of stream", function (t) { | ||
t.plan(2); | ||
var ps = new PullStream({ lowWaterMark: 0 }); | ||
ps.on('finish', function () { | ||
sourceStream.destroy(); | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 1, | ||
chunkSize: 1000 | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 1, | ||
chunkSize: 1000 | ||
}); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
ps.pull('Hello World!'.length, function (err, data) { | ||
ps.pull('Hello World!'.length, function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
t.equal('Hello World!', data.toString()); | ||
ps.pull(1, function (err) { | ||
if (err) { | ||
return t.done(err); | ||
t.ok(err, 'should get an error'); | ||
} | ||
t.equal('Hello World!', data.toString()); | ||
ps.pull(1, function (err, data) { | ||
if (err) { | ||
t.ok(err, 'should get an error'); | ||
} | ||
t.done(); | ||
}); | ||
t.end(); | ||
}); | ||
}, | ||
}); | ||
}); | ||
"pipe with no length": function (t) { | ||
t.expect(2); | ||
var ps = new PullStream({ lowWaterMark : 0 }); | ||
ps.on('end', function () { | ||
t.ok(true, "pullstream should end"); | ||
}); | ||
tap.test("pipe with no length", function (t) { | ||
t.plan(2); | ||
var ps = new PullStream({ lowWaterMark: 0 }); | ||
ps.on('end', function () { | ||
t.ok(true, "pullstream should end"); | ||
}); | ||
var writableStream = new streamBuffers.WritableStreamBuffer({ | ||
initialSize: 100 | ||
}); | ||
writableStream.on('close', function () { | ||
var str = writableStream.getContentsAsString('utf8'); | ||
t.equal('Hello World!', str); | ||
t.done(); | ||
}); | ||
var writableStream = new streamBuffers.WritableStreamBuffer({ | ||
initialSize: 100 | ||
}); | ||
writableStream.on('close', function () { | ||
var str = writableStream.getContentsAsString('utf8'); | ||
t.equal('Hello World!', str); | ||
t.end(); | ||
}); | ||
ps.pipe(writableStream); | ||
ps.pipe(writableStream); | ||
process.nextTick(function () { | ||
ps.write(new Buffer('Hello', 'utf8')); | ||
ps.write(new Buffer(' World', 'utf8')); | ||
process.nextTick(function () { | ||
ps.write(new Buffer('!', 'utf8')); | ||
ps.end(); | ||
}); | ||
setImmediate(function () { | ||
ps.write(new Buffer('Hello', 'utf8')); | ||
ps.write(new Buffer(' World', 'utf8')); | ||
setImmediate(function () { | ||
ps.write(new Buffer('!', 'utf8')); | ||
ps.end(); | ||
}); | ||
}, | ||
}); | ||
}); | ||
"throw on calling write() after end": function (t) { | ||
t.expect(1); | ||
var ps = new PullStream({ lowWaterMark : 0 }); | ||
ps.end(); | ||
tap.test("emit error on calling write() after end", function (t) { | ||
t.plan(2); | ||
try { | ||
ps.write(new Buffer('hello', 'utf8')); | ||
t.fail("should throw error"); | ||
} catch (ex) { | ||
t.ok(ex); | ||
} | ||
var ps = new PullStream({ lowWaterMark: 0 }); | ||
ps.end(); | ||
t.done(); | ||
}, | ||
ps.on('error', function (err) { | ||
t.ok(err); | ||
}); | ||
"pipe more bytes than the pullstream buffer size": function (t) { | ||
t.expect(1); | ||
var ps = new PullStream(); | ||
ps.on('end', function() { | ||
sourceStream.destroy(); | ||
setImmediate(function () { | ||
ps.write(new Buffer('hello', 'utf8'), function (err) { | ||
t.ok(err); | ||
t.end(); | ||
}); | ||
}); | ||
}); | ||
var aVals = "", bVals = ""; | ||
for (var i = 0; i < 20 * 1000; i++) { | ||
aVals += 'a'; | ||
} | ||
for (var i = 0; i < 180 * 1000; i++) { | ||
bVals += 'b'; | ||
} | ||
var combined = aVals + bVals; | ||
tap.test("pipe more bytes than the pullstream buffer size", function (t) { | ||
t.plan(1); | ||
var ps = new PullStream(); | ||
ps.on('end', function() { | ||
sourceStream.destroy(); | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 40 * 1024 | ||
}); | ||
var aVals = "", bVals = ""; | ||
for (var i = 0; i < 20 * 1000; i++) { | ||
aVals += 'a'; | ||
} | ||
for (var j = 0; j < 180 * 1000; j++) { | ||
bVals += 'b'; | ||
} | ||
var combined = aVals + bVals; | ||
sourceStream.pipe(ps); | ||
sourceStream.put(aVals); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 40 * 1024 | ||
}); | ||
var writableStream = new streamBuffers.WritableStreamBuffer({ | ||
initialSize: 200 * 1000 | ||
}); | ||
writableStream.on('close', function () { | ||
var str = writableStream.getContentsAsString('utf8'); | ||
t.equal(combined, str); | ||
t.done(); | ||
}); | ||
sourceStream.pipe(ps); | ||
sourceStream.put(aVals); | ||
ps.once('drain', function () { | ||
ps.pipe(200 * 1000, writableStream); | ||
process.nextTick(sourceStream.put.bind(null, bVals)); | ||
}); | ||
}, | ||
var writableStream = new streamBuffers.WritableStreamBuffer({ | ||
initialSize: 200 * 1000 | ||
}); | ||
writableStream.on('close', function () { | ||
var str = writableStream.getContentsAsString('utf8'); | ||
t.equal(combined, str); | ||
t.end(); | ||
}); | ||
"mix asynchronous pull with synchronous pullUpTo - exact number of bytes returned": function (t) { | ||
t.expect(2); | ||
var ps = new PullStream(); | ||
ps.once('drain', function () { | ||
ps.pipe(200 * 1000, writableStream); | ||
setImmediate(sourceStream.put.bind(null, bVals)); | ||
}); | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
tap.test("mix asynchronous pull with synchronous pullUpTo - exact number of bytes returned", function (t) { | ||
t.plan(2); | ||
var ps = new PullStream(); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
var data = ps.pullUpTo(" World!".length); | ||
t.equal(" World!", data.toString()); | ||
sourceStream.destroy(); | ||
t.done(); | ||
}); | ||
}, | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
"mix asynchronous pull with pullUpTo - fewer bytes returned than requested": function (t) { | ||
t.expect(2); | ||
var ps = new PullStream(); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
var data2 = ps.pullUpTo(" World!".length); | ||
t.equal(" World!", data2.toString()); | ||
sourceStream.destroy(); | ||
t.end(); | ||
}); | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
tap.test("mix asynchronous pull with pullUpTo - fewer bytes returned than requested", function (t) { | ||
t.plan(2); | ||
var ps = new PullStream(); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
var data = ps.pullUpTo(1000); | ||
t.equal(" World!", data.toString()); | ||
sourceStream.destroy(); | ||
t.done(); | ||
}); | ||
}, | ||
"retrieve all currently remaining bytes": function (t) { | ||
t.expect(2); | ||
var ps = new PullStream(); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
var data2 = ps.pullUpTo(1000); | ||
t.equal(" World!", data2.toString()); | ||
sourceStream.destroy(); | ||
t.end(); | ||
}); | ||
}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
tap.test("retrieve all currently remaining bytes", function (t) { | ||
t.plan(2); | ||
var ps = new PullStream(); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ | ||
frequency: 0, | ||
chunkSize: 1000 | ||
}); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
var data = ps.pullUpTo(); | ||
t.equal(" World!", data.toString()); | ||
sourceStream.destroy(); | ||
t.done(); | ||
}); | ||
}, | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
ps.pull('Hello'.length, function (err, data) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
t.equal('Hello', data.toString()); | ||
var data2 = ps.pullUpTo(); | ||
t.equal(" World!", data2.toString()); | ||
sourceStream.destroy(); | ||
t.end(); | ||
}); | ||
}); | ||
// TODO: node PassThrough stream doesn't handle unshift the same way anymore. | ||
// "prepend": function (t) { | ||
// t.expect(1); | ||
// tap.test("prepend", function (t) { | ||
// t.plan(1); | ||
// var ps = new PullStream(); | ||
@@ -402,30 +404,30 @@ // | ||
// if (err) { | ||
// return t.done(err); | ||
// return t.end(err); | ||
// } | ||
// t.equal('Hello World!', data.toString()); | ||
// sourceStream.destroy(); | ||
// t.done(); | ||
// t.end(); | ||
// }); | ||
// }, | ||
// }); | ||
"drain": function (t) { | ||
t.expect(1); | ||
var ps = new PullStream(); | ||
tap.test("drain", function (t) { | ||
t.plan(2); | ||
var ps = new PullStream(); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer(); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer(); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
sourceStream.pipe(ps); | ||
sourceStream.put("Hello World!"); | ||
ps.drain('Hello '.length, function (err) { | ||
if (err) { | ||
return t.done(err); | ||
} | ||
ps.pull('World!'.length, function (err, data) { | ||
t.equal('World!', data.toString()); | ||
sourceStream.destroy(); | ||
t.done(); | ||
}); | ||
ps.drain('Hello '.length, function (err) { | ||
if (err) { | ||
return t.end(err); | ||
} | ||
ps.pull('World!'.length, function (err, data) { | ||
t.error(err); | ||
t.equal('World!', data.toString()); | ||
sourceStream.destroy(); | ||
t.end(); | ||
}); | ||
} | ||
}; | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
3
1
23977
10
+ Addedisarray@1.0.0(transitive)
+ Addedprocess-nextick-args@2.0.1(transitive)
+ Addedreadable-stream@2.3.8(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedstring_decoder@1.1.1(transitive)
+ Addedutil-deprecate@1.0.2(transitive)
- Removedsetimmediate@>= 1.0.2 < 2
- Removedsetimmediate@1.0.5(transitive)
Updatedreadable-stream@^2.0.2