Comparing version 1.0.0 to 1.0.1
122
index.js
@@ -1,79 +0,79 @@ | ||
var once = require('once'); | ||
var eos = require('end-of-stream'); | ||
var fs = require('fs'); // we only need fs to get the ReadStream and WriteStream prototypes | ||
var once = require('once') | ||
var eos = require('end-of-stream') | ||
var fs = require('fs') // we only need fs to get the ReadStream and WriteStream prototypes | ||
var noop = function() {}; | ||
var noop = function () {} | ||
var isFn = function(fn) { | ||
return typeof fn === 'function'; | ||
}; | ||
var isFn = function (fn) { | ||
return typeof fn === 'function' | ||
} | ||
var isFS = function(stream) { | ||
return (stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) && isFn(stream.close); | ||
}; | ||
var isFS = function (stream) { | ||
return (stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) && isFn(stream.close) | ||
} | ||
var isRequest = function(stream) { | ||
return stream.setHeader && isFn(stream.abort); | ||
}; | ||
var isRequest = function (stream) { | ||
return stream.setHeader && isFn(stream.abort) | ||
} | ||
var destroyer = function(stream, reading, writing, callback) { | ||
callback = once(callback); | ||
var destroyer = function (stream, reading, writing, callback) { | ||
callback = once(callback) | ||
var closed = false; | ||
stream.on('close', function() { | ||
closed = true; | ||
}); | ||
var closed = false | ||
stream.on('close', function () { | ||
closed = true | ||
}) | ||
eos(stream, {readable:reading, writable:writing}, function(err) { | ||
if (err) return callback(err); | ||
closed = true; | ||
callback(); | ||
}); | ||
eos(stream, {readable: reading, writable: writing}, function (err) { | ||
if (err) return callback(err) | ||
closed = true | ||
callback() | ||
}) | ||
var destroyed = false; | ||
return function(err) { | ||
if (closed) return; | ||
if (destroyed) return; | ||
destroyed = true; | ||
var destroyed = false | ||
return function (err) { | ||
if (closed) return | ||
if (destroyed) return | ||
destroyed = true | ||
if (isFS(stream)) return stream.close(); // use close for fs streams to avoid fd leaks | ||
if (isRequest(stream)) return stream.abort(); // request.destroy just do .end - .abort is what we want | ||
if (isFS(stream)) return stream.close() // use close for fs streams to avoid fd leaks | ||
if (isRequest(stream)) return stream.abort() // request.destroy just do .end - .abort is what we want | ||
if (isFn(stream.destroy)) return stream.destroy(); | ||
if (isFn(stream.destroy)) return stream.destroy() | ||
callback(err || new Error('stream was destroyed')); | ||
}; | ||
}; | ||
callback(err || new Error('stream was destroyed')) | ||
} | ||
} | ||
var call = function(fn) { | ||
fn(); | ||
}; | ||
var call = function (fn) { | ||
fn() | ||
} | ||
var pipe = function(from, to) { | ||
return from.pipe(to); | ||
}; | ||
var pipe = function (from, to) { | ||
return from.pipe(to) | ||
} | ||
var pump = function() { | ||
var streams = Array.prototype.slice.call(arguments); | ||
var callback = isFn(streams[streams.length-1] || noop) && streams.pop() || noop; | ||
var pump = function () { | ||
var streams = Array.prototype.slice.call(arguments) | ||
var callback = isFn(streams[streams.length - 1] || noop) && streams.pop() || noop | ||
if (Array.isArray(streams[0])) streams = streams[0]; | ||
if (streams.length < 2) throw new Error('pump requires two streams per minimum'); | ||
if (Array.isArray(streams[0])) streams = streams[0] | ||
if (streams.length < 2) throw new Error('pump requires two streams per minimum') | ||
var error; | ||
var destroys = streams.map(function(stream, i) { | ||
var reading = i < streams.length-1; | ||
var writing = i > 0; | ||
return destroyer(stream, reading, writing, function(err) { | ||
if (!error) error = err; | ||
if (err) destroys.forEach(call); | ||
if (reading) return; | ||
destroys.forEach(call); | ||
callback(error); | ||
}); | ||
}); | ||
var error | ||
var destroys = streams.map(function (stream, i) { | ||
var reading = i < streams.length - 1 | ||
var writing = i > 0 | ||
return destroyer(stream, reading, writing, function (err) { | ||
if (!error) error = err | ||
if (err) destroys.forEach(call) | ||
if (reading) return | ||
destroys.forEach(call) | ||
callback(error) | ||
}) | ||
}) | ||
return streams.reduce(pipe); | ||
}; | ||
return streams.reduce(pipe) | ||
} | ||
module.exports = pump; | ||
module.exports = pump |
{ | ||
"name": "pump", | ||
"version": "1.0.0", | ||
"version": "1.0.1", | ||
"repository": "git://github.com/mafintosh/pump.git", | ||
@@ -5,0 +5,0 @@ "license": "MIT", |
@@ -5,3 +5,5 @@ # pump | ||
npm install pump | ||
``` | ||
npm install pump | ||
``` | ||
@@ -22,15 +24,15 @@ [![build status](http://img.shields.io/travis/mafintosh/pump.svg?style=flat)](http://travis-ci.org/mafintosh/pump) | ||
``` js | ||
var pump = require('pump'); | ||
var fs = require('fs'); | ||
var pump = require('pump') | ||
var fs = require('fs') | ||
var source = fs.createReadStream('/dev/random'); | ||
var dest = fs.createWriteStream('/dev/null'); | ||
var source = fs.createReadStream('/dev/random') | ||
var dest = fs.createWriteStream('/dev/null') | ||
pump(source, dest, function(err) { | ||
console.log('pipe finished', err); | ||
}); | ||
console.log('pipe finished', err) | ||
}) | ||
setTimeout(function() { | ||
dest.destroy(); // when dest is closed pump will destroy source | ||
}, 1000); | ||
dest.destroy() // when dest is closed pump will destroy source | ||
}, 1000) | ||
``` | ||
@@ -41,7 +43,7 @@ | ||
``` js | ||
var transform = someTransformStream(); | ||
var transform = someTransformStream() | ||
pump(source, transform, anotherTransform, dest, function(err) { | ||
console.log('pipe finished', err); | ||
}); | ||
console.log('pipe finished', err) | ||
}) | ||
``` | ||
@@ -54,1 +56,5 @@ | ||
MIT | ||
## Related | ||
`pump` is part of the [mississippi stream utility collection](https://github.com/maxogden/mississippi) which includes more useful stream modules similar to this one. |
71
test.js
@@ -1,47 +0,46 @@ | ||
var assert = require('assert'); | ||
var pump = require('./index'); | ||
var pump = require('./index') | ||
var rs = require('fs').createReadStream('/dev/random'); | ||
var ws = require('fs').createWriteStream('/dev/null'); | ||
var rs = require('fs').createReadStream('/dev/random') | ||
var ws = require('fs').createWriteStream('/dev/null') | ||
var toHex = function() { | ||
var reverse = new (require('stream').Transform)(); | ||
var toHex = function () { | ||
var reverse = new (require('stream').Transform)() | ||
reverse._transform = function(chunk, enc, callback) { | ||
reverse.push(chunk.toString('hex')); | ||
callback(); | ||
}; | ||
reverse._transform = function (chunk, enc, callback) { | ||
reverse.push(chunk.toString('hex')) | ||
callback() | ||
} | ||
return reverse; | ||
}; | ||
return reverse | ||
} | ||
var wsClosed = false; | ||
var rsClosed = false; | ||
var callbackCalled = false; | ||
var wsClosed = false | ||
var rsClosed = false | ||
var callbackCalled = false | ||
var check = function() { | ||
if (wsClosed && rsClosed && callbackCalled) process.exit(0); | ||
}; | ||
var check = function () { | ||
if (wsClosed && rsClosed && callbackCalled) process.exit(0) | ||
} | ||
ws.on('close', function() { | ||
wsClosed = true; | ||
check(); | ||
}); | ||
ws.on('close', function () { | ||
wsClosed = true | ||
check() | ||
}) | ||
rs.on('close', function() { | ||
rsClosed = true; | ||
check(); | ||
}); | ||
rs.on('close', function () { | ||
rsClosed = true | ||
check() | ||
}) | ||
pump(rs, toHex(), toHex(), toHex(), ws, function(err) { | ||
callbackCalled = true; | ||
check(); | ||
}); | ||
pump(rs, toHex(), toHex(), toHex(), ws, function () { | ||
callbackCalled = true | ||
check() | ||
}) | ||
setTimeout(function() { | ||
rs.destroy(); | ||
}, 1000); | ||
setTimeout(function () { | ||
rs.destroy() | ||
}, 1000) | ||
setTimeout(function() { | ||
throw new Error('timeout'); | ||
}, 5000); | ||
setTimeout(function () { | ||
throw new Error('timeout') | ||
}, 5000) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
5947
96
57