@blitzjs/file-pipeline
File Transformation Pipeline
Design goals
- Stages: Related logic needs to live together
- Everything is a stream
- Efficiency
- Parallel processing
- Cleaner Architecture for Dirty restart functionality
- Agnostic input file watcher / glob
- Simplify tests
This package provides a gulp based pipeline for transforming files from a source to a destination across a series of stages.
import {transformFiles} from '@blitzjs/file-pipeline'
import through from 'through2'
import File from 'vinyl'
const myStage = () => ({
stream: through.obj((file:File, _, next) => {
next(null, file)
})
})
const mySecondStage = () => ({
stream: through.obj((file:File, _, next) => {
next(null, file)
})
})
const src = '/path/to/src'
const stages = [
myStage,
mySecondStage
]
const dest = '/path/to/dest'
const options = {
watch:true,
ignore: [],
include: ['**/*']
bus: someTransformStream
}
transformFiles(src, stages, dest, options),
Stages
Stages are how you provide special behaviour to your file-pipeline.
The anatomy of your stage looks like this:
function myStage({
// Stage config holds the basic info you need for the stage
config: {
// src folder
src,
// dest folder
dest,
// current working directory
cwd,
// include globs
include,
// ignore globs
ignore,
// if we are in watch mode
watch,
},
// Input writable stream - use input.write(file) to send a file the input of the pipeline
input,
// Event bus stream - use this to send events to listeners within and outside of the pipeline
bus,
// Get the input cache.
// This is an object that contains cached objects for all the files ingested.
// Use this for things that require lists of files
getInputCache,
}: StageArgs) {
const stream = createSomeKindOfTransformStream()
const ready = {foo: "This will appear in the object returned by transformation promise"}
return {stream, ready}
}
Why Streams?
Initially, Blitz will be used by people with small projects however as the number files and throughput increases we will need to use an architecture that allows for large parallel throughput with low memory consumption. Node is built on streams as a primitive so it makes sense to utilize what is available. The Gulp ecosystems provide several tools for managing streams of files so that makes sense to use those tools when available. Because refactoring to streams later would be extremely difficult and painful not starting with streams would be a design mistake.
Why not RxJS?
RxJS could be a good match for streaming architectures and introduces some really powerful tools for managing stream operations. As we are using object streams it would also possibly simplify some of the boilerplate using RxJS. However, certain operators in RxJS can be inapproachable for newer developers and tend to encourage too much abstraction. It is also an extra dependency that increases the learning surface of the codebase and as we are stuck with understanding basic node streams, in any case, it makes sense to avoid RxJS until absolutely necessary.
File Transform Pipeline
Stream helpers
So Node streams are a little incompatible on old versions of Node and there are a few compatibility libs we are using to help us work with streams.
https://www.freecodecamp.org/news/rxjs-and-node-8f4e0acebc7c/
Helper Libs
A good way to work with streams
A pattern we have found that works well is using a constructor function to accept connectors and return a stream as well as any shared data you need to provide to other components connectors. You will see this a lot around the synchronizer.
type CreatorFn = ConnectionsOrConfig => StreamAsWellAsSharedData
An example might look like this:
const source = agnosticSource({cwd: src, include, ignore, watch})
pipe(source.stream, fileTransformPipeline)
The reason we don't just return a stream is that often we need to return other data and share it elsewhere, for example, to analyze input file structure in the pages rule we use a file cache.
const fileCache = createFileCache(config)
const pageRule = createPageRule(fileCache.cache)
pipeline(
fileCache.stream,
pageRule.stream,
)
View rendering and error handling
The cli view is provided by a stream which accepts Events which it manages and displays. This is responsible for rendering stuff to the view.
If you push an Error to the transform stream next(new Error)
an Error Event will be sent over the event bus. This can get handled by some event handling logic in your Stage.
Evented Vinyl Files
Evented Vinyl Files are Vinyl Files with events attached to them
const isDelete = (file) => file.isNull() && file.event === "unlink"
return new Vinyl({
path: "/path/to/foo",
content: null,
event: "unlink",
})
new Vinyl({
path: "/path/to/foo",
content: someContentStream,
})
Input agnostic
Pipeline is input agnostic ie. it should not matter if it comes from watch or a folder glob so to help with that we have created an agnostic input stream that takes glob config and returns a file stream. It consumes input from both chokidar and vinyl-fs.
Optimization
Input manages inputting of evented vinyl file. Files that have already been processed or are currently being processed should not be processed again. Here we try and manage a running list of files to work on based on the hash of their filename and mtime.
Analysis
Some types of analysis need a list of all the files other types do not
Analysis needs to be done in stream as new information comes in. Eg. when someone renames a file that file goes to the analysis engine which works out invariants as they occur without requiring a sweep of the entire file system.
For this, we can create file caches which represent projections of the file system and update based on input file events.
Stages
Stage streams represent specific things we need the file-pipeline
to do
Possible things it can do:
- Change its path or contents
- Drop the file from further processing. Don't copy it.
- Add new files to the input stream - Associating the new files with the original
- Write an error to the error stream
import {through} from './streams'
export default myStage({config, input, getInputCache}) => {
const service = createSomeService()
const cache = getInputCache()
if (!pathExistsSync(resolve(config.src, 'blitz.config.js'))) {
input.write(
new File({
path:resolve(config.src, 'blitz.config.js'),
content:Buffer.from('Hello World')
})
)
}
const stream = through.obj(function (file, enc, next) {
if (cache.filter(/next\.config\.js$/.exec).length > -1) {
const err = new Error('Cannot have next config!')
err.name = 'NextConfigError'
next(err)
}
file.path = file.path.toUpperCase()
this.push(file)
this.push(
new File({
path: '/path/to/foo',
content: Buffer.from('This is a file')
})
)
next(null, file)
})
const ready = {
foo: 'I am foo'
}
return {
stream,
ready,
service,
}
}