Socket
Socket
Sign inDemoInstall

pullstream

Package Overview
Dependencies
10
Maintainers
2
Versions
14
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.4.1 to 1.0.0

18

package.json
{
"name": "pullstream",
"version": "0.4.1",
"version": "1.0.0",
"description": "A stream you can pull data from.",
"main": "pullstream.js",
"scripts": {
"test": "nodeunit test"
"test": "tap test"
},
"license": "MIT",
"bugs": {
"url": "https://github.com/nearinfinity/node-pullstream/issues"
"url": "https://github.com/EvanOxfeld/node-pullstream/issues"
},
"repository": {
"type": "git",
"url": "https://github.com/nearinfinity/node-pullstream.git"
"url": "https://github.com/EvanOxfeld/node-pullstream.git"
},

@@ -22,12 +22,14 @@ "keywords": [

"devDependencies": {
"nodeunit": ">= 0.9.0 < 1",
"stream-buffers": ">= 0.2.6 < 1",
"async": ">= 0.9.0 <"
"async": ">= 0.9.0 <",
"tap": "^1.4.1"
},
"dependencies": {
"over": ">= 0.0.5 < 1",
"readable-stream": "~1.0.31",
"setimmediate": ">= 1.0.2 < 2",
"readable-stream": "^2.0.2",
"slice-stream": ">= 1.0.0 < 2"
},
"engines": {
"node": ">= 0.10.0"
}
}

@@ -5,3 +5,2 @@ 'use strict';

require("setimmediate");
var inherits = require("util").inherits;

@@ -8,0 +7,0 @@ var PassThrough = require('readable-stream/passthrough');

'use strict';
var nodeunit = require('nodeunit');
var tap = require('tap');
var fs = require("fs");
var path = require("path");
var streamBuffers = require("stream-buffers");
var async = require('async')
var async = require('async');
var PullStream = require('../');
module.exports = {
"source sending 1-byte at a time": function (t) {
t.expect(3);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
tap.test("source sending 1-byte at a time", function (t) {
t.plan(3);
var ps = new PullStream({ lowWaterMark: 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.end(err);
}
t.equal('Hello', data.toString());
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 100
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal(' World', str);
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 100
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal(' World', str);
ps.pull(function (err, data) {
if (err) {
return t.done(err);
}
t.equal('!', data.toString());
return t.done();
});
ps.pull(function (err, data) {
if (err) {
return t.end(err);
}
t.equal('!', data.toString());
return t.end();
});
ps.pipe(' World'.length, writableStream);
});
},
"source sending twelve bytes at once": function (t) {
t.expect(3);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
ps.pipe(' World'.length, writableStream);
});
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
tap.test("source sending twelve bytes at once", function (t) {
t.plan(3);
var ps = new PullStream({ lowWaterMark: 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 100
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal(' World', str);
ps.pull(function (err, data) {
if (err) {
return t.done(err);
}
t.equal('!', data.toString());
return t.done();
});
});
ps.pipe(' World'.length, writableStream);
});
},
"source sending 512 bytes at once": function (t) {
t.expect(512 / 4);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function() {
sourceStream.destroy();
});
var values = [];
for (var i = 0; i < 512; i+=4) {
values.push(i + 1000);
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.end(err);
}
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
t.equal('Hello', data.toString());
sourceStream.pipe(ps);
values.forEach(function(val) {
sourceStream.put(val);
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 100
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal(' World', str);
async.forEachSeries(values, function (val, callback) {
ps.pull(4, function (err, data) {
ps.pull(function (err, data) {
if (err) {
return callback(err);
return t.end(err);
}
t.equal(val, data.toString());
return callback(null);
t.equal('!', data.toString());
return t.end();
});
}, function (err) {
t.done(err);
});
},
"two length pulls": function (t) {
t.expect(2);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
ps.pipe(' World'.length, writableStream);
});
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
tap.test("source sending 512 bytes at once", function (t) {
t.plan(512 / 4);
var ps = new PullStream({ lowWaterMark: 0 });
ps.on('finish', function() {
sourceStream.destroy();
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
var values = [];
for (var i = 0; i < 512; i += 4) {
values.push(String(i + 1000));
}
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
ps.pull('Hello'.length, function (err, data) {
sourceStream.pipe(ps);
values.forEach(function(val) {
sourceStream.put(val);
});
async.forEachSeries(values, function (val, callback) {
ps.pull(4, function (err, data) {
if (err) {
return t.done(err);
return callback(err);
}
t.equal('Hello', data.toString());
ps.pull(' World!'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal(' World!', data.toString());
return t.done();
});
t.equal(val, data.toString());
return callback(null);
});
},
}, function (err) {
t.end(err);
});
});
"pulling zero bytes returns empty data": function (t) {
t.expect(1);
var ps = new PullStream({ lowWaterMark : 0 });
tap.test("two length pulls", function (t) {
t.plan(2);
var ps = new PullStream({ lowWaterMark: 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
chunkSize: 1000
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull(0, function (err, data) {
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.end(err);
}
t.equal('Hello', data.toString());
ps.pull(' World!'.length, function (err, data) {
if (err) {
return t.done(err);
return t.end(err);
}
t.equal(0, data.length, "data is empty");
sourceStream.destroy();
return t.done();
t.equal(' World!', data.toString());
return t.end();
});
},
});
});
"read from file": function (t) {
t.expect(2);
var ps = new PullStream({ lowWaterMark : 0 });
tap.test("pulling zero bytes returns empty data", function (t) {
t.plan(1);
var ps = new PullStream({ lowWaterMark: 0 });
var sourceStream = fs.createReadStream(path.join(__dirname, 'testFile.txt'));
var sourceStream = new streamBuffers.ReadableStreamBuffer({
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
ps.pull(0, function (err, data) {
if (err) {
return t.end(err);
}
t.equal(0, data.length, "data is empty");
sourceStream.destroy();
return t.end();
});
});
tap.test("read from file", function (t) {
t.plan(2);
var ps = new PullStream({ lowWaterMark: 0 });
var sourceStream = fs.createReadStream(path.join(__dirname, 'testFile.txt'));
sourceStream.pipe(ps);
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.end(err);
}
t.equal('Hello', data.toString());
ps.pull(' World!'.length, function (err, data) {
if (err) {
return t.done(err);
return t.end(err);
}
t.equal('Hello', data.toString());
ps.pull(' World!'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal(' World!', data.toString());
return t.done();
});
t.equal(' World!', data.toString());
return t.end();
});
},
});
});
"read past end of stream": function (t) {
t.expect(2);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
tap.test("read past end of stream", function (t) {
t.plan(2);
var ps = new PullStream({ lowWaterMark: 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 1,
chunkSize: 1000
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 1,
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello World!'.length, function (err, data) {
ps.pull('Hello World!'.length, function (err, data) {
if (err) {
return t.end(err);
}
t.equal('Hello World!', data.toString());
ps.pull(1, function (err) {
if (err) {
return t.done(err);
t.ok(err, 'should get an error');
}
t.equal('Hello World!', data.toString());
ps.pull(1, function (err, data) {
if (err) {
t.ok(err, 'should get an error');
}
t.done();
});
t.end();
});
},
});
});
"pipe with no length": function (t) {
t.expect(2);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('end', function () {
t.ok(true, "pullstream should end");
});
tap.test("pipe with no length", function (t) {
t.plan(2);
var ps = new PullStream({ lowWaterMark: 0 });
ps.on('end', function () {
t.ok(true, "pullstream should end");
});
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 100
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal('Hello World!', str);
t.done();
});
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 100
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal('Hello World!', str);
t.end();
});
ps.pipe(writableStream);
ps.pipe(writableStream);
process.nextTick(function () {
ps.write(new Buffer('Hello', 'utf8'));
ps.write(new Buffer(' World', 'utf8'));
process.nextTick(function () {
ps.write(new Buffer('!', 'utf8'));
ps.end();
});
setImmediate(function () {
ps.write(new Buffer('Hello', 'utf8'));
ps.write(new Buffer(' World', 'utf8'));
setImmediate(function () {
ps.write(new Buffer('!', 'utf8'));
ps.end();
});
},
});
});
"throw on calling write() after end": function (t) {
t.expect(1);
var ps = new PullStream({ lowWaterMark : 0 });
ps.end();
tap.test("emit error on calling write() after end", function (t) {
t.plan(2);
try {
ps.write(new Buffer('hello', 'utf8'));
t.fail("should throw error");
} catch (ex) {
t.ok(ex);
}
var ps = new PullStream({ lowWaterMark: 0 });
ps.end();
t.done();
},
ps.on('error', function (err) {
t.ok(err);
});
"pipe more bytes than the pullstream buffer size": function (t) {
t.expect(1);
var ps = new PullStream();
ps.on('end', function() {
sourceStream.destroy();
setImmediate(function () {
ps.write(new Buffer('hello', 'utf8'), function (err) {
t.ok(err);
t.end();
});
});
});
var aVals = "", bVals = "";
for (var i = 0; i < 20 * 1000; i++) {
aVals += 'a';
}
for (var i = 0; i < 180 * 1000; i++) {
bVals += 'b';
}
var combined = aVals + bVals;
tap.test("pipe more bytes than the pullstream buffer size", function (t) {
t.plan(1);
var ps = new PullStream();
ps.on('end', function() {
sourceStream.destroy();
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 40 * 1024
});
var aVals = "", bVals = "";
for (var i = 0; i < 20 * 1000; i++) {
aVals += 'a';
}
for (var j = 0; j < 180 * 1000; j++) {
bVals += 'b';
}
var combined = aVals + bVals;
sourceStream.pipe(ps);
sourceStream.put(aVals);
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 40 * 1024
});
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 200 * 1000
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal(combined, str);
t.done();
});
sourceStream.pipe(ps);
sourceStream.put(aVals);
ps.once('drain', function () {
ps.pipe(200 * 1000, writableStream);
process.nextTick(sourceStream.put.bind(null, bVals));
});
},
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 200 * 1000
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal(combined, str);
t.end();
});
"mix asynchronous pull with synchronous pullUpTo - exact number of bytes returned": function (t) {
t.expect(2);
var ps = new PullStream();
ps.once('drain', function () {
ps.pipe(200 * 1000, writableStream);
setImmediate(sourceStream.put.bind(null, bVals));
});
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
tap.test("mix asynchronous pull with synchronous pullUpTo - exact number of bytes returned", function (t) {
t.plan(2);
var ps = new PullStream();
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
var data = ps.pullUpTo(" World!".length);
t.equal(" World!", data.toString());
sourceStream.destroy();
t.done();
});
},
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
"mix asynchronous pull with pullUpTo - fewer bytes returned than requested": function (t) {
t.expect(2);
var ps = new PullStream();
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.end(err);
}
t.equal('Hello', data.toString());
var data2 = ps.pullUpTo(" World!".length);
t.equal(" World!", data2.toString());
sourceStream.destroy();
t.end();
});
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
tap.test("mix asynchronous pull with pullUpTo - fewer bytes returned than requested", function (t) {
t.plan(2);
var ps = new PullStream();
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
var data = ps.pullUpTo(1000);
t.equal(" World!", data.toString());
sourceStream.destroy();
t.done();
});
},
"retrieve all currently remaining bytes": function (t) {
t.expect(2);
var ps = new PullStream();
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.end(err);
}
t.equal('Hello', data.toString());
var data2 = ps.pullUpTo(1000);
t.equal(" World!", data2.toString());
sourceStream.destroy();
t.end();
});
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
tap.test("retrieve all currently remaining bytes", function (t) {
t.plan(2);
var ps = new PullStream();
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
var data = ps.pullUpTo();
t.equal(" World!", data.toString());
sourceStream.destroy();
t.done();
});
},
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.end(err);
}
t.equal('Hello', data.toString());
var data2 = ps.pullUpTo();
t.equal(" World!", data2.toString());
sourceStream.destroy();
t.end();
});
});
// TODO: node PassThrough stream doesn't handle unshift the same way anymore.
// "prepend": function (t) {
// t.expect(1);
// tap.test("prepend", function (t) {
// t.plan(1);
// var ps = new PullStream();

@@ -402,30 +404,30 @@ //

// if (err) {
// return t.done(err);
// return t.end(err);
// }
// t.equal('Hello World!', data.toString());
// sourceStream.destroy();
// t.done();
// t.end();
// });
// },
// });
"drain": function (t) {
t.expect(1);
var ps = new PullStream();
tap.test("drain", function (t) {
t.plan(2);
var ps = new PullStream();
var sourceStream = new streamBuffers.ReadableStreamBuffer();
var sourceStream = new streamBuffers.ReadableStreamBuffer();
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.drain('Hello '.length, function (err) {
if (err) {
return t.done(err);
}
ps.pull('World!'.length, function (err, data) {
t.equal('World!', data.toString());
sourceStream.destroy();
t.done();
});
ps.drain('Hello '.length, function (err) {
if (err) {
return t.end(err);
}
ps.pull('World!'.length, function (err, data) {
t.error(err);
t.equal('World!', data.toString());
sourceStream.destroy();
t.end();
});
}
};
});
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc