Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

minitask

Package Overview
Dependencies
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

minitask - npm Package Compare versions

Comparing version 0.0.1 to 0.1.0

lib/task.js

3

index.js
module.exports = {
list: require('./lib/list.js'),
runner: require('./lib/runner.js'),
cache: require('./lib/cache.js')
Cache: require('./lib/cache.js'),
Task: require('./lib/task.js')
};
var fs = require('fs'),
assert = require('assert'),
runner = require('./runner.js');
crypto = require('crypto'),
path = require('path'),
mkdirp = require('mkdirp');
// { metapath: {
// filepath: { stat: ... , md5: ..., filepath: ... }
// }
// }
function Cache(opts) {
this.opts = opts;
this.data = null;
this.path = opts.path;
var metaCache = {};
// can either set the path, or set 'appHash'
if(opts.path) {
this.metaPath = opts.path + '/meta.json';
this.data = (fs.existsSync(this.metaPath) ? require(this.metaPath) : {});
function loadMeta(opts) {
var metaPath = opts.cachepath + '/meta.json';
// have we loaded the cache meta.json?
if(!metaCache[metaPath]) {
// does the cache metadata file exist?
metaCache[metaPath] = (fs.existsSync(metaPath) ? require(metaPath) : {});
if(!fs.existsSync(opts.cachepath)) {
fs.mkdirSync(opts.cachepath);
// need to do this early on, since if the path is missing,
// writes to the cache dir will fail
if(!fs.existsSync(this.opts.path)) {
mkdirp.sync(this.opts.path);
}
} else if(opts.appHash) {
// global cache dir, with subdir for this
} else {
throw new Error('Must set either the path or the appHash');
}
return metaCache[metaPath];
}
function saveMeta(opts) {
var metaPath = opts.cachepath + '/meta.json';
if(!fs.existsSync(opts.cachepath)) {
fs.mkdirSync(opts.cachepath);
Cache.prototype.save = function() {
// just in case
if(!fs.existsSync(this.opts.path)) {
mkdirp.sync(this.opts.path);
}
fs.writeFileSync(metaPath, JSON.stringify(metaCache[metaPath], null, 2));
}
fs.writeFileSync(this.metaPath, JSON.stringify(this.data, null, 2));
};
function removeCached(meta, filepath) {
var item = meta[filepath];
if(!item) {
// invalidates all the cached items for the given inputFilePath
Cache.prototype.junk = function(inputFilePath) {
var self = this;
inputFilePath = path.normalize(inputFilePath);
if(!this.data[inputFilePath]) {
return; // nothing to do
}
if(fs.existsSync(item.filepath)) {
fs.unlink(item.filepath);
// for each .taskResults
Object.keys(this.data[inputFilePath].taskResults).forEach(function(taskHash) {
// .taskResults[hash] = { path: '...' }
var cacheFile = self.data[inputFilePath].taskResults[taskHash].path;
if(fs.existsSync(cacheFile)) {
fs.unlink(cacheFile);
}
});
delete this.data[inputFilePath];
};
Cache.prototype.clear = function() {
var self = this;
// delete any lingering files
Object.keys(this.data).forEach(function(inputFilePath) {
self.junk(inputFilePath);
});
this.data = {};
this.save();
};
Cache.prototype.filename = function() {
var cacheName;
// generate a new file name
do {
cacheName = this.path + '/' + Math.random().toString(36).substring(2);
} while(fs.existsSync(cacheName));
return cacheName;
};
Cache.prototype.complete = function(inputFilePath, taskHash, cacheFilePath) {
if(arguments.length < 3) {
throw new Error('Invalid call to Cache.complete()');
}
delete meta[filepath];
}
function lookup(opts) {
var meta = loadMeta(opts),
fileMeta = meta[opts.filepath];
if(!opts.filepath || !fileMeta) {
return false;
var method = this.opts.method || 'stat';
if(!this.data[inputFilePath]) {
this.data[inputFilePath] = { taskResults: {} };
}
// if the options do not match, then this is not a match
// - also, invalidate the cache entry
if(opts.options) {
if(!fileMeta.options) {
removeCached(meta, opts.filepath);
return false;
}
// assert.deepEqual is quite accurate
try {
assert.deepEqual(opts.options, fileMeta.options);
} catch(e) {
removeCached(meta, opts.filepath);
return false;
}
if(!this.data[inputFilePath].taskResults) {
this.data[inputFilePath].taskResults = {};
}
// use the stat attribute
if(fileMeta.stat && opts.stat) {
if(!(fileMeta.stat.mtime instanceof Date)) {
fileMeta.stat.mtime = new Date(fileMeta.stat.mtime);
}
if(!(opts.stat.mtime instanceof Date)) {
opts.stat.mtime = new Date(opts.stat.mtime);
}
if(fileMeta.stat.size == opts.stat.size && fileMeta.stat.mtime.getTime() == opts.stat.mtime.getTime()) {
return fileMeta.filepath;
}
if(method == 'stat') {
this.data[inputFilePath].stat = fs.statSync(inputFilePath);
} else {
this.data[inputFilePath][method] = Cache.hash(method, fs.readFileSync(inputFilePath));
}
return false;
}
this.data[inputFilePath].taskResults[taskHash] = { path: cacheFilePath };
this.save();
};
function clear(opts) {
var metaPath = opts.cachepath + '/meta.json',
meta = loadMeta(opts);
Object.keys(meta).forEach(function(filepath) {
removeCached(meta, filepath);
});
metaCache[metaPath] = {};
saveMeta(opts);
function statEqual(actual, expected) {
if(!actual || !expected) {
return false;
}
var a = (actual.mtime instanceof Date ? actual.mtime : new Date(actual.mtime)),
b = (expected.mtime instanceof Date ? expected.mtime : new Date(expected.mtime));
return actual.size == expected.size && a.getTime() == b.getTime();
}
// execute:
// cache({ cachepath: ..., filepath: ..., stat: ... }, tasks, done)
module.exports = function(opts, tasks, done) {
var cacheFile = lookup(opts), last, cacheName;
if(cacheFile) {
last = { stdout: fs.createReadStream(cacheFile) };
// attach done
if(typeof done == 'function') {
last.stdout.once('end', done);
}
if(typeof opts.onHit == 'function') {
opts.onHit();
}
// stream from cache
return last;
Cache.prototype.lookup = function(inputFilePath, taskHash) {
var method = this.opts.method || 'stat',
cacheMeta = this.data[inputFilePath];
// {
// inputFilePath: {
// stat: (expected stat meta)
// md5: (expected hash meta)
//
// taskResults: {
// taskHash: {
// path: (path in cache for this task)
// }
// }
// }
// }
// if:
// 1) the input file file path or
// 2) the cache metadata for the file or
// 3) the input task hash
// is missing no cached values can be fetched
if(!inputFilePath || !cacheMeta || !taskHash) {
return false;
}
if(typeof opts.onMiss == 'function') {
opts.onMiss();
var inputFileChanged = true; // assume changed
// has the file changed?
// console.log('Cache lookup!', method, cacheMeta);
if(method == 'stat') {
inputFileChanged = !statEqual(fs.statSync(inputFilePath), cacheMeta.stat);
} else if(cacheMeta[method]) {
inputFileChanged = (cacheMeta[method] != Cache.hash(method, fs.readFileSync(inputFilePath)));
}
// generate a new file name
cacheName = opts.cachepath + '/' + Math.random().toString(36).substring(2);
while(fs.existsSync(cacheName)) {
cacheName = opts.cachepath + '/' + Math.random().toString(36).substring(2);
// if the input file changes, then all the cached values are invalidated
if(inputFileChanged) {
this.junk(inputFilePath);
return false;
}
// return the runner result, but pipe it to the cache file first
last = runner({ stdout: fs.createReadStream(opts.filepath) }, tasks, function() {
// update the metadata cache
var meta = loadMeta(opts);
meta[opts.filepath] = {
stat: opts.stat || fs.statSync(opts.filepath),
filepath: cacheName
};
// store options
if(opts.options) {
meta[opts.filepath].options = JSON.parse(JSON.stringify(opts.options));
}
saveMeta(opts);
done && done();
});
// pipe to the writable stream
last.stdout.pipe(fs.createWriteStream(cacheName));
return last;
// now, search for a cached file that corresponds to the current task hash
if(!cacheMeta.taskResults || !cacheMeta.taskResults[taskHash] || !cacheMeta.taskResults[taskHash].path) {
return false;
}
return (cacheMeta.taskResults[taskHash] ? cacheMeta.taskResults[taskHash].path : false);
};
// lookup
module.exports.lookup = lookup;
Cache.hash = Cache.prototype.hash = function(method, str) {
// method is optional, defaults to md5
if(arguments.length === 1) {
str = method;
method = 'md5';
}
return crypto.createHash(method).update(str).digest('hex');
};
// clear
module.exports.clear = clear;
module.exports = Cache;

@@ -1,38 +0,153 @@

// Takes a input stream and pipes it to a number of tasks
// For example:
// runner({ stdout: fs.createReadStream(filename) }, [ wrapCommonJs ], function() { console.log("done!"); });
// where wrapCommonJs is either:
// - an object with two streams: { stdin: WritableStream, stdout: ReadableStream }
// - or a function that when called returns an object just like above
//
// Note that child_process.spawn() returns exactly the right kind of object for this runner
var fs = require('fs'),
Task = require('./task.js'),
Cache = require('./cache.js');
module.exports = function(input, tasks, done) {
var last = input;
tasks.forEach(function(task) {
var current = task;
// item is either a object (e.g. process) with .stdout/.stdin
// or a function that returns an object
if(typeof current == 'function') {
current = task();
exports.parallel = function(tasks, opts) {
var running = 0,
limit = opts.limit,
flows = [];
if(!opts.cachePath || !opts.cacheMethod) {
throw new Error('Missing required metadata (cachePath and/or cacheMethod).');
return;
}
// TODO: since we have one instance per run, may cause weirdness, might want to
// prevent creating multiple instances of the cache with the same target path
var cache = new Cache({
method: opts.cacheMethod,
path: opts.cachePath
});
// Scan through the input; find flows, replace with read from cache task
tasks = tasks.map(function(task) {
if(task instanceof Task) {
// require the following metadata:
// - .inputFilePath
// - .taskHash
if(!task.inputFilePath || !task.taskHash) {
throw new Error('Missing required metadata (inputFilePath and/or taskHash)');
return;
}
// override the input file name and description from the task description
var cacheFile = cache.lookup(task.inputFilePath, task.taskHash);
// when the cache is disabled, we still use it for intermediate results
// since that enables parallel execution. However, we will never reuse old results.
if(!cacheFile || !opts.cacheEnabled) {
// create the file in the cache folder
cacheFile = cache.filename();
// console.log('Writing transform', task.taskHash, 'from', task.inputFilePath, 'to', cacheFile);
// set the flow output to the cache file
// Note: to avoid allocating file handles early, use a arity-0 function as a wrapper
task.output(function() {
return fs.createWriteStream(cacheFile);
});
task.once('done', function() {
// console.log('Completed transform', task.taskHash, 'from', task.inputFilePath, 'to', cacheFile);
// mark as complete
cache.complete(task.inputFilePath, task.taskHash, cacheFile);
});
// queue the flow
flows.push(task);
} else {
// Flow.emit "hit" -> since we skipped the task
task.emit('hit');
}
// read result from cache file
return function(out, done) {
fs.createReadStream(cacheFile)
.once('close', done)
.pipe(out, { end: false});
};
}
last.stdout.pipe(current.stdin);
// if there is a stderr, pipe that - this avoids issues where the task fails to stderr
// and the stdout is not flushed due to buffering
if(current.stderr) {
current.stderr.pipe(process.stderr);
return task;
});
// Run each flow at specified level of parallelism (into the temp file dir)
function parallel() {
while(running < limit && flows.length > 0) {
var task = flows.shift(),
outName;
running++;
// Flow.emit "miss" -> since we didn't just read it from the cache
task.emit('miss');
// console.log('EXEC', task.tasks.map(function(t) { return t.toString(); } ));
task.once('done', function() {
running--;
if(flows.length > 0) {
// avoid issues caused by deep nesting
process.nextTick(parallel);
} else if(running == 0) {
done();
}
}).exec();
}
}
last = current;
});
if(flows.length === 0) {
done();
} else {
parallel();
}
if(typeof done == 'function') {
last.stdout.once('end', done);
function done() {
var ranDone = false;
// Once all the flows have run, run each the fn(out) in the order specified
// streaming the flow task outputs in the correct order to produce the final file.
if(tasks.length > 0) {
series(tasks.map(function(task) {
return function(onDone) {
// from fn(done) => fn(out, done)
return task(opts.output, onDone);
};
}), function() {
function doneFn() {
if(!ranDone && opts.onDone) {
ranDone = true;
opts.onDone();
}
}
// e.g. process.stdout
if(opts.end !== false) {
// really, avoid closing process.stdout
if(opts.output === process.stdout) {
doneFn();
} else {
opts.output.end();
opts.output.once('close', doneFn);
opts.output.once('finish', doneFn);
}
} else {
doneFn();
}
});
} else if(opts.onDone) {
opts.onDone();
}
}
};
// return the last item (so that the caller can bind
// e.g. to input.stdout.on("end") or pipe to process.stdout
// Piping to process.stdout has to be handled a bit differently since it
// will not emit "end" if part of the pipeline (and should not, since multiple things can pipe to it)
return last;
};
function series(callbacks, last) {
var results = [];
function next() {
var callback = callbacks.shift();
if(callback) {
callback(next);
} else {
last();
}
}
next();
}
{
"name": "minitask",
"version": "0.0.1",
"version": "0.1.0",
"description": "A standard/convention for running tasks over a list of files based around Node core streams2",

@@ -13,3 +13,4 @@ "main": "index.js",

"devDependencies": {
"minilog": "2.x"
"minilog": "2.x",
"readable-stream": "~1.1.8"
},

@@ -31,3 +32,6 @@ "repository": {

"readmeFilename": "readme.md",
"gitHead": "de68d793f30add8746b519c3e0fde3f54d484228"
"gitHead": "de68d793f30add8746b519c3e0fde3f54d484228",
"dependencies": {
"mkdirp": "~0.3.5"
}
}

@@ -7,31 +7,79 @@ # minitask

## Key features
- Provides a decent convention for writing programs that deal with files and/or data streams (with caching, task pipeline specification and parallel execution).
- Makes it easier to combine different ways to express a transformation: allows you to treat synchronous functions, asynchronous functions, child processes and duplex streams as equivalent parts of a stream transformation task.
- Buffering only if necessary. If all tasks are streaming (e.g. duplex / transform streams and child processes from input to output), then no buffering is performed. If a task queue consists of sync / async functions and streams, then buffering is performed automatically at the transitions between different transformation types. It's often easier to prototype using functions; you can rewrite functions into streams to get rid of buffering.
- Caching support: each task's result can be cached; the cached result is reused if the metadata (e.g. file modification date and file size or, alternatively, the md5 of the file) don't change.
- Specifically designed for dealing with the common special case where multiple files are concatenated into one in a specific order, but the subtasks need to be run in parallel.
## Introduction
[grunt](http://gruntjs.com/) is the Javascript task runner that's most popular, but I mostly prefer using makefiles since they require less ceremony.
minitask is a library I wrote for processing tasks on files.
However, sometimes you want to express something as an operation applied to a list of files, while keeping the ability to plug in more tasks via unix pipes and custom functions. That's what this is, a simple convention for working on a list of files using constructs from the Node core.
It is used in several of my libraries, including `gluejs` and `generate-markdown`.
minitask is not a makefile replacement, it is a convention for writing things that apply a bunch of pipes to a list of files.
minitask is based on the observation that most tasks including files can be divided into three phases:
minitask doesn't even define any new APIs (unlike, say, [node-task](https://github.com/node-task/spec/wiki), which is destined to become grunt's next set of internals which seems to implement their own (!) synchronous (!!) version of core streams). In minitask, everything is just based on using [Node core streams](http://nodejs.org/api/stream.html) in a specific way and structuring your code into reusable tasks. The minitask repo is a bunch of functions that support those conventions.
1. Directory iteration. This is where some set of input files are selected based on user input (e.g. via the command line or config options). Some files may be excluded from the list.
2. Task queuing. Given a list of included files, some tasks are queued for each file (e.g. based on their file extensions).
3. Task execution. The tasks are executed in parallel or sequentially, and the output is potentially cached and written out to stdout or a file.
## The first step: creating and annotating the list of files
When you try to do all these in one go (e.g. at the same time as you are iterating directories), things get messy. It's a lot easier to work with fully built directory/file metadata structures separately from the include/exclude logic; and easier to reason about execution order separate from task queueing.
Each minitask starts with a list of files, which simply an object that looks like this:
It should be easy to specify tasks as sequences of transformations on a stream. While duplex streams are cool, expressing simple tasks like wrapping a stream in a string is [quite tedious](http://nodejs.org/api/stream.html#stream_example_simpleprotocol_parser_v2) if you need to wrap it in a duplex stream class. To be fair, the core Transform stream is probably the best one can do if you want to express something as a stream; however, many 3rd party transforms are not streaming or are easier to express as computations over a whole - e.g. convert a file's content to markdown, or wrap a file in a particular template.
Furthermore, Node's `child_process` API returns [something that's not quite a duplex stream](http://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options), though it has `stdin` and `stdout`. It should be possible to write functions, child_process pipes and tasks involving duplex streams/transform streams without worrying about the details of buffering and piping everything together.
Finally, during task execution, it is useful to be able to treat each set of transformations on a file individually and in an abstract manner. This allows a queue of tasks to be executed at some specific level of parallelism. It also makes it possible to implement a fairly generic caching mechanism, which simply redirects the input into a cache file while still producing the expected output.
All in all, this makes writing things that operate on files nicer without becoming overly burdensome.
## List API: Reading input directories
The `List` class only has one method: `add(path)`. For example:
var List = require('minitask').list,
files = new List();
files.add(path.resolve(process.cwd(), './foo'));
If the path is a directory, then it is iterated recursively.
Note that there is no "exclude" - the idea is that you exclude things in postprocessing rather than trying to build in a lot of complicated exclusion logic during iteration.
This produces an object with at `.files` property, which looks like this:
{
files: [
{ name: '/full/path/to/file.js' }
{
name: '/full/path/to/file.js',
stat: { ... fs.Stat object ... }
}
]
}
The minitask core API has a file iterator that can build these lists for consumption, given path specifications as inputs.
Each file is annotated with a `fs.Stat` object, since you'll need that information anyway to distinguish between directories and files when iterating over directories..
This array of files is then filtered an annotated using list tasks, which are functions. For example, `filter-git.js`:
### Example: List filtering
Exclusions are applied by filtering out items from the list. For example, `filter-regex.js`:
````javascript
// filter-git-directories: a list task that filters out .git directories from the list
module.exports = function(list) {
list.files = list.files.filter(function(item) {
return !item.name.match(new RegExp('/\.git/'));
// Filter out files from a list by a blacklist of regular expressions
module.exports = function(list, expressions) {
list.files = list.files.filter(function(file) {
var name = file.name,
matchedExpr,
match = expressions.some(function(expr) {
var result = name.match(expr);
if(result) {
matchedExpr = expr;
}
return result;
});
if(match) {
console.log('Excluded by regexp ', matchedExpr, ':', name);
}
return !match;
});

@@ -41,170 +89,233 @@ };

List tasks are basically any tasks that include / exclude or otherwise work on metadata.
Which might be applied like this:
To add metadata, you should add properties either to each file, or to the list object itself. For example, `annotate-stat.js`:
````javascript
var fs = require('fs');
// This task adds a .stat property to every file in the list
module.exports = function(list) {
list.files.forEach(function(item, i) {
list.files[i].stat = fs.statSync(item.name);
});
};
var filterRegex = require('../lib/list-tasks/filter-regex.js');
// where `list` is an instance of List
filterRegex(list, [ new RegExp('\/dist\/'), new RegExp('[-.]min.js$') ]);
````
The key benefit of separating tasks such as filtering and annotating metadata into a step that occurs after the list of files is created is that it makes those tasks easier to reuse and test. Previously, I would perform filtering at the same time as I was reading in the file tree. The problem with doing both filtering and file tree iteration is that you end up with some unchangeable filtering logic that's embedded inside your file iterator.
Since filtering is a operation that's separate from reading in the initial tree, it's much easier to see and configure what gets excluded and to define new metadata -related operations. These tasks also becomes easier to reuse and test (no file I/O involved). No unchangeable filtering logic gets embedded into the directory iteration code.
Having your filtering and annotation embedded in the file iterator gets really annoying in some cases: for example, for [gluejs](http://mixu.net/gluejs/) there are multiple filtering rules: package.json files, .npmignore files and user-specified rules. Those were applied in various separate components that basically excluded some paths from traversal based on custom logic.
## Task API: Defining tasks on input files
Rather than special casing and doing two things at the same time, with minitask you read in a file tree and then all filters work on the same structure: an array of paths with metadata. Since filtering is a operation that's separate from reading in the initial tree, it's much easier to see and configure what gets excluded and to define new metadata -related operations.
Here, we are defining tasks that operate on input streams. These are generated by iterating over the file metadata in some appropriate manner.
## Defining tasks that operate on files (= streams)
The task queueing function is a function that takes a `List` as a first argument and produces task arrays.
File tasks are the other type of task.
There is one "master queue" into which each file processing task gets added. In phase 3, that queue is cleared by running it in parallel or sequentially.
There are three different alternatives, corresponding to different native APIs:
As I stated earlier, it should be possible to write functions, child_process pipes and tasks involving duplex streams/transform streams without worrying about the details of buffering and piping everything together. This is what the `Task` class does.
- streams: returning an object with { stdout: ..., stdin: ... }
- async calls: returning a function of arity 2: function(onEach, onDone) {}
For example, here I am applying four transformations on a stream, each specified in a different manner (sync fn, async fn, child process, duplex stream):
````javascript
var flow = new Task([
// sync function
function (input) {
return 'bb' + input.trim() + 'bb';
}),
// async function
function (input, done) {
setTimeout(function() {
done(null, 'c' + input.trim() + 'c');
}, 10);
},
// spawned child process
function() {
var spawn = require('child_process').spawn;
return spawn('wc', [ '-c']);
},
// duplex stream (not showing the details on how you can write these;
// see http://nodejs.org/api/stream.html#stream_class_stream_transform
// for the details)
function() {
return new Duplex();
}
]);
````
They use the Node 0.10.x stream interface based on a convention that makes using child_process.spawn particularly easy:
This unified interface means that you don't need to worry about how your transformation is implemented, as long as it follows one of the four forms above, the Task class will take care of calling the right functions (`pipe` / `write` / `read`) and it takes care of buffering when transitioning between streams and functions.
Also:
- any 3rd party code that implements on `stream.Transform` is immediately usable
- any external tool that reads from `stdin` and writes to `stdout` is immediately usable
There is a reason why tasks are functions. This is so that we don't create instances of streams until they are executed. Otherwise, you can easily run out of resources - for example, if you spawn a new task for every file immediately.
The input and output can be strings or streams:
````javascript
// uglify-task: runs uglify
var spawn = require('child_process').spawn;
module.exports = function(options) {
var task = spawn('uglifyjs', ['--no-copyright']);
task.on('exit', function(code) {
task.emit('error', 'Child process exited with nonzero exit code: '+ code);
});
return task;
};
// from string input to string output
flow.input('AA')
.output(function(output) {
console.log(output);
}).exec();
// from stream input to stream output
flow.input(fs.createReadStream('./foo.txt'))
.output(fs.createWriteStream('./bar.txt'))
.exec();
````
You have to return:
API:
- an object with two streams: { stdin: WritableStream, stdout: ReadableStream }
- or a function that when called returns an object with the stdin and stdout properties
- `new Task(tasks)`: creates a new flow with the given tasks
- `.input(string | ReadableStream)`:
- `.output(fn | WritableStream) `:
Note that child_process.spawn() returns exactly the right kind of object.
A small note on Node 0.8 and stream instances: Passing a stream to `.input()` automatically calls `.pause()` on that stream. This is because the event handlers are only attached when `.exec` is called; Node (0.8) may prematurely start emitting data if not paused. If you're instantiating the writable streams at a much earlier point in time, make sure you call `pause()` on them.
The key here is that every file task is a Node 0.10.x stream. Streams are easy to compose together via pipe(), and all I/O objects in Node are streams. This makes it easy to compose file tasks and to redirect them to different places.
Events:
If you're doing a JS-based stream transformation, then you can return a instance of Node core's [stream.Transform](stream.Transform) duplex stream, wrapped to look like a process:
- `exec`: emitted when exec is called
- `done`: emitted when done
````javascript
// use readable-stream to use Node 0.10.x streams in Node 0.8.x
var Transform = require('readable-stream').Transform;
Events that are only emitted if a cache is used
function Wrap(options) {
Transform.call(this, options);
this.first = true;
}
- `hit`: function to run when cache hit (useful for reporting on how many files were fetched from the cache).
- `miss`: function to run when cache miss
// this is just the recommended boilerplate from the Node core docs
Wrap.prototype = Object.create(Transform.prototype, { constructor: { value: Wrap }});
## Cache API: storing
Wrap.prototype._transform = function(chunk, encoding, done) {
if(this.first) {
this.push('!!');
this.first = false;
}
this.push(chunk);
done();
};
File processing tasks such as package builds and metadata reads are often run multiple times. It is useful to cache the output from these tasks and only re-run the processing when a file has changed. GNU Make, for example, relies on dependency resolution + file last modified timestamps to skip work where possible.
Wrap.prototype._flush = function(done) {
this.push('!!');
done();
};
A cacheable task is any task that reads a specific file path and writes to a writable stream at the end.
module.exports = function(options) {
var instance = new Wrap(options);
// since it's a duplex stream, let the stdin and stdout point to the same thing
return {
stdin: instance,
stdout: instance
};
};
````
The caching system can either use a md5 hash, or the last modified+file size information to determine whether a task needs to be re-run. Additionally, an options hash can be passed to take into account different additional options.
This also means that any 3rd party code that implements on `stream.Transform` is immediately usable with just a wrapping function that creates a new instance.
When the caching system is used, the task output is additionally written to a separate file. The assumption here is that each file task (with a task options hash and input md5) performs the same deterministic transformation. When the current input file's md5 and task options hash match, then the previously written cached result is streamed directly rather than running the full stack of transformations.
## Running tasks
The cache is separate from the task, and has the following API:
The last piece of minitask is the runner.
- `new Cache(options)`: creates a new cache
- `.global()`: returns the global default cache, which lives in the user's home folder. This is a function to avoid initializing the cache when it is never used. Multiple calls will return the same instance.
- `.lookup(filename, operationString)`: returns a file name; operationString is a unique descriptor for the cache (e.g. the options used)
- `.clear()`: clears the cache complely
- `.filename()`: returns a new output file name that is in the cache
- `.complete(cacheFilename, filename, operationString)`: marks a given output file as completed and saves the cache metadata
The runner is the last task, it is responsible for using list tasks and file tasks to achieve whatever it wants. There are no strong requirements here; it's not worth it to really try to standardize the runner in my opinion - the overhead of dealing with some kind of standard for expressing a workflow is less than the benefits of reuse. Whatever can be reused should be extracted into file tasks and list tasks and the runner is everything that can't be reused.
Options is a object with the following properties:
The first parameter is the list structure of files, without any filters or tasks applied to it.
- `filepath`: Full path to the input file.
- `cachepath`: the cache directory path. A directory where to store the cached results (a metadata file and a cached version are stored)
- `method` (optional, default `stat`): the method to use.
- `stat`: calls `fs.stat` on the input file path; the cached version is used if the file size and date last modified have not changed.
- `md5` | `sha1` | `sha256` | `sha512`: reads the input file in full and calculates the given hash using Node's crypto; this uses openSSL so it is still quite fast.
- `options` (optional): a description of the options used for this task. You need to know something about the operation which is being applied, otherwise two different tasks on the same input file would share the same cache result. If you're just applying one set of tasks per file, then just pass whatever global options were used here.
````javascript
// serve-index:
var http = require('http');
### TODO
module.exports = function(list, options) {
http.createServer(function(req, res) {
if(req.url == '/') {
res.end('<html><ul><li>'+ tree.files.join('</li><li>') +'</li></ul></html>');
} else {
res.end('Unknown: ' + req.url);
Refactor to:
- appHash <= new
- inputFilePath <= filepath
- taskHash <= options
Add better handling of multiple cached files:
{
inputFilePath: {
stat: (expected stat meta)
md5: (expected hash meta)
taskResults: {
taskHash: {
path: (path in cache for this task)
}
}
}
}
}).listen(8000).on('listening', function() {
console.log('Listening on localhost:8000');
});
};
````
The runner is king, it gets to decide what to do with the tree and options it's supplied.
This means:
## API docs
- supporting multiple apps
- supporting a single version of a input file (e.g. invalidate all if the input file changes)
- supporting multiple cached tasks per input file (e.g. keep adding as long as the task is different but the file is the same)
The minitask core basically defines a set of helpers that support these convetions:
Example:
- `list.js` is the thing that iterates paths and returns a file list array for further consumption
- `runner.js` is a function that applies a set of file tasks on a readable stream and returns a writable stream
var Cache = require('minitask').Cache;
TODO: document the list
var cache = new Cache({
path: __dirname + '/cache',
method: 'stat'
}),
fileName = __dirname + '/example.txt',
taskHash = Cache.hash(JSON.stringify({ foo: 'bar'}));
TODO: specify how the list should be annotated with tasks
var cacheFile = cache.lookup(fileName, taskHash);
if(cacheFile) {
// ... read result from cache file
} else {
// create the file in the cache folder
cacheFile = cache.filename();
// ... perform processing, pipe result to the cache file
// mark as complete
cache.complete(fileName, taskHash, cacheFile);
}
### Runner API
## 3. Running tasks
The runner is a helper method that takes an input stream (e.g. an object { stdout: ... }), an array of tasks and a done function. It instantiates tasks if necessary, and pipes the tasks together, and ensures that the last task in the pipeline calls the done function.
In phase 2, some custom workflow creates `Task` instances.
Usage example:
Each task instance is a pipe from a input file (ReadableStream) to a chain of transformations which produces some output.
var runner = require('minitask').runner,
tasks = [ fileTask, ... ];
If the tasks are independent, then running them is simple: just use any regular asynchronous concurrency control library that allows you to run each task.
var last = runner({ stdout: fs.createReadStream(filename) }, tasks, function() {
console.log('done');
});
// need to do this here so we can catch the second-to-last stream's "end" event;
last.stdout.pipe(process.stdout, { end: false });
The runner is only designed for cases where you are producing a single output out of many streams. One example is a packaging system, which produces a single output file out of several files.
The runner accepts a (linear) array of:
## Caching
- Task objects and
- functions that write into a stream
File processing tasks such as package builds and metadata reads are often run multiple times. It is useful to cache the output from these tasks and only re-run the processing when a file has changed. GNU Make, for example, relies on dependency resolution + file last modified timestamps to skip work where possible.
For example:
A cacheable task is any task that reads a specific file path and writes to a writable stream at the end.
// running a set of concatenated tasks
runner.concat(fs.createWriteStream('./tmp/concatenated.txt'), [
function(out, done) {
out.write('// begin \n');
done();
},
new Flow([ tasks ]).input(file),
new Flow([ tasks ]).input(file2),
function(out) {
out.write('// end \n');
done();
},
], {
limit: 16
})
The caching system can either use a md5 hash, or the last modified+file size information to determine whether a task needs to be re-run. Additionally, an options hash can be passed to take into account different additional options.
How is this executed?
When the caching system is used, the task output is additionally written to a separate file. The assumption here is that each file task (with a task options hash and input md5) performs the same deterministic transformation. When the current input file's md5 and task options hash match, then the previously written cached result is streamed directly rather than running the full stack of transformations.
- First, the runner scans through the input and finds each flow
- Next, it replaces each flow with a "read from file" task; where the file is the temp file or cache file
- Next, it runs each flow at the specified level of parallelism, directing the output into the cache or a temp file
- Once all the task flows have run, it creates a new writable stream, runs each function(out) in the order specified, streaming the flow task outputs in the correct order to produce the final file.
### Cache API
When the tasks are concatenated: to enable greater parallelism (than level one, where each task is executed serially), the tasks need to written out to disk or memory. If two tasks are running concurrently and writing into process.stdout, then their outputs will be interspersed. This is why most task execution systems can only run one task at a time and a key limitation of many of the earlier designs I did for command line tools.
The cache API looks a lot like the runner API, but it requires an explicit file path and options hash.
Writing out to disk isn't that bad; it also enables caching.
var last = cache({ filepath: filepath, cachepath: ..., md5: ..., stat: ..., options: ... }, tasks, function() {
## Cache options
});
The method is controlled by: `cacheMethod`
Cache lookups are based on:
## Command line tool
- cachePath: (or application identifier)
- task.inputFilePath: (unique input)
- task.taskHash: (unique transformation pipeline description in the context of the app and input)
## Task extras when using the runner
Events that are only emitted if a cache is used
- `hit`: function to run when cache hit (useful for reporting on how many files were fetched from the cache).
- `miss`: function to run when cache miss
These are emitted as the task running starts, e.g. 'hit' if we use the cached version, 'miss' if we have to exec the task.
var fs = require('fs'),
assert = require('assert'),
cache = require('minitask').cache;
Cache = require('minitask').Cache;
var opt = {
cachepath: __dirname+'/cache',
filepath: __dirname+'/fixtures/bar.txt',
stat: fs.statSync(__dirname+'/fixtures/bar.txt')
};
var cache = null;
exports['cache tests'] = {
before: function() {
cache = new Cache({
path: __dirname+'/cache'
});
},
beforeEach: function() {
cache.clear(opt);
cache.clear();
},
'can look up a cached item by fs.stat': function(done) {
assert.ok(!cache.lookup(opt));
'can look up a cached item by fs.stat': function() {
assert.ok(!cache.lookup(__dirname+'/fixtures/bar.txt', 'simple'));
// create the file in the cache folder
var last = cache(opt, [], function() {
assert.ok(cache.lookup(opt));
done();
});
last.stdout.pipe(process.stdout, { end: false });
cacheFile = cache.filename();
fs.writeFileSync(cacheFile, 'foo');
// mark as complete
cache.complete(__dirname+'/fixtures/bar.txt', 'simple', cacheFile);
assert.ok(cache.lookup(__dirname+'/fixtures/bar.txt', 'simple'));
},
'can store a execution result and reuse it': function(done) {
var last = cache(opt, [], function() {
assert.ok(cache.lookup(opt));
// run a second time
var second = cache(opt, [
function() {
throw new Error('Cache reuse failed!');
}
], function() {
done();
});
second.stdout.pipe(process.stdout, { end: false });
'can look up a cached item by md5': function() {
cache = new Cache({
path: __dirname+'/cache',
method: 'md5'
});
last.stdout.pipe(process.stdout, { end: false });
fs.writeFileSync(__dirname+'/fixtures/hash.txt', 'first');
assert.ok(!cache.lookup(__dirname+'/fixtures/hash.txt', 'test2'));
// create the file in the cache folder
cacheFile = cache.filename();
fs.writeFileSync(cacheFile, 'foo');
// mark as complete
cache.complete(__dirname+'/fixtures/hash.txt', 'test2', cacheFile);
assert.ok(cache.lookup(__dirname+'/fixtures/hash.txt', 'test2'));
// change the input file => invalidate
fs.writeFileSync(__dirname+'/fixtures/hash.txt', 'second');
assert.ok(!cache.lookup(__dirname+'/fixtures/hash.txt', 'test2'));
},
'when the execution result does not match, it is not reused': function(done) {
opt.options = {
foo: 'foo',
bar: 'bar'
};
var last = cache(opt, [], function() {
assert.ok(cache.lookup(opt));
// order of definition should not matter
opt.options = {
bar: 'bar',
foo: 'foo'
};
assert.ok(cache.lookup(opt));
// when options are changed, the lookup is invalidated
opt.options = {
foo: 'bar',
bar: 'foo'
};
assert.ok(!cache.lookup(opt));
done();
'when the execution result does not match, it is not reused': function() {
cache = new Cache({
path: __dirname+'/cache',
method: 'stat'
});
last.stdout.pipe(process.stdout, { end: false });
var taskHash = 'test3';
// create the file in the cache folder
cacheFile = cache.filename();
fs.writeFileSync(cacheFile, 'foo');
// mark as complete
cache.complete(__dirname+'/fixtures/hash.txt', taskHash, cacheFile);
assert.ok(cache.lookup(__dirname+'/fixtures/hash.txt', taskHash));
// when the task hash is changed, the lookup is invalidated
taskHash = 'test4';
assert.ok(!cache.lookup(__dirname+'/fixtures/hash.txt', taskHash));
}

@@ -67,0 +70,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc