Socket
Socket
Sign inDemoInstall

bufferstreams

Package Overview
Dependencies
Maintainers
1
Versions
12
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bufferstreams - npm Package Compare versions

Comparing version 0.0.2 to 1.0.1

12

package.json
{
"name": "bufferstreams",
"version": "0.0.2",
"version": "1.0.1",
"description": "Abstract streams to deal with the whole buffered contents.",

@@ -25,7 +25,7 @@ "homepage": "https://github.com/nfroidure/BufferStreams",

"devDependencies": {
"mocha": "1.x.x",
"event-stream": "3.x.x",
"coveralls": "~2.11.2",
"istanbul": "~0.3.5",
"mocha": "2.x.x",
"mocha-lcov-reporter": "0.0.1",
"coveralls": "~2.8.0",
"istanbul": "~0.2.6"
"streamtest": "^1.1.0"
},

@@ -49,4 +49,4 @@ "author": {

"dependencies": {
"readable-stream": "^1.0.26-2"
"readable-stream": "^1.0.33"
}
}
# BufferStreams
[![NPM version](https://badge.fury.io/js/bufferstreams.png)](https://npmjs.org/package/bufferstreams) [![Build Status](https://travis-ci.org/nfroidure/BufferStreams.png?branch=master)](https://travis-ci.org/nfroidure/BufferStreams) [![Dependency Status](https://david-dm.org/nfroidure/bufferstreams.png)](https://david-dm.org/nfroidure/bufferstreams) [![devDependency Status](https://david-dm.org/nfroidure/bufferstreams/dev-status.png)](https://david-dm.org/nfroidure/bufferstreams#info=devDependencies) [![Coverage Status](https://coveralls.io/repos/nfroidure/BufferStreams/badge.png?branch=master)](https://coveralls.io/r/nfroidure/BufferStreams?branch=master)
[![NPM version](https://badge.fury.io/js/bufferstreams.png)](https://npmjs.org/package/bufferstreams) [![Build Status](https://travis-ci.org/nfroidure/BufferStreams.png?branch=master)](https://travis-ci.org/nfroidure/BufferStreams) [![Dependency Status](https://david-dm.org/nfroidure/bufferstreams.png)](https://david-dm.org/nfroidure/bufferstreams) [![devDependency Status](https://david-dm.org/nfroidure/bufferstreams/dev-status.png)](https://david-dm.org/nfroidure/bufferstreams#info=devDependencies) [![Coverage Status](https://coveralls.io/repos/nfroidure/BufferStreams/badge.png?branch=master)](https://coveralls.io/r/nfroidure/BufferStreams?branch=master) [![Code Climate](https://codeclimate.com/github/nfroidure/BufferStreams.png)](https://codeclimate.com/github/nfroidure/BufferStreams)

@@ -42,4 +42,10 @@ BufferStreams abstracts streams to allow you to deal with their whole content in

Note that you can use BufferStream with the objectMode option. In this case, the
given buffer will be an array containing the streamed objects:
```js
new BufferStreams({objectMode: true}, myCallback);
```
## Contributing
Feel free to pull your code if you agree with publishing it under the MIT license.

@@ -8,11 +8,21 @@ var PassThrough = require('readable-stream').PassThrough;

// Constructor
function BufferStream(cb) {
function BufferStream(options, cb) {
// Ensure new were used
if (!(this instanceof BufferStream)) {
return new BufferStream(cb);
return new BufferStream(options, cb);
}
// Cast args
if(options instanceof Function) {
cb = options;
options = {};
}
options = options || {};
if(!(cb instanceof Function)) {
throw new Error('The given callback must be a function.');
}
this.__objectMode = options.objectMode;
// Parent constructor
PassThrough.call(this);
PassThrough.call(this, options);

@@ -23,3 +33,3 @@ // Keep a reference to the callback

// Internal buffer
this._buf = Buffer('');
this._buf = options.objectMode ? [] : Buffer('');
}

@@ -29,3 +39,7 @@

this._buf = Buffer.concat([this._buf, chunk], this._buf.length + chunk.length);
if(this.__objectMode) {
this._buf.push(chunk);
} else {
this._buf = Buffer.concat([this._buf, chunk], this._buf.length + chunk.length);
}

@@ -37,7 +51,13 @@ done();

BufferStream.prototype._flush = function(done) {
var _that = this;
var _this = this;
this._cb(null, this._buf, function(err, buf) {
if (buf && buf.length) {
_that.push(buf);
if(buf && buf.length) {
if(_this.__objectMode) {
buf.forEach(function(chunk) {
_this.push(chunk);
});
} else {
_this.push(buf);
}
}

@@ -44,0 +64,0 @@ done();

var assert = require('assert')
, es = require('event-stream')
, StreamTest = require('streamtest')
, BufferStream = require('../src')

@@ -8,6 +8,11 @@ ;

function syncBufferPrefixer(headerText) {
return new BufferStream(function(err, buf, cb) {
return new BufferStream({
objectMode: headerText instanceof Object
}, function(err, buf, cb) {
assert.equal(err, null);
if(null === buf) {
cb(null, Buffer(headerText));
} else if(buf instanceof Array) {
buf.unshift(headerText);
cb(null, buf);
} else {

@@ -19,3 +24,5 @@ cb(null, Buffer.concat([Buffer(headerText), buf]));

function asyncBufferPrefixer(headerText) {
return new BufferStream(function(err, buf, cb) {
return new BufferStream({
objectMode: headerText instanceof Object
}, function(err, buf, cb) {
assert.equal(err, null);

@@ -26,2 +33,7 @@ if(null === buf) {

}, 0);
} else if(buf instanceof Array) {
setTimeout(function() {
buff.push(headerText);
cb(null, buf);
}, 0);
} else {

@@ -36,80 +48,220 @@ setTimeout(function() {

// Tests
describe('Abstract buffers', function() {
describe('synchonously', function() {
describe('bufferstreams', function() {
it('should work with one pipe', function(done) {
es.readArray(['te', 'st'])
.pipe(syncBufferPrefixer('plop'))
.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'ploptest');
done();
}));
it('should fail when callback is not a function', function() {
assert.throws(function() {
new BufferStream();
});
});
it('should work when returning a null buffer', function(done) {
es.readArray(['te', 'st'])
.pipe(new BufferStream(function(err, buf, cb){
cb(null, null);
}))
.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, '');
done();
}));
});
// Iterating through versions
StreamTest.versions.forEach(function(version) {
it('should work with multiple pipes', function(done) {
es.readArray(['te', 'st'])
.pipe(syncBufferPrefixer('plop'))
.pipe(syncBufferPrefixer('plip'))
.pipe(syncBufferPrefixer('plap'))
.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'plapplipploptest');
done();
}));
});
describe('for ' + version + ' streams', function() {
});
describe('in buffer mode', function() {
describe('asynchonously', function() {
describe('synchonously', function() {
it('should work with one pipe', function(done) {
es.readArray(['te', 'st'])
.pipe(asyncBufferPrefixer('plop'))
.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'ploptest');
done();
}));
});
it('should work with one pipe', function(done) {
StreamTest[version].fromChunks(['te', 'st'])
.pipe(syncBufferPrefixer('plop'))
.pipe(StreamTest[version].toText(function(err, data) {
if(err) {
return done(err);
}
assert.equal(data, 'ploptest');
done();
}));
});
it('should work when returning a null buffer', function(done) {
es.readArray(['te', 'st'])
.pipe(BufferStream(function(err, buf, cb){
cb(null, null);
}))
.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, '');
done();
}));
});
it('should work when returning a null buffer', function(done) {
StreamTest[version].fromChunks(['te', 'st'])
.pipe(new BufferStream(function(err, buf, cb){
cb(null, null);
}))
.pipe(StreamTest[version].toText(function(err, data) {
if(err) {
return done(err);
}
assert.equal(data, '');
done();
}));
});
it('should work with multiple pipes', function(done) {
es.readArray(['te', 'st'])
.pipe(asyncBufferPrefixer('plop'))
.pipe(asyncBufferPrefixer('plip'))
.pipe(asyncBufferPrefixer('plap'))
.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'plapplipploptest');
done();
}));
it('should work with multiple pipes', function(done) {
StreamTest[version].fromChunks(['te', 'st'])
.pipe(syncBufferPrefixer('plop'))
.pipe(syncBufferPrefixer('plip'))
.pipe(syncBufferPrefixer('plap'))
.pipe(StreamTest[version].toText(function(err, data) {
if(err) {
return done(err);
}
assert.equal(data, 'plapplipploptest');
done();
}));
});
});
describe('asynchonously', function() {
it('should work with one pipe', function(done) {
StreamTest[version].fromChunks(['te', 'st'])
.pipe(asyncBufferPrefixer('plop'))
.pipe(StreamTest[version].toText(function(err, data) {
if(err) {
return done(err);
}
assert.equal(data, 'ploptest');
done();
}));
});
it('should work when returning a null buffer', function(done) {
StreamTest[version].fromChunks(['te', 'st'])
.pipe(BufferStream(function(err, buf, cb){
cb(null, null);
}))
.pipe(StreamTest[version].toText(function(err, data) {
if(err) {
return done(err);
}
assert.equal(data, '');
done();
}));
});
it('should work with multiple pipes', function(done) {
StreamTest[version].fromChunks(['te', 'st'])
.pipe(asyncBufferPrefixer('plop'))
.pipe(asyncBufferPrefixer('plip'))
.pipe(asyncBufferPrefixer('plap'))
.pipe(StreamTest[version].toText(function(err, data) {
if(err) {
return done(err);
}
assert.equal(data, 'plapplipploptest');
done();
}));
});
});
});
describe('in object mode', function() {
var object1 = {txt: 'te'};
var object2 = {txt: 'st'};
var object3 = {txt: 'e'};
var object4 = {txt: 'd'};
var object5 = {txt: 'u'};
var object6 = {txt: 'ni'};
var object7 = {txt: 't'};
describe('synchonously', function() {
it('should work with one pipe', function(done) {
StreamTest[version].fromObjects([object1, object2])
.pipe(syncBufferPrefixer(object4))
.pipe(StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.deepEqual(objs, [object4, object1, object2]);
done();
}));
});
it('should work when returning a null buffer', function(done) {
StreamTest[version].fromObjects([object1, object2])
.pipe(new BufferStream({
objectMode: true
}, function(err, buf, cb){
cb(null, null);
}))
.pipe(StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.equal(objs.length, 0);
done();
}));
});
it('should work with multiple pipes', function(done) {
StreamTest[version].fromObjects([object1, object2])
.pipe(syncBufferPrefixer(object4))
.pipe(syncBufferPrefixer(object5))
.pipe(syncBufferPrefixer(object6))
.pipe(StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.deepEqual(objs, [object6, object5, object4, object1, object2]);
done();
}));
});
});
describe('asynchonously', function() {
it('should work with one pipe', function(done) {
StreamTest[version].fromObjects([object1, object2])
.pipe(syncBufferPrefixer(object4))
.pipe(StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.deepEqual(objs, [object4, object1, object2]);
done();
}));
});
it('should work when returning a null buffer', function(done) {
StreamTest[version].fromObjects([object1, object2])
.pipe(BufferStream({
objectMode: true
}, function(err, buf, cb){
cb(null, null);
}))
.pipe(StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.equal(objs.length, 0);
done();
}));
});
it('should work with multiple pipes', function(done) {
StreamTest[version].fromObjects([object1, object2])
.pipe(syncBufferPrefixer(object4))
.pipe(syncBufferPrefixer(object5))
.pipe(syncBufferPrefixer(object6))
.pipe(StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.deepEqual(objs, [object6, object5, object4, object1, object2]);
done();
}));
});
});
});
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc