bufferedstream
Advanced tools
Comparing version
@@ -83,8 +83,6 @@ var util = require('util'); | ||
* Prevents this stream from emitting data events until resume is called. | ||
* This does not prevent writes to this stream. | ||
* Note: This does not prevent writes to this stream. | ||
*/ | ||
BufferedStream.prototype.pause = function () { | ||
if (!this.paused) { | ||
this.paused = true; | ||
} | ||
this.paused = true; | ||
}; | ||
@@ -96,6 +94,4 @@ | ||
BufferedStream.prototype.resume = function () { | ||
if (this.paused) { | ||
this.paused = false; | ||
flushOnNextTick(this); | ||
} | ||
if (this.paused) flushOnNextTick(this); | ||
this.paused = false; | ||
}; | ||
@@ -109,14 +105,7 @@ | ||
BufferedStream.prototype.write = function (chunk, encoding) { | ||
if (!this.writable) { | ||
throw new Error('Stream is not writable'); | ||
} | ||
if (!this.writable) throw new Error('Stream is not writable'); | ||
if (this.ended) throw new Error('Stream is already ended'); | ||
if (this.ended) { | ||
throw new Error('Stream is already ended'); | ||
} | ||
if (typeof chunk === 'string') chunk = new Buffer(chunk, encoding); | ||
if (typeof chunk === 'string') { | ||
chunk = new Buffer(chunk, encoding); | ||
} | ||
this._buffer.push(chunk); | ||
@@ -142,5 +131,3 @@ this.size += chunk.length; | ||
BufferedStream.prototype.end = function (chunk, encoding) { | ||
if (this.ended) { | ||
throw new Error('Stream is already ended'); | ||
} | ||
if (this.ended) throw new Error('Stream is already ended'); | ||
@@ -150,4 +137,4 @@ if (chunk != null) this.write(chunk, encoding); | ||
// Trigger the flush cycle one last time to emit any data that was written | ||
// before we called end. | ||
// Trigger the flush cycle one last time to emit any data that | ||
// was written before end was called. | ||
flushOnNextTick(this); | ||
@@ -157,20 +144,19 @@ }; | ||
function flushOnNextTick(stream) { | ||
if (!stream._flushing) { | ||
process.nextTick(function tick() { | ||
if (stream.paused) { | ||
stream._flushing = false; | ||
return; | ||
} | ||
if (stream._flushing) return; | ||
stream._flushing = true; | ||
flush(stream); | ||
process.nextTick(function tick() { | ||
if (stream.paused) { | ||
stream._flushing = false; | ||
return; | ||
} | ||
if (stream.empty) { | ||
stream._flushing = false; | ||
} else { | ||
process.nextTick(tick); | ||
} | ||
}); | ||
flush(stream); | ||
stream._flushing = true; | ||
} | ||
if (stream.empty) { | ||
stream._flushing = false; | ||
} else { | ||
process.nextTick(tick); | ||
} | ||
}); | ||
} | ||
@@ -192,15 +178,10 @@ | ||
// If the stream was paused in some data event handler, break. | ||
if (stream.paused) { | ||
break; | ||
} | ||
// If the stream was paused in a data event handler, break. | ||
if (stream.paused) break; | ||
} | ||
if (stream.ended) { | ||
end(stream); | ||
return; | ||
} | ||
// If the stream was full at one point but isn't now, emit "drain". | ||
if (stream._wasFull && !stream.full) { | ||
if (stream.ended && !stream.paused) { | ||
stream._buffer = null; | ||
stream.emit('end'); | ||
} else if (stream._wasFull && !stream.full) { | ||
stream._wasFull = false; | ||
@@ -210,6 +191,1 @@ stream.emit('drain'); | ||
} | ||
function end(stream) { | ||
stream.emit('end'); | ||
stream._buffer = null; | ||
} |
@@ -5,3 +5,3 @@ { | ||
"description": "A base stream class for node that reliably buffers until next tick", | ||
"version": "1.5.0", | ||
"version": "1.5.1", | ||
"repository": { | ||
@@ -8,0 +8,0 @@ "type": "git", |
@@ -78,4 +78,4 @@ var assert = require('assert'); | ||
done(); | ||
}, 1); | ||
}, 1); | ||
}, 5); | ||
}, 0); | ||
}); | ||
@@ -82,0 +82,0 @@ }); |
Sorry, the diff of this file is not supported yet
16938
-0.8%396
-5.04%