Socket
Socket
Sign inDemoInstall

bufferstreams

Package Overview
Dependencies
Maintainers
1
Versions
12
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bufferstreams - npm Package Compare versions

Comparing version 1.0.2 to 1.1.0

.eslintrc

12

package.json
{
"name": "bufferstreams",
"version": "1.0.2",
"version": "1.1.0",
"description": "Abstract streams to deal with the whole buffered contents.",

@@ -11,3 +11,5 @@ "homepage": "https://github.com/nfroidure/BufferStreams",

"cover": "istanbul cover --report html _mocha -- tests/*.mocha.js -t 5000",
"trinity": "npm-check-updates -u && npm install && npm test && git commit package.json -m \"Dependencies update\" && git push"
"trinity": "npm-check-updates -u && npm install && npm test && git commit package.json -m \"Dependencies update\" && git push",
"lint": "eslint **/*.s",
"cli": "env NPM_RUN_CLI=1"
},

@@ -26,9 +28,11 @@ "repository": {

"dependencies": {
"readable-stream": "^2.0.0"
"readable-stream": "^2.0.2"
},
"devDependencies": {
"istanbul": "^0.3.15",
"eslint": "^1.0.0",
"istanbul": "^0.3.17",
"istanbul-coveralls": "^1.0.3",
"mocha": "^2.2.5",
"mocha-lcov-reporter": "^0.0.2",
"sf-lint": "^1.0.2",
"streamtest": "^1.2.1"

@@ -35,0 +39,0 @@ },

@@ -10,10 +10,13 @@ # BufferStreams

BufferStreams abstracts streams to allow you to deal with their whole content in
a single buffer when it becomes necessary (by example: a legacy library that
`bufferstreams` abstracts streams to allow you to deal with their whole content
in a single buffer when it becomes necessary (by example: a legacy library that
do not support streams).
It is not a good practice, just some glue. Using BufferStreams means:
It is not a good practice, just some glue. Using `bufferstreams` means:
* there is no library dealing with streams for your needs
* you filled an issue to the wrapped library to support streams
`bufferstreams` can also be used to control the whole stream content in a single
point of a streaming pipeline for testing purposes.
## Usage

@@ -26,6 +29,7 @@ Install the [npm module](https://npmjs.org/package/bufferstreams):

```js
var BufferStreams = require('bufferstreams');
var fs = require('fs');
var bufferstreams = require('bufferstreams');
Fs.createReadStream('input.txt')
.pipe(new BufferStreams(function(err, buf, cb) {
fs.createReadStream('input.txt')
.pipe(new bufferstreams(function(err, buf, cb) {

@@ -46,7 +50,7 @@ // err will be filled with an error if the piped in stream emits one.

}))
.pipe(Fs.createWriteStream('output.txt'));
.pipe(fs.createWriteStream('output.txt'));
```
Note that you can use BufferStream with the objectMode option. In this case, the
given buffer will be an array containing the streamed objects:
Note that you can use `bufferstreams` with the objectMode option. In this case,
the given buffer will be an array containing the streamed objects:
```js

@@ -56,4 +60,31 @@ new BufferStreams({objectMode: true}, myCallback);

## API
### Stream : BufferStreams([options], callback)
#### options
##### options.objectMode
Type: `Boolean`
Default value: `false`
Use if piped in streams are in object mode. In this case, an array of the
buffered will be transmitted to the `callback` function.
##### options.*
`bufferstreams` inherits of Stream.Duplex, the options are passed to the
parent constructor so you can use it's options too.
##### callback(err, buf, cb)
Type: `Function`, required.
A function to handle the buffered content.
## Stats
[![NPM](https://nodei.co/npm/bufferstreams.png?downloads=true&stars=true)](https://nodei.co/npm/bufferstreams/)
[![NPM](https://nodei.co/npm-dl/bufferstreams.png)](https://nodei.co/npm/bufferstreams/)
## Contributing
Feel free to pull your code if you agree with publishing it under the MIT license.

@@ -1,9 +0,13 @@

var PassThrough = require('readable-stream').PassThrough;
'use strict';
var Duplex = require('readable-stream').Duplex;
var util = require('util');
// Inherit of PassThrough stream
util.inherits(BufferStream, PassThrough);
// Inherit of Duplex stream
util.inherits(BufferStream, Duplex);
// Constructor
function BufferStream(options, cb) {
var _this = this;
// Ensure new were used

@@ -26,3 +30,3 @@ if (!(this instanceof BufferStream)) {

// Parent constructor
PassThrough.call(this, options);
Duplex.call(this, options);

@@ -32,33 +36,62 @@ // Keep a reference to the callback

// Add a finished flag
this._bufferStreamFinished = false;
// Internal buffer
this._buf = options.objectMode ? [] : Buffer('');
}
this._bufferStreamBuffer = [];
BufferStream.prototype._transform = function(chunk, encoding, done) {
// Internal logic
function _bufferStreamCallbackWrapper(err) {
var buffer = options.objectMode ?
_this._bufferStreamBuffer :
Buffer.concat(_this._bufferStreamBuffer);
if(this.__objectMode) {
this._buf.push(chunk);
} else {
this._buf = Buffer.concat([this._buf, chunk], this._buf.length + chunk.length);
err = err || null;
_this._cb(
err,
buffer,
function(err2, buf) {
setImmediate(function() {
_this.removeListener('error', _bufferStreamError);
if(err2) {
_this.emit('error', err2);
}
_this._bufferStreamBuffer = options.objectMode ? buf || [] : [buf];
_this._bufferStreamFinished = true;
_this._read();
});
}
);
}
function _bufferStreamError(err) {
if(_this._bufferStreamFinished) {
return;
}
_bufferStreamCallbackWrapper(err);
}
this.once('finish', _bufferStreamCallbackWrapper);
this.on('error', _bufferStreamError);
}
BufferStream.prototype._write = function _bufferStreamWrite(chunk, encoding, done) {
this._bufferStreamBuffer.push(chunk);
done();
};
BufferStream.prototype._flush = function(done) {
BufferStream.prototype._read = function _bufferStreamRead(n) {
var _this = this;
this._cb(null, this._buf, function(err, buf) {
if(buf && buf.length) {
if(_this.__objectMode) {
buf.forEach(function(chunk) {
_this.push(chunk);
});
} else {
_this.push(buf);
if(_this._bufferStreamFinished) {
while(_this._bufferStreamBuffer.length) {
if(!_this.push(_this._bufferStreamBuffer.shift())) {
break;
}
}
done();
});
if(0 === _this._bufferStreamBuffer.length) {
_this.push(null);
}
}

@@ -65,0 +98,0 @@ };

@@ -1,14 +0,15 @@

var assert = require('assert')
, StreamTest = require('streamtest')
, BufferStream = require('../src')
;
'use strict';
var assert = require('assert');
var StreamTest = require('streamtest');
var BufferStream = require('../src');
// Helpers
function syncBufferPrefixer(headerText) {
return new BufferStream({
objectMode: headerText instanceof Object
objectMode: headerText instanceof Object,
}, function(err, buf, cb) {
assert.equal(err, null);
if(null === buf) {
cb(null, Buffer(headerText));
cb(null, new Buffer(headerText));
} else if(buf instanceof Array) {

@@ -18,3 +19,3 @@ buf.unshift(headerText);

} else {
cb(null, Buffer.concat([Buffer(headerText), buf]));
cb(null, Buffer.concat([new Buffer(headerText), buf]));
}

@@ -25,3 +26,3 @@ });

return new BufferStream({
objectMode: headerText instanceof Object
objectMode: headerText instanceof Object,
}, function(err, buf, cb) {

@@ -31,7 +32,7 @@ assert.equal(err, null);

setTimeout(function() {
cb(null, Buffer(headerText));
cb(null, new Buffer(headerText));
}, 0);
} else if(buf instanceof Array) {
setTimeout(function() {
buff.push(headerText);
buf.push(headerText);
cb(null, buf);

@@ -41,3 +42,3 @@ }, 0);

setTimeout(function() {
cb(null, Buffer.concat([Buffer(headerText), buf]));
cb(null, Buffer.concat([new Buffer(headerText), buf]));
}, 0);

@@ -79,6 +80,9 @@ }

it('should work when returning a null buffer', function(done) {
StreamTest[version].fromChunks(['te', 'st'])
.pipe(new BufferStream(function(err, buf, cb){
cb(null, null);
.pipe(new BufferStream(function(err, buf, cb) {
if(err) {
return done(err);
}
cb(null, null);
}))

@@ -125,6 +129,9 @@ .pipe(StreamTest[version].toText(function(err, data) {

it('should work when returning a null buffer', function(done) {
StreamTest[version].fromChunks(['te', 'st'])
.pipe(BufferStream(function(err, buf, cb){
cb(null, null);
.pipe(new BufferStream(function(err, buf, cb) {
if(err) {
return done(err);
}
cb(null, null);
}))

@@ -144,3 +151,2 @@ .pipe(StreamTest[version].toText(function(err, data) {

.pipe(asyncBufferPrefixer('plip'))
.pipe(asyncBufferPrefixer('plap'))

@@ -156,2 +162,49 @@ .pipe(StreamTest[version].toText(function(err, data) {

it('should report stream errors', function(done) {
var bufferStream = new BufferStream({
objectMode: true,
}, function(err, objs, cb) {
assert.equal(err.message, 'Aouch!');
cb(null, []);
});
StreamTest[version].fromErroredChunks(new Error('Aouch!'), [
'ou', 'de', 'la', 'li',
]).on('error', function(err) {
bufferStream.emit('error', err);
})
.pipe(bufferStream)
.pipe(StreamTest[version].toText(function(err, text) {
if(err) {
return done(err);
}
assert.deepEqual(text, '');
done();
}));
});
it('should emit callback errors', function(done) {
var caughtError = null;
StreamTest[version].fromChunks([
'ou', 'de', 'la', 'li',
])
.pipe(new BufferStream(function(err, objs, cb) {
if(err) {
return done(err);
}
cb(new Error('Aouch!'), '');
})).on('error', function(err) {
caughtError = err;
})
.pipe(StreamTest[version].toText(function(err, text) {
if(err) {
return done(err);
}
assert.equal(caughtError.message, 'Aouch!');
assert.equal(text, '');
done();
}));
});
});

@@ -162,9 +215,9 @@

describe('in object mode', function() {
var object1 = {txt: 'te'};
var object2 = {txt: 'st'};
var object3 = {txt: 'e'};
var object4 = {txt: 'd'};
var object5 = {txt: 'u'};
var object6 = {txt: 'ni'};
var object7 = {txt: 't'};
var object1 = { txt: 'te' };
var object2 = { txt: 'st' };
var object3 = { txt: 'e' };
var object4 = { txt: 'd' };
var object5 = { txt: 'u' };
var object6 = { txt: 'ni' };
var object7 = { txt: 't' };

@@ -185,9 +238,12 @@ describe('synchonously', function() {

it('should work when returning a null buffer', function(done) {
it('should work when returning an empty array', function(done) {
StreamTest[version].fromObjects([object1, object2])
.pipe(new BufferStream({
objectMode: true
}, function(err, buf, cb){
cb(null, null);
objectMode: true,
}, function(err, buf, cb) {
if(err) {
return done(err);
}
cb(null, []);
}))

@@ -233,7 +289,29 @@ .pipe(StreamTest[version].toObjects(function(err, objs) {

it('should work when returning a null buffer', function(done) {
it('should work when returning an empty array', function(done) {
StreamTest[version].fromObjects([object1, object2])
.pipe(BufferStream({
objectMode: true
}, function(err, buf, cb){
.pipe(new BufferStream({
objectMode: true,
}, function(err, objs, cb) {
if(err) {
return done(err);
}
cb(null, []);
}))
.pipe(StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.equal(objs.length, 0);
done();
}));
});
it('should work when returning legacy null', function(done) {
StreamTest[version].fromObjects([object1, object2])
.pipe(new BufferStream({
objectMode: true,
}, function(err, objs, cb) {
if(err) {
return done(err);
}
cb(null, null);

@@ -264,2 +342,51 @@ }))

it('should report stream errors', function(done) {
var bufferStream = new BufferStream({
objectMode: true,
}, function(err, objs, cb) {
assert.equal(err.message, 'Aouch!');
cb(null, []);
});
StreamTest[version].fromErroredObjects(new Error('Aouch!'), [
object1, object2, object3, object4, object5, object6, object7,
]).on('error', function(err) {
bufferStream.emit('error', err);
})
.pipe(bufferStream)
.pipe(StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.deepEqual(objs, []);
done();
}));
});
it('should emit callback errors', function(done) {
var caughtError = null;
StreamTest[version].fromObjects([
object1, object2, object3, object4, object5, object6, object7,
])
.pipe(new BufferStream({
objectMode: true,
}, function(err, objs, cb) {
if(err) {
return done(err);
}
cb(new Error('Aouch!'), []);
})).on('error', function(err) {
caughtError = err;
})
.pipe(StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.equal(caughtError.message, 'Aouch!');
assert.deepEqual(objs, []);
done();
}));
});
});

@@ -274,3 +401,1 @@

});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc