readable-stream
Advanced tools
Comparing version 0.2.0 to 0.3.0
@@ -46,2 +46,8 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
if (options && options.readable === false) | ||
this.readable = false; | ||
if (options && options.writable === false) | ||
this.writable = false; | ||
this.allowHalfOpen = true; | ||
@@ -48,0 +54,0 @@ if (options && options.allowHalfOpen === false) |
@@ -51,2 +51,5 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
if (this.lowWaterMark > this.highWaterMark) | ||
throw new Error('lowWaterMark cannot be higher than highWaterMark'); | ||
this.buffer = []; | ||
@@ -70,2 +73,7 @@ this.length = 0; | ||
// object stream flag. Used to make read(n) ignore n and to | ||
// make all the buffer merging and length checks go away | ||
this.objectMode = !!options.objectMode; | ||
// when piping, we only care about 'readable' events that happen | ||
@@ -110,4 +118,9 @@ // after read()ing all the bytes and not getting any pushback. | ||
// more bytes. This is to work around cases where hwm=0 and | ||
// lwm=0, such as the repl. | ||
return rs.length < rs.highWaterMark || rs.length <= rs.lowWaterMark; | ||
// lwm=0, such as the repl. Also, if the push() triggered a | ||
// readable event, and the user called read(largeNumber) such that | ||
// needReadable was set, then we ought to push more, so that another | ||
// 'readable' event will be triggered. | ||
return rs.needReadable || | ||
rs.length < rs.highWaterMark || | ||
rs.length <= rs.lowWaterMark; | ||
}; | ||
@@ -127,2 +140,5 @@ | ||
if (state.objectMode) | ||
return n === 0 ? 0 : 1; | ||
if (isNaN(n) || n === null) | ||
@@ -158,3 +174,4 @@ return state.length; | ||
if (n === 0 && state.ended) { | ||
endReadable(this); | ||
if (state.length === 0) | ||
endReadable(this); | ||
return null; | ||
@@ -216,7 +233,7 @@ } | ||
if (n > 0) | ||
ret = fromList(n, state.buffer, state.length, !!state.decoder); | ||
ret = fromList(n, state); | ||
else | ||
ret = null; | ||
if (ret === null || ret.length === 0) { | ||
if (ret === null || (!state.objectMode && ret.length === 0)) { | ||
state.needReadable = true; | ||
@@ -233,2 +250,8 @@ n = 0; | ||
// If we happened to read() exactly the remaining amount in the | ||
// buffer, and the EOF has been seen at this point, then make sure | ||
// that we emit 'end' on the very next tick. | ||
if (state.ended && !state.endEmitted && state.length === 0) | ||
endReadable(this); | ||
return ret; | ||
@@ -241,2 +264,15 @@ }; | ||
// If we get something that is not a buffer, string, null, or undefined, | ||
// then switch into objectMode. Now stream chunks are all considered | ||
// to be of length=1, and the watermarks determine how many objects to | ||
// keep in the buffer, rather than how many bytes or characters. | ||
if (!Buffer.isBuffer(chunk) && | ||
'string' !== typeof chunk && | ||
chunk !== null && | ||
chunk !== undefined) { | ||
state.objectMode = true; | ||
state.length = state.buffer.length; | ||
state.decoder = null; | ||
} | ||
state.reading = false; | ||
@@ -246,3 +282,3 @@ if (er) | ||
if (!chunk || !chunk.length) { | ||
if (chunk === null || chunk === undefined) { | ||
// eof | ||
@@ -254,20 +290,26 @@ state.ended = true; | ||
state.buffer.push(chunk); | ||
state.length += chunk.length; | ||
state.length += state.objectMode ? 1 : chunk.length; | ||
} | ||
} | ||
// if we've ended and we have some data left, then emit | ||
// 'readable' now to make sure it gets picked up. | ||
if (!sync) { | ||
if (state.length > 0) { | ||
state.needReadable = false; | ||
if (!state.emittedReadable) { | ||
state.emittedReadable = true; | ||
stream.emit('readable'); | ||
} | ||
} else | ||
endReadable(stream); | ||
} | ||
if (state.length > 0) | ||
emitReadable(stream); | ||
else | ||
endReadable(stream); | ||
return; | ||
} | ||
// at this point, if we got a zero-length buffer or string, | ||
// and we're not in object-mode, then there's really no point | ||
// continuing. it means that there is nothing to read right | ||
// now, but as we have not received the EOF-signaling null, | ||
// we're not ended. we've already unset the reading flag, | ||
// so just get out of here. | ||
if (!state.objectMode && | ||
(chunk || typeof chunk === 'string') && | ||
0 === chunk.length) | ||
return; | ||
if (state.decoder) | ||
@@ -277,6 +319,4 @@ chunk = state.decoder.write(chunk); | ||
// update the buffer info. | ||
if (chunk) { | ||
state.length += chunk.length; | ||
state.buffer.push(chunk); | ||
} | ||
state.length += state.objectMode ? 1 : chunk.length; | ||
state.buffer.push(chunk); | ||
@@ -294,11 +334,25 @@ // if we haven't gotten enough to pass the lowWaterMark, | ||
if (state.needReadable && !sync) { | ||
state.needReadable = false; | ||
if (!state.emittedReadable) { | ||
state.emittedReadable = true; | ||
stream.emit('readable'); | ||
} | ||
// Don't emit readable right away in sync mode, because this can trigger | ||
// another read() call => stack overflow. This way, it might trigger | ||
// a nextTick recursion warning, but that's not so bad. | ||
if (state.needReadable) { | ||
if (!sync) | ||
emitReadable(stream); | ||
else | ||
process.nextTick(function() { | ||
emitReadable(stream); | ||
}); | ||
} | ||
} | ||
function emitReadable(stream) { | ||
var state = stream._readableState; | ||
state.needReadable = false; | ||
if (state.emittedReadable) | ||
return; | ||
state.emittedReadable = true; | ||
stream.emit('readable'); | ||
} | ||
// abstract method. to be overridden in specific implementation classes. | ||
@@ -361,3 +415,3 @@ // call cb(er, data) where data is <= n in length. | ||
// cleanup event handlers once the pipe is broken | ||
dest.removeListener('close', unpipe); | ||
dest.removeListener('close', onclose); | ||
dest.removeListener('finish', onfinish); | ||
@@ -388,7 +442,11 @@ dest.removeListener('drain', ondrain); | ||
// if the dest emits close, then presumably there's no point writing | ||
// to it any more. | ||
dest.once('close', unpipe); | ||
// Both close and finish should trigger unpipe, but only once. | ||
function onclose() { | ||
dest.removeListener('finish', onfinish); | ||
unpipe(); | ||
} | ||
dest.once('close', onclose); | ||
function onfinish() { | ||
dest.removeListener('close', unpipe); | ||
dest.removeListener('close', onclose); | ||
unpipe(); | ||
} | ||
@@ -504,2 +562,3 @@ dest.once('finish', onfinish); | ||
this.removeListener('readable', pipeOnReadable); | ||
state.flowing = false; | ||
if (dest) | ||
@@ -519,2 +578,3 @@ dest.emit('unpipe', this); | ||
this.removeListener('readable', pipeOnReadable); | ||
state.flowing = false; | ||
@@ -684,12 +744,17 @@ for (var i = 0; i < len; i++) | ||
// Length is the combined lengths of all the buffers in the list. | ||
function fromList(n, list, length, stringMode) { | ||
function fromList(n, state) { | ||
var list = state.buffer; | ||
var length = state.length; | ||
var stringMode = !!state.decoder; | ||
var objectMode = !!state.objectMode; | ||
var ret; | ||
// nothing in the list, definitely empty. | ||
if (list.length === 0) { | ||
if (list.length === 0) | ||
return null; | ||
} | ||
if (length === 0) | ||
ret = null; | ||
else if (objectMode) | ||
ret = list.shift(); | ||
else if (!n || n >= length) { | ||
@@ -746,2 +811,8 @@ // read it all, truncate the array. | ||
var state = stream._readableState; | ||
// If we get here before consuming all the bytes, then that is a | ||
// bug in node. Should never happen. | ||
if (state.length > 0) | ||
throw new Error('endReadable called on non-empty stream'); | ||
if (state.endEmitted) | ||
@@ -748,0 +819,0 @@ return; |
@@ -73,11 +73,45 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
function TransformState(stream) { | ||
this.buffer = []; | ||
this.transforming = false; | ||
this.pendingReadCb = null; | ||
function TransformState(options, stream) { | ||
var ts = this; | ||
this.output = function(chunk) { | ||
ts.needTransform = false; | ||
stream.push(chunk); | ||
}; | ||
this.afterTransform = function(er, data) { | ||
return afterTransform(stream, er, data); | ||
}; | ||
this.needTransform = false; | ||
this.transforming = false; | ||
this.writecb = null; | ||
this.writechunk = null; | ||
} | ||
function afterTransform(stream, er, data) { | ||
var ts = stream._transformState; | ||
ts.transforming = false; | ||
var cb = ts.writecb; | ||
if (!cb) | ||
return this.emit('error', new Error('no writecb in Transform class')); | ||
ts.writechunk = null; | ||
ts.writecb = null; | ||
if (data !== null && data !== undefined) | ||
ts.output(data); | ||
if (cb) | ||
cb(er); | ||
var rs = stream._readableState; | ||
if (rs.needReadable || rs.length < rs.highWaterMark) { | ||
stream._read(); | ||
} | ||
} | ||
function Transform(options) { | ||
@@ -89,9 +123,10 @@ if (!(this instanceof Transform)) | ||
// bind output so that it can be passed around as a regular function. | ||
var ts = this._transformState = new TransformState(options, this); | ||
// when the writable side finishes, then flush out anything remaining. | ||
var stream = this; | ||
// the queue of _write chunks that are pending being transformed | ||
var ts = this._transformState = new TransformState(stream); | ||
// start out asking for a readable event once data is transformed. | ||
this._readableState.needReadable = true; | ||
// when the writable side finishes, then flush out anything remaining. | ||
this.once('finish', function() { | ||
@@ -123,52 +158,26 @@ if ('function' === typeof this._flush) | ||
var ts = this._transformState; | ||
var rs = this._readableState; | ||
ts.buffer.push([chunk, cb]); | ||
// no need for auto-pull if already in the midst of one. | ||
ts.writecb = cb; | ||
ts.writechunk = chunk; | ||
if (ts.transforming) | ||
return; | ||
// now we have something to transform, if we were waiting for it. | ||
// kick off a _read to pull it in. | ||
if (ts.pendingReadCb) { | ||
var readcb = ts.pendingReadCb; | ||
ts.pendingReadCb = null; | ||
this._read(0, readcb); | ||
} | ||
// if we weren't waiting for it, but nothing is queued up, then | ||
// still kick off a transform, just so it's there when the user asks. | ||
var doRead = rs.needReadable || rs.length <= rs.highWaterMark; | ||
if (doRead && !rs.reading) { | ||
var ret = this.read(0); | ||
if (ret !== null) | ||
return cb(new Error('invalid stream transform state')); | ||
} | ||
var rs = this._readableState; | ||
if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) | ||
this._read(); | ||
}; | ||
Transform.prototype._read = function(n, readcb) { | ||
var ws = this._writableState; | ||
var rs = this._readableState; | ||
// Doesn't matter what the args are here. | ||
// the output and callback functions passed to _transform do all the work. | ||
// That we got here means that the readable side wants more data. | ||
Transform.prototype._read = function(n, cb) { | ||
var ts = this._transformState; | ||
ts.pendingReadCb = readcb; | ||
// if there's nothing pending, then we just wait. | ||
// if we're already transforming, then also just hold on a sec. | ||
// we've already stashed the readcb, so we can come back later | ||
// when we have something to transform | ||
if (ts.buffer.length === 0 || ts.transforming) | ||
if (ts.writechunk && ts.writecb && !ts.transforming) { | ||
ts.transforming = true; | ||
this._transform(ts.writechunk, ts.output, ts.afterTransform); | ||
return; | ||
} | ||
// go ahead and transform that thing, now that someone wants it | ||
var req = ts.buffer.shift(); | ||
var chunk = req[0]; | ||
var writecb = req[1]; | ||
ts.transforming = true; | ||
this._transform(chunk, ts.output, function(er, data) { | ||
ts.transforming = false; | ||
if (data) | ||
ts.output(data); | ||
writecb(er); | ||
}); | ||
// mark that we need a transform, so that any data that comes in | ||
// will get processed, now that we've asked for it. | ||
ts.needTransform = true; | ||
}; | ||
@@ -175,0 +184,0 @@ |
@@ -49,2 +49,6 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// object stream flag to indicate whether or not this stream | ||
// contains buffers or objects. | ||
this.objectMode = !!options.objectMode; | ||
// cast to ints. | ||
@@ -54,2 +58,5 @@ this.lowWaterMark = ~~this.lowWaterMark; | ||
if (this.lowWaterMark > this.highWaterMark) | ||
throw new Error('lowWaterMark cannot be higher than highWaterMark'); | ||
this.needDrain = false; | ||
@@ -133,11 +140,25 @@ // at the start of calling end() | ||
var l = chunk.length; | ||
if (false === state.decodeStrings) | ||
chunk = [chunk, encoding || 'utf8']; | ||
else if (typeof chunk === 'string' || encoding) { | ||
chunk = new Buffer(chunk + '', encoding); | ||
l = chunk.length; | ||
// Writing something other than a string or buffer will switch | ||
// the stream into objectMode. | ||
if (!state.objectMode && | ||
typeof chunk !== 'string' && | ||
chunk !== null && | ||
chunk !== undefined && | ||
!Buffer.isBuffer(chunk)) | ||
state.objectMode = true; | ||
var len; | ||
if (state.objectMode) | ||
len = 1; | ||
else { | ||
len = chunk.length; | ||
if (false === state.decodeStrings) | ||
chunk = [chunk, encoding || 'utf8']; | ||
else if (typeof chunk === 'string') { | ||
chunk = new Buffer(chunk, encoding); | ||
len = chunk.length; | ||
} | ||
} | ||
state.length += l; | ||
state.length += len; | ||
@@ -157,3 +178,3 @@ var ret = state.length < state.highWaterMark; | ||
state.sync = true; | ||
state.writelen = l; | ||
state.writelen = len; | ||
state.writecb = cb; | ||
@@ -170,3 +191,3 @@ this._write(chunk, state.onwrite); | ||
var cb = state.writecb; | ||
var l = state.writelen; | ||
var len = state.writelen; | ||
@@ -194,3 +215,3 @@ state.writing = false; | ||
} | ||
state.length -= l; | ||
state.length -= len; | ||
@@ -239,8 +260,10 @@ if (cb) { | ||
if (false === state.decodeStrings) | ||
l = chunk[0].length; | ||
if (state.objectMode) | ||
len = 1; | ||
else if (false === state.decodeStrings) | ||
len = chunk[0].length; | ||
else | ||
l = chunk.length; | ||
len = chunk.length; | ||
state.writelen = l; | ||
state.writelen = len; | ||
state.writecb = cb; | ||
@@ -277,3 +300,3 @@ state.writechunk = chunk; | ||
Writable.prototype.end = function(chunk, encoding) { | ||
Writable.prototype.end = function(chunk, encoding, cb) { | ||
var state = this._writableState; | ||
@@ -285,5 +308,14 @@ | ||
if (typeof chunk === 'function') { | ||
cb = chunk; | ||
chunk = null; | ||
encoding = null; | ||
} else if (typeof encoding === 'function') { | ||
cb = encoding; | ||
encoding = null; | ||
} | ||
state.ending = true; | ||
if (chunk) | ||
this.write(chunk, encoding); | ||
this.write(chunk, encoding, cb); | ||
else if (state.length === 0 && !state.finishing && !state.finished) { | ||
@@ -293,4 +325,8 @@ state.finishing = true; | ||
state.finished = true; | ||
if (cb) process.nextTick(cb); | ||
} else if (cb) { | ||
this.once('finish', cb); | ||
} | ||
state.ended = true; | ||
}; |
{ | ||
"name": "readable-stream", | ||
"version": "0.2.0", | ||
"version": "0.3.0", | ||
"description": "An exploration of a new kind of readable streams for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "readable.js", |
@@ -99,3 +99,6 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var tests = []; | ||
var count = 0; | ||
function test(name, fn) { | ||
count++; | ||
tests.push([name, fn]); | ||
@@ -115,6 +118,14 @@ } | ||
equal: assert.equal, | ||
end: run | ||
end: function () { | ||
count--; | ||
run(); | ||
} | ||
}); | ||
} | ||
// ensure all tests have run | ||
process.on("exit", function () { | ||
assert.equal(count, 0); | ||
}); | ||
process.nextTick(run); | ||
@@ -323,1 +334,137 @@ | ||
}); | ||
test('back pressure respected', function (t) { | ||
function noop() {} | ||
var r = new R(); | ||
var counter = 0; | ||
r.push(["one"]); | ||
r.push(["two"]); | ||
r.push(["three"]); | ||
r.push(["four"]); | ||
r.push(null); | ||
r._read = noop; | ||
var w1 = new R(); | ||
w1.write = function (chunk) { | ||
assert.equal(chunk[0], "one"); | ||
w1.emit("close"); | ||
process.nextTick(function () { | ||
r.pipe(w2); | ||
r.pipe(w3); | ||
}) | ||
}; | ||
w1.end = noop; | ||
r.pipe(w1); | ||
var expected = ["two", "two", "three", "three", "four", "four"]; | ||
var w2 = new R(); | ||
w2.write = function (chunk) { | ||
assert.equal(chunk[0], expected.shift()); | ||
assert.equal(counter, 0); | ||
counter++; | ||
if (chunk[0] === "four") { | ||
return true; | ||
} | ||
setTimeout(function () { | ||
counter--; | ||
w2.emit("drain"); | ||
}, 10); | ||
return false; | ||
} | ||
w2.end = noop; | ||
var w3 = new R(); | ||
w3.write = function (chunk) { | ||
assert.equal(chunk[0], expected.shift()); | ||
assert.equal(counter, 1); | ||
counter++; | ||
if (chunk[0] === "four") { | ||
return true; | ||
} | ||
setTimeout(function () { | ||
counter--; | ||
w3.emit("drain"); | ||
}, 50); | ||
return false; | ||
}; | ||
w3.end = function () { | ||
assert.equal(counter, 2); | ||
assert.equal(expected.length, 0); | ||
t.end(); | ||
}; | ||
}); | ||
test('read(0) for ended streams', function (t) { | ||
var r = new R(); | ||
var written = false; | ||
var ended = false; | ||
r._read = function () {}; | ||
r.push(new Buffer("foo")); | ||
r.push(null); | ||
var v = r.read(0); | ||
assert.equal(v, null); | ||
var w = new R(); | ||
w.write = function (buffer) { | ||
written = true; | ||
assert.equal(ended, false); | ||
assert.equal(buffer.toString(), "foo") | ||
}; | ||
w.end = function () { | ||
ended = true; | ||
assert.equal(written, true); | ||
t.end(); | ||
}; | ||
r.pipe(w); | ||
}) | ||
test('sync _read ending', function (t) { | ||
var r = new R(); | ||
var called = false; | ||
r._read = function (n, cb) { | ||
cb(null, null); | ||
}; | ||
r.once('end', function () { | ||
called = true; | ||
}) | ||
r.read(); | ||
process.nextTick(function () { | ||
assert.equal(called, true); | ||
t.end(); | ||
}) | ||
}); | ||
assert.throws(function() { | ||
var bad = new R({ | ||
highWaterMark: 10, | ||
lowWaterMark: 1000 | ||
}); | ||
}); | ||
assert.throws(function() { | ||
var W = require('stream').Writable; | ||
var bad = new W({ | ||
highWaterMark: 10, | ||
lowWaterMark: 1000 | ||
}); | ||
}); |
@@ -28,3 +28,6 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var tests = []; | ||
var count = 0; | ||
function test(name, fn) { | ||
count++; | ||
tests.push([name, fn]); | ||
@@ -44,6 +47,14 @@ } | ||
equal: assert.equal, | ||
end: run | ||
end: function () { | ||
count--; | ||
run(); | ||
} | ||
}); | ||
} | ||
// ensure all tests have run | ||
process.on("exit", function () { | ||
assert.equal(count, 0); | ||
}); | ||
process.nextTick(run); | ||
@@ -62,15 +73,15 @@ | ||
// read more than the first element. | ||
var ret = fromList(6, list, 16); | ||
var ret = fromList(6, { buffer: list, length: 16 }); | ||
t.equal(ret.toString(), 'foogba'); | ||
// read exactly the first element. | ||
ret = fromList(2, list, 10); | ||
ret = fromList(2, { buffer: list, length: 10 }); | ||
t.equal(ret.toString(), 'rk'); | ||
// read less than the first element. | ||
ret = fromList(2, list, 8); | ||
ret = fromList(2, { buffer: list, length: 8 }); | ||
t.equal(ret.toString(), 'ba'); | ||
// read more than we have. | ||
ret = fromList(100, list, 6); | ||
ret = fromList(100, { buffer: list, length: 6 }); | ||
t.equal(ret.toString(), 'zykuel'); | ||
@@ -93,15 +104,15 @@ | ||
// read more than the first element. | ||
var ret = fromList(6, list, 16, true); | ||
var ret = fromList(6, { buffer: list, length: 16, decoder: true }); | ||
t.equal(ret, 'foogba'); | ||
// read exactly the first element. | ||
ret = fromList(2, list, 10, true); | ||
ret = fromList(2, { buffer: list, length: 10, decoder: true }); | ||
t.equal(ret, 'rk'); | ||
// read less than the first element. | ||
ret = fromList(2, list, 8, true); | ||
ret = fromList(2, { buffer: list, length: 8, decoder: true }); | ||
t.equal(ret, 'ba'); | ||
// read more than we have. | ||
ret = fromList(100, list, 6, true); | ||
ret = fromList(100, { buffer: list, length: 6, decoder: true }); | ||
t.equal(ret, 'zykuel'); | ||
@@ -108,0 +119,0 @@ |
@@ -30,3 +30,6 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var tests = []; | ||
var count = 0; | ||
function test(name, fn) { | ||
count++; | ||
tests.push([name, fn]); | ||
@@ -46,6 +49,14 @@ } | ||
equal: assert.equal, | ||
end: run | ||
end: function () { | ||
count--; | ||
run(); | ||
} | ||
}); | ||
} | ||
// ensure all tests have run | ||
process.on("exit", function () { | ||
assert.equal(count, 0); | ||
}); | ||
process.nextTick(run); | ||
@@ -82,2 +93,4 @@ | ||
console.log("cb(null, ret)", ret) | ||
return cb(null, ret); | ||
@@ -183,2 +196,3 @@ }.bind(this), 1); | ||
tr.on('readable', function flow() { | ||
console.log("readable once") | ||
var chunk; | ||
@@ -190,2 +204,3 @@ while (null !== (chunk = tr.read(13))) | ||
tr.on('end', function() { | ||
console.log("END") | ||
t.same(out, expect); | ||
@@ -192,0 +207,0 @@ t.end(); |
@@ -29,3 +29,6 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var tests = []; | ||
var count = 0; | ||
function test(name, fn) { | ||
count++; | ||
tests.push([name, fn]); | ||
@@ -45,6 +48,15 @@ } | ||
equal: assert.equal, | ||
end: run | ||
ok: assert, | ||
end: function () { | ||
count--; | ||
run(); | ||
} | ||
}); | ||
} | ||
// ensure all tests have run | ||
process.on("exit", function () { | ||
assert.equal(count, 0); | ||
}); | ||
process.nextTick(run); | ||
@@ -54,2 +66,29 @@ | ||
test('writable side consumption', function(t) { | ||
var tx = new Transform({ | ||
highWaterMark: 10 | ||
}); | ||
var transformed = 0; | ||
tx._transform = function(chunk, output, cb) { | ||
transformed += chunk.length; | ||
output(chunk); | ||
cb(); | ||
}; | ||
for (var i = 1; i <= 10; i++) { | ||
tx.write(new Buffer(i)); | ||
} | ||
tx.end(); | ||
t.equal(tx._readableState.length, 10); | ||
t.equal(transformed, 10); | ||
t.equal(tx._transformState.writechunk.length, 5); | ||
t.same(tx._writableState.buffer.map(function(c) { | ||
return c[0].length; | ||
}), [6, 7, 8, 9, 10]); | ||
t.end(); | ||
}); | ||
test('passthrough', function(t) { | ||
@@ -326,1 +365,81 @@ var pt = new PassThrough(); | ||
}); | ||
test('object transform (json parse)', function(t) { | ||
console.error('json parse stream'); | ||
var jp = new Transform({ objectMode: true }); | ||
jp._transform = function(data, output, cb) { | ||
try { | ||
output(JSON.parse(data)); | ||
cb(); | ||
} catch (er) { | ||
cb(er); | ||
} | ||
}; | ||
// anything except null/undefined is fine. | ||
// those are "magic" in the stream API, because they signal EOF. | ||
var objects = [ | ||
{ foo: 'bar' }, | ||
100, | ||
"string", | ||
{ nested: { things: [ { foo: 'bar' }, 100, "string" ] } } | ||
]; | ||
var ended = false; | ||
jp.on('end', function() { | ||
ended = true; | ||
}); | ||
objects.forEach(function(obj) { | ||
jp.write(JSON.stringify(obj)); | ||
var res = jp.read(); | ||
t.same(res, obj); | ||
}); | ||
jp.end(); | ||
process.nextTick(function() { | ||
t.ok(ended); | ||
t.end(); | ||
}) | ||
}); | ||
test('object transform (json stringify)', function(t) { | ||
console.error('json parse stream'); | ||
var js = new Transform({ objectMode: true }); | ||
js._transform = function(data, output, cb) { | ||
try { | ||
output(JSON.stringify(data)); | ||
cb(); | ||
} catch (er) { | ||
cb(er); | ||
} | ||
}; | ||
// anything except null/undefined is fine. | ||
// those are "magic" in the stream API, because they signal EOF. | ||
var objects = [ | ||
{ foo: 'bar' }, | ||
100, | ||
"string", | ||
{ nested: { things: [ { foo: 'bar' }, 100, "string" ] } } | ||
]; | ||
var ended = false; | ||
js.on('end', function() { | ||
ended = true; | ||
}); | ||
objects.forEach(function(obj) { | ||
js.write(obj); | ||
var res = js.read(); | ||
t.equal(res, JSON.stringify(obj)); | ||
}); | ||
js.end(); | ||
process.nextTick(function() { | ||
t.ok(ended); | ||
t.end(); | ||
}) | ||
}); |
@@ -51,3 +51,6 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var tests = []; | ||
var count = 0; | ||
function test(name, fn) { | ||
count++; | ||
tests.push([name, fn]); | ||
@@ -59,10 +62,6 @@ } | ||
if (!next) | ||
return console.log('ok'); | ||
return console.error('ok'); | ||
var name = next[0]; | ||
var fn = next[1]; | ||
if (!fn) | ||
return run(); | ||
console.log('# %s', name); | ||
@@ -72,6 +71,14 @@ fn({ | ||
equal: assert.equal, | ||
end: run | ||
end: function () { | ||
count--; | ||
run(); | ||
} | ||
}); | ||
} | ||
// ensure all tests have run | ||
process.on("exit", function () { | ||
assert.equal(count, 0); | ||
}); | ||
process.nextTick(run); | ||
@@ -250,1 +257,41 @@ | ||
}); | ||
test('end callback', function (t) { | ||
var tw = new TestWriter(); | ||
tw.end(function () { | ||
t.end(); | ||
}); | ||
}); | ||
test('end callback with chunk', function (t) { | ||
var tw = new TestWriter(); | ||
tw.end(new Buffer('hello world'), function () { | ||
t.end(); | ||
}); | ||
}); | ||
test('end callback with chunk and encoding', function (t) { | ||
var tw = new TestWriter(); | ||
tw.end('hello world', 'ascii', function () { | ||
t.end(); | ||
}); | ||
}); | ||
test('end callback after .write() call', function (t) { | ||
var tw = new TestWriter(); | ||
tw.write(new Buffer('hello world')); | ||
tw.end(function () { | ||
t.end(); | ||
}); | ||
}); | ||
test('encoding should be ignored for buffers', function(t) { | ||
var tw = new W(); | ||
var hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb'; | ||
tw._write = function(chunk, cb) { | ||
t.equal(chunk.toString('hex'), hex); | ||
t.end(); | ||
}; | ||
var buf = new Buffer(hex, 'hex'); | ||
tw.write(buf, 'binary'); | ||
}); |
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 2 instances in 1 package
2
176619
27
4908