
Product
Announcing Socket Fix 2.0
Socket Fix 2.0 brings targeted CVE remediation, smarter upgrade planning, and broader ecosystem support to help developers get to zero alerts.
@raywhite/workflow
Advanced tools
A DSL (and compiler) for creating trees of dependancies (of async operations). Mostly for running heavy (containerized) workloads in a particular sequence.
What's the problem being solved here?
Through trail and error, and evaluation of the many many alternatives, we've settled on a relatively simple paradigm for ETL - some (compute weak) central system should coordinate the triggering and monitoring of (compute heavy) jobs. The triggers are either time based, or a result of some dependency (another job) completing. Sometimes workloads need to run in sequence, where possible (no shared deps) they should be run in parallel as a performance optimization.
This means these workflows can be described as DAGs (Directed Acyclic Graphs), they always proceed in some direction, they do no go backwards or around like a circle in a spiral, like a wheel within a wheel, never ending or beginning on an ever spinning reel, like a snowball down a mountain, or a carnival balloon, like a carousel that's turning - running rings around the moon, like a clock whose hands are sweeping past the minutes of its face and the world is like an apple whirling silently in space, like the circles that you find in the windmills of your mind (to reiterate; they don't do this).
How would one implement on of these DAGs?
Well... they can generally be described using (for instance in Node.JS) Promise
s. The examples in this document use a timeout
operation as a placeholder for some work (for instance starting a VM with a network request and giving it a container to run).
const x = async function () {
try {
await timeout(...a_params) // a.
await Promise.all([ // b
timeout(...b1_params), // b1
timeout(...b2_params), // b2
])
await timeout(...c_params) // c
} catch (err) {
// ... do something with this err?
throw err
}
}
x.then(console.log).catch(console.error)
This looks pretty simple, but actually understanding which job(s) threw (if any), or waiting for all Promise
s to settle before resolving or throwing would add a layer of complexity, and verbosity... this, in and of itself is not terrible - until this is being done multiple times (for multiple workflows), in which case one will find themselves parsing different files will different hand coded sequences every time an issue arrises.
The whole sequence of steps can easily be described by a DSL, and middleware can be used to catch
errors at every node, or log the completion of stages etc.
What does that DSL look like?
It's very similar to the Circle CI DSL, or even that used by GH actions. Some background on it's design could be found in this GH PR
The below is in YAML (the parser expects JS), it describes an identical workflow to the one above.
jobs:
- name: a
operation: timeout
params:
duration: 1
- name: b1
operation: timeout
params:
duration: 2
- name: b2
operation: timeout
params:
duration: 3
- name: c
operation: timeout
params:
duration: 4
workflows:
- x:
sequence:
- a
- b:
parallel:
- b1
- b2
- c
These are just async function
instances that have been assigned a name
. Each operation
(or middleware) has the signature async (context, params, next)
.
params
are whatever parameters the job was called with.context
contains contextual values, such as a unique identifier for the execution (id
) and state
, which should be used to transmit state between jobs.next
is intended for use by middleware (it's the next function in the middleware chain).Consider this example:
const timeout = function (_, params) {
const { duration } = params
return new Promise(function (resolve, reject) {
return setTimeout(function () {
if (Math.random() > 0.9) {
const err = new Error('BOOM!')
return reject(err)
}
return resolve()
}, duration)
})
}
const logger = async function (context, _, next) {
console.log(`log: \`${context.name}\` starting.`)
try {
await next()
console.log(`log: \`${context.name}\` completed successfully.`)
} catch (err) {
console.log(`log: \`${context.name}\` failed.`)
throw err
}
}
Both timeout
and logger
are examples of operation
s. timeout
actually does some work, whereas logger
is intended to be used as a middleware. Both of these functions actually have the same signature, but timeout
does not use context
and next
, and logger
does not use params.
NOTE: That the logger middleware doesn't consume the error, but logs a line and then passes the error on. This is because the middleware above (the ancestors) of this job should also receive the error and be able to deal with it.
new Compiler(options)
or createCompiler(options)
Instantiates a compiler, designed to be used as a singleton across an application. The only option
is createIdentifier
which is used to create the identifier passed (via context
) to executions.
Below are the instance methods.
compiler.createOperation(name, operation)
Registers an operation under name
. operation
should be a function (possibly a middleware) with the signature discussed above. The compiler will throw when compiledSpec
is called if an unrecognized name
is encountered - so all operations should be created ahead of time.
compiler.compileSpec(spec)
Parses a specification (can be called more than once).
const compiler = new Compiler()
// NOTE: These ops declared above.
compiler.createOperation('logger', logger)
compiler.createOperation('timeout', timeout)
// NOTE: Spec is the YAML above.
const config = YAML.safeLoad(spec)
compiler.compileSpec(spec)
compiler.execute(name, ...middleware)
Calling this will execute the job name
, and any of it's dependencies in the specified sequence. Note that any job (regardless of whether or not it was declared as a nested job) can be triggered in this way. This method is what is used to actually schedule job executions (see below).
Note that execution will not settle until all internally triggered Promise
s have settled, and where one or more Promise
(s) throw, only the first error will be thrown (middleware can be used for more granular detection of errors).
Compiler.compose(...fns)
This is a static method of the Compiler
constructor.
Mostly for internal use - composes functions with the operation
/ middleware
signature (left to right).
context
Each async
call (even nodes in the tree which specify parallel
or sequence
groups) are called with context
as the first param... that object has the methods getState() => Object
and setState(Object)
, which retrieve or store (using Object.assign
) state that needs to be transmitted between operations. Further that context object will also contain the following metadata properties:
- `root` - the name of the top level workflow.
- `id` - an identifier (created with `createIdentifier`) that is unique per execution.
- `name` - the name of this job / step.
- `type` - `'job' || 'sequence' || 'parallel'`.
Consult the below contrived example of scheduling over HTTP (using koa
).
const compiler = new Compiler()
/**
* ... add operations / middleware, then
* a handler of some sort for HTTP requests.
*/
app.get('/cron/jobs/:name', function (context) {
const { name } = context.params
/**
* NOTE: This runs in background, and so
* returns immediately.
*/
compiler.execute(name).catch(console.error)
context.body = null
return
})
• MIT © Ray White, 2019 •
FAQs
A DSL for controlling async workflows
We found that @raywhite/workflow demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 9 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket Fix 2.0 brings targeted CVE remediation, smarter upgrade planning, and broader ecosystem support to help developers get to zero alerts.
Security News
Socket CEO Feross Aboukhadijeh joins Risky Business Weekly to unpack recent npm phishing attacks, their limited impact, and the risks if attackers get smarter.
Product
Socket’s new Tier 1 Reachability filters out up to 80% of irrelevant CVEs, so security teams can focus on the vulnerabilities that matter.