New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

filequeue

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

filequeue - npm Package Compare versions

Comparing version 0.4.0 to 0.5.0

lib/stream.js

62

lib/filequeue.js

@@ -7,2 +7,5 @@ // FileQueue#readFile is a drop-in replacement for fs.readFile that prevents too many files from being opened at once.

// Export the constructor function
module.exports = FileQueue;
// Instantiate a new FileQueue. Pass in the maximum number of files to be opened at any one time.

@@ -41,40 +44,18 @@ // By default it attempts to return an already instantiated instance of FileQueue so that your maxfiles is shared across processes

// Export the constructor function
module.exports = FileQueue;
FileQueue.prototype.addToQueue = function(fn, callback) {
// Internally used to add an fs command to the queue
FileQueue.prototype.addToQueue = function() {
var fq = this;
var args = Array.prototype.slice.call(arguments);
// retrieve the original callback
var callback = args.pop();
// add our own callback that adjust the open files and moves the queue
args.push(function() {
// remove this as an open file
fq.openFiles--;
// execute the queue if there are files sitting in it
fq.execQueue();
// call the original callback
callback.apply(this, Array.prototype.slice.call(arguments));
this.queue.push({
fn: fn,
callback: callback
});
// add it to the queue
fq.queue.push(args);
return fq.queue;
return this;
};
// Internally used to execute the next command in the queue
FileQueue.prototype.execQueue = function() {
var fq = this;
// only execute if the queue has any files
if(!fq.queue.length) {
if(!this.queue.length) {
return;

@@ -84,21 +65,26 @@ }

// execute the first file in the queue
var args = fq.queue.shift();
var command = this.queue.shift();
// check that we're not over our limit
if(fq.openFiles < fq.limit) {
if(this.openFiles < this.limit) {
// account for this file being open
fq.openFiles++;
this.openFiles++;
// get the method name from the queue
var method = args.shift();
command.fn(fs, function() {
// do the acual fs method
fs[method].apply(fs, args);
fq.openFiles--;
fq.execQueue();
if(command.callback) {
command.callback.apply(this, [].slice.call(arguments));
}
});
} else {
// we can't execute it yet, so we put it back in the front of the line
fq.queue.unshift(args);
this.queue.unshift(command);
}
};
addFsMethods(FileQueue);
addFsMethods(FileQueue);
// All of the fs methods we want exposed on fq
// This should pass the Grep Test (http://jamie-wong.com/2013/07/12/grep-test/)
var stream = require('./stream');
module.exports = function (FQ) {

@@ -17,8 +19,13 @@

if(options) {
this.addToQueue('readFile', filename, options, callback);
} else {
this.addToQueue('readFile', filename, callback);
}
this.addToQueue(function(fs, cb) {
if(options) {
fs.readFile(filename, options, cb);
return;
}
fs.readFile(filename, cb);
}, callback);
this.execQueue();

@@ -33,4 +40,8 @@ };

this.addToQueue('rename', oldPath, newPath, callback);
this.addToQueue(function(fs, cb) {
fs.rename(oldPath, newPath, cb);
}, callback);
this.execQueue();

@@ -50,8 +61,13 @@ };

if(type) {
this.addToQueue('symlink', srcpath, dstpath, type, callback);
} else {
this.addToQueue('symlink', srcpath, dstpath, callback);
}
this.addToQueue(function(fs, cb) {
if(type) {
fs.symlink(srcpath, dstpath, type, cb);
return;
}
fs.symlink(srcpath, dstpath, cb);
}, callback);
this.execQueue();

@@ -71,8 +87,13 @@ };

if(options) {
this.addToQueue('writeFile', filename, data, options, callback);
} else {
this.addToQueue('writeFile', filename, data, callback);
}
this.addToQueue(function(fs, cb) {
if(options) {
fs.writeFile(filename, data, options, cb);
return;
}
fs.writeFile(filename, data, cb);
}, callback);
this.execQueue();

@@ -87,4 +108,8 @@ };

this.addToQueue('stat', path, callback);
this.addToQueue(function(fs, cb) {
fs.stat(path, cb);
}, callback)
this.execQueue();

@@ -99,4 +124,8 @@ };

this.addToQueue('readdir', path, callback);
this.addToQueue(function(fs, cb) {
fs.readdir(path, cb);
}, callback);
this.execQueue();

@@ -111,4 +140,8 @@ };

