bufferstreams
Advanced tools
Comparing version 1.0.2 to 1.1.0
{ | ||
"name": "bufferstreams", | ||
"version": "1.0.2", | ||
"version": "1.1.0", | ||
"description": "Abstract streams to deal with the whole buffered contents.", | ||
@@ -11,3 +11,5 @@ "homepage": "https://github.com/nfroidure/BufferStreams", | ||
"cover": "istanbul cover --report html _mocha -- tests/*.mocha.js -t 5000", | ||
"trinity": "npm-check-updates -u && npm install && npm test && git commit package.json -m \"Dependencies update\" && git push" | ||
"trinity": "npm-check-updates -u && npm install && npm test && git commit package.json -m \"Dependencies update\" && git push", | ||
"lint": "eslint **/*.s", | ||
"cli": "env NPM_RUN_CLI=1" | ||
}, | ||
@@ -26,9 +28,11 @@ "repository": { | ||
"dependencies": { | ||
"readable-stream": "^2.0.0" | ||
"readable-stream": "^2.0.2" | ||
}, | ||
"devDependencies": { | ||
"istanbul": "^0.3.15", | ||
"eslint": "^1.0.0", | ||
"istanbul": "^0.3.17", | ||
"istanbul-coveralls": "^1.0.3", | ||
"mocha": "^2.2.5", | ||
"mocha-lcov-reporter": "^0.0.2", | ||
"sf-lint": "^1.0.2", | ||
"streamtest": "^1.2.1" | ||
@@ -35,0 +39,0 @@ }, |
@@ -10,10 +10,13 @@ # BufferStreams | ||
BufferStreams abstracts streams to allow you to deal with their whole content in | ||
a single buffer when it becomes necessary (by example: a legacy library that | ||
`bufferstreams` abstracts streams to allow you to deal with their whole content | ||
in a single buffer when it becomes necessary (by example: a legacy library that | ||
do not support streams). | ||
It is not a good practice, just some glue. Using BufferStreams means: | ||
It is not a good practice, just some glue. Using `bufferstreams` means: | ||
* there is no library dealing with streams for your needs | ||
* you filled an issue to the wrapped library to support streams | ||
`bufferstreams` can also be used to control the whole stream content in a single | ||
point of a streaming pipeline for testing purposes. | ||
## Usage | ||
@@ -26,6 +29,7 @@ Install the [npm module](https://npmjs.org/package/bufferstreams): | ||
```js | ||
var BufferStreams = require('bufferstreams'); | ||
var fs = require('fs'); | ||
var bufferstreams = require('bufferstreams'); | ||
Fs.createReadStream('input.txt') | ||
.pipe(new BufferStreams(function(err, buf, cb) { | ||
fs.createReadStream('input.txt') | ||
.pipe(new bufferstreams(function(err, buf, cb) { | ||
@@ -46,7 +50,7 @@ // err will be filled with an error if the piped in stream emits one. | ||
})) | ||
.pipe(Fs.createWriteStream('output.txt')); | ||
.pipe(fs.createWriteStream('output.txt')); | ||
``` | ||
Note that you can use BufferStream with the objectMode option. In this case, the | ||
given buffer will be an array containing the streamed objects: | ||
Note that you can use `bufferstreams` with the objectMode option. In this case, | ||
the given buffer will be an array containing the streamed objects: | ||
```js | ||
@@ -56,4 +60,31 @@ new BufferStreams({objectMode: true}, myCallback); | ||
## API | ||
### Stream : BufferStreams([options], callback) | ||
#### options | ||
##### options.objectMode | ||
Type: `Boolean` | ||
Default value: `false` | ||
Use if piped in streams are in object mode. In this case, an array of the | ||
buffered will be transmitted to the `callback` function. | ||
##### options.* | ||
`bufferstreams` inherits of Stream.Duplex, the options are passed to the | ||
parent constructor so you can use it's options too. | ||
##### callback(err, buf, cb) | ||
Type: `Function`, required. | ||
A function to handle the buffered content. | ||
## Stats | ||
[![NPM](https://nodei.co/npm/bufferstreams.png?downloads=true&stars=true)](https://nodei.co/npm/bufferstreams/) | ||
[![NPM](https://nodei.co/npm-dl/bufferstreams.png)](https://nodei.co/npm/bufferstreams/) | ||
## Contributing | ||
Feel free to pull your code if you agree with publishing it under the MIT license. | ||
@@ -1,9 +0,13 @@ | ||
var PassThrough = require('readable-stream').PassThrough; | ||
'use strict'; | ||
var Duplex = require('readable-stream').Duplex; | ||
var util = require('util'); | ||
// Inherit of PassThrough stream | ||
util.inherits(BufferStream, PassThrough); | ||
// Inherit of Duplex stream | ||
util.inherits(BufferStream, Duplex); | ||
// Constructor | ||
function BufferStream(options, cb) { | ||
var _this = this; | ||
// Ensure new were used | ||
@@ -26,3 +30,3 @@ if (!(this instanceof BufferStream)) { | ||
// Parent constructor | ||
PassThrough.call(this, options); | ||
Duplex.call(this, options); | ||
@@ -32,33 +36,62 @@ // Keep a reference to the callback | ||
// Add a finished flag | ||
this._bufferStreamFinished = false; | ||
// Internal buffer | ||
this._buf = options.objectMode ? [] : Buffer(''); | ||
} | ||
this._bufferStreamBuffer = []; | ||
BufferStream.prototype._transform = function(chunk, encoding, done) { | ||
// Internal logic | ||
function _bufferStreamCallbackWrapper(err) { | ||
var buffer = options.objectMode ? | ||
_this._bufferStreamBuffer : | ||
Buffer.concat(_this._bufferStreamBuffer); | ||
if(this.__objectMode) { | ||
this._buf.push(chunk); | ||
} else { | ||
this._buf = Buffer.concat([this._buf, chunk], this._buf.length + chunk.length); | ||
err = err || null; | ||
_this._cb( | ||
err, | ||
buffer, | ||
function(err2, buf) { | ||
setImmediate(function() { | ||
_this.removeListener('error', _bufferStreamError); | ||
if(err2) { | ||
_this.emit('error', err2); | ||
} | ||
_this._bufferStreamBuffer = options.objectMode ? buf || [] : [buf]; | ||
_this._bufferStreamFinished = true; | ||
_this._read(); | ||
}); | ||
} | ||
); | ||
} | ||
function _bufferStreamError(err) { | ||
if(_this._bufferStreamFinished) { | ||
return; | ||
} | ||
_bufferStreamCallbackWrapper(err); | ||
} | ||
this.once('finish', _bufferStreamCallbackWrapper); | ||
this.on('error', _bufferStreamError); | ||
} | ||
BufferStream.prototype._write = function _bufferStreamWrite(chunk, encoding, done) { | ||
this._bufferStreamBuffer.push(chunk); | ||
done(); | ||
}; | ||
BufferStream.prototype._flush = function(done) { | ||
BufferStream.prototype._read = function _bufferStreamRead(n) { | ||
var _this = this; | ||
this._cb(null, this._buf, function(err, buf) { | ||
if(buf && buf.length) { | ||
if(_this.__objectMode) { | ||
buf.forEach(function(chunk) { | ||
_this.push(chunk); | ||
}); | ||
} else { | ||
_this.push(buf); | ||
if(_this._bufferStreamFinished) { | ||
while(_this._bufferStreamBuffer.length) { | ||
if(!_this.push(_this._bufferStreamBuffer.shift())) { | ||
break; | ||
} | ||
} | ||
done(); | ||
}); | ||
if(0 === _this._bufferStreamBuffer.length) { | ||
_this.push(null); | ||
} | ||
} | ||
@@ -65,0 +98,0 @@ }; |
@@ -1,14 +0,15 @@ | ||
var assert = require('assert') | ||
, StreamTest = require('streamtest') | ||
, BufferStream = require('../src') | ||
; | ||
'use strict'; | ||
var assert = require('assert'); | ||
var StreamTest = require('streamtest'); | ||
var BufferStream = require('../src'); | ||
// Helpers | ||
function syncBufferPrefixer(headerText) { | ||
return new BufferStream({ | ||
objectMode: headerText instanceof Object | ||
objectMode: headerText instanceof Object, | ||
}, function(err, buf, cb) { | ||
assert.equal(err, null); | ||
if(null === buf) { | ||
cb(null, Buffer(headerText)); | ||
cb(null, new Buffer(headerText)); | ||
} else if(buf instanceof Array) { | ||
@@ -18,3 +19,3 @@ buf.unshift(headerText); | ||
} else { | ||
cb(null, Buffer.concat([Buffer(headerText), buf])); | ||
cb(null, Buffer.concat([new Buffer(headerText), buf])); | ||
} | ||
@@ -25,3 +26,3 @@ }); | ||
return new BufferStream({ | ||
objectMode: headerText instanceof Object | ||
objectMode: headerText instanceof Object, | ||
}, function(err, buf, cb) { | ||
@@ -31,7 +32,7 @@ assert.equal(err, null); | ||
setTimeout(function() { | ||
cb(null, Buffer(headerText)); | ||
cb(null, new Buffer(headerText)); | ||
}, 0); | ||
} else if(buf instanceof Array) { | ||
setTimeout(function() { | ||
buff.push(headerText); | ||
buf.push(headerText); | ||
cb(null, buf); | ||
@@ -41,3 +42,3 @@ }, 0); | ||
setTimeout(function() { | ||
cb(null, Buffer.concat([Buffer(headerText), buf])); | ||
cb(null, Buffer.concat([new Buffer(headerText), buf])); | ||
}, 0); | ||
@@ -79,6 +80,9 @@ } | ||
it('should work when returning a null buffer', function(done) { | ||
StreamTest[version].fromChunks(['te', 'st']) | ||
.pipe(new BufferStream(function(err, buf, cb){ | ||
cb(null, null); | ||
.pipe(new BufferStream(function(err, buf, cb) { | ||
if(err) { | ||
return done(err); | ||
} | ||
cb(null, null); | ||
})) | ||
@@ -125,6 +129,9 @@ .pipe(StreamTest[version].toText(function(err, data) { | ||
it('should work when returning a null buffer', function(done) { | ||
StreamTest[version].fromChunks(['te', 'st']) | ||
.pipe(BufferStream(function(err, buf, cb){ | ||
cb(null, null); | ||
.pipe(new BufferStream(function(err, buf, cb) { | ||
if(err) { | ||
return done(err); | ||
} | ||
cb(null, null); | ||
})) | ||
@@ -144,3 +151,2 @@ .pipe(StreamTest[version].toText(function(err, data) { | ||
.pipe(asyncBufferPrefixer('plip')) | ||
.pipe(asyncBufferPrefixer('plap')) | ||
@@ -156,2 +162,49 @@ .pipe(StreamTest[version].toText(function(err, data) { | ||
it('should report stream errors', function(done) { | ||
var bufferStream = new BufferStream({ | ||
objectMode: true, | ||
}, function(err, objs, cb) { | ||
assert.equal(err.message, 'Aouch!'); | ||
cb(null, []); | ||
}); | ||
StreamTest[version].fromErroredChunks(new Error('Aouch!'), [ | ||
'ou', 'de', 'la', 'li', | ||
]).on('error', function(err) { | ||
bufferStream.emit('error', err); | ||
}) | ||
.pipe(bufferStream) | ||
.pipe(StreamTest[version].toText(function(err, text) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.deepEqual(text, ''); | ||
done(); | ||
})); | ||
}); | ||
it('should emit callback errors', function(done) { | ||
var caughtError = null; | ||
StreamTest[version].fromChunks([ | ||
'ou', 'de', 'la', 'li', | ||
]) | ||
.pipe(new BufferStream(function(err, objs, cb) { | ||
if(err) { | ||
return done(err); | ||
} | ||
cb(new Error('Aouch!'), ''); | ||
})).on('error', function(err) { | ||
caughtError = err; | ||
}) | ||
.pipe(StreamTest[version].toText(function(err, text) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(caughtError.message, 'Aouch!'); | ||
assert.equal(text, ''); | ||
done(); | ||
})); | ||
}); | ||
}); | ||
@@ -162,9 +215,9 @@ | ||
describe('in object mode', function() { | ||
var object1 = {txt: 'te'}; | ||
var object2 = {txt: 'st'}; | ||
var object3 = {txt: 'e'}; | ||
var object4 = {txt: 'd'}; | ||
var object5 = {txt: 'u'}; | ||
var object6 = {txt: 'ni'}; | ||
var object7 = {txt: 't'}; | ||
var object1 = { txt: 'te' }; | ||
var object2 = { txt: 'st' }; | ||
var object3 = { txt: 'e' }; | ||
var object4 = { txt: 'd' }; | ||
var object5 = { txt: 'u' }; | ||
var object6 = { txt: 'ni' }; | ||
var object7 = { txt: 't' }; | ||
@@ -185,9 +238,12 @@ describe('synchonously', function() { | ||
it('should work when returning a null buffer', function(done) { | ||
it('should work when returning an empty array', function(done) { | ||
StreamTest[version].fromObjects([object1, object2]) | ||
.pipe(new BufferStream({ | ||
objectMode: true | ||
}, function(err, buf, cb){ | ||
cb(null, null); | ||
objectMode: true, | ||
}, function(err, buf, cb) { | ||
if(err) { | ||
return done(err); | ||
} | ||
cb(null, []); | ||
})) | ||
@@ -233,7 +289,29 @@ .pipe(StreamTest[version].toObjects(function(err, objs) { | ||
it('should work when returning a null buffer', function(done) { | ||
it('should work when returning an empty array', function(done) { | ||
StreamTest[version].fromObjects([object1, object2]) | ||
.pipe(BufferStream({ | ||
objectMode: true | ||
}, function(err, buf, cb){ | ||
.pipe(new BufferStream({ | ||
objectMode: true, | ||
}, function(err, objs, cb) { | ||
if(err) { | ||
return done(err); | ||
} | ||
cb(null, []); | ||
})) | ||
.pipe(StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(objs.length, 0); | ||
done(); | ||
})); | ||
}); | ||
it('should work when returning legacy null', function(done) { | ||
StreamTest[version].fromObjects([object1, object2]) | ||
.pipe(new BufferStream({ | ||
objectMode: true, | ||
}, function(err, objs, cb) { | ||
if(err) { | ||
return done(err); | ||
} | ||
cb(null, null); | ||
@@ -264,2 +342,51 @@ })) | ||
it('should report stream errors', function(done) { | ||
var bufferStream = new BufferStream({ | ||
objectMode: true, | ||
}, function(err, objs, cb) { | ||
assert.equal(err.message, 'Aouch!'); | ||
cb(null, []); | ||
}); | ||
StreamTest[version].fromErroredObjects(new Error('Aouch!'), [ | ||
object1, object2, object3, object4, object5, object6, object7, | ||
]).on('error', function(err) { | ||
bufferStream.emit('error', err); | ||
}) | ||
.pipe(bufferStream) | ||
.pipe(StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.deepEqual(objs, []); | ||
done(); | ||
})); | ||
}); | ||
it('should emit callback errors', function(done) { | ||
var caughtError = null; | ||
StreamTest[version].fromObjects([ | ||
object1, object2, object3, object4, object5, object6, object7, | ||
]) | ||
.pipe(new BufferStream({ | ||
objectMode: true, | ||
}, function(err, objs, cb) { | ||
if(err) { | ||
return done(err); | ||
} | ||
cb(new Error('Aouch!'), []); | ||
})).on('error', function(err) { | ||
caughtError = err; | ||
}) | ||
.pipe(StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(caughtError.message, 'Aouch!'); | ||
assert.deepEqual(objs, []); | ||
done(); | ||
})); | ||
}); | ||
}); | ||
@@ -274,3 +401,1 @@ | ||
}); | ||
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
24997
8
422
87
7
Updatedreadable-stream@^2.0.2