
Security News
npm Adopts OIDC for Trusted Publishing in CI/CD Workflows
npm now supports Trusted Publishing with OIDC, enabling secure package publishing directly from CI/CD workflows without relying on long-lived tokens.
exframe-workflow
Advanced tools
A lightweight workflow engine that can be embedded into any node service
A library for facilitating a distributed workflow as a service system
const logger = require('exframe-logger').create();
const dbClient = require('exframe-db').init(options);
const cacheManager = require('exframe-cache-manager').create(options);
const { WorkflowManager } = require('exframe-workflow');
dbClient.connect();
const serviceName = 'test-service';
const workflowManager = WorkflowManager.default(logger, serviceName, cacheManager, dbClient);
function addOne(context, { inputValue }) { return { result: inputValue + 1 }; }
const myWorkflow = WorkflowManager.createWorkflow('my-workflow');
myWorkflow.task(addOne);
const context = {
user: { userId: 'test-id', userName: 'test-user' },
log: logger
};
app.post('/addOne', (request, response) => {
const instance = await myWorkflow.start(context, { inputValue: response.body.inputValue });
response.send(await instance.getWorkContext());
});
The WorkflowManager
handles the instantiation of the WorkflowInstanceManager
and the Workflow
. The workflow is bound to the Workflow
is bound to the WorkflowInstanceManager
and when a workflow is started or resumed it will coordinate the WorkflowInstanceManager
to create or load a WorkflowInstance
. A WorkflowInstance
will manage traversing through the tasks, joining on child workflow results, and keeping work context state.
Handles the instantiation of the WorkflowInstanceManager
and the Workflow
. It has been configured with a default implementation that wraps the exframe-cache-manager
and exframe-db
for logging, caching, and coordination. WorkflowInstanceManager
will listen on a queue that's unique by the serviceName
. The distributed coordination for starting workflows across multiple instances will rely on this.
field | description |
---|---|
logger | exframe-logger |
serviceName | string -- the unique name for the service to use for coordinating the start of new workflow instances |
cacheManager | exframe-cache-manager -- provides the redis backing for the WorkflowInstance cache and the expiry queue for task execution expiration |
dbClient | exframe-db -- provides the mongo backing for the WorkflowInstance cache and the WorkLog storage |
options | optional, object -- configures the WorkflowManager |
field | description |
---|---|
serverId | string -- a unique identifier for the server running the service, can help with logging |
timeoutDuration | integer, default - 30 -- the number of seconds a service will wait between trying to pull items off the queue |
maxWorkers | integer, default - 30 -- the number of workers for the worker pool, this governs the number of concurrent workflows that can execute at any given time |
overflowWorkers | integer, default - 15 -- the number of workers over the max number of workers that can execute, this is a bucket reserved for incoming requests or parent workflows that are resuming after all children have completed |
workTtl | integer, default - 600 -- the number of seconds that a workflow execution is locked from being taken over by another service |
keepaliveInterval | integer, default - 60 -- the number of seconds between refreshing the locks |
field | description |
---|---|
name | string -- specifies the name of the workflow, generally for logging purposes |
options? | optional, object -- configures the workflow execution |
field | description |
---|---|
parallel | boolean -- specifies whether the workflow will execute sequentially through the tasks or in parallel essentially vertically or horizontally |
task(name|action, action|options, options?) -> void
Add a task to the workflow. For shortcut purposes, action can be a named function and used in lieu of entering the string name
workflow.task('test', (context, workContext) => {});
workflow.task(function test(context, workContext) {});
field | description |
---|---|
name | string -- the name of the task, must be unique |
action | function -- (context, workContext, { instanceId, index }) -> Promise<Result> -- the action that will be executed when the task is executed by the WorkflowInstance . The instanceId is the id for the workflow and index is the item index from a sourceIterator (if there is one). The result will be merged with the workContext. See Result |
options | optional, object -- configures the task execution |
field | description |
---|---|
next | optional, string -- the name of the next task to execute. If not set, next will automatically be set by whatever the subsequently added task is. If null, next will not be automatically set. If the value is a task command that command will be operated on. See Commands |
pre | optional, function -- (context, workContext) -> Promise<Result> -- a function that can mutate the context to be given to the action, this is mostly useful for blocks |
post | optional, function -- (context, workContext, result) -> Promise<Result> -- a function that can process the result relative to the work context and can further modify the result before merge into the work context, this is mostly useful for blocks |
case | optional, function -- (context, workContext) -> Promise<string> -- a function that can return the name of a task that should be executed next based upon the current work context following the execution of the task. If the value is a task command that command will be operated on. See Commands |
catch | optional, function -- (context, workContext, error) -> Promise<string> -- a function that can return the name of a task that should be executed next given the error that was thrown by the task. If the value is a task command that command will be operated on. See Commands |
workflow.task('test',
(context, preWorkContext) => {}, {
pre: (context, workContext) => {
const preWorkContext = workContext;
return preWorkContext;
}
});
workflow.task('test',
(context, workContext) => {
const result = workContext;
return result;
}, {
post: (context, workContext, result) => {
const postResult = result;
return postResult;
}
});
workflow.task('test',
(context, workContext) => {},
{
case: (context, workContext) => {
switch (workContext.value) {
case 'test': return 'test',
default: return null;
}
}
}
);
workflow.task('test',
(context, workContext) => {},
{
catch: (context, workContext, error) => {
switch (error.status) {
case 403: return 'authorize',
default: return null;
}
}
}
);
block(name, buildSubWorkflow, options?) -> void
Adds a block task to the workflow. A block task is a wrapper around another workflow that will be executed with a separate and distinct work context. Tthese can be added until the server runs out of resources.
workflow.block('test', w => {
w.task('sub-task', (context, workContext) => {});
w.block('sub-block', subW => {
subW.task('sub-block-task', (context, workContext) => {});
});
});
field | description |
---|---|
name | string -- the name of the task, must be unique |
buildSubWorkflow | function -- (Workflow) -> void -- will give a Workflow with an identical signature to the current Workflow but will not be directly callable by the WorkflowInstanceManager |
options | optional, object -- configures the block task execution |
field | description |
---|---|
sourceIterator | optional, function -- (context, workContext) -> asyncIterator -- the source for all the items to apply the action to. This can be an array but it would be best if it's some sort of async iterator. Even a stream can be used as an async iterator source. So, theoretically, and endpoint that can be parseable in a streaming fashion could be used as a source. |
parallel | boolean, default - false -- indicates whether the created workflow should run in parallel |
... | see task options for the rest |
iterate(name, source, action, options?) -> void
Adds an iteration task to the workflow. An iteration task will execute the given action over each item returned by the source. This will be done over an asyncIterator so this can be optimized to be something that's not fully resident in memory but rather a cursor or stream of some sort.
workflow.iterate('test',
/* source */
(context, workContext) => axios({
...,
responseType: 'stream'
}).then(response => response.data),
/* action */
(context, workContext) => {}
);
field | description |
---|---|
name | string -- the name of the task, must be unique |
source | function -- (context, workContext) -> asyncIterator -- the source for all the items to apply the action to. This can be an array but it would be best if it's some sort of async iterator. Even a stream can be used as an async iterator source. So, theoretically, and endpoint that can be parseable in a streaming fashion could be used as a source. |
action | function -- (context, workContext) -> Promise<Result> -- the action that will be executed for each item returned by the source |
options | optional, object -- configures the iterate task execution |
field | description |
---|---|
parallel | boolean, default - false -- indicates whether each item should be executed in parallel. This should be used with caution, if there are too many tasks, it could overload the system. Subsequent versions could limit the max number of tasks executing across all the services |
... | see task options for the rest |
The result object is what augments the work context. Fields in the result object will be applied to not only the root object but to the nested fields. So to assist there are pre-merge operations that can be applied to the fields to further augment the update.
If you have data that you want to either overwrite or delete on the workcontext, you can do so by using the $overwrite and $delete pre-merge operations. These operations will be preformed on the the workContext before the workItemContext is merged into the workContext and persisted.
###$ $delete operation
workflow.task(someTask, {
post: (context, workContext, result) => ({
...result,
unwantedProperty: { $delete: 1 }
});
});
OR
workflow.task('someTask', {
action: (context, workContext) => {
return {
unwantedProperty: { $delete: 1 }
}
});
});
workflow.task(someTask, {
post: (context, workContext, result) => ({
...result,
changingProperty: {
$overwrite: {
newData: 'x'
}
}
};
});
OR
workflow.task('someTask', {
action: (context, workContext) => {
return {
changingProperty: {
$overwrite: {
newData: ['x']
}
}
}
};
});
Task commands influence what the workflow will do. Inherantly, all next
values are commands to go to the next tasks. Accordingly, commands will adjust what happens next following the execution of a task.
workflow.task('someTask', {
action: (context, workContext) => ({ test: true })
case: (context, workContext) => {
if (test) return '$pause';
}
});
FAQs
A lightweight workflow engine that can be embedded into any node service
The npm package exframe-workflow receives a total of 123 weekly downloads. As such, exframe-workflow popularity was classified as not popular.
We found that exframe-workflow demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
npm now supports Trusted Publishing with OIDC, enabling secure package publishing directly from CI/CD workflows without relying on long-lived tokens.
Research
/Security News
A RubyGems malware campaign used 60 malicious packages posing as automation tools to steal credentials from social media and marketing tool users.
Security News
The CNA Scorecard ranks CVE issuers by data completeness, revealing major gaps in patch info and software identifiers across thousands of vulnerabilities.