skipper-disk
Advanced tools
Comparing version 0.2.6 to 0.3.1
337
index.js
@@ -5,6 +5,8 @@ /** | ||
var Writable = require('stream').Writable; | ||
var WritableStream = require('stream').Writable; | ||
var TransformStream = require('stream').Transform; | ||
var fsx = require('fs-extra'); | ||
var path = require('path'); | ||
var _ = require('lodash'); | ||
var UUIDGenerator = require('node-uuid'); | ||
@@ -16,2 +18,3 @@ | ||
* | ||
* @type {Function} | ||
* @param {Object} options | ||
@@ -21,24 +24,26 @@ * @return {Object} | ||
module.exports = function DiskStore (options) { | ||
module.exports = function DiskStore(options) { | ||
options = options || {}; | ||
var log = options.log || function _noOpLog() {}; | ||
var adapter = { | ||
rm: function (filepath, cb){ | ||
return fsx.unlink(filepath, function(err) { | ||
rm: function(fd, cb) { | ||
return fsx.unlink(fd, function(err) { | ||
// Ignore "doesn't exist" errors | ||
if (err && err.code !== 'ENOENT') { return cb(err); } | ||
else return cb(); | ||
if (err && (typeof err !== 'object' || err.code !== 'ENOENT')) { | ||
return cb(err); | ||
} else return cb(); | ||
}); | ||
}, | ||
ls: function (dirpath, cb) { | ||
ls: function(dirpath, cb) { | ||
return fsx.readdir(dirpath, cb); | ||
}, | ||
read: function (filepath, cb) { | ||
read: function(fd, cb) { | ||
if (cb) { | ||
return fsx.readFile(filepath, cb); | ||
return fsx.readFile(fd, cb); | ||
} else { | ||
return fsx.createReadStream(fd); | ||
} | ||
else { | ||
return fsx.createReadStream(filepath); | ||
} | ||
}, | ||
@@ -53,2 +58,13 @@ | ||
/** | ||
@@ -64,20 +80,28 @@ * A simple receiver for Skipper that writes Upstreams to | ||
*/ | ||
function DiskReceiver (options) { | ||
function DiskReceiver(options) { | ||
options = options || {}; | ||
// Normalize `saveAs()` option: | ||
// options.saveAs() <==> options.rename() <==> options.getFilename() <==> options.getFileName() | ||
options.saveAs = options.saveAs || options.rename; | ||
options.saveAs = options.saveAs || options.getFileName; | ||
options.saveAs = options.saveAs || options.getFilename; | ||
_.defaults(options, { | ||
// By default, create new files on disk | ||
// using their uploaded filenames. | ||
// (no overwrite-checking is performed!!) | ||
saveAs: function (__newFile) { | ||
return __newFile.filename; | ||
// The default `saveAs` implements a unique filename by combining: | ||
// • a generated UUID (like "4d5f444-38b4-4dc3-b9c3-74cb7fbbc932") | ||
// • the uploaded file's original extension (like ".jpg") | ||
saveAs: function(__newFile, cb) { | ||
return cb(null, UUIDGenerator.v4() + path.extname(__newFile.filename)); | ||
}, | ||
// Bind a progress event handler, e.g.: | ||
// function (milestone) { | ||
// milestone.id; | ||
// milestone.name; | ||
// milestone.written; | ||
// milestone.total; | ||
// milestone.percent; | ||
// }, | ||
onProgress: undefined, | ||
// Upload limit (in bytes) | ||
// defaults to ~15MB | ||
maxBytes: 15000000, | ||
// By default, upload files to `./.tmp/uploads` (relative to cwd) | ||
@@ -87,6 +111,19 @@ dirname: '.tmp/uploads' | ||
var receiver__ = Writable({ | ||
objectMode: true | ||
}); | ||
var receiver__ = WritableStream({ objectMode: true }); | ||
// if onProgress handler was provided, bind an event automatically: | ||
if (_.isFunction(options.onProgress)) { | ||
receiver__.on('progress', options.onProgress); | ||
} | ||
// Track the progress of all file uploads that pass through this receiver | ||
// through one or more attached Upstream(s). | ||
receiver__._files = []; | ||
// Keep track of the number total bytes written so that maxBytes can | ||
// be enforced. | ||
var totalBytesWritten = 0; | ||
// This `_write` method is invoked each time a new file is received | ||
@@ -97,53 +134,72 @@ // from the Readable stream (Upstream) which is pumping filestreams | ||
// Determine location where file should be written: | ||
// ------------------------------------------------------- | ||
var filePath, dirPath, filename; | ||
if (options.id) { | ||
// If `options.id` was specified, use it directly as the path. | ||
filePath = options.id; | ||
dirPath = path.dirname(filePath); | ||
filename = path.basename(filePath); | ||
} | ||
else { | ||
// Otherwise, use the more sophisiticated options: | ||
// ------------------------------------------------------- | ||
// ------------------------------------------------------- | ||
// | ||
// Determine the file descriptor-- the unique identifier. | ||
// Often represents the location where file should be written. | ||
var fd; | ||
var dirPath; | ||
if (options.dirname) { | ||
dirPath = path.resolve(options.dirname); | ||
filename = options.filename || options.saveAs(__newFile); | ||
filePath = path.join(dirPath, filename); | ||
} | ||
// ------------------------------------------------------- | ||
else dirPath = process.cwd(); | ||
// Run `saveAs` to get the desired name for the file | ||
options.saveAs(__newFile, function (err, filename){ | ||
if (err) return done(err); | ||
// Garbage-collect the bytes that were already written for this file. | ||
// (called when a read or write error occurs) | ||
function gc(err) { | ||
// console.log('************** Garbage collecting file `' + __newFile.filename + '` located @ ' + filePath + '...'); | ||
adapter.rm(filePath, function (gcErr) { | ||
if (gcErr) return done([err].concat([gcErr])); | ||
else return done(); | ||
}); | ||
} | ||
// Ensure necessary parent directories exist: | ||
fsx.mkdirs(dirPath, function (mkdirsErr) { | ||
// If we get an error here, it's probably because the Node | ||
// user doesn't have write permissions at the designated | ||
// path. | ||
if (mkdirsErr) { | ||
return done(mkdirsErr); | ||
if (options.fd) { | ||
fd = path.resolve(options.fd); | ||
} | ||
else fd = path.join(dirPath, filename); | ||
var outs = fsx.createWriteStream(filePath, encoding); | ||
__newFile.on('error', function (err) { | ||
// console.log('***** READ error on file ' + __newFile.filename, '::', err); | ||
// Attach fd as metadata to the file stream for use back in skipper core | ||
__newFile._skipperFD = fd; | ||
// | ||
// ------------------------------------------------------- | ||
// ------------------------------------------------------- | ||
// ------------------------------------------------------- | ||
// Ensure necessary parent directories exist: | ||
fsx.mkdirs(dirPath, function(mkdirsErr) { | ||
// If we get an error here, it's probably because the Node | ||
// user doesn't have write permissions at the designated | ||
// path. | ||
if (mkdirsErr) { | ||
return done(mkdirsErr); | ||
} | ||
// Error reading from the file stream | ||
__newFile.on('error', function(err) { | ||
log('***** READ error on file ' + __newFile.filename, '::', err); | ||
}); | ||
// Create a new write stream to write to disk | ||
var outs__ = fsx.createWriteStream(fd, encoding); | ||
// When the file is done writing, call the callback | ||
outs__.on('finish', function successfullyWroteFile() { | ||
log('finished file: ' + __newFile.filename); | ||
done(); | ||
}); | ||
outs__.on('E_EXCEEDS_UPLOAD_LIMIT', function (err) { | ||
done(err); | ||
}); | ||
var __progress__ = buildProgressStream(options, __newFile, receiver__, outs__); | ||
// Finally pipe the progress THROUGH the progress stream | ||
// and out to disk. | ||
__newFile | ||
.pipe(__progress__) | ||
.pipe(outs__); | ||
}); | ||
outs.on('error', function failedToWriteFile(err) { | ||
// console.log('Error on output stream- garbage collecting unfinished uploads...'); | ||
gc(err); | ||
}); | ||
outs.on('finish', function successfullyWroteFile() { | ||
done(); | ||
}); | ||
__newFile.pipe(outs); | ||
}); | ||
}; | ||
@@ -158,1 +214,146 @@ | ||
function buildProgressStream (options, __newFile, receiver__, outs__) { | ||
var log = options.log || function noOpLog(){}; | ||
// Generate a progress stream and unique id for this file | ||
// then pipe the bytes down to the outs___ stream | ||
// We will pipe the incoming file stream to this, which will | ||
var localID = _.uniqueId(); | ||
var guessedTotal = 0; | ||
var writtenSoFar = 0; | ||
var __progress__ = new TransformStream(); | ||
__progress__._transform = function(chunk, enctype, next) { | ||
// Update the guessedTotal to make % estimate | ||
// more accurate: | ||
guessedTotal += chunk.length; | ||
writtenSoFar += chunk.length; | ||
// Do the actual "writing", which in our case will pipe | ||
// the bytes to the outs___ stream that writes to disk | ||
this.push(chunk); | ||
// Emit an event that will calculate our total upload | ||
// progress and determine whether we're within quota | ||
this.emit('progress', { | ||
id: localID, | ||
fd: __newFile._skipperFD, | ||
name: __newFile.name, | ||
written: writtenSoFar, | ||
total: guessedTotal, | ||
percent: (writtenSoFar / guessedTotal) * 100 | 0 | ||
}); | ||
next(); | ||
}; | ||
// This event is fired when a single file stream emits a progress event. | ||
// Each time we receive a file, we must recalculate the TOTAL progress | ||
// for the aggregate file upload. | ||
// | ||
// events emitted look like: | ||
/* | ||
{ | ||
percentage: 9.05, | ||
transferred: 949624, | ||
length: 10485760, | ||
remaining: 9536136, | ||
eta: 10, | ||
runtime: 0, | ||
delta: 295396, | ||
speed: 949624 | ||
} | ||
*/ | ||
__progress__.on('progress', function singleFileProgress(milestone) { | ||
// Lookup or create new object to track file progress | ||
var currentFileProgress = _.find(receiver__._files, { | ||
id: localID | ||
}); | ||
if (currentFileProgress) { | ||
currentFileProgress.written = milestone.written; | ||
currentFileProgress.total = milestone.total; | ||
currentFileProgress.percent = milestone.percent; | ||
currentFileProgress.stream = __newFile; | ||
} else { | ||
currentFileProgress = { | ||
id: localID, | ||
fd: __newFile._skipperFD, | ||
name: __newFile.filename, | ||
written: milestone.written, | ||
total: milestone.total, | ||
percent: milestone.percent, | ||
stream: __newFile | ||
}; | ||
receiver__._files.push(currentFileProgress); | ||
} | ||
//////////////////////////////////////////////////////////////// | ||
// Recalculate `totalBytesWritten` so far for this receiver instance | ||
// (across ALL OF ITS FILES) | ||
// using the sum of all bytes written to each file in `receiver__._files` | ||
totalBytesWritten = _.reduce(receiver__._files, function(memo, status) { | ||
memo += status.written; | ||
return memo; | ||
}, 0); | ||
log(currentFileProgress.percent, '::', currentFileProgress.written, '/', currentFileProgress.total, ' (file #' + currentFileProgress.id + ' :: ' + /*'update#'+counter*/ '' + ')'); //receiver__._files.length+' files)'); | ||
// Emit an event on the receiver. Someone using Skipper may listen for this to show | ||
// a progress bar, for example. | ||
receiver__.emit('progress', currentFileProgress); | ||
// and then enforce its `maxBytes`. | ||
if (options.maxBytes && totalBytesWritten >= options.maxBytes) { | ||
var err = new Error(); | ||
err.code = 'E_EXCEEDS_UPLOAD_LIMIT'; | ||
err.name = 'Upload Error'; | ||
err.maxBytes = options.maxBytes; | ||
err.written = totalBytesWritten; | ||
err.message = 'Upload limit of ' + err.maxBytes + ' bytes exceeded (' + err.written + ' bytes written)'; | ||
// Stop listening for progress events | ||
__progress__.removeAllListeners('progress'); | ||
// Unpipe the progress stream, which feeds the disk stream, so we don't keep dumping to disk | ||
__progress__.unpipe(); | ||
// Clean up any files we've already written | ||
(function gc(err) { | ||
// Garbage-collects the bytes that were already written for this file. | ||
// (called when a read or write error occurs) | ||
log('************** Garbage collecting file `' + __newFile.filename + '` located @ ' + fd + '...'); | ||
adapter.rm(fd, function(gcErr) { | ||
if (gcErr) return outs__.emit('E_EXCEEDS_UPLOAD_LIMIT',[err].concat([gcErr])); | ||
return outs__.emit('E_EXCEEDS_UPLOAD_LIMIT',err); | ||
}); | ||
})(err); | ||
return; | ||
// Don't do this--it releases the underlying pipes, which confuses node when it's in the middle | ||
// of a write operation. | ||
// outs__.emit('error', err); | ||
// | ||
// | ||
} | ||
}); | ||
return __progress__; | ||
} | ||
{ | ||
"name": "skipper-disk", | ||
"version": "0.2.6", | ||
"version": "0.3.1", | ||
"description": "Receive Skipper's file uploads on your local filesystem", | ||
@@ -34,5 +34,6 @@ "main": "index.js", | ||
"dependencies": { | ||
"fs-extra": "~0.8.1", | ||
"lodash": "~2.4.1", | ||
"fs-extra": "~0.8.1" | ||
"node-uuid": "~1.4.1" | ||
} | ||
} |
@@ -66,3 +66,3 @@ # [<img title="skipper-disk - Local disk adapter for Skipper" src="http://i.imgur.com/P6gptnI.png" width="200px" alt="skipper emblem - face of a ship's captain"/>](https://github.com/balderdashy/skipper-disk) Disk Blob Adapter | ||
| `dirname` | ((string)) | The path to the directory on disk where file uploads should be streamed. May be specified as an absolute path (e.g. `/Users/mikermcneil/foo`) or a relative path from the current working directory. Defaults to `".tmp/uploads/"` | ||
| `saveAs()` | ((function)) | An optional function that can be used to define the logic for naming files. For example: <br/> `function (file) {return Math.random()+file.name;} });` <br/> By default, the filename of the uploaded file is used, including the extension (e.g. `"Screen Shot 2014-05-06 at 4.44.02 PM.jpg"`. If a file already exists at `dirname` with the same name, it will be overridden. | | ||
| `saveAs()` | ((function)) | An optional function that can be used to define the logic for naming files (with callback optional). For example: <br/> `function (file) {return Math.random()+file.name;}` or `function (filename,cb) {foo.asyncall(function(err,result){ options.filename = result[0]; cb(null)}); }`<br/> By default, Skipper-disk generate a random-Number for filename on your disk (e.g. 24d5f444-38b4-4dc3-b9c3-74cb7fbbc932.jpg) - that is given as "id" in upload()-callback. <br/> | | ||
@@ -69,0 +69,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
18162
262
3
1
+ Addednode-uuid@~1.4.1
+ Addednode-uuid@1.4.8(transitive)