@node-ts/bus-workflow
Advanced tools
Comparing version 0.1.23 to 0.2.0
@@ -5,3 +5,2 @@ export * from './workflow-data'; | ||
export * from './decorators'; | ||
export * from './complete-workflow'; | ||
export * from './message-workflow-mapping'; |
@@ -5,6 +5,6 @@ "use strict"; | ||
tslib_1.__exportStar(require("./workflow-data"), exports); | ||
tslib_1.__exportStar(require("./workflow"), exports); | ||
tslib_1.__exportStar(require("./registry/workflow-registry"), exports); | ||
tslib_1.__exportStar(require("./decorators"), exports); | ||
tslib_1.__exportStar(require("./complete-workflow"), exports); | ||
tslib_1.__exportStar(require("./message-workflow-mapping"), exports); | ||
//# sourceMappingURL=index.js.map |
@@ -27,2 +27,5 @@ "use strict"; | ||
const workflowData = this.workflowData[workflowDataName]; | ||
if (!workflowData) { | ||
this.logger.error('Workflow data not initialized', { workflowDataName }); | ||
} | ||
return workflowData | ||
@@ -29,0 +32,0 @@ .filter(data => (includeCompleted || data.$status === workflow_data_1.WorkflowStatus.Running) |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const workflow_data_1 = require("../workflow-data"); | ||
const handler_with_id_1 = require("./handler-with-id"); | ||
@@ -26,6 +27,9 @@ class WorkflowHandlerProxy { | ||
const handlerPromises = workflowDataItems.map(async (workflowData) => { | ||
const immutableWorkflow = Object.freeze(Object.assign({}, workflowData)); | ||
const workflowDataOutput = await this.handler(message, immutableWorkflow); | ||
if (workflowDataOutput) { | ||
this.logger.debug('Changes detected in workflow data and will be persisted.'); | ||
const immutableWorkflowData = Object.freeze(Object.assign({}, workflowData)); | ||
const workflowDataOutput = await this.handler(message, immutableWorkflowData); | ||
if (workflowDataOutput && workflowDataOutput.$status === workflow_data_1.WorkflowStatus.Discard) { | ||
this.logger.debug('Workflow step is discarding state changes. State changes will not be persisted', { workflowId: immutableWorkflowData.$workflowId, workflowName: this.workflowDataConstructor.name }); | ||
} | ||
else if (workflowDataOutput) { | ||
this.logger.debug('Changes detected in workflow data and will be persisted.', { workflowId: immutableWorkflowData.$workflowId, workflowName: this.workflowDataConstructor.name }); | ||
const updatedWorkflowData = Object.assign(new this.workflowDataConstructor(), workflowData, workflowDataOutput); | ||
@@ -41,3 +45,3 @@ try { | ||
else { | ||
this.logger.trace('No changes detected in workflow data.'); | ||
this.logger.trace('No changes detected in workflow data.', { workflowId: immutableWorkflowData.$workflowId }); | ||
} | ||
@@ -50,2 +54,3 @@ }); | ||
await this.persistence.saveWorkflowData(data); | ||
this.logger.info('Saving workflow data', { data }); | ||
} | ||
@@ -52,0 +57,0 @@ catch (err) { |
@@ -86,3 +86,3 @@ "use strict"; | ||
}; | ||
this.handlerRegistry.register(messageName, Symbol.for(`node-ts/bus/workflow/${messageName}-started-by-proxy`), handlerFactory, step.messageConstructor); | ||
this.handlerRegistry.register(messageName, Symbol.for(`node-ts/bus/workflow/${registration.workflowConstructor.name}-${messageName}-started-by-proxy`), handlerFactory, step.messageConstructor); | ||
}); | ||
@@ -97,3 +97,3 @@ } | ||
}; | ||
this.handlerRegistry.register(messageName, Symbol.for(`node-ts/bus/workflow/${messageName}-handles-proxy`), handler, step.messageConstructor); | ||
this.handlerRegistry.register(messageName, Symbol.for(`node-ts/bus/workflow/${registration.workflowConstructor.name}-${messageName}-handles-proxy`), handler, step.messageConstructor); | ||
}); | ||
@@ -100,0 +100,0 @@ } |
@@ -9,3 +9,7 @@ export declare enum WorkflowStatus { | ||
*/ | ||
Complete = "complete" | ||
Complete = "complete", | ||
/** | ||
* A pseudo status to indicate that changes to the current state should not be persisted | ||
*/ | ||
Discard = "discard" | ||
} | ||
@@ -12,0 +16,0 @@ /** |
@@ -15,2 +15,6 @@ "use strict"; | ||
WorkflowStatus["Complete"] = "complete"; | ||
/** | ||
* A pseudo status to indicate that changes to the current state should not be persisted | ||
*/ | ||
WorkflowStatus["Discard"] = "discard"; | ||
})(WorkflowStatus = exports.WorkflowStatus || (exports.WorkflowStatus = {})); | ||
@@ -17,0 +21,0 @@ /** |
import { ClassConstructor } from '@node-ts/bus-core'; | ||
import { WorkflowData } from './workflow-data'; | ||
export declare type WorkflowConstructor<TWorkflowData extends WorkflowData, TWorkflow extends Workflow<TWorkflowData> = Workflow<TWorkflowData>> = ClassConstructor<TWorkflow>; | ||
export interface Workflow<TWorkflowData extends WorkflowData> { | ||
export declare const WORKFLOW_STEP_DISCARDED = "discarded"; | ||
export declare abstract class Workflow<TWorkflowData extends WorkflowData> { | ||
/** | ||
* Flags that the workflow is complete, thereby preventing it from reacting to any | ||
* subsequent messages. This should be called as part of the return value of | ||
* a handling function | ||
* @param data Any final modifications to the workflow data that will be persisted | ||
* @example | ||
* \@Handles<TaskRan, TestWorkflowData, 'handleTaskRan'>(TaskRan, event => event.value, 'property1') | ||
* async handleTaskRan (event: TaskRan): Promise<Partial<TestWorkflowData>> { | ||
* return this.complete({ taskRunDuration: event.duration }) | ||
* } | ||
*/ | ||
protected complete(data?: Partial<TWorkflowData>): Partial<TWorkflowData>; | ||
/** | ||
* Tells the workflow engine to avoid persisting state for this workflow step. If this is | ||
* returned in a StartedBy handler, then the handler state will not be created and stored. | ||
* If this is run in a Handles handler, then changes to the state will not be run. | ||
*/ | ||
protected discard(): Partial<TWorkflowData>; | ||
} |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const tslib_1 = require("tslib"); | ||
const workflow_data_1 = require("./workflow-data"); | ||
const inversify_1 = require("inversify"); | ||
exports.WORKFLOW_STEP_DISCARDED = 'discarded'; | ||
let Workflow = class Workflow { | ||
/** | ||
* Flags that the workflow is complete, thereby preventing it from reacting to any | ||
* subsequent messages. This should be called as part of the return value of | ||
* a handling function | ||
* @param data Any final modifications to the workflow data that will be persisted | ||
* @example | ||
* \@Handles<TaskRan, TestWorkflowData, 'handleTaskRan'>(TaskRan, event => event.value, 'property1') | ||
* async handleTaskRan (event: TaskRan): Promise<Partial<TestWorkflowData>> { | ||
* return this.complete({ taskRunDuration: event.duration }) | ||
* } | ||
*/ | ||
complete(data = {}) { | ||
return Object.assign({}, data, { $status: workflow_data_1.WorkflowStatus.Complete }); | ||
} | ||
/** | ||
* Tells the workflow engine to avoid persisting state for this workflow step. If this is | ||
* returned in a StartedBy handler, then the handler state will not be created and stored. | ||
* If this is run in a Handles handler, then changes to the state will not be run. | ||
*/ | ||
discard() { | ||
return { | ||
$status: workflow_data_1.WorkflowStatus.Discard | ||
}; | ||
} | ||
}; | ||
Workflow = tslib_1.__decorate([ | ||
inversify_1.injectable() | ||
], Workflow); | ||
exports.Workflow = Workflow; | ||
//# sourceMappingURL=workflow.js.map |
{ | ||
"name": "@node-ts/bus-workflow", | ||
"description": "A workflow engine for orchestrating logic flows in distributed applications.", | ||
"version": "0.1.23", | ||
"version": "0.2.0", | ||
"license": "MIT", | ||
@@ -24,3 +24,3 @@ "main": "./dist/index.js", | ||
"devDependencies": { | ||
"@node-ts/bus-core": "^0.1.18", | ||
"@node-ts/bus-core": "^0.2.0", | ||
"@node-ts/code-standards": "^0.0.10", | ||
@@ -59,3 +59,3 @@ "@node-ts/logger-core": "^0.0.17", | ||
], | ||
"gitHead": "d2136911ed371a8f60647791f42dc3f4d959cc33" | ||
"gitHead": "db665a2aa4ee1e7ee5a64dd30f38163bee47881d" | ||
} |
@@ -82,5 +82,40 @@ --- | ||
3. Contain at least 1 message handling function decorated with `StartedBy` | ||
4. Contain at least 1 message handling function that returns `completeWorkflow()` from `@node-ts/bus-workflow` | ||
4. Contain at least 1 message handling function that returns `this.complete()` from the super class `Workflow<>` | ||
5. Be registered with the `WorkflowRegistry` from `@node-ts/bus-workflow` | ||
### Completing a workflow | ||
Workflows that have completed their work should be marked as completed. This means that they will no longer react to any future events. This is done by returning `this.complete()` at the end of your message handler. | ||
```typescript | ||
@injectable() | ||
export class ProcessDocumentWorkflow extends Workflow<ProcessDocumentWorkflowData> { | ||
@Handles<DocumentSaved, ProcessDocumentWorkflowData, 'handlesDocumentSaved'>(DocumentSaved) | ||
handlesDocumentSaved (_: DocumentSaved): Partial<ProcessDocumentWorkflowData> { | ||
return this.complete() | ||
} | ||
} | ||
``` | ||
### Discarding state changes | ||
Occasionally there are times when the workflow data shouldn't persist after a message has been handled. This is particularly relevant in cases where a workflow should only handle a message under certain circumstances. | ||
For example, if your workflow is started by an `S3ObjectCreated` event, but should only create a new workflow if the object key is prefixed with `/documents`, then this can be achieved by returning `this.discard()` in the workflow like so: | ||
```typescript | ||
@injectable() | ||
export class ProcessDocumentWorkflow extends Workflow<ProcessDocumentWorkflowData> { | ||
@StartedBy<S3ObjectCreated, ProcessDocumentWorkflowData, 'handlesS3ObjectCreated'>(S3ObjectCreated) | ||
handlesS3ObjectCreated (s3ObjectCreated: S3ObjectCreated): Partial<ProcessDocumentWorkflowData> { | ||
if (s3ObjectCreated.key.indexOf('/documents') === 0) { | ||
return {} // Starts a new workflow | ||
} | ||
return this.discard() // Do not start a new workflow | ||
} | ||
} | ||
``` | ||
### Example | ||
@@ -116,3 +151,3 @@ | ||
} from 'contracts' | ||
import { Workflow, completeWorkflow } from '@node-ts/bus-workflow' | ||
import { Workflow } from '@node-ts/bus-workflow' | ||
import { injectable } from 'inversify' | ||
@@ -122,3 +157,3 @@ import { UserSignupWorkflowData } from './user-signup-workflow-data' | ||
@injectable() | ||
export class UserSignupWorkflow implements Workflow<UserSignupWorkflowData> { | ||
export class UserSignupWorkflow extends Workflow<UserSignupWorkflowData> { | ||
@@ -128,2 +163,3 @@ constructor ( | ||
) { | ||
super() | ||
} | ||
@@ -161,3 +197,3 @@ | ||
if (workflowData.subscribedToMailingList) { | ||
return completeWorkflow({ welcomeEmailSent: true }) | ||
return this.complete({ welcomeEmailSent: true }) | ||
} | ||
@@ -175,3 +211,3 @@ // We're still waiting for the mailing list subscription to go through, so just return these state changes to be persisted | ||
if (workflowData.welcomeEmailSent) { | ||
return completeWorkflow({ subscribedToMailingList: true }) | ||
return this.complete({ subscribedToMailingList: true }) | ||
} | ||
@@ -178,0 +214,0 @@ // We're still waiting for the welcome email to be sent, so just return these state changes to be persisted |
@@ -9,6 +9,5 @@ import { Workflow } from '../workflow' | ||
import { TaskRan } from './task-ran' | ||
import { completeWorkflow } from '../workflow/complete-workflow' | ||
@injectable() | ||
export class TestWorkflow implements Workflow<TestWorkflowData> { | ||
export class TestWorkflow extends Workflow<TestWorkflowData> { | ||
@@ -18,2 +17,3 @@ constructor ( | ||
) { | ||
super() | ||
} | ||
@@ -31,9 +31,7 @@ | ||
@Handles<TaskRan, TestWorkflowData, 'handleTaskRan'>(TaskRan, event => event.value, 'property1') | ||
async handleTaskRan (event: TaskRan, data: TestWorkflowData): Promise<Partial<TestWorkflowData>> { | ||
return completeWorkflow({ | ||
...data, | ||
async handleTaskRan (event: TaskRan): Promise<Partial<TestWorkflowData>> { | ||
return this.complete({ | ||
eventValue: event.value | ||
}) | ||
} | ||
} |
@@ -5,3 +5,2 @@ export * from './workflow-data' | ||
export * from './decorators' | ||
export * from './complete-workflow' | ||
export * from './message-workflow-mapping' |
@@ -48,2 +48,5 @@ import { Persistence } from './persistence' | ||
const workflowData = this.workflowData[workflowDataName] as WorkflowDataType[] | ||
if (!workflowData) { | ||
this.logger.error('Workflow data not initialized', { workflowDataName }) | ||
} | ||
return workflowData | ||
@@ -50,0 +53,0 @@ .filter(data => |
import { IMock, It, Mock, Times } from 'typemoq' | ||
import { StartedByProxy } from './started-by-proxy' | ||
import { completeWorkflow } from '../complete-workflow' | ||
import { Persistence } from '../persistence' | ||
@@ -78,3 +77,3 @@ import { WorkflowHandlerFn } from './workflow-handler-fn' | ||
.callback(() => { | ||
dataOutput = completeWorkflow(dataOutput) | ||
dataOutput = { ...dataOutput, $status: WorkflowStatus.Complete } | ||
}) | ||
@@ -81,0 +80,0 @@ .returns(async () => dataOutput) |
import { Message } from '@node-ts/bus-messages' | ||
import { WorkflowData, WorkflowDataConstructor } from '../workflow-data' | ||
import { WorkflowData, WorkflowDataConstructor, WorkflowStatus } from '../workflow-data' | ||
import { Logger } from '@node-ts/logger-core' | ||
@@ -41,7 +41,15 @@ import { Handler } from '@node-ts/bus-core' | ||
const handlerPromises = workflowDataItems.map(async workflowData => { | ||
const immutableWorkflow = Object.freeze({...workflowData}) | ||
const workflowDataOutput = await this.handler(message, immutableWorkflow) | ||
const immutableWorkflowData = Object.freeze({...workflowData}) | ||
const workflowDataOutput = await this.handler(message, immutableWorkflowData) | ||
if (workflowDataOutput) { | ||
this.logger.debug('Changes detected in workflow data and will be persisted.') | ||
if (workflowDataOutput && workflowDataOutput.$status === WorkflowStatus.Discard) { | ||
this.logger.debug( | ||
'Workflow step is discarding state changes. State changes will not be persisted', | ||
{ workflowId: immutableWorkflowData.$workflowId, workflowName: this.workflowDataConstructor.name } | ||
) | ||
} else if (workflowDataOutput) { | ||
this.logger.debug( | ||
'Changes detected in workflow data and will be persisted.', | ||
{ workflowId: immutableWorkflowData.$workflowId, workflowName: this.workflowDataConstructor.name } | ||
) | ||
const updatedWorkflowData = Object.assign( | ||
@@ -52,2 +60,3 @@ new this.workflowDataConstructor(), | ||
) | ||
try { | ||
@@ -63,3 +72,3 @@ await this.persist(updatedWorkflowData) | ||
} else { | ||
this.logger.trace('No changes detected in workflow data.') | ||
this.logger.trace('No changes detected in workflow data.', { workflowId: immutableWorkflowData.$workflowId }) | ||
} | ||
@@ -75,2 +84,3 @@ }) | ||
await this.persistence.saveWorkflowData(data) | ||
this.logger.info('Saving workflow data', { data }) | ||
} catch (err) { | ||
@@ -77,0 +87,0 @@ this.logger.error('Error persisting workflow data', { err }) |
@@ -16,2 +16,3 @@ import { WorkflowData, WorkflowDataConstructor } from '../workflow-data' | ||
import { LOGGER_SYMBOLS, Logger } from '@node-ts/logger-core' | ||
import * as uuid from 'uuid' | ||
@@ -139,3 +140,3 @@ interface WorkflowRegistration { | ||
messageName, | ||
Symbol.for(`node-ts/bus/workflow/${messageName}-started-by-proxy`), | ||
Symbol.for(`node-ts/bus/workflow/${registration.workflowConstructor.name}-${messageName}-started-by-proxy`), | ||
handlerFactory, | ||
@@ -165,3 +166,3 @@ step.messageConstructor | ||
messageName, | ||
Symbol.for(`node-ts/bus/workflow/${messageName}-handles-proxy`), | ||
Symbol.for(`node-ts/bus/workflow/${registration.workflowConstructor.name}-${messageName}-handles-proxy`), | ||
handler, | ||
@@ -168,0 +169,0 @@ step.messageConstructor |
@@ -12,3 +12,8 @@ import { injectable } from 'inversify' | ||
*/ | ||
Complete = 'complete' | ||
Complete = 'complete', | ||
/** | ||
* A pseudo status to indicate that changes to the current state should not be persisted | ||
*/ | ||
Discard = 'discard' | ||
} | ||
@@ -15,0 +20,0 @@ |
@@ -11,3 +11,12 @@ import { Container } from 'inversify' | ||
import { BusWorkflowModule } from '../bus-workflow-module' | ||
import { LoggerModule } from '@node-ts/logger-core' | ||
import { LoggerModule, LOGGER_SYMBOLS, Logger } from '@node-ts/logger-core' | ||
import { | ||
TestWorkflowStartedByCompletes, | ||
TestWorkflowStartedByCompletesData | ||
} from '../test/test-workflow-startedby-completes' | ||
import { | ||
TestWorkflowStartedByDiscard, | ||
TestWorkflowStartedByDiscardData | ||
} from '../test/test-workflow-startedby-discard' | ||
import { Mock } from 'typemoq' | ||
@@ -19,2 +28,6 @@ describe('Workflow', () => { | ||
const command = new TestCommand('abc') | ||
let bus: Bus | ||
const CONSUME_TIMEOUT = 500 | ||
beforeAll(async () => { | ||
@@ -25,2 +38,3 @@ container = new Container() | ||
container.load(new BusWorkflowModule()) | ||
container.rebind(LOGGER_SYMBOLS.Logger).toConstantValue(Mock.ofType<Logger>().object) | ||
@@ -31,2 +45,4 @@ persistence = container.get<Persistence>(BUS_WORKFLOW_SYMBOLS.Persistence) | ||
workflowRegistry.register(TestWorkflow, TestWorkflowData) | ||
workflowRegistry.register(TestWorkflowStartedByCompletes, TestWorkflowStartedByCompletesData) | ||
workflowRegistry.register(TestWorkflowStartedByDiscard, TestWorkflowStartedByDiscardData) | ||
await workflowRegistry.initializeWorkflows() | ||
@@ -36,2 +52,6 @@ | ||
await bootstrap.initialize(container) | ||
bus = container.get(BUS_SYMBOLS.Bus) | ||
await bus.send(command) | ||
await sleep(CONSUME_TIMEOUT) | ||
}) | ||
@@ -44,5 +64,2 @@ | ||
describe('when a message that starts a workflow is received', () => { | ||
let bus: Bus | ||
const command = new TestCommand('abc') | ||
const propertyMapping = new MessageWorkflowMapping<TestCommand, TestWorkflowData> ( | ||
@@ -53,8 +70,4 @@ cmd => cmd.property1, | ||
let workflowData: TestWorkflowData[] | ||
const CONSUME_TIMEOUT = 500 | ||
beforeAll(async () => { | ||
bus = container.get(BUS_SYMBOLS.Bus) | ||
await bus.send(command) | ||
await sleep(CONSUME_TIMEOUT) | ||
workflowData = await persistence.getWorkflowData<TestWorkflowData, TestCommand>( | ||
@@ -71,3 +84,3 @@ TestWorkflowData, | ||
expect(data.$status).toEqual(WorkflowStatus.Running) | ||
expect(data.$version).toEqual(1) | ||
expect(data.$version).toEqual(0) | ||
expect(data).toMatchObject({ property1: command.property1 }) | ||
@@ -94,4 +107,7 @@ }) | ||
expect(finalWorkflowData).toHaveLength(1) | ||
}) | ||
it('should mark the workflow as complete', () => { | ||
const data = finalWorkflowData[0] | ||
expect(data.$version).toEqual(2) | ||
expect(data.$status).toEqual(WorkflowStatus.Complete) | ||
}) | ||
@@ -101,2 +117,39 @@ }) | ||
describe('when a workflow is completed in a StartedBy handler', () => { | ||
const propertyMapping = new MessageWorkflowMapping<TestCommand, TestWorkflowStartedByCompletesData> ( | ||
cmd => cmd.property1, | ||
'property1' | ||
) | ||
it('should persist the workflow as completed', async () => { | ||
const workflowData = await persistence.getWorkflowData<TestWorkflowStartedByCompletesData, TestCommand>( | ||
TestWorkflowStartedByCompletesData, | ||
propertyMapping, | ||
command, | ||
true | ||
) | ||
expect(workflowData).toHaveLength(1) | ||
const data = workflowData[0] | ||
expect(data.$status).toEqual(WorkflowStatus.Complete) | ||
}) | ||
}) | ||
describe('when a StartedBy handler returns a discardStep', () => { | ||
const propertyMapping = new MessageWorkflowMapping<TestCommand, TestWorkflowStartedByDiscardData> ( | ||
cmd => cmd.property1, | ||
'property1' | ||
) | ||
it('should not persist the workflow', async () => { | ||
const workflowData = await persistence.getWorkflowData<TestWorkflowStartedByDiscardData, TestCommand>( | ||
TestWorkflowStartedByDiscardData, | ||
propertyMapping, | ||
command, | ||
true | ||
) | ||
expect(workflowData).toHaveLength(0) | ||
}) | ||
}) | ||
}) |
import { ClassConstructor } from '@node-ts/bus-core' | ||
import { WorkflowData } from './workflow-data' | ||
import { WorkflowData, WorkflowStatus } from './workflow-data' | ||
import { injectable } from 'inversify' | ||
@@ -9,2 +10,35 @@ export type WorkflowConstructor< | ||
export interface Workflow<TWorkflowData extends WorkflowData> {} | ||
export const WORKFLOW_STEP_DISCARDED = 'discarded' | ||
@injectable() | ||
export abstract class Workflow<TWorkflowData extends WorkflowData> { | ||
/** | ||
* Flags that the workflow is complete, thereby preventing it from reacting to any | ||
* subsequent messages. This should be called as part of the return value of | ||
* a handling function | ||
* @param data Any final modifications to the workflow data that will be persisted | ||
* @example | ||
* \@Handles<TaskRan, TestWorkflowData, 'handleTaskRan'>(TaskRan, event => event.value, 'property1') | ||
* async handleTaskRan (event: TaskRan): Promise<Partial<TestWorkflowData>> { | ||
* return this.complete({ taskRunDuration: event.duration }) | ||
* } | ||
*/ | ||
protected complete (data: Partial<TWorkflowData> = {}): Partial<TWorkflowData> { | ||
return { | ||
...data, | ||
$status: WorkflowStatus.Complete | ||
} | ||
} | ||
/** | ||
* Tells the workflow engine to avoid persisting state for this workflow step. If this is | ||
* returned in a StartedBy handler, then the handler state will not be created and stored. | ||
* If this is run in a Handles handler, then changes to the state will not be run. | ||
*/ | ||
protected discard (): Partial<TWorkflowData> { | ||
return { | ||
$status: WorkflowStatus.Discard | ||
} as Partial<TWorkflowData> | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
128312
2045
243
114