Make highly reliable workflows across microservices
(current version: 0.0.2, beta usage only)
Why?
This tool makes it simple to execute a complex sequence of tasks, where each task runs in a separate process / thread / microservice (optional). It's simpler to configure than Amazon SWF, and less complex than a home-baked solution of passing messages between microservices over a message queue (Redis, NSQ.io, Amazon SQS) or via IPC.
How it works:
- Build and visualize Workflows - the glue that links together a sequence of separate processes, or Workers.
- Workflows can include branches and parallelized executions.
- Workflows consist of a series of Tasks. Each Worker completes one Task. Each initiation of a Workflow is a Job. Branching and parallelization create additional JobThreads, which reference the original Job.
- A message queue of your choice is used to communicate between processes.
- A database of your choice is used to track completion details of all Jobs, JobThreads, and Tasks.
- Workers only need access to message queue endpoints. Run Workers anywhere access to these endpoints is available (local, in a cloud, etc.). Run multiple Workers for each Task for redundancy.
- Run WorkflowManager instances on infrastructure with access to message queue and database, these processes ensure Jobs move thru your Workflow appropriately.
Usage
npm install altflo
import altflo from 'altflo'
let altflo = require('altflo')
Creating a workflow (and visualizing it)
let altflo = require('altflo');
let WorkflowStep = altflo.WorkflowStep;
let WorkflowGraph = altflo.WorkflowGraph;
let GraphUtils = altflo.GraphUtils;
let step1 = new WorkflowStep('STEP_1', 'http://localhost/sample_queue_uri_1');
let step2 = new WorkflowStep('STEP_2', 'http://localhost/sample_queue_uri_2');
let step3 = new WorkflowStep('STEP_3', 'http://localhost/sample_queue_uri_3');
let step4 = new WorkflowStep('STEP_4', 'http://localhost/sample_queue_uri_4');
let step5 = new WorkflowStep('STEP_5', 'http://localhost/sample_queue_uri_5');
let step6 = new WorkflowStep('STEP_6', 'http://localhost/sample_queue_uri_6');
let step7 = new WorkflowStep('STEP_7', 'http://localhost/sample_queue_uri_7');
let step8 = new WorkflowStep('STEP_8', 'http://localhost/sample_queue_uri_8');
step7.addNextStep(step8);
step3.addNextSteps([step5, step6, step7]);
step2.addNextSteps([step3, step4]);
step1.addNextStep(step2);
let graph = new WorkflowGraph('SAMPLE_GRAPH_1', 'V1');
graph.loadWorkflowFromStepGraph(step1);
console.log('graph structure: ' + GraphUtils.visualizeGraph(graph));
Running your workflow
This example leverages Amazon SQS and PostgreSQL:
let WorkflowManager = altflo.WorkflowManager;
let AmazonSQSBackend = altflo.AmazonSQSBackend;
let PostgreSQLBackend = altflo.PostgreSQLBackend;
let WORKFLOW_QUEUE_URI = 'http://localhost/workflow_manager_queue';
let manager = new WorkflowManager(graph, WORKFLOW_QUEUE_URI, new AmazonSQSBackend(), new PostgreSQLBackend());
manager.initialize();
Individual processes (Workers)
Run these anywhere. Process data, crunch numbers, wait for events, etc.
let altflo = require('altflo');
let Worker = altflo.Worker;
let AmazonSQSBackend = altflo.AmazonSQSBackend;
let workerQueue = 'http://localhost/sample_queue_uri_1';
let managerQueue = 'http://localhost/workflow_manager_queue';
function performWork(messageBody, messageMetaData){
console.log('DOING SOME TASK, PART OF THE WORKFLOW');
worker.resumeWorkflow(messageBody, messageMetaData);
}
let worker = new Worker(workerQueue, managerQueue, new AmazonSQSBackend(), performWork);
worker.initialize();
A full, functional example
See /examples/example1 for a simple configuration that uses Amazon SQS and PostgreSQL
The following steps will guide you thru setting up and running this workflow:
Configure SQS
Create the following .fifo queues in console.aws.amazon.com/sqs
w1-step1.fifo
w1-step2.fifo
w1-step3.fifo
w1-step4.fifo
workflow1.fifo
All queues should be set with Content-Based Deduplication turned on.
NOTE: This means you'll need to send messages with different contents, otherwise they will not send.
Configure PostgreSQL
You'll need the following tables:
Jobs: jobId (serial, primary key), startTime (timestamp ), endTime (timestamp)
JobThreads: jobId (integer, foreign key), jobThreadId (serial, primary key), startTime (timestamp ), endTime (timestamp), startStepName (varchar)
Tasks: jobId (integer, foreign key), jobThreadId (integer, foreign key), taskId (serial, primary key), startTime (timestamp ), endTime (timestamp), stepName (varchar)
SET Node ENVIRONMENT variables
Anytime a WorkerManager is run, it needs access to the database backend.
If PostgreSQL is running locally, your configuration for the demo may be as follows:
export PGUSER='postgres'
export PGHOST='localhost'
export PGDATABASE='altflo-dev'
export PGPORT='5432'
export PGPASSWORD='yourpassword'
For this specific example, you'll also need to provide your Amazon account ID (see queueHandles.js)
export AWS_ACCOUNT_NUMBER='abc123'
Run processes with a process manager
cd examples/example1
pm2 start workflowManagerExample.js
pm2 start worker1Example.js
pm2 start worker2Example.js
pm2 start worker3Example.js
pm2 start worker4Example.js
pm2 list
To run a job thru your workflow:
Either programmatically, or thru the SQS console, send a message to workflow1.fifo with the following JSON body:
{
"workflowStep": "INIT_WORKFLOW",
"messageBody": {
"sampleVar": "add any data you want in the messageBody"
}
}
To check Job completion:
See the Jobs database table in your provided database backend.
SELECT * FROM "Jobs";
SELECT * FROM "JobThreads";
SELECT * FROM "Tasks";
Planned Features:
- Unit tests
- More robust workers
- Recommended best-practices for retry & fail-over
- Conditional branching
- More comprehensive transaction logging
- Debug mode
- A browser admin component (D3 + React?) for visualizing workflows & job history.
Debatable Features:
- Merging of Workflow branches or parallel threads.
- Option to automatically create & manage message queue endpoints (on first run of a new workflow). Quicker setup for new users.
Contributing
Before implementing any code, please open an Issue and discuss. Fixes for bugs with open Issues are welcome, and implementation of new features will be merged if coordinated thru an Issue.
Implementation of additional backends are probably the easiest way to contribute! e.g.
- Redis (QueueBackend)
- NSQ.io (QueueBackend)
- MongoDB (DatabaseBackend)
- MySQL (DatabaseBackend)
- Google Cloud Pub/Sub (QueueBackend)
- Azure Service Bus Queues (QueueBackend)
Questions?
Create an Issue for any usage questions!