Comparing version 0.0.6 to 0.1.1
@@ -5,5 +5,5 @@ 'use strict'; | ||
function accum(options, cb) { | ||
function accum(options, listenerFn) { | ||
if (arguments.length === 1) { // options is optional | ||
cb = options; | ||
listenerFn = options; | ||
options = {}; | ||
@@ -13,3 +13,3 @@ } | ||
options.encoding = options.encoding || 'utf8'; | ||
if (typeof cb !== 'function') throw new Error('accum requires a cb function'); | ||
if (typeof listenerFn !== 'function') throw new Error('accum requires a listenerFn function'); | ||
if (options.type && | ||
@@ -39,5 +39,5 @@ (options.type !== 'buffer' && options.type !== 'string' && options.type !== 'array')) { | ||
var endConcatFns = { | ||
buffer: function (arrData, cb) { cb(null, Buffer.concat(arrData.map(ensureBuffer))); }, | ||
string: function (arrData, cb) { cb(null, arrData.map(ensureString).join('')); }, | ||
array: function (arrData, cb) { cb(null, arrData); } | ||
buffer: function (arrData, listenerFn) { listenerFn(Buffer.concat(arrData.map(ensureBuffer))); }, | ||
string: function (arrData, listenerFn) { listenerFn(arrData.map(ensureString).join('')); }, | ||
array: function (arrData, listenerFn) { listenerFn(arrData); } | ||
}; | ||
@@ -54,3 +54,3 @@ | ||
var fn = endConcatFns[options.type]; | ||
fn(arrData, cb); | ||
fn(arrData, listenerFn); | ||
this.queueEnd(); | ||
@@ -60,26 +60,15 @@ } | ||
function errored(err) { | ||
cb(err); | ||
stream.destroy(); | ||
} | ||
stream.on('error', errored); | ||
var origDestroy = stream.destroy; | ||
stream.destroy = function () { | ||
stream.removeListener('error', errored); | ||
origDestroy.call(stream); | ||
}; | ||
return stream; | ||
} | ||
function buffer(cb) { return accum({ type: 'buffer'}, cb); } | ||
function string(optEncoding, cb) { | ||
function buffer(listenerFn) { return accum({ type: 'buffer'}, listenerFn); } | ||
function string(optEncoding, listenerFn) { | ||
if (arguments.length === 1) { // optEncoding not provided, shift | ||
cb = optEncoding; | ||
listenerFn = optEncoding; | ||
optEncoding = null; | ||
} | ||
optEncoding = optEncoding || 'utf8'; | ||
return accum({ type: 'string', encoding: optEncoding }, cb); | ||
return accum({ type: 'string', encoding: optEncoding }, listenerFn); | ||
} | ||
function array(cb) { return accum({ type: 'array' }, cb); } | ||
function array(listenerFn) { return accum({ type: 'array' }, listenerFn); } | ||
@@ -86,0 +75,0 @@ accum.buffer = buffer; |
{ | ||
"name": "accum", | ||
"description": "accum - Simple pass-through stream (RW) which accumulates or collects the data from a stream. Pipe your stream into this to get all the data as buffer, string, or raw array.", | ||
"version": "0.0.6", | ||
"version": "0.1.1", | ||
"author": "Jeff Barczewski <jeff.barczewski@gmail.com>", | ||
@@ -6,0 +6,0 @@ "repository": { "type": "git", "url": "http://github.com/jeffbski/accum.git" }, |
@@ -18,3 +18,3 @@ # accum | ||
- The default automatic method - `accum(cb)` constructs a pass-through stream which checks if the first chunk is a Buffer and if so returns a concatenated Buffer of all the data, otherwise if it is a string then returns a concatenated string, otherwise returns a raw array. The `cb` signature is `function(err, alldata)`. The `cb` is called after all the data is received just prior to the `end` event being emitted. | ||
- The default automatic method - `accum(listenerFn)` constructs a pass-through stream which checks if the first chunk is a Buffer and if so returns a concatenated Buffer of all the data, otherwise if it is a string then returns a concatenated string, otherwise returns a raw array. The `listenerFn` signature is `function(alldata)`. The `listenerFn` is called after all the data is received just prior to the `end` event being emitted. | ||
@@ -24,4 +24,3 @@ ```javascript | ||
rstream | ||
.pipe(accum(function (err, alldata) { | ||
if (err) return yourHandleErrFn(err); // handle the error | ||
.pipe(accum(function (alldata) { | ||
// use the accumulated data - alldata will be Buffer, string, or [] | ||
@@ -33,3 +32,3 @@ })); | ||
- `accum.buffer(cb)` - constructs a pass-through stream which converts everything into a Buffer, concatenates, and calls the `cb` with the buffer. The `cb` signature is `function(err, buffer)`. The `cb` is called after all the data is received just prior to the `end` event being emitted. | ||
- `accum.buffer(listenerFn)` - constructs a pass-through stream which converts everything into a Buffer, concatenates, and calls the `listenerFn` with the buffer. The `listenerFn` signature is `function(buffer)`. The `listenerFn` is called after all the data is received just prior to the `end` event being emitted. | ||
@@ -39,4 +38,3 @@ ```javascript | ||
rstream | ||
.pipe(accum.buffer(function (err, buffer) { | ||
if (err) return yourHandleErrFn(err); // handle the error | ||
.pipe(accum.buffer(function (buffer) { | ||
// use the accumulated data - buffer which is a Buffer | ||
@@ -46,3 +44,3 @@ })); | ||
- `accum.string([optEncoding], cb)` - constructs a pass-through stream which concatenates everything into a string. Buffer data is converted to string using the optional encoding `optEncoding` which defaults to 'utf8'. Other data is simply converted using `.toString()`. The `cb` signature is `function(err, string)`. The `cb` is called after all the data is received just prior to the `end` event being emitted. | ||
- `accum.string([optEncoding], listenerFn)` - constructs a pass-through stream which concatenates everything into a string. Buffer data is converted to string using the optional encoding `optEncoding` which defaults to 'utf8'. Other data is simply converted using `.toString()`. The `listenerFn` signature is `function(string)`. The `listenerFn` is called after all the data is received just prior to the `end` event being emitted. | ||
@@ -52,4 +50,3 @@ ```javascript | ||
rstream | ||
.pipe(accum.string('utf8', function (err, string) { | ||
if (err) return yourHandleErrFn(err); // handle the error | ||
.pipe(accum.string('utf8', function (string) { | ||
// use the accumulated data - string which is a utf8 string | ||
@@ -59,3 +56,3 @@ })); | ||
- `accum.array(cb)` - constructs a pass-through stream which concatenates everything into an array without any conversion, which the `cb` receives the accumulated data on end. The `cb` signature is `function(err, arr)`. The `cb` is called after all the data is received just prior to the `end` event being emitted. | ||
- `accum.array(listenerFn)` - constructs a pass-through stream which concatenates everything into an array without any conversion, which the `listenerFn` receives the accumulated data on end. The `listenerFn` signature is `function(arr)`. The `listenerFn` is called after all the data is received just prior to the `end` event being emitted. | ||
@@ -65,4 +62,3 @@ ```javascript | ||
rstream | ||
.pipe(accum.array(function (err, array) { | ||
if (err) return yourHandleErrFn(err); // handle the error | ||
.pipe(accum.array(function (array) { | ||
// use the accumulated data - array which is a raw unconverted array of data chunks | ||
@@ -72,2 +68,16 @@ })); | ||
### Error handling | ||
Node.js stream.pipe does not forward errors and neither do many pass-through stream implementations so the recommended way to catch errors is either to attach error handlers at each stream or to use domains. | ||
```javascript | ||
var d = domain.create(); | ||
d.on('error', handleAllErrors); | ||
d.run(function() { | ||
rstream.pipe(accum(function (alldata) { | ||
// use alldata | ||
}); | ||
}); | ||
``` | ||
## Goals | ||
@@ -74,0 +84,0 @@ |
@@ -6,2 +6,3 @@ /*global suite:false test:false */ | ||
var crypto = require('crypto'); | ||
var domain = require('domain'); | ||
var accum = require('..'); // require('accum'); | ||
@@ -14,7 +15,6 @@ var passStream = require('pass-stream'); | ||
test('accum.array(cb) with string data, results with raw array of chunks to cb before end', function (done) { | ||
test('accum.array(listenerFn) with string data, results with raw array of chunks to listenerFn before end', function (done) { | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum.array(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum.array(function (alldata) { | ||
t.deepEqual(alldata, ['abc', 'def', 'ghi']); | ||
@@ -30,8 +30,7 @@ done(); | ||
test('accum.array(cb) with Buffer data, results with raw array of chunks to cb before end', function (done) { | ||
test('accum.array(listenerFn) with Buffer data, results with raw array of chunks to listenerFn before end', function (done) { | ||
var DATA = new Buffer('abcdefghi'); | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum.array(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum.array(function (alldata) { | ||
t.deepEqual(alldata, [new Buffer('abc'), new Buffer('def'), new Buffer('ghi')]); | ||
@@ -47,7 +46,6 @@ done(); | ||
test('accum.array(cb) with number data, results raw array to cb before end', function (done) { | ||
test('accum.array(listenerFn) with number data, results raw array to listenerFn before end', function (done) { | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum.array(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum.array(function (alldata) { | ||
t.ok(Array.isArray(alldata)); | ||
@@ -64,3 +62,3 @@ t.deepEqual(alldata, [1, 2, 3]); | ||
test('accum.array(cb) with various types of data, results with concatenated raw array to cb before end', function (done) { | ||
test('accum.array(listenerFn) with various types of data, results with concatenated raw array to listenerFn before end', function (done) { | ||
var DATA = [ | ||
@@ -80,4 +78,3 @@ 1, | ||
stream | ||
.pipe(accum.array(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum.array(function (alldata) { | ||
t.ok(Array.isArray(alldata)); | ||
@@ -95,35 +92,22 @@ t.deepEqual(alldata, DATA); | ||
test('accum.array(cb) with err, calls cb with err', function (done) { | ||
var stream = accum.array(function (err, alldata) { | ||
t.equal(err.message, 'my error'); | ||
done(); | ||
}); | ||
process.nextTick(function () { | ||
stream.emit('error', new Error('my error')); | ||
}); | ||
test('accum.array(listenerFn) with throws to domain', function (done) { | ||
var d = domain.create(); | ||
d.on('error', function (err) { | ||
t.equal(err.message, 'my error'); | ||
done(); | ||
}) | ||
.run(function () { | ||
var stream = accum.array(function (alldata) { }); | ||
process.nextTick(function () { | ||
stream.emit('error', new Error('my error')); | ||
}); | ||
}); | ||
}); | ||
// currently pipe does not forward error but I have put in | ||
// pull request to fix node.js. Also pause-stream will have | ||
// to be modified as well. | ||
// test('accum.array(cb) with err piped, calls cb with err', function (done) { | ||
// var stream = passStream(); | ||
// stream | ||
// .pipe(accum.array(function (err, alldata) { | ||
// t.equal(err.message, 'my error'); | ||
// done(); | ||
// })); | ||
// process.nextTick(function () { | ||
// stream.emit('error', new Error('my error')); | ||
// }); | ||
// }); | ||
test('missing cb throws error', function () { | ||
test('missing listenerFn throws error', function () { | ||
function throwsErr() { | ||
var stream = accum.array(); | ||
} | ||
t.throws(throwsErr, /accum requires a cb function/); | ||
t.throws(throwsErr, /accum requires a listenerFn function/); | ||
}); | ||
@@ -6,2 +6,3 @@ /*global suite:false test:false */ | ||
var crypto = require('crypto'); | ||
var domain = require('domain'); | ||
var accum = require('..'); // require('accum'); | ||
@@ -14,8 +15,7 @@ var passStream = require('pass-stream'); | ||
test('accum.buffer(cb) with string data, results with concatenated Buffer to cb before end', function (done) { | ||
test('accum.buffer(listenerFn) with string data, results with concatenated Buffer to listenerFn before end', function (done) { | ||
var DATA = new Buffer('abcdefghi'); | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum.buffer(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum.buffer(function (alldata) { | ||
t.ok(Buffer.isBuffer(alldata)); | ||
@@ -35,8 +35,7 @@ t.equal(alldata.length, DATA.length); | ||
test('accum.buffer(cb) with Buffer data, results with concatenated Buffer to cb before end', function (done) { | ||
test('accum.buffer(listenerFn) with Buffer data, results with concatenated Buffer to listenerFn before end', function (done) { | ||
var DATA = new Buffer('abcdefghi'); | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum.buffer(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum.buffer(function (alldata) { | ||
t.ok(Buffer.isBuffer(alldata)); | ||
@@ -56,35 +55,22 @@ t.equal(alldata.length, DATA.length); | ||
test('accum.buffer(cb) with err, calls cb with err', function (done) { | ||
var stream = accum.buffer(function (err, alldata) { | ||
t.equal(err.message, 'my error'); | ||
done(); | ||
}); | ||
process.nextTick(function () { | ||
stream.emit('error', new Error('my error')); | ||
}); | ||
test('accum.buffer(listenerFn) with throws to domain', function (done) { | ||
var d = domain.create(); | ||
d.on('error', function (err) { | ||
t.equal(err.message, 'my error'); | ||
done(); | ||
}) | ||
.run(function () { | ||
var stream = accum.buffer(function (alldata) { }); | ||
process.nextTick(function () { | ||
stream.emit('error', new Error('my error')); | ||
}); | ||
}); | ||
}); | ||
// currently pipe does not forward error but I have put in | ||
// pull request to fix node.js. Also pause-stream will have | ||
// to be modified as well. | ||
// test('accum.buffer(cb) with err piped, calls cb with err', function (done) { | ||
// var stream = passStream(); | ||
// stream | ||
// .pipe(accum.buffer(function (err, alldata) { | ||
// t.equal(err.message, 'my error'); | ||
// done(); | ||
// })); | ||
// process.nextTick(function () { | ||
// stream.emit('error', new Error('my error')); | ||
// }); | ||
// }); | ||
test('accum.buffer() missing cb throws error', function () { | ||
test('accum.buffer() missing listenerFn throws error', function () { | ||
function throwsErr() { | ||
var stream = accum.buffer(); | ||
} | ||
t.throws(throwsErr, /accum requires a cb function/); | ||
t.throws(throwsErr, /accum requires a listenerFn function/); | ||
}); | ||
@@ -6,2 +6,3 @@ /*global suite:false test:false */ | ||
var crypto = require('crypto'); | ||
var domain = require('domain'); | ||
var accum = require('..'); // require('accum'); | ||
@@ -14,7 +15,6 @@ var passStream = require('pass-stream'); | ||
test('accum(cb) with string data, results with concatenated string to cb before end', function (done) { | ||
test('accum(listenerFn) with string data, results with concatenated string to listenerFn before end', function (done) { | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum(function (alldata) { | ||
t.equal(alldata, 'abcdefghi'); | ||
@@ -30,8 +30,7 @@ done(); | ||
test('accum(cb) with Buffer data, results with concatenated Buffer to cb before end', function (done) { | ||
test('accum(listenerFn) with Buffer data, results with concatenated Buffer to listenerFn before end', function (done) { | ||
var DATA = new Buffer('abcdefghi'); | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum(function (alldata) { | ||
t.ok(Buffer.isBuffer(alldata)); | ||
@@ -51,7 +50,6 @@ t.equal(alldata.length, DATA.length); | ||
test('accum(cb) with number data, results with concatenated raw array to cb before end', function (done) { | ||
test('accum(listenerFn) with number data, results with concatenated raw array to listenerFn before end', function (done) { | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum(function (alldata) { | ||
t.ok(Array.isArray(alldata)); | ||
@@ -68,3 +66,3 @@ t.deepEqual(alldata, [1, 2, 3]); | ||
test('accum(cb) with various types of data, results with concatenated raw array to cb before end', function (done) { | ||
test('accum(listenerFn) with various types of data, results with concatenated raw array to listenerFn before end', function (done) { | ||
var DATA = [ | ||
@@ -84,4 +82,3 @@ 1, | ||
stream | ||
.pipe(accum(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum(function (alldata) { | ||
t.ok(Array.isArray(alldata)); | ||
@@ -100,34 +97,23 @@ t.deepEqual(alldata, DATA); | ||
test('accum(cb) with err, calls cb with err', function (done) { | ||
var stream = accum(function (err, alldata) { | ||
t.equal(err.message, 'my error'); | ||
done(); | ||
}); | ||
process.nextTick(function () { | ||
stream.emit('error', new Error('my error')); | ||
}); | ||
test('accum(listenerFn) with throws to domain', function (done) { | ||
var d = domain.create(); | ||
d.on('error', function (err) { | ||
t.equal(err.message, 'my error'); | ||
done(); | ||
}) | ||
.run(function () { | ||
var stream = accum(function (alldata) { }); | ||
process.nextTick(function () { | ||
stream.emit('error', new Error('my error')); | ||
}); | ||
}); | ||
}); | ||
// currently pipe does not forward error but I have put in | ||
// pull request to fix node.js. Also pause-stream will have | ||
// to be modified as well. | ||
// test('accum(cb) with err piped, calls cb with err', function (done) { | ||
// var stream = passStream(); | ||
// stream | ||
// .pipe(accum(function (err, alldata) { | ||
// t.equal(err.message, 'my error'); | ||
// done(); | ||
// })); | ||
// process.nextTick(function () { | ||
// stream.emit('error', new Error('my error')); | ||
// }); | ||
// }); | ||
test('missing cb throws error', function () { | ||
test('missing listenerFn throws error', function () { | ||
function throwsErr() { | ||
var stream = accum(); | ||
} | ||
t.throws(throwsErr, /accum requires a cb function/); | ||
t.throws(throwsErr, /accum requires a listenerFn function/); | ||
}); | ||
@@ -16,3 +16,3 @@ /*global suite:false test:false */ | ||
var result; | ||
var astream = accum(function (err, alldata) { | ||
var astream = accum(function (alldata) { | ||
result = alldata; | ||
@@ -48,3 +48,3 @@ }); | ||
var result; | ||
var astream = accum(function (err, alldata) { | ||
var astream = accum(function (alldata) { | ||
result = alldata; | ||
@@ -85,3 +85,3 @@ }); | ||
var result; | ||
var astream = accum.buffer(function (err, alldata) { | ||
var astream = accum.buffer(function (alldata) { | ||
result = alldata; | ||
@@ -119,3 +119,3 @@ }); | ||
var result; | ||
var astream = accum.buffer(function (err, alldata) { | ||
var astream = accum.buffer(function (alldata) { | ||
result = alldata; | ||
@@ -155,3 +155,3 @@ }); | ||
var result; | ||
var astream = accum.string(function (err, alldata) { | ||
var astream = accum.string(function (alldata) { | ||
result = alldata; | ||
@@ -192,3 +192,3 @@ }); | ||
var result; | ||
var astream = accum.array(function (err, alldata) { | ||
var astream = accum.array(function (alldata) { | ||
result = alldata; | ||
@@ -195,0 +195,0 @@ }); |
@@ -6,2 +6,3 @@ /*global suite:false test:false */ | ||
var crypto = require('crypto'); | ||
var domain = require('domain'); | ||
var accum = require('..'); // require('accum'); | ||
@@ -14,8 +15,7 @@ var passStream = require('pass-stream'); | ||
test('accum.string(cb) with string data, results with concatenated string to cb before end', function (done) { | ||
test('accum.string(listenerFn) with string data, results with concatenated string to listenerFn before end', function (done) { | ||
var DATA = 'abcdefghi'; | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum.string(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum.string(function (alldata) { | ||
t.typeOf(alldata, 'string'); | ||
@@ -33,8 +33,7 @@ t.equal(alldata.length, DATA.length); | ||
test('accum.string(cb) with Buffer data, results with concatenated string to cb before end', function (done) { | ||
test('accum.string(listenerFn) with Buffer data, results with concatenated string to listenerFn before end', function (done) { | ||
var DATA = 'abcdefghi'; | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum.string(function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum.string(function (alldata) { | ||
t.typeOf(alldata, 'string'); | ||
@@ -52,8 +51,7 @@ t.equal(alldata.length, DATA.length); | ||
test('accum.string("utf8", cb) with Buffer data, results with concatenated string to cb before end', function (done) { | ||
test('accum.string("utf8", listenerFn) with Buffer data, results with concatenated string to listenerFn before end', function (done) { | ||
var DATA = 'abcdefghi'; | ||
var stream = passStream(); | ||
stream | ||
.pipe(accum.string('utf8', function (err, alldata) { | ||
if (err) return done(err); | ||
.pipe(accum.string('utf8', function (alldata) { | ||
t.typeOf(alldata, 'string'); | ||
@@ -71,35 +69,22 @@ t.equal(alldata.length, DATA.length); | ||
test('accum.string(cb) with err, calls cb with err', function (done) { | ||
var stream = accum.string(function (err, alldata) { | ||
t.equal(err.message, 'my error'); | ||
done(); | ||
}); | ||
process.nextTick(function () { | ||
stream.emit('error', new Error('my error')); | ||
}); | ||
test('accum.string(listenerFn) with throws to domain', function (done) { | ||
var d = domain.create(); | ||
d.on('error', function (err) { | ||
t.equal(err.message, 'my error'); | ||
done(); | ||
}) | ||
.run(function () { | ||
var stream = accum.string(function (alldata) { }); | ||
process.nextTick(function () { | ||
stream.emit('error', new Error('my error')); | ||
}); | ||
}); | ||
}); | ||
// currently pipe does not forward error but I have put in | ||
// pull request to fix node.js. Also pause-stream will have | ||
// to be modified as well. | ||
// test('accum.string(cb) with err piped, calls cb with err', function (done) { | ||
// var stream = passStream(); | ||
// stream | ||
// .pipe(accum.string(function (err, alldata) { | ||
// t.equal(err.message, 'my error'); | ||
// done(); | ||
// })); | ||
// process.nextTick(function () { | ||
// stream.emit('error', new Error('my error')); | ||
// }); | ||
// }); | ||
test('accum.string() missing cb throws error', function () { | ||
test('accum.string() missing listenerFn throws error', function () { | ||
function throwsErr() { | ||
var stream = accum.string(); | ||
} | ||
t.throws(throwsErr, /accum requires a cb function/); | ||
t.throws(throwsErr, /accum requires a listenerFn function/); | ||
}); | ||
103
26564
581