Socket
Socket
Sign inDemoInstall

accum

Package Overview
Dependencies
2
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.0.6 to 0.1.1

35

lib/accum.js

@@ -5,5 +5,5 @@ 'use strict';

function accum(options, cb) {
function accum(options, listenerFn) {
if (arguments.length === 1) { // options is optional
cb = options;
listenerFn = options;
options = {};

@@ -13,3 +13,3 @@ }

options.encoding = options.encoding || 'utf8';
if (typeof cb !== 'function') throw new Error('accum requires a cb function');
if (typeof listenerFn !== 'function') throw new Error('accum requires a listenerFn function');
if (options.type &&

@@ -39,5 +39,5 @@ (options.type !== 'buffer' && options.type !== 'string' && options.type !== 'array')) {

var endConcatFns = {
buffer: function (arrData, cb) { cb(null, Buffer.concat(arrData.map(ensureBuffer))); },
string: function (arrData, cb) { cb(null, arrData.map(ensureString).join('')); },
array: function (arrData, cb) { cb(null, arrData); }
buffer: function (arrData, listenerFn) { listenerFn(Buffer.concat(arrData.map(ensureBuffer))); },
string: function (arrData, listenerFn) { listenerFn(arrData.map(ensureString).join('')); },
array: function (arrData, listenerFn) { listenerFn(arrData); }
};

@@ -54,3 +54,3 @@

var fn = endConcatFns[options.type];
fn(arrData, cb);
fn(arrData, listenerFn);
this.queueEnd();

@@ -60,26 +60,15 @@ }

function errored(err) {
cb(err);
stream.destroy();
}
stream.on('error', errored);
var origDestroy = stream.destroy;
stream.destroy = function () {
stream.removeListener('error', errored);
origDestroy.call(stream);
};
return stream;
}
function buffer(cb) { return accum({ type: 'buffer'}, cb); }
function string(optEncoding, cb) {
function buffer(listenerFn) { return accum({ type: 'buffer'}, listenerFn); }
function string(optEncoding, listenerFn) {
if (arguments.length === 1) { // optEncoding not provided, shift
cb = optEncoding;
listenerFn = optEncoding;
optEncoding = null;
}
optEncoding = optEncoding || 'utf8';
return accum({ type: 'string', encoding: optEncoding }, cb);
return accum({ type: 'string', encoding: optEncoding }, listenerFn);
}
function array(cb) { return accum({ type: 'array' }, cb); }
function array(listenerFn) { return accum({ type: 'array' }, listenerFn); }

@@ -86,0 +75,0 @@ accum.buffer = buffer;

{
"name": "accum",
"description": "accum - Simple pass-through stream (RW) which accumulates or collects the data from a stream. Pipe your stream into this to get all the data as buffer, string, or raw array.",
"version": "0.0.6",
"version": "0.1.1",
"author": "Jeff Barczewski <jeff.barczewski@gmail.com>",

@@ -6,0 +6,0 @@ "repository": { "type": "git", "url": "http://github.com/jeffbski/accum.git" },

@@ -18,3 +18,3 @@ # accum

- The default automatic method - `accum(cb)` constructs a pass-through stream which checks if the first chunk is a Buffer and if so returns a concatenated Buffer of all the data, otherwise if it is a string then returns a concatenated string, otherwise returns a raw array. The `cb` signature is `function(err, alldata)`. The `cb` is called after all the data is received just prior to the `end` event being emitted.
- The default automatic method - `accum(listenerFn)` constructs a pass-through stream which checks if the first chunk is a Buffer and if so returns a concatenated Buffer of all the data, otherwise if it is a string then returns a concatenated string, otherwise returns a raw array. The `listenerFn` signature is `function(alldata)`. The `listenerFn` is called after all the data is received just prior to the `end` event being emitted.

@@ -24,4 +24,3 @@ ```javascript

rstream
.pipe(accum(function (err, alldata) {
if (err) return yourHandleErrFn(err); // handle the error
.pipe(accum(function (alldata) {
// use the accumulated data - alldata will be Buffer, string, or []

@@ -33,3 +32,3 @@ }));

- `accum.buffer(cb)` - constructs a pass-through stream which converts everything into a Buffer, concatenates, and calls the `cb` with the buffer. The `cb` signature is `function(err, buffer)`. The `cb` is called after all the data is received just prior to the `end` event being emitted.
- `accum.buffer(listenerFn)` - constructs a pass-through stream which converts everything into a Buffer, concatenates, and calls the `listenerFn` with the buffer. The `listenerFn` signature is `function(buffer)`. The `listenerFn` is called after all the data is received just prior to the `end` event being emitted.

@@ -39,4 +38,3 @@ ```javascript

rstream
.pipe(accum.buffer(function (err, buffer) {
if (err) return yourHandleErrFn(err); // handle the error
.pipe(accum.buffer(function (buffer) {
// use the accumulated data - buffer which is a Buffer

@@ -46,3 +44,3 @@ }));

- `accum.string([optEncoding], cb)` - constructs a pass-through stream which concatenates everything into a string. Buffer data is converted to string using the optional encoding `optEncoding` which defaults to 'utf8'. Other data is simply converted using `.toString()`. The `cb` signature is `function(err, string)`. The `cb` is called after all the data is received just prior to the `end` event being emitted.
- `accum.string([optEncoding], listenerFn)` - constructs a pass-through stream which concatenates everything into a string. Buffer data is converted to string using the optional encoding `optEncoding` which defaults to 'utf8'. Other data is simply converted using `.toString()`. The `listenerFn` signature is `function(string)`. The `listenerFn` is called after all the data is received just prior to the `end` event being emitted.

@@ -52,4 +50,3 @@ ```javascript

rstream
.pipe(accum.string('utf8', function (err, string) {
if (err) return yourHandleErrFn(err); // handle the error
.pipe(accum.string('utf8', function (string) {
// use the accumulated data - string which is a utf8 string

@@ -59,3 +56,3 @@ }));

- `accum.array(cb)` - constructs a pass-through stream which concatenates everything into an array without any conversion, which the `cb` receives the accumulated data on end. The `cb` signature is `function(err, arr)`. The `cb` is called after all the data is received just prior to the `end` event being emitted.
- `accum.array(listenerFn)` - constructs a pass-through stream which concatenates everything into an array without any conversion, which the `listenerFn` receives the accumulated data on end. The `listenerFn` signature is `function(arr)`. The `listenerFn` is called after all the data is received just prior to the `end` event being emitted.

@@ -65,4 +62,3 @@ ```javascript

rstream
.pipe(accum.array(function (err, array) {
if (err) return yourHandleErrFn(err); // handle the error
.pipe(accum.array(function (array) {
// use the accumulated data - array which is a raw unconverted array of data chunks

@@ -72,2 +68,16 @@ }));

### Error handling
Node.js stream.pipe does not forward errors and neither do many pass-through stream implementations so the recommended way to catch errors is either to attach error handlers at each stream or to use domains.
```javascript
var d = domain.create();
d.on('error', handleAllErrors);
d.run(function() {
rstream.pipe(accum(function (alldata) {
// use alldata
});
});
```
## Goals

@@ -74,0 +84,0 @@

@@ -6,2 +6,3 @@ /*global suite:false test:false */

var crypto = require('crypto');
var domain = require('domain');
var accum = require('..'); // require('accum');

@@ -14,7 +15,6 @@ var passStream = require('pass-stream');

test('accum.array(cb) with string data, results with raw array of chunks to cb before end', function (done) {
test('accum.array(listenerFn) with string data, results with raw array of chunks to listenerFn before end', function (done) {
var stream = passStream();
stream
.pipe(accum.array(function (err, alldata) {
if (err) return done(err);
.pipe(accum.array(function (alldata) {
t.deepEqual(alldata, ['abc', 'def', 'ghi']);

@@ -30,8 +30,7 @@ done();

test('accum.array(cb) with Buffer data, results with raw array of chunks to cb before end', function (done) {
test('accum.array(listenerFn) with Buffer data, results with raw array of chunks to listenerFn before end', function (done) {
var DATA = new Buffer('abcdefghi');
var stream = passStream();
stream
.pipe(accum.array(function (err, alldata) {
if (err) return done(err);
.pipe(accum.array(function (alldata) {
t.deepEqual(alldata, [new Buffer('abc'), new Buffer('def'), new Buffer('ghi')]);

@@ -47,7 +46,6 @@ done();

test('accum.array(cb) with number data, results raw array to cb before end', function (done) {
test('accum.array(listenerFn) with number data, results raw array to listenerFn before end', function (done) {
var stream = passStream();
stream
.pipe(accum.array(function (err, alldata) {
if (err) return done(err);
.pipe(accum.array(function (alldata) {
t.ok(Array.isArray(alldata));

@@ -64,3 +62,3 @@ t.deepEqual(alldata, [1, 2, 3]);

test('accum.array(cb) with various types of data, results with concatenated raw array to cb before end', function (done) {
test('accum.array(listenerFn) with various types of data, results with concatenated raw array to listenerFn before end', function (done) {
var DATA = [

@@ -80,4 +78,3 @@ 1,

stream
.pipe(accum.array(function (err, alldata) {
if (err) return done(err);
.pipe(accum.array(function (alldata) {
t.ok(Array.isArray(alldata));

@@ -95,35 +92,22 @@ t.deepEqual(alldata, DATA);

test('accum.array(cb) with err, calls cb with err', function (done) {
var stream = accum.array(function (err, alldata) {
t.equal(err.message, 'my error');
done();
});
process.nextTick(function () {
stream.emit('error', new Error('my error'));
});
test('accum.array(listenerFn) with throws to domain', function (done) {
var d = domain.create();
d.on('error', function (err) {
t.equal(err.message, 'my error');
done();
})
.run(function () {
var stream = accum.array(function (alldata) { });
process.nextTick(function () {
stream.emit('error', new Error('my error'));
});
});
});
// currently pipe does not forward error but I have put in
// pull request to fix node.js. Also pause-stream will have
// to be modified as well.
// test('accum.array(cb) with err piped, calls cb with err', function (done) {
// var stream = passStream();
// stream
// .pipe(accum.array(function (err, alldata) {
// t.equal(err.message, 'my error');
// done();
// }));
// process.nextTick(function () {
// stream.emit('error', new Error('my error'));
// });
// });
test('missing cb throws error', function () {
test('missing listenerFn throws error', function () {
function throwsErr() {
var stream = accum.array();
}
t.throws(throwsErr, /accum requires a cb function/);
t.throws(throwsErr, /accum requires a listenerFn function/);
});

@@ -6,2 +6,3 @@ /*global suite:false test:false */

var crypto = require('crypto');
var domain = require('domain');
var accum = require('..'); // require('accum');

@@ -14,8 +15,7 @@ var passStream = require('pass-stream');

test('accum.buffer(cb) with string data, results with concatenated Buffer to cb before end', function (done) {
test('accum.buffer(listenerFn) with string data, results with concatenated Buffer to listenerFn before end', function (done) {
var DATA = new Buffer('abcdefghi');
var stream = passStream();
stream
.pipe(accum.buffer(function (err, alldata) {
if (err) return done(err);
.pipe(accum.buffer(function (alldata) {
t.ok(Buffer.isBuffer(alldata));

@@ -35,8 +35,7 @@ t.equal(alldata.length, DATA.length);

test('accum.buffer(cb) with Buffer data, results with concatenated Buffer to cb before end', function (done) {
test('accum.buffer(listenerFn) with Buffer data, results with concatenated Buffer to listenerFn before end', function (done) {
var DATA = new Buffer('abcdefghi');
var stream = passStream();
stream
.pipe(accum.buffer(function (err, alldata) {
if (err) return done(err);
.pipe(accum.buffer(function (alldata) {
t.ok(Buffer.isBuffer(alldata));

@@ -56,35 +55,22 @@ t.equal(alldata.length, DATA.length);

test('accum.buffer(cb) with err, calls cb with err', function (done) {
var stream = accum.buffer(function (err, alldata) {
t.equal(err.message, 'my error');
done();
});
process.nextTick(function () {
stream.emit('error', new Error('my error'));
});
test('accum.buffer(listenerFn) with throws to domain', function (done) {
var d = domain.create();
d.on('error', function (err) {
t.equal(err.message, 'my error');
done();
})
.run(function () {
var stream = accum.buffer(function (alldata) { });
process.nextTick(function () {
stream.emit('error', new Error('my error'));
});
});
});
// currently pipe does not forward error but I have put in
// pull request to fix node.js. Also pause-stream will have
// to be modified as well.
// test('accum.buffer(cb) with err piped, calls cb with err', function (done) {
// var stream = passStream();
// stream
// .pipe(accum.buffer(function (err, alldata) {
// t.equal(err.message, 'my error');
// done();
// }));
// process.nextTick(function () {
// stream.emit('error', new Error('my error'));
// });
// });
test('accum.buffer() missing cb throws error', function () {
test('accum.buffer() missing listenerFn throws error', function () {
function throwsErr() {
var stream = accum.buffer();
}
t.throws(throwsErr, /accum requires a cb function/);
t.throws(throwsErr, /accum requires a listenerFn function/);
});

@@ -6,2 +6,3 @@ /*global suite:false test:false */

var crypto = require('crypto');
var domain = require('domain');
var accum = require('..'); // require('accum');

@@ -14,7 +15,6 @@ var passStream = require('pass-stream');

test('accum(cb) with string data, results with concatenated string to cb before end', function (done) {
test('accum(listenerFn) with string data, results with concatenated string to listenerFn before end', function (done) {
var stream = passStream();
stream
.pipe(accum(function (err, alldata) {
if (err) return done(err);
.pipe(accum(function (alldata) {
t.equal(alldata, 'abcdefghi');

@@ -30,8 +30,7 @@ done();

test('accum(cb) with Buffer data, results with concatenated Buffer to cb before end', function (done) {
test('accum(listenerFn) with Buffer data, results with concatenated Buffer to listenerFn before end', function (done) {
var DATA = new Buffer('abcdefghi');
var stream = passStream();
stream
.pipe(accum(function (err, alldata) {
if (err) return done(err);
.pipe(accum(function (alldata) {
t.ok(Buffer.isBuffer(alldata));

@@ -51,7 +50,6 @@ t.equal(alldata.length, DATA.length);

test('accum(cb) with number data, results with concatenated raw array to cb before end', function (done) {
test('accum(listenerFn) with number data, results with concatenated raw array to listenerFn before end', function (done) {
var stream = passStream();
stream
.pipe(accum(function (err, alldata) {
if (err) return done(err);
.pipe(accum(function (alldata) {
t.ok(Array.isArray(alldata));

@@ -68,3 +66,3 @@ t.deepEqual(alldata, [1, 2, 3]);

test('accum(cb) with various types of data, results with concatenated raw array to cb before end', function (done) {
test('accum(listenerFn) with various types of data, results with concatenated raw array to listenerFn before end', function (done) {
var DATA = [

@@ -84,4 +82,3 @@ 1,

stream
.pipe(accum(function (err, alldata) {
if (err) return done(err);
.pipe(accum(function (alldata) {
t.ok(Array.isArray(alldata));

@@ -100,34 +97,23 @@ t.deepEqual(alldata, DATA);

test('accum(cb) with err, calls cb with err', function (done) {
var stream = accum(function (err, alldata) {
t.equal(err.message, 'my error');
done();
});
process.nextTick(function () {
stream.emit('error', new Error('my error'));
});
test('accum(listenerFn) with throws to domain', function (done) {
var d = domain.create();
d.on('error', function (err) {
t.equal(err.message, 'my error');
done();
})
.run(function () {
var stream = accum(function (alldata) { });
process.nextTick(function () {
stream.emit('error', new Error('my error'));
});
});
});
// currently pipe does not forward error but I have put in
// pull request to fix node.js. Also pause-stream will have
// to be modified as well.
// test('accum(cb) with err piped, calls cb with err', function (done) {
// var stream = passStream();
// stream
// .pipe(accum(function (err, alldata) {
// t.equal(err.message, 'my error');
// done();
// }));
// process.nextTick(function () {
// stream.emit('error', new Error('my error'));
// });
// });
test('missing cb throws error', function () {
test('missing listenerFn throws error', function () {
function throwsErr() {
var stream = accum();
}
t.throws(throwsErr, /accum requires a cb function/);
t.throws(throwsErr, /accum requires a listenerFn function/);
});

@@ -16,3 +16,3 @@ /*global suite:false test:false */

var result;
var astream = accum(function (err, alldata) {
var astream = accum(function (alldata) {
result = alldata;

@@ -48,3 +48,3 @@ });

var result;
var astream = accum(function (err, alldata) {
var astream = accum(function (alldata) {
result = alldata;

@@ -85,3 +85,3 @@ });

var result;
var astream = accum.buffer(function (err, alldata) {
var astream = accum.buffer(function (alldata) {
result = alldata;

@@ -119,3 +119,3 @@ });

var result;
var astream = accum.buffer(function (err, alldata) {
var astream = accum.buffer(function (alldata) {
result = alldata;

@@ -155,3 +155,3 @@ });

var result;
var astream = accum.string(function (err, alldata) {
var astream = accum.string(function (alldata) {
result = alldata;

@@ -192,3 +192,3 @@ });

var result;
var astream = accum.array(function (err, alldata) {
var astream = accum.array(function (alldata) {
result = alldata;

@@ -195,0 +195,0 @@ });

@@ -6,2 +6,3 @@ /*global suite:false test:false */

var crypto = require('crypto');
var domain = require('domain');
var accum = require('..'); // require('accum');

@@ -14,8 +15,7 @@ var passStream = require('pass-stream');

test('accum.string(cb) with string data, results with concatenated string to cb before end', function (done) {
test('accum.string(listenerFn) with string data, results with concatenated string to listenerFn before end', function (done) {
var DATA = 'abcdefghi';
var stream = passStream();
stream
.pipe(accum.string(function (err, alldata) {
if (err) return done(err);
.pipe(accum.string(function (alldata) {
t.typeOf(alldata, 'string');

@@ -33,8 +33,7 @@ t.equal(alldata.length, DATA.length);

test('accum.string(cb) with Buffer data, results with concatenated string to cb before end', function (done) {
test('accum.string(listenerFn) with Buffer data, results with concatenated string to listenerFn before end', function (done) {
var DATA = 'abcdefghi';
var stream = passStream();
stream
.pipe(accum.string(function (err, alldata) {
if (err) return done(err);
.pipe(accum.string(function (alldata) {
t.typeOf(alldata, 'string');

@@ -52,8 +51,7 @@ t.equal(alldata.length, DATA.length);

test('accum.string("utf8", cb) with Buffer data, results with concatenated string to cb before end', function (done) {
test('accum.string("utf8", listenerFn) with Buffer data, results with concatenated string to listenerFn before end', function (done) {
var DATA = 'abcdefghi';
var stream = passStream();
stream
.pipe(accum.string('utf8', function (err, alldata) {
if (err) return done(err);
.pipe(accum.string('utf8', function (alldata) {
t.typeOf(alldata, 'string');

@@ -71,35 +69,22 @@ t.equal(alldata.length, DATA.length);

test('accum.string(cb) with err, calls cb with err', function (done) {
var stream = accum.string(function (err, alldata) {
t.equal(err.message, 'my error');
done();
});
process.nextTick(function () {
stream.emit('error', new Error('my error'));
});
test('accum.string(listenerFn) with throws to domain', function (done) {
var d = domain.create();
d.on('error', function (err) {
t.equal(err.message, 'my error');
done();
})
.run(function () {
var stream = accum.string(function (alldata) { });
process.nextTick(function () {
stream.emit('error', new Error('my error'));
});
});
});
// currently pipe does not forward error but I have put in
// pull request to fix node.js. Also pause-stream will have
// to be modified as well.
// test('accum.string(cb) with err piped, calls cb with err', function (done) {
// var stream = passStream();
// stream
// .pipe(accum.string(function (err, alldata) {
// t.equal(err.message, 'my error');
// done();
// }));
// process.nextTick(function () {
// stream.emit('error', new Error('my error'));
// });
// });
test('accum.string() missing cb throws error', function () {
test('accum.string() missing listenerFn throws error', function () {
function throwsErr() {
var stream = accum.string();
}
t.throws(throwsErr, /accum requires a cb function/);
t.throws(throwsErr, /accum requires a listenerFn function/);
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc