Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

@emartech/program-executor

Package Overview
Dependencies
Maintainers
96
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@emartech/program-executor - npm Package Compare versions

Comparing version
1.4.1
to
2.0.0
+5
-1
index.js
'use strict';
const ProgramExecutor = require('./src');
const RetryableError = require('./src/retryable-error');
const IgnorableError = require('./src/ignorable-error');
module.exports = ProgramExecutor;
module.exports.ProgramExecutor = ProgramExecutor;
module.exports.RetryableError = RetryableError;
module.exports.IgnorableError = IgnorableError;
+1
-1

@@ -45,3 +45,3 @@ {

},
"version": "1.4.1"
"version": "2.0.0"
}
+229
-1
# Program Executor JS
Program executor is a scalable, resilient job framework that is capable of concurrently executing programs, while jobs inside a program are executed in order. Execution can be distributed between resources (multiple workers/threads). It is fault tolerant by storing its progress, so it can continue execution in case of failures.
# Prerequisites
* NodeJS 7.10.1+
* RabbitMQ
* Database (Postgres, MSSQL, MySQL, MariaDB, SQLite3, Oracle, and Amazon Redshift) and a connected [knex](https://knexjs.org/#Installation-client) instance
# Install
`npm i -E @emartech/program-executor`
# Usage
## Configuration
| Config | Required | Description |
| --- |:----:|----|
| `knex` | true | Connected `knex` instance ([docs](https://knexjs.org/#Installation-client)).
| `amqpUrl` | true | Connection string for RabbitMQ instance.
| `tableName` | true | The name of the table to store bookkeeping data. The table is created automatically if it does not exist.
| `queueName` | true | RabbitMQ queue used by the executor. The queue is created automatically if it does not exist.
```javascript
const { ProgramExecutor } = require('@emartech/program-executor');
const config = {
knex: require('knex')({
client: 'mysql',
connection: {
host: '127.0.0.1',
user: 'your_database_user',
password: 'your_database_password',
database: 'myapp_test'
}
}),
amqpUrl: 'amqp://guest:guest@localhost:9999',
tableName: 'programs',
queueName: 'program-executor'
};
const programExecutor = ProgramExecutor.create(config);
```
## How to create a program
Creating a program will add a row to the database table and insert a message into the program queue. A unique `runId` will be returned, that can be used to query the table and track progress.
| Config | Required | Type | Description |
| --- |:----:|----|----|
| `jobs` | true | Array\<String\> | Ordered list of job names to be executed.
| `programData` | false | Object | Optional data available for all jobs.
| `jobsData` | false | Object | Optional job specific data keyed by job name.
```javascript
const runId = await this._programExecutor.createProgram({
jobs: ['first_job', 'second_job'],
programData: {
global: 'data for every job'
},
jobData: {
first_job: {
property: 'value for first job only'
},
second_job: {
property: 'value for second job only'
}
}
});
```
## How to execute programs
In a long-running process (e. g. [throng](https://github.com/hunterloftis/throng)) call `processPrograms` with a collection of executable jobs (see [Job Library](#job-library)).
```javascript
const throng = require('throng');
const jobLibrary = require('../../../lib/jobs');
throng(id => {
ProgramExecutor.create(config).processPrograms(jobLibrary);
});
```
## Built-in logging
This library is using `@emartech/json-logger`, so in order to see the logs you have to enable the program-executor namespace (`DEBUG=...,program-executor*,...`) in your environment.
If a program execution fails search for the following pattern in your logs:
```javascript
{
"name": "program-executor-<queueName>-consumer",
"action": "Consumer error finish",
"event": "Consumer error finish",
"error_message": "...",
"error_stack": "...",
"content": "<whole RabbitMQ message>"
}
```
Also you may build metrics on `"Consumer error retry"` events.
# Progam
A program is a list of jobs to be executed in serial order. Jobs may depend on other job's results higher in the order, beacuse of the serial execution. **If any job fails to execute successfully the program will be cancelled, the error will be logged and the remaining jobs will be skipped.**
Programs can be executed concurrently, therefore cannot depend on each other. However a job in a program may start another program.
## Bookkeeping
The following information will be managed for every program in the configured database table:
| Column | Type | Description |
| --- |:----:|---|
| id | Integer | Increment id.
| run_id | String | Generated UUID of the program.
| program_data | Object | Data available for all jobs.
| jobs | Array\<String\> | Ordered list of job names to be executed.
| job_data | Object | Job specific data keyed by job name.
| step | Integer | Index of the currently executed job. On program error it indicates the job index where the error happened.
| step_retry_count | Integer | Indicates the retry count of the current job.
| finished_at | Date | Filled in **only** if the program completed successfully.
| errored_at | Date | Filled in **only** if the program failed permanently.
| error_message | String | Contains the message of the last caught error. May be a retryable error message, but in this case `errored_at` will not be filled.
| created_at | Date | Timestamp of program creation.
# Job
A job is a individually executable part of a program. It's referred by its name in a program, therefore it has to expose its **globally unique** name, a static `create` method that instantiates the job, and an `execute` method.
* `create()` will be called with the `programData` object that may contain globally available data of the program.
* `execute()` will be called with the queue message and the corresponding `jobDataHandler`. `jobDataHandler` is used to manage data specific for the job.
## Anatomy
```javascript
class SampleJob {
static get name() {
return 'sample_job_name';
}
static create(programData) {
return new SampleJob(programData);
}
constructor(programData) {
/// ... intitialize member variables based on programData if needed
}
async execute(message, jobDataHandler) {
const jobSpecificData = await jobDataHandler.get();
/// ... do some processing
await jobDataHandler.set({ stored: 'progress' });
}
}
module.exports = SampleJob;
```
## Error handling
Generally unhandled job errors will bubble up to the executor where further execution of the program will be terminated. [See bookkeeping](#bookkeeping).
The executor can handle retryable and ignorable errors.
### Retryable errors
On transient errors a job can throw a retryable error, so the executor will restart program execution from the specific job. Using the `jobDataHandler` the job may implement logic so that the job resumes from a stored progress (ie. `{ currentPage: 50, totalPages: 200 }`).
You may use the `RetryableError` exposed by this library
`const { RetryableError } = require('@emartech/program-executor');`
or throw any other error with a property `retryable: true`.
Example RetryableError implementation:
```javascript
class RetryableError extends Error {
constructor(message, code) {
super();
this.message = message;
this.code = code;
RetryableError.decorate(this);
}
static decorate(error) {
error.retryable = true;
return error;
}
}
module.exports = RetryableError;
```
# Job library
Job Library is a javascript object containing jobs identified by the job name. Program executor instatiates and executes jobs from the library while working on a program.
## Anatomy
```javascript
class FirstJob {
static get name() { return 'first_job'; }
async create() { ... }
async execute() { ... }
}
class SecondJob {
static get name() { return 'second_job'; }
async create() { ... }
async execute() { ... }
}
const jobLibrary = {
[FirstJob.name]: FirstJob,
// ... or manually
'second_job': SecondJob
}
```
## Helper script to re-export every job from given sub-directories
```javascript
const glob = require('glob');
const path = require('path');
glob.sync('./server/lib/jobs/+(common|other|folders)/*/index.js').forEach(function(file) {
const job = require(path.resolve(file));
const jobName = job.name;
if (module.exports[job.name]) {
console.log(`${job.name} job already exists, please choose a unique name!\n\n`);
}
module.exports[jobName] = job;
});
```
# Manually stopping a program
To stop a stucked program and remove it from RabbitMq you may set the program's `errored_at` column manually in the database, and the program will be thrown away in the next execution cycle.
# Development

@@ -10,0 +238,0 @@ ## Default Commit Message Format

@@ -12,6 +12,6 @@ 'use strict';

* @param {object} config
* @param {object} object.knex - Connected Knex instance
* @param {string} object.amqpUrl - RabbitMq Url
* @param {string} object.tableName - Table name for bookkeeping
* @param {string} object.queueName - Queue name to publish to
* @param {object} config.knex - Connected Knex instance
* @param {string} config.amqpUrl - RabbitMq Url
* @param {string} config.tableName - Table name for bookkeeping
* @param {string} config.queueName - Queue name to publish to
*/

@@ -27,5 +27,5 @@ constructor(config) {

* @param {object} data
* @param {object} object.programData
* @param {array} object.jobs
* @param {object} object.jobsData
* @param {object} data.programData
* @param {array} data.jobs
* @param {object} data.jobsData
*/

@@ -59,2 +59,9 @@ createProgram(data) {

/**
* @param {object} config
* @param {object} config.knex - Connected Knex instance
* @param {string} config.amqpUrl - RabbitMq Url
* @param {string} config.tableName - Table name for bookkeeping
* @param {string} config.queueName - Queue name to publish to
*/
static create(config) {

@@ -61,0 +68,0 @@ return new ProgramExecutor(config);