Product
Socket Now Supports uv.lock Files
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
A standard/convention for running tasks over a list of files based around Node core streams2
A standard/convention for running tasks over a list of files based around Node core streams2.
Compatible with Node 0.8.x as well thanks to readable-stream by isaacs.
minitask is a library I wrote for processing tasks on files.
It is used in several of my libraries, including gluejs
and generate-markdown
.
minitask is based on the observation that most tasks including files can be divided into three phases:
When you try to do all these in one go (e.g. at the same time as you are iterating directories), things get messy. It's a lot easier to work with fully built directory/file metadata structures separately from the include/exclude logic; and easier to reason about execution order separate from task queueing.
It should be easy to specify tasks as sequences of transformations on a stream. While duplex streams are cool, expressing simple tasks like wrapping a stream in a string is quite tedious if you need to wrap it in a duplex stream class. To be fair, the core Transform stream is probably the best one can do if you want to express something as a stream; however, many 3rd party transforms are not streaming or are easier to express as computations over a whole - e.g. convert a file's content to markdown, or wrap a file in a particular template.
Furthermore, Node's child_process
API returns something that's not quite a duplex stream, though it has stdin
and stdout
. It should be possible to write functions, child_process pipes and tasks involving duplex streams/transform streams without worrying about the details of buffering and piping everything together.
Finally, during task execution, it is useful to be able to treat each set of transformations on a file individually and in an abstract manner. This allows a queue of tasks to be executed at some specific level of parallelism. It also makes it possible to implement a fairly generic caching mechanism, which simply redirects the input into a cache file while still producing the expected output.
All in all, this makes writing things that operate on files nicer without becoming overly burdensome.
The List
class only has one method: add(path)
. For example:
var List = require('minitask').list,
files = new List();
files.add(path.resolve(process.cwd(), './foo'));
If the path is a directory, then it is iterated recursively.
Note that there is no "exclude" - the idea is that you exclude things in postprocessing rather than trying to build in a lot of complicated exclusion logic during iteration.
This produces an object with at .files
property, which looks like this:
{
files: [
{
name: '/full/path/to/file.js',
stat: { ... fs.Stat object ... }
}
]
}
Each file is annotated with a fs.Stat
object, since you'll need that information anyway to distinguish between directories and files when iterating over directories..
Exclusions are applied by filtering out items from the list. For example, filter-regex.js
:
// Filter out files from a list by a blacklist of regular expressions
module.exports = function(list, expressions) {
list.files = list.files.filter(function(file) {
var name = file.name,
matchedExpr,
match = expressions.some(function(expr) {
var result = name.match(expr);
if(result) {
matchedExpr = expr;
}
return result;
});
if(match) {
console.log('Excluded by regexp ', matchedExpr, ':', name);
}
return !match;
});
};
Which might be applied like this:
var filterRegex = require('../lib/list-tasks/filter-regex.js');
// where `list` is an instance of List
filterRegex(list, [ new RegExp('\/dist\/'), new RegExp('[-.]min.js$') ]);
Since filtering is a operation that's separate from reading in the initial tree, it's much easier to see and configure what gets excluded and to define new metadata -related operations. These tasks also becomes easier to reuse and test (no file I/O involved). No unchangeable filtering logic gets embedded into the directory iteration code.
Here, we are defining tasks that operate on input streams. These are generated by iterating over the file metadata in some appropriate manner.
The task queueing function is a function that takes a List
as a first argument and produces task arrays.
There is one "master queue" into which each file processing task gets added. In phase 3, that queue is cleared by running it in parallel or sequentially.
As I stated earlier, it should be possible to write functions, child_process pipes and tasks involving duplex streams/transform streams without worrying about the details of buffering and piping everything together. This is what the Task
class does.
For example, here I am applying four transformations on a stream, each specified in a different manner (sync fn, async fn, child process, duplex stream):
var flow = new Task([
// sync function
function (input) {
return 'bb' + input.trim() + 'bb';
}),
// async function
function (input, done) {
setTimeout(function() {
done(null, 'c' + input.trim() + 'c');
}, 10);
},
// spawned child process
function() {
var spawn = require('child_process').spawn;
return spawn('wc', [ '-c']);
},
// duplex stream (not showing the details on how you can write these;
// see http://nodejs.org/api/stream.html#stream_class_stream_transform
// for the details)
function() {
return new Duplex();
}
]);
This unified interface means that you don't need to worry about how your transformation is implemented, as long as it follows one of the four forms above, the Task class will take care of calling the right functions (pipe
/ write
/ read
) and it takes care of buffering when transitioning between streams and functions.
Also:
stream.Transform
is immediately usablestdin
and writes to stdout
is immediately usableThere is a reason why tasks are functions. This is so that we don't create instances of streams until they are executed. Otherwise, you can easily run out of resources - for example, if you spawn a new task for every file immediately.
The input and output can be strings or streams:
// from string input to string output
flow.input('AA')
.output(function(output) {
console.log(output);
}).exec();
// from stream input to stream output
flow.input(fs.createReadStream('./foo.txt'))
.output(fs.createWriteStream('./bar.txt'))
.exec();
API:
new Task(tasks)
: creates a new flow with the given tasks.input(string | ReadableStream)
:.output(fn | WritableStream)
:A small note on Node 0.8 and stream instances: Passing a stream to .input()
automatically calls .pause()
on that stream. This is because the event handlers are only attached when .exec
is called; Node (0.8) may prematurely start emitting data if not paused. If you're instantiating the writable streams at a much earlier point in time, make sure you call pause()
on them.
Events:
exec
: emitted when exec is calleddone
: emitted when doneEvents that are only emitted if a cache is used
hit
: function to run when cache hit (useful for reporting on how many files were fetched from the cache).miss
: function to run when cache missFile processing tasks such as package builds and metadata reads are often run multiple times. It is useful to cache the output from these tasks and only re-run the processing when a file has changed. GNU Make, for example, relies on dependency resolution + file last modified timestamps to skip work where possible.
A cacheable task is any task that reads a specific file path and writes to a writable stream at the end.
The caching system can either use a md5 hash, or the last modified+file size information to determine whether a task needs to be re-run. Additionally, an options hash can be passed to take into account different additional options.
When the caching system is used, the task output is additionally written to a separate file. The assumption here is that each file task (with a task options hash and input md5) performs the same deterministic transformation. When the current input file's md5 and task options hash match, then the previously written cached result is streamed directly rather than running the full stack of transformations.
The cache is separate from the task, and has the following API:
new Cache(options)
: creates a new cache.global()
: returns the global default cache, which lives in the user's home folder. This is a function to avoid initializing the cache when it is never used. Multiple calls will return the same instance..lookup(filename, operationString)
: returns a file name; operationString is a unique descriptor for the cache (e.g. the options used).clear()
: clears the cache complely.filename()
: returns a new output file name that is in the cache.complete(cacheFilename, filename, operationString)
: marks a given output file as completed and saves the cache metadataOptions is a object with the following properties:
filepath
: Full path to the input file.cachepath
: the cache directory path. A directory where to store the cached results (a metadata file and a cached version are stored)method
(optional, default stat
): the method to use.
stat
: calls fs.stat
on the input file path; the cached version is used if the file size and date last modified have not changed.md5
| sha1
| sha256
| sha512
: reads the input file in full and calculates the given hash using Node's crypto; this uses openSSL so it is still quite fast.options
(optional): a description of the options used for this task. You need to know something about the operation which is being applied, otherwise two different tasks on the same input file would share the same cache result. If you're just applying one set of tasks per file, then just pass whatever global options were used here.Refactor to:
Add better handling of multiple cached files:
{
inputFilePath: {
stat: (expected stat meta)
md5: (expected hash meta)
taskResults: {
taskHash: {
path: (path in cache for this task)
}
}
}
}
This means:
Example:
var Cache = require('minitask').Cache;
var cache = new Cache({
path: __dirname + '/cache',
method: 'stat'
}),
fileName = __dirname + '/example.txt',
taskHash = Cache.hash(JSON.stringify({ foo: 'bar'}));
var cacheFile = cache.lookup(fileName, taskHash);
if(cacheFile) {
// ... read result from cache file
} else {
// create the file in the cache folder
cacheFile = cache.filename();
// ... perform processing, pipe result to the cache file
// mark as complete
cache.complete(fileName, taskHash, cacheFile);
}
In phase 2, some custom workflow creates Task
instances.
Each task instance is a pipe from a input file (ReadableStream) to a chain of transformations which produces some output.
If the tasks are independent, then running them is simple: just use any regular asynchronous concurrency control library that allows you to run each task.
The runner is only designed for cases where you are producing a single output out of many streams. One example is a packaging system, which produces a single output file out of several files.
The runner accepts a (linear) array of:
For example:
// running a set of concatenated tasks
runner.concat(fs.createWriteStream('./tmp/concatenated.txt'), [
function(out, done) {
out.write('// begin \n');
done();
},
new Flow([ tasks ]).input(file),
new Flow([ tasks ]).input(file2),
function(out) {
out.write('// end \n');
done();
},
], {
limit: 16
})
How is this executed?
When the tasks are concatenated: to enable greater parallelism (than level one, where each task is executed serially), the tasks need to written out to disk or memory. If two tasks are running concurrently and writing into process.stdout, then their outputs will be interspersed. This is why most task execution systems can only run one task at a time and a key limitation of many of the earlier designs I did for command line tools.
Writing out to disk isn't that bad; it also enables caching.
The method is controlled by: cacheMethod
Cache lookups are based on:
Events that are only emitted if a cache is used
hit
: function to run when cache hit (useful for reporting on how many files were fetched from the cache).miss
: function to run when cache missThese are emitted as the task running starts, e.g. 'hit' if we use the cached version, 'miss' if we have to exec the task.
FAQs
A standard/convention for running tasks over a list of files based around Node core streams2
The npm package minitask receives a total of 290 weekly downloads. As such, minitask popularity was classified as not popular.
We found that minitask demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
Research
Security News
Socket researchers have discovered multiple malicious npm packages targeting Solana private keys, abusing Gmail to exfiltrate the data and drain Solana wallets.
Security News
PEP 770 proposes adding SBOM support to Python packages to improve transparency and catch hidden non-Python dependencies that security tools often miss.