readable-stream
Advanced tools
Comparing version 0.0.4 to 0.1.0
@@ -1,63 +0,1 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a duplex stream is just a stream that is both readable and writable. | ||
// Since JS doesn't have multiple prototypal inheritance, this class | ||
// prototypally inherits from Readable, and then parasitically from | ||
// Writable. | ||
module.exports = Duplex; | ||
var util = require('util'); | ||
var Readable = require('./readable.js'); | ||
var Writable = require('./writable.js'); | ||
util.inherits(Duplex, Readable); | ||
Object.keys(Writable.prototype).forEach(function(method) { | ||
if (!Duplex.prototype[method]) | ||
Duplex.prototype[method] = Writable.prototype[method]; | ||
}); | ||
function Duplex(options) { | ||
if (!(this instanceof Duplex)) | ||
return new Duplex(options); | ||
Readable.call(this, options); | ||
Writable.call(this, options); | ||
this.allowHalfOpen = true; | ||
if (options && options.allowHalfOpen === false) | ||
this.allowHalfOpen = false; | ||
this.once('end', onend); | ||
} | ||
// the no-half-open enforcer | ||
function onend() { | ||
// if we allow half-open state, or if the writable side ended, | ||
// then we're ok. | ||
if (this.allowHalfOpen || this._writableState.ended) | ||
return; | ||
// no more data can be written. | ||
// But allow more writes to happen in this tick. | ||
process.nextTick(this.end.bind(this)); | ||
} | ||
module.exports = require("./lib/_stream_duplex.js") |
1697
fs.js
@@ -1,58 +0,1431 @@ | ||
'use strict'; | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
module.exports = FSReadable; | ||
// Maintainers, keep in mind that octal literals are not allowed | ||
// in strict mode. Use the decimal value and add a comment with | ||
// the octal value. Example: | ||
// | ||
// var mode = 438; /* mode=0666 */ | ||
// This uses the existing bindings in Node's FS module to | ||
// implement a read-method style readable stream. | ||
var Readable = require('./readable.js'); | ||
var util = require('util'); | ||
var fs = require('fs'); | ||
var StringDecoder = require('string_decoder').StringDecoder; | ||
var assert = require('assert'); | ||
var pathModule = require('path'); | ||
// a very basic memory pool. this optimization helps revent lots | ||
// of allocations when there are many fs readable streams happening | ||
// concurrently. | ||
var binding = process.binding('fs'); | ||
var constants = process.binding('constants'); | ||
var fs = exports; | ||
var Stream = require('stream').Stream; | ||
var EventEmitter = require('events').EventEmitter; | ||
var Readable = require('./lib/_stream_readable.js'); | ||
var Writable = require('./lib/_stream_writable.js'); | ||
var kMinPoolSpace = 128; | ||
var kPoolSize = 40 * 1024; | ||
var O_APPEND = constants.O_APPEND || 0; | ||
var O_CREAT = constants.O_CREAT || 0; | ||
var O_DIRECTORY = constants.O_DIRECTORY || 0; | ||
var O_EXCL = constants.O_EXCL || 0; | ||
var O_NOCTTY = constants.O_NOCTTY || 0; | ||
var O_NOFOLLOW = constants.O_NOFOLLOW || 0; | ||
var O_RDONLY = constants.O_RDONLY || 0; | ||
var O_RDWR = constants.O_RDWR || 0; | ||
var O_SYMLINK = constants.O_SYMLINK || 0; | ||
var O_SYNC = constants.O_SYNC || 0; | ||
var O_TRUNC = constants.O_TRUNC || 0; | ||
var O_WRONLY = constants.O_WRONLY || 0; | ||
var isWindows = process.platform === 'win32'; | ||
var DEBUG = process.env.NODE_DEBUG && /fs/.test(process.env.NODE_DEBUG); | ||
function rethrow() { | ||
// Only enable in debug mode. A backtrace uses ~1000 bytes of heap space and | ||
// is fairly slow to generate. | ||
if (DEBUG) { | ||
var backtrace = new Error; | ||
return function(err) { | ||
if (err) { | ||
backtrace.message = err.message; | ||
err = backtrace; | ||
throw err; | ||
} | ||
}; | ||
} | ||
return function(err) { | ||
if (err) { | ||
throw err; // Forgot a callback but don't know where? Use NODE_DEBUG=fs | ||
} | ||
}; | ||
} | ||
function maybeCallback(cb) { | ||
return typeof cb === 'function' ? cb : rethrow(); | ||
} | ||
// Ensure that callbacks run in the global context. Only use this function | ||
// for callbacks that are passed to the binding layer, callbacks that are | ||
// invoked from JS already run in the proper scope. | ||
function makeCallback(cb) { | ||
if (typeof cb !== 'function') { | ||
return rethrow(); | ||
} | ||
return function() { | ||
return cb.apply(null, arguments); | ||
}; | ||
} | ||
function assertEncoding(encoding) { | ||
if (encoding && !Buffer.isEncoding(encoding)) { | ||
throw new Error('Unknown encoding: ' + encoding); | ||
} | ||
} | ||
function nullCheck(path, callback) { | ||
if (('' + path).indexOf('\u0000') !== -1) { | ||
var er = new Error('Path must be a string without null bytes.'); | ||
if (!callback) | ||
throw er; | ||
process.nextTick(function() { | ||
callback(er); | ||
}); | ||
return false; | ||
} | ||
return true; | ||
} | ||
fs.Stats = binding.Stats; | ||
fs.Stats.prototype._checkModeProperty = function(property) { | ||
return ((this.mode & constants.S_IFMT) === property); | ||
}; | ||
fs.Stats.prototype.isDirectory = function() { | ||
return this._checkModeProperty(constants.S_IFDIR); | ||
}; | ||
fs.Stats.prototype.isFile = function() { | ||
return this._checkModeProperty(constants.S_IFREG); | ||
}; | ||
fs.Stats.prototype.isBlockDevice = function() { | ||
return this._checkModeProperty(constants.S_IFBLK); | ||
}; | ||
fs.Stats.prototype.isCharacterDevice = function() { | ||
return this._checkModeProperty(constants.S_IFCHR); | ||
}; | ||
fs.Stats.prototype.isSymbolicLink = function() { | ||
return this._checkModeProperty(constants.S_IFLNK); | ||
}; | ||
fs.Stats.prototype.isFIFO = function() { | ||
return this._checkModeProperty(constants.S_IFIFO); | ||
}; | ||
fs.Stats.prototype.isSocket = function() { | ||
return this._checkModeProperty(constants.S_IFSOCK); | ||
}; | ||
fs.exists = function(path, callback) { | ||
if (!nullCheck(path, cb)) return; | ||
binding.stat(pathModule._makeLong(path), cb); | ||
function cb(err, stats) { | ||
if (callback) callback(err ? false : true); | ||
} | ||
}; | ||
fs.existsSync = function(path) { | ||
try { | ||
nullCheck(path); | ||
binding.stat(pathModule._makeLong(path)); | ||
return true; | ||
} catch (e) { | ||
return false; | ||
} | ||
}; | ||
fs.readFile = function(path, encoding_) { | ||
var encoding = typeof(encoding_) === 'string' ? encoding_ : null; | ||
var callback = maybeCallback(arguments[arguments.length - 1]); | ||
assertEncoding(encoding); | ||
// first, stat the file, so we know the size. | ||
var size; | ||
var buffer; // single buffer with file data | ||
var buffers; // list for when size is unknown | ||
var pos = 0; | ||
var fd; | ||
fs.open(path, constants.O_RDONLY, 438 /*=0666*/, function(er, fd_) { | ||
if (er) return callback(er); | ||
fd = fd_; | ||
fs.fstat(fd, function(er, st) { | ||
if (er) return callback(er); | ||
size = st.size; | ||
if (size === 0) { | ||
// the kernel lies about many files. | ||
// Go ahead and try to read some bytes. | ||
buffers = []; | ||
return read(); | ||
} | ||
buffer = new Buffer(size); | ||
read(); | ||
}); | ||
}); | ||
function read() { | ||
if (size === 0) { | ||
buffer = new Buffer(8192); | ||
fs.read(fd, buffer, 0, 8192, -1, afterRead); | ||
} else { | ||
fs.read(fd, buffer, pos, size - pos, -1, afterRead); | ||
} | ||
} | ||
function afterRead(er, bytesRead) { | ||
if (er) { | ||
return fs.close(fd, function(er2) { | ||
return callback(er); | ||
}); | ||
} | ||
if (bytesRead === 0) { | ||
return close(); | ||
} | ||
pos += bytesRead; | ||
if (size !== 0) { | ||
if (pos === size) close(); | ||
else read(); | ||
} else { | ||
// unknown size, just read until we don't get bytes. | ||
buffers.push(buffer.slice(0, bytesRead)); | ||
read(); | ||
} | ||
} | ||
function close() { | ||
fs.close(fd, function(er) { | ||
if (size === 0) { | ||
// collected the data into the buffers list. | ||
buffer = Buffer.concat(buffers, pos); | ||
} else if (pos < size) { | ||
buffer = buffer.slice(0, pos); | ||
} | ||
if (encoding) buffer = buffer.toString(encoding); | ||
return callback(er, buffer); | ||
}); | ||
} | ||
}; | ||
fs.readFileSync = function(path, encoding) { | ||
assertEncoding(encoding); | ||
var fd = fs.openSync(path, constants.O_RDONLY, 438 /*=0666*/); | ||
var size; | ||
var threw = true; | ||
try { | ||
size = fs.fstatSync(fd).size; | ||
threw = false; | ||
} finally { | ||
if (threw) fs.closeSync(fd); | ||
} | ||
var pos = 0; | ||
var buffer; // single buffer with file data | ||
var buffers; // list for when size is unknown | ||
if (size === 0) { | ||
buffers = []; | ||
} else { | ||
buffer = new Buffer(size); | ||
} | ||
var done = false; | ||
while (!done) { | ||
var threw = true; | ||
try { | ||
if (size !== 0) { | ||
var bytesRead = fs.readSync(fd, buffer, pos, size - pos); | ||
} else { | ||
// the kernel lies about many files. | ||
// Go ahead and try to read some bytes. | ||
buffer = new Buffer(8192); | ||
var bytesRead = fs.readSync(fd, buffer, 0, 8192); | ||
if (bytesRead) { | ||
buffers.push(buffer.slice(0, bytesRead)); | ||
} | ||
} | ||
threw = false; | ||
} finally { | ||
if (threw) fs.closeSync(fd); | ||
} | ||
pos += bytesRead; | ||
done = (bytesRead === 0) || (size !== 0 && pos >= size); | ||
} | ||
fs.closeSync(fd); | ||
if (size === 0) { | ||
// data was collected into the buffers list. | ||
buffer = Buffer.concat(buffers, pos); | ||
} else if (pos < size) { | ||
buffer = buffer.slice(0, pos); | ||
} | ||
if (encoding) buffer = buffer.toString(encoding); | ||
return buffer; | ||
}; | ||
// Used by binding.open and friends | ||
function stringToFlags(flag) { | ||
// Only mess with strings | ||
if (typeof flag !== 'string') { | ||
return flag; | ||
} | ||
// O_EXCL is mandated by POSIX, Windows supports it too. | ||
// Let's add a check anyway, just in case. | ||
if (!O_EXCL && ~flag.indexOf('x')) { | ||
throw errnoException('ENOSYS', 'fs.open(O_EXCL)'); | ||
} | ||
switch (flag) { | ||
case 'r' : return O_RDONLY; | ||
case 'rs' : return O_RDONLY | O_SYNC; | ||
case 'r+' : return O_RDWR; | ||
case 'rs+' : return O_RDWR | O_SYNC; | ||
case 'w' : return O_TRUNC | O_CREAT | O_WRONLY; | ||
case 'wx' : // fall through | ||
case 'xw' : return O_TRUNC | O_CREAT | O_WRONLY | O_EXCL; | ||
case 'w+' : return O_TRUNC | O_CREAT | O_RDWR; | ||
case 'wx+': // fall through | ||
case 'xw+': return O_TRUNC | O_CREAT | O_RDWR | O_EXCL; | ||
case 'a' : return O_APPEND | O_CREAT | O_WRONLY; | ||
case 'ax' : // fall through | ||
case 'xa' : return O_APPEND | O_CREAT | O_WRONLY | O_EXCL; | ||
case 'a+' : return O_APPEND | O_CREAT | O_RDWR; | ||
case 'ax+': // fall through | ||
case 'xa+': return O_APPEND | O_CREAT | O_RDWR | O_EXCL; | ||
} | ||
throw new Error('Unknown file open flag: ' + flag); | ||
} | ||
// exported but hidden, only used by test/simple/test-fs-open-flags.js | ||
Object.defineProperty(exports, '_stringToFlags', { | ||
enumerable: false, | ||
value: stringToFlags | ||
}); | ||
// Yes, the follow could be easily DRYed up but I provide the explicit | ||
// list to make the arguments clear. | ||
fs.close = function(fd, callback) { | ||
binding.close(fd, makeCallback(callback)); | ||
}; | ||
fs.closeSync = function(fd) { | ||
return binding.close(fd); | ||
}; | ||
function modeNum(m, def) { | ||
switch (typeof m) { | ||
case 'number': return m; | ||
case 'string': return parseInt(m, 8); | ||
default: | ||
if (def) { | ||
return modeNum(def); | ||
} else { | ||
return undefined; | ||
} | ||
} | ||
} | ||
fs.open = function(path, flags, mode, callback) { | ||
callback = makeCallback(arguments[arguments.length - 1]); | ||
mode = modeNum(mode, 438 /*=0666*/); | ||
if (!nullCheck(path, callback)) return; | ||
binding.open(pathModule._makeLong(path), | ||
stringToFlags(flags), | ||
mode, | ||
callback); | ||
}; | ||
fs.openSync = function(path, flags, mode) { | ||
mode = modeNum(mode, 438 /*=0666*/); | ||
nullCheck(path); | ||
return binding.open(pathModule._makeLong(path), stringToFlags(flags), mode); | ||
}; | ||
fs.read = function(fd, buffer, offset, length, position, callback) { | ||
if (!Buffer.isBuffer(buffer)) { | ||
// legacy string interface (fd, length, position, encoding, callback) | ||
var cb = arguments[4], | ||
encoding = arguments[3]; | ||
assertEncoding(encoding); | ||
position = arguments[2]; | ||
length = arguments[1]; | ||
buffer = new Buffer(length); | ||
offset = 0; | ||
callback = function(err, bytesRead) { | ||
if (!cb) return; | ||
var str = (bytesRead > 0) ? buffer.toString(encoding, 0, bytesRead) : ''; | ||
(cb)(err, str, bytesRead); | ||
}; | ||
} | ||
function wrapper(err, bytesRead) { | ||
// Retain a reference to buffer so that it can't be GC'ed too soon. | ||
callback && callback(err, bytesRead || 0, buffer); | ||
} | ||
binding.read(fd, buffer, offset, length, position, wrapper); | ||
}; | ||
fs.readSync = function(fd, buffer, offset, length, position) { | ||
var legacy = false; | ||
if (!Buffer.isBuffer(buffer)) { | ||
// legacy string interface (fd, length, position, encoding, callback) | ||
legacy = true; | ||
var encoding = arguments[3]; | ||
assertEncoding(encoding); | ||
position = arguments[2]; | ||
length = arguments[1]; | ||
buffer = new Buffer(length); | ||
offset = 0; | ||
} | ||
var r = binding.read(fd, buffer, offset, length, position); | ||
if (!legacy) { | ||
return r; | ||
} | ||
var str = (r > 0) ? buffer.toString(encoding, 0, r) : ''; | ||
return [str, r]; | ||
}; | ||
fs.write = function(fd, buffer, offset, length, position, callback) { | ||
if (!Buffer.isBuffer(buffer)) { | ||
// legacy string interface (fd, data, position, encoding, callback) | ||
callback = arguments[4]; | ||
position = arguments[2]; | ||
assertEncoding(arguments[3]); | ||
buffer = new Buffer('' + arguments[1], arguments[3]); | ||
offset = 0; | ||
length = buffer.length; | ||
} | ||
if (!length) { | ||
if (typeof callback == 'function') { | ||
process.nextTick(function() { | ||
callback(undefined, 0); | ||
}); | ||
} | ||
return; | ||
} | ||
callback = maybeCallback(callback); | ||
function wrapper(err, written) { | ||
// Retain a reference to buffer so that it can't be GC'ed too soon. | ||
callback(err, written || 0, buffer); | ||
} | ||
binding.write(fd, buffer, offset, length, position, wrapper); | ||
}; | ||
fs.writeSync = function(fd, buffer, offset, length, position) { | ||
if (!Buffer.isBuffer(buffer)) { | ||
// legacy string interface (fd, data, position, encoding) | ||
position = arguments[2]; | ||
assertEncoding(arguments[3]); | ||
buffer = new Buffer('' + arguments[1], arguments[3]); | ||
offset = 0; | ||
length = buffer.length; | ||
} | ||
if (!length) return 0; | ||
return binding.write(fd, buffer, offset, length, position); | ||
}; | ||
fs.rename = function(oldPath, newPath, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(oldPath, callback)) return; | ||
if (!nullCheck(newPath, callback)) return; | ||
binding.rename(pathModule._makeLong(oldPath), | ||
pathModule._makeLong(newPath), | ||
callback); | ||
}; | ||
fs.renameSync = function(oldPath, newPath) { | ||
nullCheck(oldPath); | ||
nullCheck(newPath); | ||
return binding.rename(pathModule._makeLong(oldPath), | ||
pathModule._makeLong(newPath)); | ||
}; | ||
fs.truncate = function(path, len, callback) { | ||
if (typeof path === 'number') { | ||
// legacy | ||
return fs.ftruncate(path, len, callback); | ||
} | ||
if (typeof len === 'function') { | ||
callback = len; | ||
len = 0; | ||
} else if (typeof len === 'undefined') { | ||
len = 0; | ||
} | ||
callback = maybeCallback(callback); | ||
fs.open(path, 'w', function(er, fd) { | ||
if (er) return callback(er); | ||
binding.ftruncate(fd, len, function(er) { | ||
fs.close(fd, function(er2) { | ||
callback(er || er2); | ||
}); | ||
}); | ||
}); | ||
}; | ||
fs.truncateSync = function(path, len) { | ||
if (typeof path === 'number') { | ||
// legacy | ||
return fs.ftruncateSync(path, len); | ||
} | ||
if (typeof len === 'undefined') { | ||
len = 0; | ||
} | ||
// allow error to be thrown, but still close fd. | ||
var fd = fs.openSync(path, 'w'); | ||
try { | ||
var ret = fs.ftruncateSync(fd, len); | ||
} finally { | ||
fs.closeSync(fd); | ||
} | ||
return ret; | ||
}; | ||
fs.ftruncate = function(fd, len, callback) { | ||
if (typeof len === 'function') { | ||
callback = len; | ||
len = 0; | ||
} else if (typeof len === 'undefined') { | ||
len = 0; | ||
} | ||
binding.ftruncate(fd, len, makeCallback(callback)); | ||
}; | ||
fs.ftruncateSync = function(fd, len) { | ||
if (typeof len === 'undefined') { | ||
len = 0; | ||
} | ||
return binding.ftruncate(fd, len); | ||
}; | ||
fs.rmdir = function(path, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.rmdir(pathModule._makeLong(path), callback); | ||
}; | ||
fs.rmdirSync = function(path) { | ||
nullCheck(path); | ||
return binding.rmdir(pathModule._makeLong(path)); | ||
}; | ||
fs.fdatasync = function(fd, callback) { | ||
binding.fdatasync(fd, makeCallback(callback)); | ||
}; | ||
fs.fdatasyncSync = function(fd) { | ||
return binding.fdatasync(fd); | ||
}; | ||
fs.fsync = function(fd, callback) { | ||
binding.fsync(fd, makeCallback(callback)); | ||
}; | ||
fs.fsyncSync = function(fd) { | ||
return binding.fsync(fd); | ||
}; | ||
fs.mkdir = function(path, mode, callback) { | ||
if (typeof mode === 'function') callback = mode; | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.mkdir(pathModule._makeLong(path), | ||
modeNum(mode, 511 /*=0777*/), | ||
callback); | ||
}; | ||
fs.mkdirSync = function(path, mode) { | ||
nullCheck(path); | ||
return binding.mkdir(pathModule._makeLong(path), | ||
modeNum(mode, 511 /*=0777*/)); | ||
}; | ||
fs.sendfile = function(outFd, inFd, inOffset, length, callback) { | ||
binding.sendfile(outFd, inFd, inOffset, length, makeCallback(callback)); | ||
}; | ||
fs.sendfileSync = function(outFd, inFd, inOffset, length) { | ||
return binding.sendfile(outFd, inFd, inOffset, length); | ||
}; | ||
fs.readdir = function(path, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.readdir(pathModule._makeLong(path), callback); | ||
}; | ||
fs.readdirSync = function(path) { | ||
nullCheck(path); | ||
return binding.readdir(pathModule._makeLong(path)); | ||
}; | ||
fs.fstat = function(fd, callback) { | ||
binding.fstat(fd, makeCallback(callback)); | ||
}; | ||
fs.lstat = function(path, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.lstat(pathModule._makeLong(path), callback); | ||
}; | ||
fs.stat = function(path, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.stat(pathModule._makeLong(path), callback); | ||
}; | ||
fs.fstatSync = function(fd) { | ||
return binding.fstat(fd); | ||
}; | ||
fs.lstatSync = function(path) { | ||
nullCheck(path); | ||
return binding.lstat(pathModule._makeLong(path)); | ||
}; | ||
fs.statSync = function(path) { | ||
nullCheck(path); | ||
return binding.stat(pathModule._makeLong(path)); | ||
}; | ||
fs.readlink = function(path, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.readlink(pathModule._makeLong(path), callback); | ||
}; | ||
fs.readlinkSync = function(path) { | ||
nullCheck(path); | ||
return binding.readlink(pathModule._makeLong(path)); | ||
}; | ||
function preprocessSymlinkDestination(path, type) { | ||
if (!isWindows) { | ||
// No preprocessing is needed on Unix. | ||
return path; | ||
} else if (type === 'junction') { | ||
// Junctions paths need to be absolute and \\?\-prefixed. | ||
return pathModule._makeLong(path); | ||
} else { | ||
// Windows symlinks don't tolerate forward slashes. | ||
return ('' + path).replace(/\//g, '\\'); | ||
} | ||
} | ||
fs.symlink = function(destination, path, type_, callback) { | ||
var type = (typeof type_ === 'string' ? type_ : null); | ||
var callback = makeCallback(arguments[arguments.length - 1]); | ||
if (!nullCheck(destination, callback)) return; | ||
if (!nullCheck(path, callback)) return; | ||
binding.symlink(preprocessSymlinkDestination(destination, type), | ||
pathModule._makeLong(path), | ||
type, | ||
callback); | ||
}; | ||
fs.symlinkSync = function(destination, path, type) { | ||
type = (typeof type === 'string' ? type : null); | ||
nullCheck(destination); | ||
nullCheck(path); | ||
return binding.symlink(preprocessSymlinkDestination(destination, type), | ||
pathModule._makeLong(path), | ||
type); | ||
}; | ||
fs.link = function(srcpath, dstpath, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(srcpath, callback)) return; | ||
if (!nullCheck(dstpath, callback)) return; | ||
binding.link(pathModule._makeLong(srcpath), | ||
pathModule._makeLong(dstpath), | ||
callback); | ||
}; | ||
fs.linkSync = function(srcpath, dstpath) { | ||
nullCheck(srcpath); | ||
nullCheck(dstpath); | ||
return binding.link(pathModule._makeLong(srcpath), | ||
pathModule._makeLong(dstpath)); | ||
}; | ||
fs.unlink = function(path, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.unlink(pathModule._makeLong(path), callback); | ||
}; | ||
fs.unlinkSync = function(path) { | ||
nullCheck(path); | ||
return binding.unlink(pathModule._makeLong(path)); | ||
}; | ||
fs.fchmod = function(fd, mode, callback) { | ||
binding.fchmod(fd, modeNum(mode), makeCallback(callback)); | ||
}; | ||
fs.fchmodSync = function(fd, mode) { | ||
return binding.fchmod(fd, modeNum(mode)); | ||
}; | ||
if (constants.hasOwnProperty('O_SYMLINK')) { | ||
fs.lchmod = function(path, mode, callback) { | ||
callback = maybeCallback(callback); | ||
fs.open(path, constants.O_WRONLY | constants.O_SYMLINK, function(err, fd) { | ||
if (err) { | ||
callback(err); | ||
return; | ||
} | ||
// prefer to return the chmod error, if one occurs, | ||
// but still try to close, and report closing errors if they occur. | ||
fs.fchmod(fd, mode, function(err) { | ||
fs.close(fd, function(err2) { | ||
callback(err || err2); | ||
}); | ||
}); | ||
}); | ||
}; | ||
fs.lchmodSync = function(path, mode) { | ||
var fd = fs.openSync(path, constants.O_WRONLY | constants.O_SYMLINK); | ||
// prefer to return the chmod error, if one occurs, | ||
// but still try to close, and report closing errors if they occur. | ||
var err, err2; | ||
try { | ||
var ret = fs.fchmodSync(fd, mode); | ||
} catch (er) { | ||
err = er; | ||
} | ||
try { | ||
fs.closeSync(fd); | ||
} catch (er) { | ||
err2 = er; | ||
} | ||
if (err || err2) throw (err || err2); | ||
return ret; | ||
}; | ||
} | ||
fs.chmod = function(path, mode, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.chmod(pathModule._makeLong(path), | ||
modeNum(mode), | ||
callback); | ||
}; | ||
fs.chmodSync = function(path, mode) { | ||
nullCheck(path); | ||
return binding.chmod(pathModule._makeLong(path), modeNum(mode)); | ||
}; | ||
if (constants.hasOwnProperty('O_SYMLINK')) { | ||
fs.lchown = function(path, uid, gid, callback) { | ||
callback = maybeCallback(callback); | ||
fs.open(path, constants.O_WRONLY | constants.O_SYMLINK, function(err, fd) { | ||
if (err) { | ||
callback(err); | ||
return; | ||
} | ||
fs.fchown(fd, uid, gid, callback); | ||
}); | ||
}; | ||
fs.lchownSync = function(path, uid, gid) { | ||
var fd = fs.openSync(path, constants.O_WRONLY | constants.O_SYMLINK); | ||
return fs.fchownSync(fd, uid, gid); | ||
}; | ||
} | ||
fs.fchown = function(fd, uid, gid, callback) { | ||
binding.fchown(fd, uid, gid, makeCallback(callback)); | ||
}; | ||
fs.fchownSync = function(fd, uid, gid) { | ||
return binding.fchown(fd, uid, gid); | ||
}; | ||
fs.chown = function(path, uid, gid, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.chown(pathModule._makeLong(path), uid, gid, callback); | ||
}; | ||
fs.chownSync = function(path, uid, gid) { | ||
nullCheck(path); | ||
return binding.chown(pathModule._makeLong(path), uid, gid); | ||
}; | ||
// converts Date or number to a fractional UNIX timestamp | ||
function toUnixTimestamp(time) { | ||
if (typeof time == 'number') { | ||
return time; | ||
} | ||
if (time instanceof Date) { | ||
// convert to 123.456 UNIX timestamp | ||
return time.getTime() / 1000; | ||
} | ||
throw new Error('Cannot parse time: ' + time); | ||
} | ||
// exported for unit tests, not for public consumption | ||
fs._toUnixTimestamp = toUnixTimestamp; | ||
fs.utimes = function(path, atime, mtime, callback) { | ||
callback = makeCallback(callback); | ||
if (!nullCheck(path, callback)) return; | ||
binding.utimes(pathModule._makeLong(path), | ||
toUnixTimestamp(atime), | ||
toUnixTimestamp(mtime), | ||
callback); | ||
}; | ||
fs.utimesSync = function(path, atime, mtime) { | ||
nullCheck(path); | ||
atime = toUnixTimestamp(atime); | ||
mtime = toUnixTimestamp(mtime); | ||
binding.utimes(pathModule._makeLong(path), atime, mtime); | ||
}; | ||
fs.futimes = function(fd, atime, mtime, callback) { | ||
atime = toUnixTimestamp(atime); | ||
mtime = toUnixTimestamp(mtime); | ||
binding.futimes(fd, atime, mtime, makeCallback(callback)); | ||
}; | ||
fs.futimesSync = function(fd, atime, mtime) { | ||
atime = toUnixTimestamp(atime); | ||
mtime = toUnixTimestamp(mtime); | ||
binding.futimes(fd, atime, mtime); | ||
}; | ||
function writeAll(fd, buffer, offset, length, position, callback) { | ||
callback = maybeCallback(arguments[arguments.length - 1]); | ||
// write(fd, buffer, offset, length, position, callback) | ||
fs.write(fd, buffer, offset, length, position, function(writeErr, written) { | ||
if (writeErr) { | ||
fs.close(fd, function() { | ||
if (callback) callback(writeErr); | ||
}); | ||
} else { | ||
if (written === length) { | ||
fs.close(fd, callback); | ||
} else { | ||
offset += written; | ||
length -= written; | ||
position += written; | ||
writeAll(fd, buffer, offset, length, position, callback); | ||
} | ||
} | ||
}); | ||
} | ||
fs.writeFile = function(path, data, encoding_, callback) { | ||
var encoding = (typeof(encoding_) == 'string' ? encoding_ : 'utf8'); | ||
assertEncoding(encoding); | ||
callback = maybeCallback(arguments[arguments.length - 1]); | ||
fs.open(path, 'w', 438 /*=0666*/, function(openErr, fd) { | ||
if (openErr) { | ||
if (callback) callback(openErr); | ||
} else { | ||
var buffer = Buffer.isBuffer(data) ? data : new Buffer('' + data, | ||
encoding); | ||
writeAll(fd, buffer, 0, buffer.length, 0, callback); | ||
} | ||
}); | ||
}; | ||
fs.writeFileSync = function(path, data, encoding) { | ||
assertEncoding(encoding); | ||
var fd = fs.openSync(path, 'w'); | ||
if (!Buffer.isBuffer(data)) { | ||
data = new Buffer('' + data, encoding || 'utf8'); | ||
} | ||
var written = 0; | ||
var length = data.length; | ||
try { | ||
while (written < length) { | ||
written += fs.writeSync(fd, data, written, length - written, written); | ||
} | ||
} finally { | ||
fs.closeSync(fd); | ||
} | ||
}; | ||
fs.appendFile = function(path, data, encoding_, callback) { | ||
var encoding = (typeof(encoding_) == 'string' ? encoding_ : 'utf8'); | ||
assertEncoding(encoding); | ||
callback = maybeCallback(arguments[arguments.length - 1]); | ||
fs.open(path, 'a', 438 /*=0666*/, function(err, fd) { | ||
if (err) return callback(err); | ||
var buffer = Buffer.isBuffer(data) ? data : new Buffer('' + data, encoding); | ||
writeAll(fd, buffer, 0, buffer.length, null, callback); | ||
}); | ||
}; | ||
fs.appendFileSync = function(path, data, encoding) { | ||
assertEncoding(encoding); | ||
var fd = fs.openSync(path, 'a'); | ||
if (!Buffer.isBuffer(data)) { | ||
data = new Buffer('' + data, encoding || 'utf8'); | ||
} | ||
var written = 0; | ||
var position = null; | ||
var length = data.length; | ||
try { | ||
while (written < length) { | ||
written += fs.writeSync(fd, data, written, length - written, position); | ||
position += written; // XXX not safe with multiple concurrent writers? | ||
} | ||
} finally { | ||
fs.closeSync(fd); | ||
} | ||
}; | ||
function errnoException(errorno, syscall) { | ||
// TODO make this more compatible with ErrnoException from src/node.cc | ||
// Once all of Node is using this function the ErrnoException from | ||
// src/node.cc should be removed. | ||
var e = new Error(syscall + ' ' + errorno); | ||
e.errno = e.code = errorno; | ||
e.syscall = syscall; | ||
return e; | ||
} | ||
function FSWatcher() { | ||
EventEmitter.call(this); | ||
var self = this; | ||
var FSEvent = process.binding('fs_event_wrap').FSEvent; | ||
this._handle = new FSEvent(); | ||
this._handle.owner = this; | ||
this._handle.onchange = function(status, event, filename) { | ||
if (status) { | ||
self._handle.close(); | ||
self.emit('error', errnoException(errno, 'watch')); | ||
} else { | ||
self.emit('change', event, filename); | ||
} | ||
}; | ||
} | ||
util.inherits(FSWatcher, EventEmitter); | ||
FSWatcher.prototype.start = function(filename, persistent) { | ||
nullCheck(filename); | ||
var r = this._handle.start(pathModule._makeLong(filename), persistent); | ||
if (r) { | ||
this._handle.close(); | ||
throw errnoException(errno, 'watch'); | ||
} | ||
}; | ||
FSWatcher.prototype.close = function() { | ||
this._handle.close(); | ||
}; | ||
fs.watch = function(filename) { | ||
nullCheck(filename); | ||
var watcher; | ||
var options; | ||
var listener; | ||
if ('object' == typeof arguments[1]) { | ||
options = arguments[1]; | ||
listener = arguments[2]; | ||
} else { | ||
options = {}; | ||
listener = arguments[1]; | ||
} | ||
if (options.persistent === undefined) options.persistent = true; | ||
watcher = new FSWatcher(); | ||
watcher.start(filename, options.persistent); | ||
if (listener) { | ||
watcher.addListener('change', listener); | ||
} | ||
return watcher; | ||
}; | ||
// Stat Change Watchers | ||
function StatWatcher() { | ||
EventEmitter.call(this); | ||
var self = this; | ||
this._handle = new binding.StatWatcher(); | ||
// uv_fs_poll is a little more powerful than ev_stat but we curb it for | ||
// the sake of backwards compatibility | ||
var oldStatus = -1; | ||
this._handle.onchange = function(current, previous, newStatus) { | ||
if (oldStatus === -1 && | ||
newStatus === -1 && | ||
current.nlink === previous.nlink) return; | ||
oldStatus = newStatus; | ||
self.emit('change', current, previous); | ||
}; | ||
this._handle.onstop = function() { | ||
self.emit('stop'); | ||
}; | ||
} | ||
util.inherits(StatWatcher, EventEmitter); | ||
StatWatcher.prototype.start = function(filename, persistent, interval) { | ||
nullCheck(filename); | ||
this._handle.start(pathModule._makeLong(filename), persistent, interval); | ||
}; | ||
StatWatcher.prototype.stop = function() { | ||
this._handle.stop(); | ||
}; | ||
var statWatchers = {}; | ||
function inStatWatchers(filename) { | ||
return Object.prototype.hasOwnProperty.call(statWatchers, filename) && | ||
statWatchers[filename]; | ||
} | ||
fs.watchFile = function(filename) { | ||
nullCheck(filename); | ||
var stat; | ||
var listener; | ||
var options = { | ||
// Poll interval in milliseconds. 5007 is what libev used to use. It's | ||
// a little on the slow side but let's stick with it for now to keep | ||
// behavioral changes to a minimum. | ||
interval: 5007, | ||
persistent: true | ||
}; | ||
if ('object' == typeof arguments[1]) { | ||
options = util._extend(options, arguments[1]); | ||
listener = arguments[2]; | ||
} else { | ||
listener = arguments[1]; | ||
} | ||
if (!listener) { | ||
throw new Error('watchFile requires a listener function'); | ||
} | ||
if (inStatWatchers(filename)) { | ||
stat = statWatchers[filename]; | ||
} else { | ||
stat = statWatchers[filename] = new StatWatcher(); | ||
stat.start(filename, options.persistent, options.interval); | ||
} | ||
stat.addListener('change', listener); | ||
return stat; | ||
}; | ||
fs.unwatchFile = function(filename, listener) { | ||
nullCheck(filename); | ||
if (!inStatWatchers(filename)) return; | ||
var stat = statWatchers[filename]; | ||
if (typeof listener === 'function') { | ||
stat.removeListener('change', listener); | ||
} else { | ||
stat.removeAllListeners('change'); | ||
} | ||
if (stat.listeners('change').length === 0) { | ||
stat.stop(); | ||
statWatchers[filename] = undefined; | ||
} | ||
}; | ||
// Realpath | ||
// Not using realpath(2) because it's bad. | ||
// See: http://insanecoding.blogspot.com/2007/11/pathmax-simply-isnt.html | ||
var normalize = pathModule.normalize; | ||
// Regexp that finds the next partion of a (partial) path | ||
// result is [base_with_slash, base], e.g. ['somedir/', 'somedir'] | ||
if (isWindows) { | ||
var nextPartRe = /(.*?)(?:[\/\\]+|$)/g; | ||
} else { | ||
var nextPartRe = /(.*?)(?:[\/]+|$)/g; | ||
} | ||
// Regex to find the device root, including trailing slash. E.g. 'c:\\'. | ||
if (isWindows) { | ||
var splitRootRe = /^(?:[a-zA-Z]:|[\\\/]{2}[^\\\/]+[\\\/][^\\\/]+)?[\\\/]*/; | ||
} else { | ||
var splitRootRe = /^[\/]*/; | ||
} | ||
fs.realpathSync = function realpathSync(p, cache) { | ||
// make p is absolute | ||
p = pathModule.resolve(p); | ||
if (cache && Object.prototype.hasOwnProperty.call(cache, p)) { | ||
return cache[p]; | ||
} | ||
var original = p, | ||
seenLinks = {}, | ||
knownHard = {}; | ||
// current character position in p | ||
var pos; | ||
// the partial path so far, including a trailing slash if any | ||
var current; | ||
// the partial path without a trailing slash (except when pointing at a root) | ||
var base; | ||
// the partial path scanned in the previous round, with slash | ||
var previous; | ||
start(); | ||
function start() { | ||
// Skip over roots | ||
var m = splitRootRe.exec(p); | ||
pos = m[0].length; | ||
current = m[0]; | ||
base = m[0]; | ||
previous = ''; | ||
// On windows, check that the root exists. On unix there is no need. | ||
if (isWindows && !knownHard[base]) { | ||
fs.lstatSync(base); | ||
knownHard[base] = true; | ||
} | ||
} | ||
// walk down the path, swapping out linked pathparts for their real | ||
// values | ||
// NB: p.length changes. | ||
while (pos < p.length) { | ||
// find the next part | ||
nextPartRe.lastIndex = pos; | ||
var result = nextPartRe.exec(p); | ||
previous = current; | ||
current += result[0]; | ||
base = previous + result[1]; | ||
pos = nextPartRe.lastIndex; | ||
// continue if not a symlink | ||
if (knownHard[base] || (cache && cache[base] === base)) { | ||
continue; | ||
} | ||
var resolvedLink; | ||
if (cache && Object.prototype.hasOwnProperty.call(cache, base)) { | ||
// some known symbolic link. no need to stat again. | ||
resolvedLink = cache[base]; | ||
} else { | ||
var stat = fs.lstatSync(base); | ||
if (!stat.isSymbolicLink()) { | ||
knownHard[base] = true; | ||
if (cache) cache[base] = base; | ||
continue; | ||
} | ||
// read the link if it wasn't read before | ||
// dev/ino always return 0 on windows, so skip the check. | ||
var linkTarget = null; | ||
if (!isWindows) { | ||
var id = stat.dev.toString(32) + ':' + stat.ino.toString(32); | ||
if (seenLinks.hasOwnProperty(id)) { | ||
linkTarget = seenLinks[id]; | ||
} | ||
} | ||
if (linkTarget === null) { | ||
fs.statSync(base); | ||
linkTarget = fs.readlinkSync(base); | ||
} | ||
resolvedLink = pathModule.resolve(previous, linkTarget); | ||
// track this, if given a cache. | ||
if (cache) cache[base] = resolvedLink; | ||
if (!isWindows) seenLinks[id] = linkTarget; | ||
} | ||
// resolve the link, then start over | ||
p = pathModule.resolve(resolvedLink, p.slice(pos)); | ||
start(); | ||
} | ||
if (cache) cache[original] = p; | ||
return p; | ||
}; | ||
fs.realpath = function realpath(p, cache, cb) { | ||
if (typeof cb !== 'function') { | ||
cb = maybeCallback(cache); | ||
cache = null; | ||
} | ||
// make p is absolute | ||
p = pathModule.resolve(p); | ||
if (cache && Object.prototype.hasOwnProperty.call(cache, p)) { | ||
return process.nextTick(cb.bind(null, null, cache[p])); | ||
} | ||
var original = p, | ||
seenLinks = {}, | ||
knownHard = {}; | ||
// current character position in p | ||
var pos; | ||
// the partial path so far, including a trailing slash if any | ||
var current; | ||
// the partial path without a trailing slash (except when pointing at a root) | ||
var base; | ||
// the partial path scanned in the previous round, with slash | ||
var previous; | ||
start(); | ||
function start() { | ||
// Skip over roots | ||
var m = splitRootRe.exec(p); | ||
pos = m[0].length; | ||
current = m[0]; | ||
base = m[0]; | ||
previous = ''; | ||
// On windows, check that the root exists. On unix there is no need. | ||
if (isWindows && !knownHard[base]) { | ||
fs.lstat(base, function(err) { | ||
if (err) return cb(err); | ||
knownHard[base] = true; | ||
LOOP(); | ||
}); | ||
} else { | ||
process.nextTick(LOOP); | ||
} | ||
} | ||
// walk down the path, swapping out linked pathparts for their real | ||
// values | ||
function LOOP() { | ||
// stop if scanned past end of path | ||
if (pos >= p.length) { | ||
if (cache) cache[original] = p; | ||
return cb(null, p); | ||
} | ||
// find the next part | ||
nextPartRe.lastIndex = pos; | ||
var result = nextPartRe.exec(p); | ||
previous = current; | ||
current += result[0]; | ||
base = previous + result[1]; | ||
pos = nextPartRe.lastIndex; | ||
// continue if not a symlink | ||
if (knownHard[base] || (cache && cache[base] === base)) { | ||
return process.nextTick(LOOP); | ||
} | ||
if (cache && Object.prototype.hasOwnProperty.call(cache, base)) { | ||
// known symbolic link. no need to stat again. | ||
return gotResolvedLink(cache[base]); | ||
} | ||
return fs.lstat(base, gotStat); | ||
} | ||
function gotStat(err, stat) { | ||
if (err) return cb(err); | ||
// if not a symlink, skip to the next path part | ||
if (!stat.isSymbolicLink()) { | ||
knownHard[base] = true; | ||
if (cache) cache[base] = base; | ||
return process.nextTick(LOOP); | ||
} | ||
// stat & read the link if not read before | ||
// call gotTarget as soon as the link target is known | ||
// dev/ino always return 0 on windows, so skip the check. | ||
if (!isWindows) { | ||
var id = stat.dev.toString(32) + ':' + stat.ino.toString(32); | ||
if (seenLinks.hasOwnProperty(id)) { | ||
return gotTarget(null, seenLinks[id], base); | ||
} | ||
} | ||
fs.stat(base, function(err) { | ||
if (err) return cb(err); | ||
fs.readlink(base, function(err, target) { | ||
if (!isWindows) seenLinks[id] = target; | ||
gotTarget(err, target); | ||
}); | ||
}); | ||
} | ||
function gotTarget(err, target, base) { | ||
if (err) return cb(err); | ||
var resolvedLink = pathModule.resolve(previous, target); | ||
if (cache) cache[base] = resolvedLink; | ||
gotResolvedLink(resolvedLink); | ||
} | ||
function gotResolvedLink(resolvedLink) { | ||
// resolve the link, then start over | ||
p = pathModule.resolve(resolvedLink, p.slice(pos)); | ||
start(); | ||
} | ||
}; | ||
var pool; | ||
var minPoolSpace = 128; | ||
var poolSize = 40 * 1024; | ||
function allocNewPool() { | ||
pool = new Buffer(poolSize); | ||
pool = new Buffer(kPoolSize); | ||
pool.used = 0; | ||
} | ||
util.inherits(FSReadable, Readable); | ||
function FSReadable(path, options) { | ||
if (!options) | ||
options = {}; | ||
fs.createReadStream = function(path, options) { | ||
return new ReadStream(path, options); | ||
}; | ||
util.inherits(ReadStream, Readable); | ||
fs.ReadStream = ReadStream; | ||
function ReadStream(path, options) { | ||
if (!(this instanceof ReadStream)) | ||
return new ReadStream(path, options); | ||
// a little bit bigger buffer and water marks by default | ||
options = util._extend({ | ||
bufferSize: 64 * 1024, | ||
lowWaterMark: 16 * 1024, | ||
highWaterMark: 64 * 1024 | ||
}, options || {}); | ||
Readable.call(this, options); | ||
var state = this._readableState; | ||
this.path = path; | ||
this.flags = options.flags || 'r'; | ||
this.mode = options.mode || 438; //=0666 | ||
this.fd = options.fd || null; | ||
this.fd = options.hasOwnProperty('fd') ? options.fd : null; | ||
this.flags = options.hasOwnProperty('flags') ? options.flags : 'r'; | ||
this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/ | ||
// a little bit bigger buffer and watermark by default | ||
state.bufferSize = options.bufferSize || 64 * 1024; | ||
state.lowWaterMark = options.lowWaterMark || 16 * 1024; | ||
this.start = options.hasOwnProperty('start') ? options.start : undefined; | ||
this.end = options.hasOwnProperty('start') ? options.end : undefined; | ||
this.pos = undefined; | ||
if (this.encoding) { | ||
this._decoder = new StringDecoder(this.encoding); | ||
} | ||
if (this.start !== undefined) { | ||
if ('number' !== typeof this.start) { | ||
throw TypeError('start must be a Number'); | ||
} | ||
if (this.end === undefined) { | ||
this.end = Infinity; | ||
} else if ('number' !== typeof this.end) { | ||
throw TypeError('end must be a Number'); | ||
} | ||
var typeofStart = typeof this.start; | ||
if (typeofStart !== 'undefined') { | ||
if (typeofStart !== 'number') | ||
throw new TypeError('start must be a Number'); | ||
if (this.start > this.end) { | ||
throw new Error('start must be <= end'); | ||
} | ||
var typeofEnd = typeof this.end; | ||
if (typeofEnd === 'undefined') | ||
this.end = Infinity; | ||
else if (typeofEnd !== 'number') | ||
throw new TypeError('end must be a Number'); | ||
this.pos = this.start; | ||
@@ -63,22 +1436,31 @@ } | ||
this.open(); | ||
this.on('end', function() { | ||
this.destroy(); | ||
}); | ||
} | ||
FSReadable.prototype.open = function() { | ||
fs.FileReadStream = fs.ReadStream; // support the legacy name | ||
ReadStream.prototype.open = function() { | ||
var self = this; | ||
fs.open(this.path, this.flags, this.mode, function(er, fd) { | ||
if (er) { | ||
this.destroy(); | ||
this.emit('error', er); | ||
self.destroy(); | ||
self.emit('error', er); | ||
return; | ||
} | ||
this.fd = fd; | ||
this.emit('open', fd); | ||
}.bind(this)); | ||
self.fd = fd; | ||
self.emit('open', fd); | ||
// start the flow of data. | ||
self.read(); | ||
}); | ||
}; | ||
FSReadable.prototype._read = function(n, cb) { | ||
if (this.fd === null) { | ||
this.once('open', this._read.bind(this, n, cb)); | ||
return; | ||
} | ||
ReadStream.prototype._read = function(n, cb) { | ||
if (typeof this.fd !== 'number') | ||
return this.once('open', function() { | ||
this._read(n, cb); | ||
}); | ||
@@ -88,3 +1470,3 @@ if (this.destroyed) | ||
if (!pool || pool.length - pool.used < minPoolSpace) { | ||
if (!pool || pool.length - pool.used < kMinPoolSpace) { | ||
// discard the old pool. Can't add to the free list because | ||
@@ -96,2 +1478,5 @@ // users might have refernces to slices on it. | ||
// Grab another reference to the pool in the case that while we're | ||
// in the thread pool another read() finishes up the pool, and | ||
// allocates a new one. | ||
var thisPool = pool; | ||
@@ -104,13 +1489,19 @@ var toRead = Math.min(pool.length - pool.used, n); | ||
// already read everything we were supposed to read! | ||
// treat as EOF. | ||
if (toRead <= 0) | ||
return cb(); | ||
if (toRead <= 0) { | ||
this.emit('readable'); | ||
return; | ||
} | ||
// the actual read. | ||
var self = this; | ||
fs.read(this.fd, pool, pool.used, toRead, this.pos, onread); | ||
fs.read(this.fd, pool, pool.used, toRead, this.pos, onread.bind(this)); | ||
// move the pool positions, and internal position for reading. | ||
if (this.pos !== undefined) | ||
this.pos += toRead; | ||
pool.used += toRead; | ||
function onread(er, bytesRead) { | ||
if (er) { | ||
this.destroy(); | ||
self.destroy(); | ||
return cb(er); | ||
@@ -127,23 +1518,193 @@ } | ||
FSReadable.prototype.close = function(cb) { | ||
ReadStream.prototype.destroy = function() { | ||
if (this.destroyed) | ||
return; | ||
this.destroyed = true; | ||
if ('number' === typeof this.fd) | ||
this.close(); | ||
}; | ||
ReadStream.prototype.close = function(cb) { | ||
if (cb) | ||
this.once('close', cb); | ||
if (this.closed || this.fd === null) { | ||
if (this.fd === null) | ||
this.once('open', this.destroy); | ||
if (this.closed || 'number' !== typeof this.fd) { | ||
if ('number' !== typeof this.fd) | ||
this.once('open', close); | ||
return process.nextTick(this.emit.bind(this, 'close')); | ||
} | ||
this.closed = true; | ||
var self = this; | ||
close(); | ||
fs.close(this.fd, function(er) { | ||
if (er) | ||
function close() { | ||
fs.close(self.fd, function(er) { | ||
if (er) | ||
self.emit('error', er); | ||
else | ||
self.emit('close'); | ||
}); | ||
} | ||
}; | ||
fs.createWriteStream = function(path, options) { | ||
return new WriteStream(path, options); | ||
}; | ||
util.inherits(WriteStream, Writable); | ||
fs.WriteStream = WriteStream; | ||
function WriteStream(path, options) { | ||
if (!(this instanceof WriteStream)) | ||
return new WriteStream(path, options); | ||
// a little bit bigger buffer and water marks by default | ||
options = util._extend({ | ||
bufferSize: 64 * 1024, | ||
lowWaterMark: 16 * 1024, | ||
highWaterMark: 64 * 1024 | ||
}, options || {}); | ||
Writable.call(this, options); | ||
this.path = path; | ||
this.fd = null; | ||
this.fd = options.hasOwnProperty('fd') ? options.fd : null; | ||
this.flags = options.hasOwnProperty('flags') ? options.flags : 'w'; | ||
this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/ | ||
this.start = options.hasOwnProperty('start') ? options.start : undefined; | ||
this.pos = undefined; | ||
this.bytesWritten = 0; | ||
if (this.start !== undefined) { | ||
if ('number' !== typeof this.start) { | ||
throw TypeError('start must be a Number'); | ||
} | ||
if (this.start < 0) { | ||
throw new Error('start must be >= zero'); | ||
} | ||
this.pos = this.start; | ||
} | ||
if ('number' !== typeof this.fd) | ||
this.open(); | ||
// dispose on finish. | ||
this.once('finish', this.close); | ||
} | ||
fs.FileWriteStream = fs.WriteStream; // support the legacy name | ||
WriteStream.prototype.open = function() { | ||
fs.open(this.path, this.flags, this.mode, function(er, fd) { | ||
if (er) { | ||
this.destroy(); | ||
this.emit('error', er); | ||
else | ||
this.emit('close'); | ||
return; | ||
} | ||
this.fd = fd; | ||
this.emit('open', fd); | ||
}.bind(this)); | ||
}; | ||
FSReadable.prototype.destroy = function() { | ||
this.destroyed = true; | ||
fs.close(this.fd, function() {}); | ||
WriteStream.prototype._write = function(data, cb) { | ||
if (!Buffer.isBuffer(data)) | ||
return this.emit('error', new Error('Invalid data')); | ||
if (typeof this.fd !== 'number') | ||
return this.once('open', this._write.bind(this, data, cb)); | ||
fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) { | ||
if (er) { | ||
this.destroy(); | ||
return cb(er); | ||
} | ||
this.bytesWritten += bytes; | ||
cb(); | ||
}.bind(this)); | ||
if (this.pos !== undefined) | ||
this.pos += data.length; | ||
}; | ||
WriteStream.prototype.destroy = ReadStream.prototype.destroy; | ||
WriteStream.prototype.close = ReadStream.prototype.close; | ||
// There is no shutdown() for files. | ||
WriteStream.prototype.destroySoon = WriteStream.prototype.end; | ||
// SyncWriteStream is internal. DO NOT USE. | ||
// Temporary hack for process.stdout and process.stderr when piped to files. | ||
function SyncWriteStream(fd) { | ||
Stream.call(this); | ||
this.fd = fd; | ||
this.writable = true; | ||
this.readable = false; | ||
} | ||
util.inherits(SyncWriteStream, Stream); | ||
// Export | ||
fs.SyncWriteStream = SyncWriteStream; | ||
SyncWriteStream.prototype.write = function(data, arg1, arg2) { | ||
var encoding, cb; | ||
// parse arguments | ||
if (arg1) { | ||
if (typeof arg1 === 'string') { | ||
encoding = arg1; | ||
cb = arg2; | ||
} else if (typeof arg1 === 'function') { | ||
cb = arg1; | ||
} else { | ||
throw new Error('bad arg'); | ||
} | ||
} | ||
assertEncoding(encoding); | ||
// Change strings to buffers. SLOW | ||
if (typeof data == 'string') { | ||
data = new Buffer(data, encoding); | ||
} | ||
fs.writeSync(this.fd, data, 0, data.length); | ||
if (cb) { | ||
process.nextTick(cb); | ||
} | ||
return true; | ||
}; | ||
SyncWriteStream.prototype.end = function(data, arg1, arg2) { | ||
if (data) { | ||
this.write(data, arg1, arg2); | ||
} | ||
this.destroy(); | ||
}; | ||
SyncWriteStream.prototype.destroy = function() { | ||
fs.closeSync(this.fd); | ||
this.fd = null; | ||
this.emit('close'); | ||
return true; | ||
}; | ||
SyncWriteStream.prototype.destroySoon = SyncWriteStream.prototype.destroy; |
@@ -29,4 +29,4 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var util = require('util'); | ||
var Readable = require('_stream_readable'); | ||
var Writable = require('_stream_writable'); | ||
var Readable = require('./_stream_readable'); | ||
var Writable = require('./_stream_writable'); | ||
@@ -33,0 +33,0 @@ util.inherits(Duplex, Readable); |
@@ -28,3 +28,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var Transform = require('_stream_transform'); | ||
var Transform = require('./_stream_transform'); | ||
var util = require('util'); | ||
@@ -31,0 +31,0 @@ util.inherits(PassThrough, Transform); |
@@ -35,5 +35,2 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// cast to an int | ||
this.bufferSize = ~~this.bufferSize; | ||
// the argument passed to this._read(n,cb) | ||
@@ -66,3 +63,4 @@ this.bufferSize = options.hasOwnProperty('bufferSize') ? | ||
this.length = 0; | ||
this.pipes = []; | ||
this.pipes = null; | ||
this.pipesCount = 0; | ||
this.flowing = false; | ||
@@ -73,3 +71,5 @@ this.ended = false; | ||
this.sync = false; | ||
this.onread = onread.bind(stream); | ||
this.onread = function(er, data) { | ||
onread(stream, er, data); | ||
}; | ||
@@ -81,2 +81,10 @@ // whenever we return null, then we set a flag to say | ||
// when piping, we only care about 'readable' events that happen | ||
// after read()ing all the bytes and not getting any pushback. | ||
this.ranOut = false; | ||
// the number of writers that are awaiting a drain event in .pipe()s | ||
this.awaitDrain = 0; | ||
this.pipeChunkSize = null; | ||
this.decoder = null; | ||
@@ -95,2 +103,6 @@ if (options.encoding) { | ||
this._readableState = new ReadableState(options, this); | ||
// legacy | ||
this.readable = true; | ||
Stream.apply(this); | ||
@@ -111,3 +123,3 @@ } | ||
if (isNaN(n)) | ||
if (isNaN(n) || n === null) | ||
return state.length; | ||
@@ -175,7 +187,2 @@ | ||
// if we currently have *nothing*, then always try to get *something* | ||
// no matter what the high water mark says. | ||
if (state.length === 0) | ||
doRead = true; | ||
// however, if we've ended, then there's no point, and if we're already | ||
@@ -187,5 +194,7 @@ // reading, then it's unnecessary. | ||
if (doRead) { | ||
var sync = true; | ||
state.reading = true; | ||
state.sync = true; | ||
// if the length is currently zero, then we *need* a readable event. | ||
if (state.length === 0) | ||
state.needReadable = true; | ||
// call internal read method | ||
@@ -215,7 +224,12 @@ this._read(state.bufferSize, state.onread); | ||
// If we have nothing in the buffer, then we want to know | ||
// as soon as we *do* get something into the buffer. | ||
if (state.length === 0 && !state.ended) | ||
state.needReadable = true; | ||
return ret; | ||
}; | ||
function onread(er, chunk) { | ||
var state = this._readableState; | ||
function onread(stream, er, chunk) { | ||
var state = stream._readableState; | ||
var sync = state.sync; | ||
@@ -225,3 +239,3 @@ | ||
if (er) | ||
return this.emit('error', er); | ||
return stream.emit('error', er); | ||
@@ -232,3 +246,3 @@ if (!chunk || !chunk.length) { | ||
if (state.decoder) { | ||
chunk = state.decoder.end(); | ||
chunk = state.decoder.end && state.decoder.end(); | ||
if (chunk && chunk.length) { | ||
@@ -246,6 +260,6 @@ state.buffer.push(chunk); | ||
state.emittedReadable = true; | ||
this.emit('readable'); | ||
stream.emit('readable'); | ||
} | ||
} else | ||
endReadable(this); | ||
endReadable(stream); | ||
} | ||
@@ -266,8 +280,8 @@ return; | ||
// and we haven't ended, then don't bother telling the user | ||
// that it's time to read more data. Otherwise, that'll | ||
// probably kick off another stream.read(), which can trigger | ||
// that it's time to read more data. Otherwise, emitting 'readable' | ||
// probably will trigger another stream.read(), which can trigger | ||
// another _read(n,cb) before this one returns! | ||
if (state.length <= state.lowWaterMark) { | ||
state.reading = true; | ||
this._read(state.bufferSize, state.onread); | ||
stream._read(state.bufferSize, state.onread); | ||
return; | ||
@@ -280,3 +294,3 @@ } | ||
state.emittedReadable = true; | ||
this.emit('readable'); | ||
stream.emit('readable'); | ||
} | ||
@@ -291,3 +305,5 @@ } | ||
Readable.prototype._read = function(n, cb) { | ||
process.nextTick(cb.bind(this, new Error('not implemented'))); | ||
process.nextTick(function() { | ||
cb(new Error('not implemented')); | ||
}); | ||
}; | ||
@@ -298,6 +314,16 @@ | ||
var state = this._readableState; | ||
if (!pipeOpts) | ||
pipeOpts = {}; | ||
state.pipes.push(dest); | ||
switch (state.pipesCount) { | ||
case 0: | ||
state.pipes = dest; | ||
break; | ||
case 1: | ||
state.pipes = [state.pipes, dest]; | ||
break; | ||
default: | ||
state.pipes.push(dest); | ||
break; | ||
} | ||
state.pipesCount += 1; | ||
if ((!pipeOpts || pipeOpts.end !== false) && | ||
@@ -313,2 +339,5 @@ dest !== process.stdout && | ||
if (pipeOpts && pipeOpts.chunkSize) | ||
state.pipeChunkSize = pipeOpts.chunkSize; | ||
function onend() { | ||
@@ -318,8 +347,55 @@ dest.end(); | ||
// when the dest drains, it reduces the awaitDrain counter | ||
// on the source. This would be more elegant with a .once() | ||
// handler in flow(), but adding and removing repeatedly is | ||
// too slow. | ||
var ondrain = pipeOnDrain(src); | ||
dest.on('drain', ondrain); | ||
dest.on('unpipe', function(readable) { | ||
if (readable === src) | ||
dest.removeListener('drain', ondrain); | ||
// if the reader is waiting for a drain event from this | ||
// specific writer, then it would cause it to never start | ||
// flowing again. | ||
// So, if this is awaiting a drain, then we just call it now. | ||
// If we don't know, then assume that we are waiting for one. | ||
if (!dest._writableState || dest._writableState.needDrain) | ||
ondrain(); | ||
}); | ||
// if the dest has an error, then stop piping into it. | ||
// however, don't suppress the throwing behavior for this. | ||
dest.once('error', function(er) { | ||
unpipe(); | ||
if (dest.listeners('error').length === 0) | ||
dest.emit('error', er); | ||
}); | ||
// if the dest emits close, then presumably there's no point writing | ||
// to it any more. | ||
dest.on('close', unpipe); | ||
dest.on('finish', function() { | ||
dest.removeListener('close', unpipe); | ||
}); | ||
function unpipe() { | ||
src.unpipe(dest); | ||
} | ||
// tell the dest that it's being piped to | ||
dest.emit('pipe', src); | ||
// start the flow. | ||
// start the flow if it hasn't been started already. | ||
if (!state.flowing) { | ||
// the handler that waits for readable events after all | ||
// the data gets sucked out in flow. | ||
// This would be easier to follow with a .once() handler | ||
// in flow(), but that is too slow. | ||
this.on('readable', pipeOnReadable); | ||
state.flowing = true; | ||
process.nextTick(flow.bind(null, src, pipeOpts)); | ||
process.nextTick(function() { | ||
flow(src); | ||
}); | ||
} | ||
@@ -330,26 +406,36 @@ | ||
function flow(src, pipeOpts) { | ||
function pipeOnDrain(src) { | ||
return function() { | ||
var dest = this; | ||
var state = src._readableState; | ||
state.awaitDrain--; | ||
if (state.awaitDrain === 0) | ||
flow(src); | ||
}; | ||
} | ||
function flow(src) { | ||
var state = src._readableState; | ||
var chunk; | ||
var needDrain = 0; | ||
state.awaitDrain = 0; | ||
function ondrain() { | ||
needDrain--; | ||
if (needDrain === 0) | ||
flow(src, pipeOpts); | ||
function write(dest, i, list) { | ||
var written = dest.write(chunk); | ||
if (false === written) { | ||
state.awaitDrain++; | ||
} | ||
} | ||
while (state.pipes.length && | ||
null !== (chunk = src.read(pipeOpts.chunkSize))) { | ||
state.pipes.forEach(function(dest, i, list) { | ||
var written = dest.write(chunk); | ||
if (false === written) { | ||
needDrain++; | ||
dest.once('drain', ondrain); | ||
} | ||
}); | ||
while (state.pipesCount && | ||
null !== (chunk = src.read(state.pipeChunkSize))) { | ||
if (state.pipesCount === 1) | ||
write(state.pipes, 0, null); | ||
else | ||
state.pipes.forEach(write); | ||
src.emit('data', chunk); | ||
// if anyone needs a drain, then we have to wait for that. | ||
if (needDrain > 0) | ||
if (state.awaitDrain > 0) | ||
return; | ||
@@ -362,3 +448,3 @@ } | ||
// NB: This is a pretty rare edge case. | ||
if (state.pipes.length === 0) { | ||
if (state.pipesCount === 0) { | ||
state.flowing = false; | ||
@@ -374,20 +460,65 @@ | ||
// on the next readable event, start it over again. | ||
src.once('readable', flow.bind(null, src, pipeOpts)); | ||
state.ranOut = true; | ||
} | ||
function pipeOnReadable() { | ||
if (this._readableState.ranOut) { | ||
this._readableState.ranOut = false; | ||
flow(this); | ||
} | ||
} | ||
Readable.prototype.unpipe = function(dest) { | ||
var state = this._readableState; | ||
if (!dest) { | ||
// remove all of them. | ||
state.pipes.forEach(function(dest, i, list) { | ||
// if we're not piping anywhere, then do nothing. | ||
if (state.pipesCount === 0) | ||
return this; | ||
// just one destination. most common case. | ||
if (state.pipesCount === 1) { | ||
// passed in one, but it's not the right one. | ||
if (dest && dest !== state.pipes) | ||
return this; | ||
if (!dest) | ||
dest = state.pipes; | ||
// got a match. | ||
state.pipes = null; | ||
state.pipesCount = 0; | ||
this.removeListener('readable', pipeOnReadable); | ||
if (dest) | ||
dest.emit('unpipe', this); | ||
}, this); | ||
state.pipes.length = 0; | ||
} else { | ||
var i = state.pipes.indexOf(dest); | ||
if (i !== -1) { | ||
dest.emit('unpipe', this); | ||
state.pipes.splice(i, 1); | ||
} | ||
return this; | ||
} | ||
// slow case. multiple pipe destinations. | ||
if (!dest) { | ||
// remove all. | ||
var dests = state.pipes; | ||
var len = state.pipesCount; | ||
state.pipes = null; | ||
state.pipesCount = 0; | ||
this.removeListener('readable', pipeOnReadable); | ||
for (var i = 0; i < len; i++) | ||
dests[i].emit('unpipe', this); | ||
return this; | ||
} | ||
// try to find the right one. | ||
var i = state.pipes.indexOf(dest); | ||
if (i === -1) | ||
return this; | ||
state.pipes.splice(i, 1); | ||
state.pipesCount -= 1; | ||
if (state.pipesCount === 1) | ||
state.pipes = state.pipes[0]; | ||
dest.emit('unpipe', this); | ||
return this; | ||
@@ -413,11 +544,12 @@ }; | ||
emitDataEvents(this); | ||
return this.resume(); | ||
this.read(0); | ||
this.emit('resume'); | ||
}; | ||
Readable.prototype.pause = function() { | ||
emitDataEvents(this); | ||
return this.pause(); | ||
emitDataEvents(this, true); | ||
this.emit('pause'); | ||
}; | ||
function emitDataEvents(stream) { | ||
function emitDataEvents(stream, startPaused) { | ||
var state = stream._readableState; | ||
@@ -430,3 +562,3 @@ | ||
var paused = false; | ||
var paused = startPaused || false; | ||
var readable = false; | ||
@@ -441,2 +573,3 @@ | ||
readable = true; | ||
var c; | ||
@@ -454,2 +587,3 @@ while (!paused && (null !== (c = stream.read()))) | ||
paused = true; | ||
this.emit('pause'); | ||
}; | ||
@@ -460,9 +594,12 @@ | ||
if (readable) | ||
stream.emit('readable'); | ||
process.nextTick(function() { | ||
stream.emit('readable'); | ||
}); | ||
else | ||
this.read(0); | ||
this.emit('resume'); | ||
}; | ||
// now make it start, just in case it hadn't already. | ||
process.nextTick(function() { | ||
stream.emit('readable'); | ||
}); | ||
stream.emit('readable'); | ||
} | ||
@@ -477,6 +614,7 @@ | ||
var self = this; | ||
stream.on('end', function() { | ||
state.ended = true; | ||
if (state.decoder) { | ||
var chunk = state.decoder.end(); | ||
var chunk = state.decoder.end && state.decoder.end(); | ||
if (chunk && chunk.length) { | ||
@@ -489,6 +627,6 @@ state.buffer.push(chunk); | ||
if (state.length > 0) | ||
this.emit('readable'); | ||
self.emit('readable'); | ||
else | ||
endReadable(this); | ||
}.bind(this)); | ||
endReadable(self); | ||
}); | ||
@@ -503,3 +641,3 @@ stream.on('data', function(chunk) { | ||
state.length += chunk.length; | ||
this.emit('readable'); | ||
self.emit('readable'); | ||
@@ -511,3 +649,3 @@ // if not consumed, then pause the stream. | ||
} | ||
}.bind(this)); | ||
}); | ||
@@ -528,4 +666,4 @@ // proxy all the other methods. | ||
events.forEach(function(ev) { | ||
stream.on(ev, this.emit.bind(this, ev)); | ||
}.bind(this)); | ||
stream.on(ev, self.emit.bind(self, ev)); | ||
}); | ||
@@ -554,2 +692,5 @@ // consume some bytes. if not all is consumed, then | ||
if (state.length === 0 && !state.ended) | ||
state.needReadable = true; | ||
if (state.length <= state.lowWaterMark && paused) { | ||
@@ -639,3 +780,6 @@ stream.resume(); | ||
state.endEmitted = true; | ||
process.nextTick(stream.emit.bind(stream, 'end')); | ||
process.nextTick(function() { | ||
stream.readable = false; | ||
stream.emit('end'); | ||
}); | ||
} |
@@ -69,10 +69,13 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var Duplex = require('_stream_duplex'); | ||
var Duplex = require('./_stream_duplex'); | ||
var util = require('util'); | ||
util.inherits(Transform, Duplex); | ||
function TransformState() { | ||
function TransformState(stream) { | ||
this.buffer = []; | ||
this.transforming = false; | ||
this.pendingReadCb = null; | ||
this.output = function(chunk) { | ||
stream._output(chunk); | ||
}; | ||
} | ||
@@ -87,6 +90,6 @@ | ||
// bind output so that it can be passed around as a regular function. | ||
this._output = this._output.bind(this); | ||
var stream = this; | ||
// the queue of _write chunks that are pending being transformed | ||
this._transformState = new TransformState(); | ||
var ts = this._transformState = new TransformState(stream); | ||
@@ -96,5 +99,7 @@ // when the writable side finishes, then flush out anything remaining. | ||
if ('function' === typeof this._flush) | ||
this._flush(this._output, done.bind(this)); | ||
this._flush(ts.output, function(er) { | ||
done(stream, er); | ||
}); | ||
else | ||
done.call(this); | ||
done(stream); | ||
}); | ||
@@ -165,10 +170,9 @@ } | ||
var writecb = req[1]; | ||
var output = this._output; | ||
ts.transforming = true; | ||
this._transform(chunk, output, function(er, data) { | ||
this._transform(chunk, ts.output, function(er, data) { | ||
ts.transforming = false; | ||
if (data) | ||
output(data); | ||
ts.output(data); | ||
writecb(er); | ||
}.bind(this)); | ||
}); | ||
}; | ||
@@ -192,8 +196,8 @@ | ||
// otherwise, it's up to us to fill the rs buffer. | ||
var state = this._readableState; | ||
var len = state.length; | ||
state.buffer.push(chunk); | ||
state.length += chunk.length; | ||
if (state.needReadable) { | ||
state.needReadable = false; | ||
var rs = this._readableState; | ||
var len = rs.length; | ||
rs.buffer.push(chunk); | ||
rs.length += chunk.length; | ||
if (rs.needReadable) { | ||
rs.needReadable = false; | ||
this.emit('readable'); | ||
@@ -203,11 +207,11 @@ } | ||
function done(er) { | ||
function done(stream, er) { | ||
if (er) | ||
return this.emit('error', er); | ||
return stream.emit('error', er); | ||
// if there's nothing in the write buffer, then that means | ||
// that nothing more will ever be provided | ||
var ws = this._writableState; | ||
var rs = this._readableState; | ||
var ts = this._transformState; | ||
var ws = stream._writableState; | ||
var rs = stream._readableState; | ||
var ts = stream._transformState; | ||
@@ -230,5 +234,5 @@ if (ws.length) | ||
if (rs.length && rs.needReadable) | ||
this.emit('readable'); | ||
stream.emit('readable'); | ||
else if (rs.length === 0) | ||
this.emit('end'); | ||
stream.emit('end'); | ||
} |
@@ -32,2 +32,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var Stream = require('stream'); | ||
var Duplex = require('./_stream_duplex'); | ||
@@ -86,3 +87,5 @@ util.inherits(Writable, Stream); | ||
// the callback that's passed to _write(chunk,cb) | ||
this.onwrite = onwrite.bind(stream); | ||
this.onwrite = function(er) { | ||
onwrite(stream, er); | ||
}; | ||
@@ -101,3 +104,3 @@ // the callback that the user supplies to write(chunk,encoding,cb) | ||
// instanceof Writable, they're instanceof Readable. | ||
if (!(this instanceof Writable) && !(this instanceof Stream.Duplex)) | ||
if (!(this instanceof Writable) && !(this instanceof Duplex)) | ||
return new Writable(options); | ||
@@ -161,4 +164,4 @@ | ||
function onwrite(er) { | ||
var state = this._writableState; | ||
function onwrite(stream, er) { | ||
var state = stream._writableState; | ||
var sync = state.sync; | ||
@@ -175,7 +178,9 @@ var cb = state.writecb; | ||
if (sync) | ||
process.nextTick(cb.bind(null, er)); | ||
process.nextTick(function() { | ||
cb(er); | ||
}); | ||
else | ||
cb(er); | ||
} else | ||
this.emit('error', er); | ||
stream.emit('error', er); | ||
return; | ||
@@ -194,6 +199,7 @@ } | ||
if (state.length === 0 && (state.ended || state.ending)) { | ||
if (state.length === 0 && (state.ended || state.ending) && | ||
!state.finished && !state.finishing) { | ||
// emit 'finish' at the very end. | ||
state.finishing = true; | ||
this.emit('finish'); | ||
stream.emit('finish'); | ||
state.finished = true; | ||
@@ -218,3 +224,3 @@ return; | ||
state.writing = true; | ||
this._write(chunk, state.onwrite); | ||
stream._write(chunk, state.onwrite); | ||
} | ||
@@ -230,4 +236,4 @@ | ||
state.needDrain = false; | ||
this.emit('drain'); | ||
}.bind(this)); | ||
stream.emit('drain'); | ||
}); | ||
} | ||
@@ -237,3 +243,5 @@ } | ||
Writable.prototype._write = function(chunk, cb) { | ||
process.nextTick(cb.bind(this, new Error('not implemented'))); | ||
process.nextTick(function() { | ||
cb(new Error('not implemented')); | ||
}); | ||
}; | ||
@@ -251,3 +259,3 @@ | ||
this.write(chunk, encoding); | ||
else if (state.length === 0) { | ||
else if (state.length === 0 && !state.finishing && !state.finished) { | ||
state.finishing = true; | ||
@@ -254,0 +262,0 @@ this.emit('finish'); |
{ | ||
"name": "readable-stream", | ||
"version": "0.0.4", | ||
"version": "0.1.0", | ||
"description": "An exploration of a new kind of readable streams for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "readable.js", |
@@ -1,41 +0,1 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a passthrough stream. | ||
// basically just the most minimal sort of Transform stream. | ||
// Every written chunk gets output as-is. | ||
module.exports = PassThrough; | ||
var Transform = require('./transform.js'); | ||
var util = require('util'); | ||
util.inherits(PassThrough, Transform); | ||
function PassThrough(options) { | ||
if (!(this instanceof PassThrough)) | ||
return new PassThrough(options); | ||
Transform.call(this, options); | ||
} | ||
PassThrough.prototype._transform = function(chunk, output, cb) { | ||
cb(null, chunk); | ||
}; | ||
module.exports = require("./lib/_stream_passthrough.js") |
744
readable.js
@@ -1,743 +0,1 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
module.exports = Readable; | ||
Readable.ReadableState = ReadableState; | ||
var Stream = require('stream'); | ||
var util = require('util'); | ||
var assert = require('assert'); | ||
var StringDecoder; | ||
util.inherits(Readable, Stream); | ||
function ReadableState(options, stream) { | ||
options = options || {}; | ||
// cast to an int | ||
this.bufferSize = ~~this.bufferSize; | ||
// the argument passed to this._read(n,cb) | ||
this.bufferSize = options.hasOwnProperty('bufferSize') ? | ||
options.bufferSize : 16 * 1024; | ||
// the point at which it stops calling _read() to fill the buffer | ||
this.highWaterMark = options.hasOwnProperty('highWaterMark') ? | ||
options.highWaterMark : 16 * 1024; | ||
// the minimum number of bytes to buffer before emitting 'readable' | ||
// default to pushing everything out as fast as possible. | ||
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ? | ||
options.lowWaterMark : 0; | ||
// cast to ints. | ||
assert(typeof this.bufferSize === 'number'); | ||
assert(typeof this.lowWaterMark === 'number'); | ||
assert(typeof this.highWaterMark === 'number'); | ||
this.bufferSize = ~~this.bufferSize; | ||
this.lowWaterMark = ~~this.lowWaterMark; | ||
this.highWaterMark = ~~this.highWaterMark; | ||
assert(this.bufferSize >= 0); | ||
assert(this.lowWaterMark >= 0); | ||
assert(this.highWaterMark >= this.lowWaterMark, | ||
this.highWaterMark + '>=' + this.lowWaterMark); | ||
this.buffer = []; | ||
this.length = 0; | ||
this.pipes = null; | ||
this.pipesCount = 0; | ||
this.flowing = false; | ||
this.ended = false; | ||
this.endEmitted = false; | ||
this.reading = false; | ||
this.sync = false; | ||
this.onread = function(er, data) { | ||
onread(stream, er, data); | ||
}; | ||
// whenever we return null, then we set a flag to say | ||
// that we're awaiting a 'readable' event emission. | ||
this.needReadable = false; | ||
this.emittedReadable = false; | ||
// when piping, we only care about 'readable' events that happen | ||
// after read()ing all the bytes and not getting any pushback. | ||
this.ranOut = false; | ||
// the number of writers that are awaiting a drain event in .pipe()s | ||
this.awaitDrain = 0; | ||
this.pipeChunkSize = null; | ||
this.decoder = null; | ||
if (options.encoding) { | ||
if (!StringDecoder) | ||
StringDecoder = require('string_decoder').StringDecoder; | ||
this.decoder = new StringDecoder(options.encoding); | ||
} | ||
} | ||
function Readable(options) { | ||
if (!(this instanceof Readable)) | ||
return new Readable(options); | ||
this._readableState = new ReadableState(options, this); | ||
// legacy | ||
this.readable = true; | ||
Stream.apply(this); | ||
} | ||
// backwards compatibility. | ||
Readable.prototype.setEncoding = function(enc) { | ||
if (!StringDecoder) | ||
StringDecoder = require('string_decoder').StringDecoder; | ||
this._readableState.decoder = new StringDecoder(enc); | ||
}; | ||
function howMuchToRead(n, state) { | ||
if (state.length === 0 && state.ended) | ||
return 0; | ||
if (isNaN(n) || n === null) | ||
return state.length; | ||
if (n <= 0) | ||
return 0; | ||
// don't have that much. return null, unless we've ended. | ||
if (n > state.length) { | ||
if (!state.ended) { | ||
state.needReadable = true; | ||
return 0; | ||
} else | ||
return state.length; | ||
} | ||
return n; | ||
} | ||
// you can override either this method, or _read(n, cb) below. | ||
Readable.prototype.read = function(n) { | ||
var state = this._readableState; | ||
var nOrig = n; | ||
if (typeof n !== 'number' || n > 0) | ||
state.emittedReadable = false; | ||
n = howMuchToRead(n, state); | ||
// if we've ended, and we're now clear, then finish it up. | ||
if (n === 0 && state.ended) { | ||
endReadable(this); | ||
return null; | ||
} | ||
// All the actual chunk generation logic needs to be | ||
// *below* the call to _read. The reason is that in certain | ||
// synthetic stream cases, such as passthrough streams, _read | ||
// may be a completely synchronous operation which may change | ||
// the state of the read buffer, providing enough data when | ||
// before there was *not* enough. | ||
// | ||
// So, the steps are: | ||
// 1. Figure out what the state of things will be after we do | ||
// a read from the buffer. | ||
// | ||
// 2. If that resulting state will trigger a _read, then call _read. | ||
// Note that this may be asynchronous, or synchronous. Yes, it is | ||
// deeply ugly to write APIs this way, but that still doesn't mean | ||
// that the Readable class should behave improperly, as streams are | ||
// designed to be sync/async agnostic. | ||
// Take note if the _read call is sync or async (ie, if the read call | ||
// has returned yet), so that we know whether or not it's safe to emit | ||
// 'readable' etc. | ||
// | ||
// 3. Actually pull the requested chunks out of the buffer and return. | ||
// if we need a readable event, then we need to do some reading. | ||
var doRead = state.needReadable; | ||
// if we currently have less than the highWaterMark, then also read some | ||
if (state.length - n <= state.highWaterMark) | ||
doRead = true; | ||
// if we currently have *nothing*, then always try to get *something* | ||
// no matter what the high water mark says. | ||
if (state.length === 0) | ||
doRead = true; | ||
// however, if we've ended, then there's no point, and if we're already | ||
// reading, then it's unnecessary. | ||
if (state.ended || state.reading) | ||
doRead = false; | ||
if (doRead) { | ||
var sync = true; | ||
state.reading = true; | ||
state.sync = true; | ||
// call internal read method | ||
this._read(state.bufferSize, state.onread); | ||
state.sync = false; | ||
} | ||
// If _read called its callback synchronously, then `reading` | ||
// will be false, and we need to re-evaluate how much data we | ||
// can return to the user. | ||
if (doRead && !state.reading) | ||
n = howMuchToRead(nOrig, state); | ||
var ret; | ||
if (n > 0) | ||
ret = fromList(n, state.buffer, state.length, !!state.decoder); | ||
else | ||
ret = null; | ||
if (ret === null || ret.length === 0) { | ||
state.needReadable = true; | ||
n = 0; | ||
} | ||
state.length -= n; | ||
return ret; | ||
}; | ||
function onread(stream, er, chunk) { | ||
var state = stream._readableState; | ||
var sync = state.sync; | ||
state.reading = false; | ||
if (er) | ||
return stream.emit('error', er); | ||
if (!chunk || !chunk.length) { | ||
// eof | ||
state.ended = true; | ||
if (state.decoder) { | ||
chunk = state.decoder.end(); | ||
if (chunk && chunk.length) { | ||
state.buffer.push(chunk); | ||
state.length += chunk.length; | ||
} | ||
} | ||
// if we've ended and we have some data left, then emit | ||
// 'readable' now to make sure it gets picked up. | ||
if (!sync) { | ||
if (state.length > 0) { | ||
state.needReadable = false; | ||
if (!state.emittedReadable) { | ||
state.emittedReadable = true; | ||
stream.emit('readable'); | ||
} | ||
} else | ||
endReadable(stream); | ||
} | ||
return; | ||
} | ||
if (state.decoder) | ||
chunk = state.decoder.write(chunk); | ||
// update the buffer info. | ||
if (chunk) { | ||
state.length += chunk.length; | ||
state.buffer.push(chunk); | ||
} | ||
// if we haven't gotten enough to pass the lowWaterMark, | ||
// and we haven't ended, then don't bother telling the user | ||
// that it's time to read more data. Otherwise, emitting 'readable' | ||
// probably will trigger another stream.read(), which can trigger | ||
// another _read(n,cb) before this one returns! | ||
if (state.length <= state.lowWaterMark) { | ||
state.reading = true; | ||
stream._read(state.bufferSize, state.onread); | ||
return; | ||
} | ||
if (state.needReadable && !sync) { | ||
state.needReadable = false; | ||
if (!state.emittedReadable) { | ||
state.emittedReadable = true; | ||
stream.emit('readable'); | ||
} | ||
} | ||
} | ||
// abstract method. to be overridden in specific implementation classes. | ||
// call cb(er, data) where data is <= n in length. | ||
// for virtual (non-string, non-buffer) streams, "length" is somewhat | ||
// arbitrary, and perhaps not very meaningful. | ||
Readable.prototype._read = function(n, cb) { | ||
process.nextTick(function() { | ||
cb(new Error('not implemented')); | ||
}); | ||
}; | ||
Readable.prototype.pipe = function(dest, pipeOpts) { | ||
var src = this; | ||
var state = this._readableState; | ||
switch (state.pipesCount) { | ||
case 0: | ||
state.pipes = dest; | ||
break; | ||
case 1: | ||
state.pipes = [ state.pipes, dest ]; | ||
break; | ||
default: | ||
state.pipes.push(dest); | ||
break; | ||
} | ||
state.pipesCount += 1; | ||
if ((!pipeOpts || pipeOpts.end !== false) && | ||
dest !== process.stdout && | ||
dest !== process.stderr) { | ||
src.once('end', onend); | ||
dest.on('unpipe', function(readable) { | ||
if (readable === src) | ||
src.removeListener('end', onend); | ||
}); | ||
} | ||
if (pipeOpts && pipeOpts.chunkSize) | ||
state.pipeChunkSize = pipeOpts.chunkSize; | ||
function onend() { | ||
dest.end(); | ||
} | ||
// when the dest drains, it reduces the awaitDrain counter | ||
// on the source. This would be more elegant with a .once() | ||
// handler in flow(), but adding and removing repeatedly is | ||
// too slow. | ||
var ondrain = pipeOnDrain(src); | ||
dest.on('drain', ondrain); | ||
dest.on('unpipe', function(readable) { | ||
if (readable === src) | ||
dest.removeListener('drain', ondrain); | ||
// if the reader is waiting for a drain event from this | ||
// specific writer, then it would cause it to never start | ||
// flowing again. | ||
// So, if this is awaiting a drain, then we just call it now. | ||
// If we don't know, then assume that we are waiting for one. | ||
if (!dest._writableState || dest._writableState.needDrain) | ||
ondrain(); | ||
}); | ||
// if the dest has an error, then stop piping into it. | ||
// however, don't suppress the throwing behavior for this. | ||
dest.once('error', function(er) { | ||
unpipe(); | ||
if (dest.listeners('error').length === 0) | ||
dest.emit('error', er); | ||
}); | ||
// if the dest emits close, then presumably there's no point writing | ||
// to it any more. | ||
dest.on('close', unpipe); | ||
dest.on('finish', function() { | ||
dest.removeListener('close', unpipe); | ||
}); | ||
function unpipe() { | ||
src.unpipe(dest); | ||
} | ||
// tell the dest that it's being piped to | ||
dest.emit('pipe', src); | ||
// start the flow if it hasn't been started already. | ||
if (!state.flowing) { | ||
// the handler that waits for readable events after all | ||
// the data gets sucked out in flow. | ||
// This would be easier to follow with a .once() handler | ||
// in flow(), but that is too slow. | ||
this.on('readable', pipeOnReadable); | ||
state.flowing = true; | ||
process.nextTick(function() { | ||
flow(src); | ||
}); | ||
} | ||
return dest; | ||
}; | ||
function pipeOnDrain(src) { | ||
return function() { | ||
var dest = this; | ||
var state = src._readableState; | ||
state.awaitDrain --; | ||
if (state.awaitDrain === 0) | ||
flow(src); | ||
}; | ||
} | ||
function flow(src) { | ||
var state = src._readableState; | ||
var chunk; | ||
state.awaitDrain = 0; | ||
function write(dest, i, list) { | ||
var written = dest.write(chunk); | ||
if (false === written) { | ||
state.awaitDrain++; | ||
} | ||
} | ||
while (state.pipesCount && | ||
null !== (chunk = src.read(state.pipeChunkSize))) { | ||
if (state.pipesCount === 1) | ||
write(state.pipes, 0, null); | ||
else | ||
state.pipes.forEach(write); | ||
src.emit('data', chunk); | ||
// if anyone needs a drain, then we have to wait for that. | ||
if (state.awaitDrain > 0) | ||
return; | ||
} | ||
// if every destination was unpiped, either before entering this | ||
// function, or in the while loop, then stop flowing. | ||
// | ||
// NB: This is a pretty rare edge case. | ||
if (state.pipesCount === 0) { | ||
state.flowing = false; | ||
// if there were data event listeners added, then switch to old mode. | ||
if (src.listeners('data').length) | ||
emitDataEvents(src); | ||
return; | ||
} | ||
// at this point, no one needed a drain, so we just ran out of data | ||
// on the next readable event, start it over again. | ||
state.ranOut = true; | ||
} | ||
function pipeOnReadable() { | ||
if (this._readableState.ranOut) { | ||
this._readableState.ranOut = false; | ||
flow(this); | ||
} | ||
} | ||
Readable.prototype.unpipe = function(dest) { | ||
var state = this._readableState; | ||
// if we're not piping anywhere, then do nothing. | ||
if (state.pipesCount === 0) | ||
return this; | ||
// just one destination. most common case. | ||
if (state.pipesCount === 1) { | ||
// passed in one, but it's not the right one. | ||
if (dest && dest !== state.pipes) | ||
return this; | ||
if (!dest) | ||
dest = state.pipes; | ||
// got a match. | ||
state.pipes = null; | ||
state.pipesCount = 0; | ||
this.removeListener('readable', pipeOnReadable); | ||
if (dest) | ||
dest.emit('unpipe', this); | ||
return this; | ||
} | ||
// slow case. multiple pipe destinations. | ||
if (!dest) { | ||
// remove all. | ||
var dests = state.pipes; | ||
var len = state.pipesCount; | ||
state.pipes = null; | ||
state.pipesCount = 0; | ||
this.removeListener('readable', pipeOnReadable); | ||
for (var i = 0; i < len; i++) | ||
dests[i].emit('unpipe', this); | ||
return this; | ||
} | ||
// try to find the right one. | ||
var i = state.pipes.indexOf(dest); | ||
if (i === -1) | ||
return this; | ||
state.pipes.splice(i, 1); | ||
state.pipesCount -= 1; | ||
if (state.pipesCount === 1) | ||
state.pipes = state.pipes[0]; | ||
dest.emit('unpipe', this); | ||
return this; | ||
}; | ||
// kludge for on('data', fn) consumers. Sad. | ||
// This is *not* part of the new readable stream interface. | ||
// It is an ugly unfortunate mess of history. | ||
Readable.prototype.on = function(ev, fn) { | ||
// https://github.com/isaacs/readable-stream/issues/16 | ||
// if we're already flowing, then no need to set up data events. | ||
if (ev === 'data' && !this._readableState.flowing) | ||
emitDataEvents(this); | ||
return Stream.prototype.on.call(this, ev, fn); | ||
}; | ||
Readable.prototype.addListener = Readable.prototype.on; | ||
// pause() and resume() are remnants of the legacy readable stream API | ||
// If the user uses them, then switch into old mode. | ||
Readable.prototype.resume = function() { | ||
emitDataEvents(this); | ||
return this.resume(); | ||
}; | ||
Readable.prototype.pause = function() { | ||
emitDataEvents(this); | ||
return this.pause(); | ||
}; | ||
function emitDataEvents(stream) { | ||
var state = stream._readableState; | ||
if (state.flowing) { | ||
// https://github.com/isaacs/readable-stream/issues/16 | ||
throw new Error('Cannot switch to old mode now.'); | ||
} | ||
var paused = false; | ||
var readable = false; | ||
// convert to an old-style stream. | ||
stream.readable = true; | ||
stream.pipe = Stream.prototype.pipe; | ||
stream.on = stream.addEventListener = Stream.prototype.on; | ||
stream.on('readable', function() { | ||
readable = true; | ||
var c; | ||
while (!paused && (null !== (c = stream.read()))) | ||
stream.emit('data', c); | ||
if (c === null) { | ||
readable = false; | ||
stream._readableState.needReadable = true; | ||
} | ||
}); | ||
stream.pause = function() { | ||
paused = true; | ||
}; | ||
stream.resume = function() { | ||
paused = false; | ||
if (readable) | ||
stream.emit('readable'); | ||
}; | ||
// now make it start, just in case it hadn't already. | ||
process.nextTick(function() { | ||
stream.emit('readable'); | ||
}); | ||
} | ||
// wrap an old-style stream as the async data source. | ||
// This is *not* part of the readable stream interface. | ||
// It is an ugly unfortunate mess of history. | ||
Readable.prototype.wrap = function(stream) { | ||
var state = this._readableState; | ||
var paused = false; | ||
var self = this; | ||
stream.on('end', function() { | ||
state.ended = true; | ||
if (state.decoder) { | ||
var chunk = state.decoder.end(); | ||
if (chunk && chunk.length) { | ||
state.buffer.push(chunk); | ||
state.length += chunk.length; | ||
} | ||
} | ||
if (state.length > 0) | ||
self.emit('readable'); | ||
else | ||
endReadable(self); | ||
}); | ||
stream.on('data', function(chunk) { | ||
if (state.decoder) | ||
chunk = state.decoder.write(chunk); | ||
if (!chunk || !chunk.length) | ||
return; | ||
state.buffer.push(chunk); | ||
state.length += chunk.length; | ||
self.emit('readable'); | ||
// if not consumed, then pause the stream. | ||
if (state.length > state.lowWaterMark && !paused) { | ||
paused = true; | ||
stream.pause(); | ||
} | ||
}); | ||
// proxy all the other methods. | ||
// important when wrapping filters and duplexes. | ||
for (var i in stream) { | ||
if (typeof stream[i] === 'function' && | ||
typeof this[i] === 'undefined') { | ||
this[i] = function(method) { return function() { | ||
return stream[method].apply(stream, arguments); | ||
}}(i); | ||
} | ||
} | ||
// proxy certain important events. | ||
var events = ['error', 'close', 'destroy', 'pause', 'resume']; | ||
events.forEach(function(ev) { | ||
stream.on(ev, self.emit.bind(self, ev)); | ||
}); | ||
// consume some bytes. if not all is consumed, then | ||
// pause the underlying stream. | ||
this.read = function(n) { | ||
if (state.length === 0) { | ||
state.needReadable = true; | ||
return null; | ||
} | ||
if (isNaN(n) || n <= 0) | ||
n = state.length; | ||
if (n > state.length) { | ||
if (!state.ended) { | ||
state.needReadable = true; | ||
return null; | ||
} else | ||
n = state.length; | ||
} | ||
var ret = fromList(n, state.buffer, state.length, !!state.decoder); | ||
state.length -= n; | ||
if (state.length <= state.lowWaterMark && paused) { | ||
stream.resume(); | ||
paused = false; | ||
} | ||
if (state.length === 0 && state.ended) | ||
endReadable(this); | ||
return ret; | ||
}; | ||
}; | ||
// exposed for testing purposes only. | ||
Readable._fromList = fromList; | ||
// Pluck off n bytes from an array of buffers. | ||
// Length is the combined lengths of all the buffers in the list. | ||
function fromList(n, list, length, stringMode) { | ||
var ret; | ||
// nothing in the list, definitely empty. | ||
if (list.length === 0) { | ||
return null; | ||
} | ||
if (length === 0) | ||
ret = null; | ||
else if (!n || n >= length) { | ||
// read it all, truncate the array. | ||
if (stringMode) | ||
ret = list.join(''); | ||
else | ||
ret = Buffer.concat(list, length); | ||
list.length = 0; | ||
} else { | ||
// read just some of it. | ||
if (n < list[0].length) { | ||
// just take a part of the first list item. | ||
// slice is the same for buffers and strings. | ||
var buf = list[0]; | ||
ret = buf.slice(0, n); | ||
list[0] = buf.slice(n); | ||
} else if (n === list[0].length) { | ||
// first list is a perfect match | ||
ret = list.shift(); | ||
} else { | ||
// complex case. | ||
// we have enough to cover it, but it spans past the first buffer. | ||
if (stringMode) | ||
ret = ''; | ||
else | ||
ret = new Buffer(n); | ||
var c = 0; | ||
for (var i = 0, l = list.length; i < l && c < n; i++) { | ||
var buf = list[0]; | ||
var cpy = Math.min(n - c, buf.length); | ||
if (stringMode) | ||
ret += buf.slice(0, cpy); | ||
else | ||
buf.copy(ret, c, 0, cpy); | ||
if (cpy < buf.length) | ||
list[0] = buf.slice(cpy); | ||
else | ||
list.shift(); | ||
c += cpy; | ||
} | ||
} | ||
} | ||
return ret; | ||
} | ||
function endReadable(stream) { | ||
var state = stream._readableState; | ||
if (state.endEmitted) | ||
return; | ||
state.ended = true; | ||
state.endEmitted = true; | ||
process.nextTick(function() { | ||
stream.emit('end'); | ||
}); | ||
} | ||
module.exports = require("./lib/_stream_readable.js") |
@@ -84,6 +84,6 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
setInterval, | ||
setImmediate, | ||
global.setImmediate, | ||
clearTimeout, | ||
clearInterval, | ||
clearImmediate, | ||
global.clearImmediate, | ||
console, | ||
@@ -90,0 +90,0 @@ Buffer, |
@@ -24,3 +24,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var common = require('../common.js'); | ||
var R = require('../../readable'); | ||
var R = require('../../lib/_stream_readable.js'); | ||
var assert = require('assert'); | ||
@@ -27,0 +27,0 @@ |
@@ -27,3 +27,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var fs = require('fs'); | ||
var fs = require('../../fs'); | ||
var FSReadable = fs.ReadStream; | ||
@@ -30,0 +30,0 @@ |
232
transform.js
@@ -1,231 +0,1 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a transform stream is a readable/writable stream where you do | ||
// something with the data. Sometimes it's called a "filter", | ||
// but that's not a great name for it, since that implies a thing where | ||
// some bits pass through, and others are simply ignored. (That would | ||
// be a valid example of a transform, of course.) | ||
// | ||
// While the output is causally related to the input, it's not a | ||
// necessarily symmetric or synchronous transformation. For example, | ||
// a zlib stream might take multiple plain-text writes(), and then | ||
// emit a single compressed chunk some time in the future. | ||
// | ||
// Here's how this works: | ||
// | ||
// The Transform stream has all the aspects of the readable and writable | ||
// stream classes. When you write(chunk), that calls _write(chunk,cb) | ||
// internally, and returns false if there's a lot of pending writes | ||
// buffered up. When you call read(), that calls _read(n,cb) until | ||
// there's enough pending readable data buffered up. | ||
// | ||
// In a transform stream, the written data is placed in a buffer. When | ||
// _read(n,cb) is called, it transforms the queued up data, calling the | ||
// buffered _write cb's as it consumes chunks. If consuming a single | ||
// written chunk would result in multiple output chunks, then the first | ||
// outputted bit calls the readcb, and subsequent chunks just go into | ||
// the read buffer, and will cause it to emit 'readable' if necessary. | ||
// | ||
// This way, back-pressure is actually determined by the reading side, | ||
// since _read has to be called to start processing a new chunk. However, | ||
// a pathological inflate type of transform can cause excessive buffering | ||
// here. For example, imagine a stream where every byte of input is | ||
// interpreted as an integer from 0-255, and then results in that many | ||
// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in | ||
// 1kb of data being output. In this case, you could write a very small | ||
// amount of input, and end up with a very large amount of output. In | ||
// such a pathological inflating mechanism, there'd be no way to tell | ||
// the system to stop doing the transform. A single 4MB write could | ||
// cause the system to run out of memory. | ||
// | ||
// However, even in such a pathological case, only a single written chunk | ||
// would be consumed, and then the rest would wait (un-transformed) until | ||
// the results of the previous transformed chunk were consumed. Because | ||
// the transform happens on-demand, it will only transform as much as is | ||
// necessary to fill the readable buffer to the specified lowWaterMark. | ||
module.exports = Transform; | ||
var Duplex = require('./duplex.js'); | ||
var util = require('util'); | ||
util.inherits(Transform, Duplex); | ||
function TransformState(stream) { | ||
this.buffer = []; | ||
this.transforming = false; | ||
this.pendingReadCb = null; | ||
this.output = function(chunk) { | ||
stream._output(chunk); | ||
}; | ||
} | ||
function Transform(options) { | ||
if (!(this instanceof Transform)) | ||
return new Transform(options); | ||
Duplex.call(this, options); | ||
// bind output so that it can be passed around as a regular function. | ||
var stream = this; | ||
// the queue of _write chunks that are pending being transformed | ||
var ts = this._transformState = new TransformState(stream); | ||
// when the writable side finishes, then flush out anything remaining. | ||
this.once('finish', function() { | ||
if ('function' === typeof this._flush) | ||
this._flush(ts.output, function(er) { | ||
done(stream, er); | ||
}); | ||
else | ||
done(stream); | ||
}); | ||
} | ||
// This is the part where you do stuff! | ||
// override this function in implementation classes. | ||
// 'chunk' is an input chunk. | ||
// | ||
// Call `output(newChunk)` to pass along transformed output | ||
// to the readable side. You may call 'output' zero or more times. | ||
// | ||
// Call `cb(err)` when you are done with this chunk. If you pass | ||
// an error, then that'll put the hurt on the whole operation. If you | ||
// never call cb(), then you'll never get another chunk. | ||
Transform.prototype._transform = function(chunk, output, cb) { | ||
throw new Error('not implemented'); | ||
}; | ||
Transform.prototype._write = function(chunk, cb) { | ||
var ts = this._transformState; | ||
var rs = this._readableState; | ||
ts.buffer.push([chunk, cb]); | ||
// no need for auto-pull if already in the midst of one. | ||
if (ts.transforming) | ||
return; | ||
// now we have something to transform, if we were waiting for it. | ||
// kick off a _read to pull it in. | ||
if (ts.pendingReadCb) { | ||
var readcb = ts.pendingReadCb; | ||
ts.pendingReadCb = null; | ||
this._read(0, readcb); | ||
} | ||
// if we weren't waiting for it, but nothing is queued up, then | ||
// still kick off a transform, just so it's there when the user asks. | ||
var doRead = rs.needReadable || rs.length <= rs.highWaterMark; | ||
if (doRead && !rs.reading) { | ||
var ret = this.read(0); | ||
if (ret !== null) | ||
return cb(new Error('invalid stream transform state')); | ||
} | ||
}; | ||
Transform.prototype._read = function(n, readcb) { | ||
var ws = this._writableState; | ||
var rs = this._readableState; | ||
var ts = this._transformState; | ||
if (ts.pendingReadCb) | ||
throw new Error('_read while _read already in progress'); | ||
ts.pendingReadCb = readcb; | ||
// if there's nothing pending, then we just wait. | ||
// if we're already transforming, then also just hold on a sec. | ||
// we've already stashed the readcb, so we can come back later | ||
// when we have something to transform | ||
if (ts.buffer.length === 0 || ts.transforming) | ||
return; | ||
// go ahead and transform that thing, now that someone wants it | ||
var req = ts.buffer.shift(); | ||
var chunk = req[0]; | ||
var writecb = req[1]; | ||
ts.transforming = true; | ||
this._transform(chunk, ts.output, function(er, data) { | ||
ts.transforming = false; | ||
if (data) | ||
ts.output(data); | ||
writecb(er); | ||
}); | ||
}; | ||
Transform.prototype._output = function(chunk) { | ||
if (!chunk || !chunk.length) | ||
return; | ||
// if we've got a pending readcb, then just call that, | ||
// and let Readable take care of it. If not, then we fill | ||
// the readable buffer ourselves, and emit whatever's needed. | ||
var ts = this._transformState; | ||
var readcb = ts.pendingReadCb; | ||
if (readcb) { | ||
ts.pendingReadCb = null; | ||
readcb(null, chunk); | ||
return; | ||
} | ||
// otherwise, it's up to us to fill the rs buffer. | ||
var rs = this._readableState; | ||
var len = rs.length; | ||
rs.buffer.push(chunk); | ||
rs.length += chunk.length; | ||
if (rs.needReadable) { | ||
rs.needReadable = false; | ||
this.emit('readable'); | ||
} | ||
}; | ||
function done(stream, er) { | ||
if (er) | ||
return stream.emit('error', er); | ||
// if there's nothing in the write buffer, then that means | ||
// that nothing more will ever be provided | ||
var ws = stream._writableState; | ||
var rs = stream._readableState; | ||
var ts = stream._transformState; | ||
if (ws.length) | ||
throw new Error('calling transform done when ws.length != 0'); | ||
if (ts.transforming) | ||
throw new Error('calling transform done when still transforming'); | ||
// if we were waiting on a read, let them know that it isn't coming. | ||
var readcb = ts.pendingReadCb; | ||
if (readcb) | ||
return readcb(); | ||
rs.ended = true; | ||
// we may have gotten a 'null' read before, and since there is | ||
// no more data coming from the writable side, we need to emit | ||
// now so that the consumer knows to pick up the tail bits. | ||
if (rs.length && rs.needReadable) | ||
stream.emit('readable'); | ||
else if (rs.length === 0) | ||
stream.emit('end'); | ||
} | ||
module.exports = require("./lib/_stream_transform.js") |
71
zlib.js
@@ -22,3 +22,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
var Transform = require('./transform.js'); | ||
var Transform = require('./lib/_stream_transform.js'); | ||
@@ -165,3 +165,5 @@ var binding = process.binding('zlib'); | ||
function onEnd() { | ||
callback(null, Buffer.concat(buffers, nread)); | ||
var buf = Buffer.concat(buffers, nread); | ||
buffers = []; | ||
callback(null, buf); | ||
} | ||
@@ -299,2 +301,5 @@ } | ||
this._offset = 0; | ||
this._closed = false; | ||
this.once('end', this.close); | ||
} | ||
@@ -309,9 +314,55 @@ | ||
Zlib.prototype._flush = function(output, callback) { | ||
this._transform(null, output, callback); | ||
var rs = this._readableState; | ||
var self = this; | ||
this._transform(null, output, function(er) { | ||
if (er) | ||
return callback(er); | ||
// now a weird thing happens... it could be that you called flush | ||
// but everything had already actually been consumed, but it wasn't | ||
// enough to get over the Readable class's lowWaterMark. | ||
// In that case, we emit 'readable' now to make sure it's consumed. | ||
if (rs.length && | ||
rs.length < rs.lowWaterMark && | ||
!rs.ended && | ||
rs.needReadable) | ||
self.emit('readable'); | ||
callback(); | ||
}); | ||
}; | ||
Zlib.prototype.flush = function(callback) { | ||
this._flush(this._output, callback || function() {}); | ||
var ws = this._writableState; | ||
var ts = this._transformState; | ||
if (ws.writing) { | ||
ws.needDrain = true; | ||
var self = this; | ||
this.once('drain', function() { | ||
self._flush(ts.output, callback); | ||
}); | ||
return; | ||
} | ||
this._flush(ts.output, callback || function() {}); | ||
}; | ||
Zlib.prototype.close = function(callback) { | ||
if (callback) | ||
process.nextTick(callback); | ||
if (this._closed) | ||
return; | ||
this._closed = true; | ||
this._binding.close(); | ||
var self = this; | ||
process.nextTick(function() { | ||
self.emit('close'); | ||
}); | ||
}; | ||
Zlib.prototype._transform = function(chunk, output, cb) { | ||
@@ -323,9 +374,13 @@ var flushFlag; | ||
if (chunk !== null && !Buffer.isBuffer(chunk)) | ||
return cb(new Error('invalid input')); | ||
// If it's the last chunk, or a final flush, we use the Z_FINISH flush flag. | ||
// If it's explicitly flushing at some other time, then we use Z_FLUSH. | ||
// Otherwise, use Z_NO_FLUSH for maximum compression goodness. | ||
// If it's explicitly flushing at some other time, then we use | ||
// Z_FULL_FLUSH. Otherwise, use Z_NO_FLUSH for maximum compression | ||
// goodness. | ||
if (last) | ||
flushFlag = binding.Z_FINISH; | ||
else if (chunk === null) | ||
flushFlag = binding.Z_FLUSH; | ||
flushFlag = binding.Z_FULL_FLUSH; | ||
else | ||
@@ -379,3 +434,3 @@ flushFlag = binding.Z_NO_FLUSH; | ||
var newReq = self._binding.write(self._flush, | ||
var newReq = self._binding.write(flushFlag, | ||
chunk, | ||
@@ -382,0 +437,0 @@ inOff, |
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
1
166692
27
4547