this.addToQueue('exists', path, callback);
this.addToQueue(function(fs, cb) {
fs.exists(path, cb);
}, callback);
this.execQueue();

@@ -128,11 +161,64 @@ };

if(mode) {
this.addToQueue('mkdir', path, mode, callback);
} else {
this.addToQueue('mkdir', path, callback);
}
this.addToQueue(function(fs, cb) {
if(mode) {
fs.mkdir(path, mode, cb);
return;
}
fs.mkdir(path, cb);
}, callback);
this.execQueue();
};
/**
* http://nodejs.org/docs/v0.10.15/api/fs.html#fs_class_fs_readstream
*/
FQ.prototype.ReadStream = stream.ReadStream;
/**
* http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_createreadstream_path_options
*/
FQ.prototype.createReadStream = function(path, options) {
var readStream = new this.ReadStream(path, options);
this.addToQueue(function(fs, cb) {
readStream.on('close', cb);
readStream.open();
});
this.execQueue();
return readStream;
};
/**
* http://nodejs.org/docs/v0.10.15/api/fs.html#fs_class_fs_writestream
*/
FQ.prototype.WriteStream = stream.WriteStream;
/**
* http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_createwritestream_path_options
*/
FQ.prototype.createWriteStream = function(path, options) {
var writeStream = new this.WriteStream(path, options);
this.addToQueue(function(fs, cb) {
writeStream.on('close', cb);
writeStream.open();
});
this.execQueue();
return writeStream;
};
};
{
"name": "filequeue",
"description": "Drop-in Replacement for fs to prevent too many open files",
"version": "0.4.0",
"version": "0.5.0",
"author": {

@@ -18,4 +18,4 @@ "name": "Trey Griffith",

"devDependencies": {
"rewire": "1.1.1",
"mocha": "1.7.4"
"mocha": "1.7.4",
"temp": "0.5.x"
},

@@ -22,0 +22,0 @@ "main": "./index.js",

@@ -5,2 +5,4 @@ Filequeue

As of version 0.4.0, `Filequeue` supports Node 0.10.x, and as of version 0.5.0, it has basic Streams support.
`Filequeue` was born out of my encounter with `Error: EMFILE, too many open files`, which occurs when you try to open too many files at once on your system. Due to Node's asynchronous nature, if you perform a lot of `fs.readFile` or similar operations in quick succession, you can easily hit your system's `maxfiles` limit, usually set to 256 on a dev box.

@@ -48,14 +50,16 @@

#### Use any of the following supported `fs` methods
* [readFile](http://nodejs.org/docs/v0.8.0/api/fs.html#fs_fs_readfile_filename_encoding_callback)
* [writeFile](http://nodejs.org/docs/v0.8.0/api/fs.html#fs_fs_writefile_filename_data_encoding_callback) - Note that Node 0.10.0 introduced a different parameter signature, and is therefore incompatible with the current version of Filequeue (0.3.0)
* [readdir](http://nodejs.org/docs/v0.8.0/api/fs.html#fs_fs_readdir_path_callback)
* [rename](http://nodejs.org/docs/v0.8.0/api/fs.html#fs_fs_rename_oldpath_newpath_callback)
* [symlink](http://nodejs.org/docs/v0.8.0/api/fs.html#fs_fs_symlink_srcpath_dstpath_type_callback)
* [mkdir](http://nodejs.org/docs/v0.8.0/api/fs.html#fs_fs_mkdir_path_mode_callback)
* [stat](http://nodejs.org/docs/v0.8.0/api/fs.html#fs_fs_stat_path_callback)
* [exists](http://nodejs.org/docs/v0.8.0/api/fs.html#fs_fs_exists_path_callback)
* [readFile](http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_readfile_filename_options_callback)
* [writeFile](http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_writefile_filename_data_options_callback)
* [readdir](http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_readdir_path_callback)
* [rename](http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_rename_oldpath_newpath_callback)
* [symlink](http://nodejs.org/docs/latest/api/fs.html#fs_fs_symlink_srcpath_dstpath_type_callback)
* [mkdir](http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_mkdir_path_mode_callback)
* [stat](http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_stat_path_callback)
* [exists](http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_exists_path_callback)
* [createReadStream](http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_createreadstream_path_options)
* [createWriteStream](http://nodejs.org/docs/v0.10.15/api/fs.html#fs_fs_createwritestream_path_options)
``` javascript
for(var i=0; i<1000; i++) {
fq.readFile('/somefile.txt', 'utf8', function(err, somefile) {
fq.readFile('/somefile.txt', {encoding: 'utf8'}, function(err, somefile) {
console.log("data from somefile.txt without crashing!", somefile);

@@ -62,0 +66,0 @@ });

// Run this with "mocha filequeue.test.js"
var assert = require('assert');
var rewire = require('rewire');
var temp = require('temp');
var fs = require('fs');
var path = require('path');
var FileQueue = rewire('../lib/filequeue');
var FileQueue = require('../lib/filequeue');
// Shim in our controlled version of fs
var fs = require('./fs-shim');
FileQueue.__set__('fs', fs);
var dir = temp.mkdirSync('filequeue-test-');
function makePath(filename) {
return path.join(dir, filename);
}

@@ -34,10 +37,23 @@ describe('FileQueue', function() {

// set `newQueue` to true so we don't get the instance with a limit of 2000
var fq = new FileQueue(true);
describe('readFile', function() {
var fq = new FileQueue(200);
it('should read file contents', function(done) {
it('should read file contents', function(done) {
fq.readFile('my_path', function(err, data) {
assert.equal(data, fs.__internal.filesystem.files['my_path'].data);
done();
var text = 'some random text';
fs.writeFile(makePath('my_path'), text, function(err) {
assert.ifError(err);
fq.readFile(makePath('my_path'), {encoding: 'utf8'}, function(err, data) {
assert.ifError(err);
assert.equal(data, text);
done();
});
});

@@ -47,12 +63,23 @@ });

it('should read many files without crashing', function(done) {
var count = 0;
for(var i=0;i<1000;i++) {
fq.readFile('my_other_path', function(err, data) {
assert.equal(data, fs.__internal.filesystem.files['my_other_path'].data);
if(++count >= 1000) {
done();
}
});
}
var text = 'some other text';
fs.writeFile(makePath('my_other_path'), text, function(err) {
assert.ifError(err);
var count = 0;
for(var i=0;i<1000;i++) {
fq.readFile(makePath('my_other_path'), {encoding: 'utf8'}, function(err, data) {
assert.ifError(err);
assert.equal(data, text);
if(++count >= 1000) {
done();
}
});
}
});
});

@@ -62,20 +89,20 @@ });

describe('rename', function () {
var fq = new FileQueue();
it('should rename a file', function(done) {
// retrieve the contents of the original file
fq.readFile('file-to-rename', {encoding: 'utf8'}, function(err, contents) {
var text = 'this file will be renamed';
fs.writeFile(makePath('file-to-rename'), text, function(err) {
assert.ifError(err);
fq.rename('file-to-rename', 'this_is_a_different_file', function(err) {
fq.rename(makePath('file-to-rename'), makePath('this_is_a_different_file'), function(err) {
assert.ifError(err);
fq.readFile('this_is_a_different_file', {encoding: 'utf8'}, function(err, renamed_contents) {
fs.readFile(makePath('this_is_a_different_file'), {encoding: 'utf8'}, function(err, contents) {
assert.ifError(err);
assert.equal(contents, renamed_contents);
assert.equal(contents, text);

@@ -91,27 +118,24 @@ done();

describe('symlink', function () {
var fq = new FileQueue();
it('should create symlink without optional "type" argument', function(done) {
// grab the contents of the source file
fq.readFile('file-to-point-at', {encoding: 'utf8'}, function(err, contents) {
var text = "this file will be symlinked";
fs.writeFile(makePath('file-to-point-at'), text, function(err) {
assert.ifError(err);
fq.symlink('file-to-point-at', 'symlink1', function(err) {
fq.symlink(makePath('file-to-point-at'), makePath('symlink1'), function(err) {
assert.ifError(err);
// check that file is symlink (should do this with fq.lstat once implemented)
assert.equal(fs.__internal.filesystem.files['symlink1'].type, 'symlink');
fs.lstat(makePath('symlink1'), function(err, stats) {
// check that the contents are identical
fq.readFile('symlink1', {encoding: 'utf8'}, function(err, symlinked_contents) {
assert.ifError(err);
assert.equal(contents, symlinked_contents);
// make sure we created a symlink
assert.ok(stats.isSymbolicLink());
// check that we can't create another symlink with the same name
fq.symlink('another-file-to-point-at', 'symlink1', function(err) {
fq.symlink(makePath('file-to-point-at'), makePath('symlink1'), function(err) {

@@ -125,32 +149,8 @@ assert.notEqual(err, null, 'expected error: path already exists');

});
});
it('should create symlink with optional "type" argument', function(done) {
it('should create symlink with optional "type" argument', function() {
// grab the contents of the source file
fq.readFile('file-to-point-at', {encoding: 'utf8'}, function(err, contents) {
assert.ifError(err);
fq.symlink('file-to-point-at', 'symlink2', 'file', function(err) {
assert.ifError(err);
// check that file is symlink, (should do this with fq.lstat once implemented)
assert.equal(fs.__internal.filesystem.files['symlink2'].type, 'symlink');
// check that the contents are identical
fq.readFile('symlink2', 'utf8', function(err, symlinked_contents) {
assert.ifError(err);
assert.equal(contents, symlinked_contents);
// check that the type of symlink is correct
assert.equal(fs.__internal.filesystem.files['symlink2'].windows_type, 'file');
done();
});
});
});
// only applicable on windows environments, which I don't have a box for
});

@@ -161,13 +161,13 @@ });

var fq = new FileQueue();
it('should write file contents', function(done) {
fq.writeFile('my_path', 'some different data', {encoding: 'utf8'}, function(err) {
var text = 'some different data';
fq.writeFile(makePath('my_path'), text, {encoding: 'utf8'}, function(err) {
assert.ifError(err);
fq.readFile('my_path', {encoding: 'utf8'}, function(err, data) {
fs.readFile(makePath('my_path'), {encoding: 'utf8'}, function(err, data) {
assert.equal(data, 'some different data');
assert.equal(data, text);
done();

@@ -183,7 +183,7 @@ });

fq.writeFile('my_path_'+num, 'some different data '+num, {encoding: 'utf8'}, function(err) {
fq.writeFile(makePath('my_path_'+num), 'some different data '+num, {encoding: 'utf8'}, function(err) {
assert.ifError(err);
fq.readFile('my_path_'+num, {encoding: 'utf8'}, function(err, data) {
fq.readFile(makePath('my_path_'+num), {encoding: 'utf8'}, function(err, data) {

@@ -207,14 +207,17 @@ assert.equal(data, 'some different data '+num);

var fq = new FileQueue();
it('should return a stats object', function(done) {
fq.stat('my_path', function(err, stats) {
fq.stat(makePath('my_path'), function(err, stats) {
assert.ifError(err);
assert.equal(stats.isFile(), fs.__internal.fsPath('my_path').data instanceof Buffer);
assert.equal(stats.isDirectory(), fs.__internal.isDirectory(fs.__internal.fsPath('my_path')));
fs.stat(makePath('my_path'), function(err, realStats) {
done();
assert.ifError(err);
assert.equal(stats.isFile(), realStats.isFile());
assert.equal(stats.isDirectory(), realStats.isDirectory());
done();
});
});

@@ -227,13 +230,16 @@ });

var fq = new FileQueue();
it('should return all filenames', function(done) {
fq.readdir('.', function(err, _files) {
fq.readdir(dir, function(err, files) {
assert.ifError(err);
assert.equal(Object.keys(fs.__internal.filesystem.files).length, _files.length);
fs.readdir(dir, function(err, _files) {
done();
assert.ifError(err);
assert.equal(_files.length, files.length);
done();
});
});

@@ -246,9 +252,7 @@ });

var fq = new FileQueue();
it('should check if a file exists', function(done) {
fq.exists('my_path', function(exists) {
fq.exists(makePath('my_path'), function(exists) {
assert.equal(exists, !!fs.__internal.fsPath('my_path'));
assert.equal(exists, fs.existsSync(makePath('my_path')));

@@ -261,5 +265,5 @@ done();

fq.exists('this_file_doesnt_exist', function(exists) {
fq.exists(makePath('this_file_doesnt_exist'), function(exists) {
assert.equal(exists, !!fs.__internal.fsPath('this_file_doesnt_exist'));
assert.equal(exists, fs.existsSync(makePath('this_file_doesnt_exist')));

@@ -275,13 +279,20 @@ done();

var fq = new FileQueue();
function modeString(stats) {
return '0' + (stats.mode & parseInt('777', 8)).toString(8);
}
// change the default mask so it doesn't affect the ops below
process.umask(0000);
it('should create a new directory with the default mode', function(done) {
var dirname = 'newdir';
fq.mkdir(dirname, function(err) {
fq.mkdir(makePath(dirname), function(err) {
assert.ifError(err);
assert.equal(fs.__internal.isDirectory(fs.__internal.filesystem.files[dirname]), true);
assert.equal(fs.__internal.filesystem.files[dirname].mode, '0777');
var stats = fs.statSync(makePath(dirname));
assert.equal(stats.isDirectory(), true);
assert.equal(modeString(stats), '0777');
done();

@@ -294,8 +305,10 @@ });

var mode = '0666';
fq.mkdir(dirname, mode, function(err) {
fq.mkdir(makePath(dirname), mode, function(err) {
assert.ifError(err);
assert.equal(fs.__internal.isDirectory(fs.__internal.filesystem.files[dirname]), true);
assert.equal(fs.__internal.filesystem.files[dirname].mode, mode);
var stats = fs.statSync(makePath(dirname));
assert.equal(stats.isDirectory(), true);
assert.equal(modeString(stats), mode);
done();

@@ -307,1 +320,160 @@ });

describe('readStream', function() {
it('creates a ReadStream', function() {
var stream = fq.createReadStream(makePath('my_path'), {encoding: 'utf8'});
assert.ok(stream instanceof fs.ReadStream);
});
it('reads data from a ReadStream', function(done) {
var stream = fq.createReadStream(makePath('my_path'), {encoding: 'utf8'});
var my_path_string = '';
stream.on('data', function(data) {
my_path_string += data;
});
stream.on('error', function(err) {
assert.ifError(err);
});
stream.on('close', function() {
assert.equal(my_path_string, fs.readFileSync(makePath('my_path'), {encoding: 'utf8'}));
done();
});
});
it('reads data from lots of streams without crashing', function(done) {
var my_path_string = fs.readFileSync(makePath('my_path'), {encoding: 'utf8'});
var count = 0,
streams = [];
for(var i=0;i<1000;i++) {
(function(i) {
streams.push(fq.createReadStream(makePath('my_path'), {encoding: 'utf8'}));
streams[i].fq_data = '';
streams[i].on('data', function(data) {
streams[i].fq_data += data;
});
streams[i].on('error', function(err) {
assert.ifError(err);
});
streams[i].on('close', function() {
assert.equal(my_path_string, streams[i].fq_data);
if(++count >= 1000) {
done();
}
});
})(i);
}
});
});
describe('writeStream', function() {
it('creates a WriteStream', function() {
var stream = fq.createWriteStream(makePath('my_path'), {encoding: 'utf8'});
assert.ok(stream instanceof fs.WriteStream);
stream.end();
});
it('writes data to a WriteStream', function(done) {
var stream = fq.createWriteStream(makePath('my_writing_path'), {encoding: 'utf8'});
stream.on('error', function(err) {
assert.ifError(err);
});
var my_path_arr = 'this is some data that I want to write'.split(' ');
var i = 0;
function write() {
stream.write(my_path_arr[i], 'utf8', function() {
i++;
if(i < my_path_arr.length) {
write();
} else {
stream.end(function() {
assert.equal(my_path_arr.join(''), fs.readFileSync(makePath('my_writing_path'), {encoding: 'utf8'}));
done();
});
}
});
}
write();
});
it('writes data to lots of streams without crashing', function(done) {
done();
var my_path_arr = 'this is some data that I want to write'.split(' ');
function write(stream) {
stream.write(my_path_arr[stream.i], 'utf8', function() {
stream.i++;
if(stream.i < my_path_arr.length) {
write(stream);
} else {
stream.end(function() {
assert.equal(my_path_arr.join(''), fs.readFileSync(makePath('my_other_writing_path'), {encoding: 'utf8'}));
done();
});
}
});
}
var count = 0,
streams = [];
for(var i=0;i<1000;i++) {
streams.push(fq.createWriteStream(makePath('my_other_writing_path'), {encoding: 'utf8'}));
streams[i].i = 0;
write(streams[i]);
}
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc