@node-ts/bus-core
Advanced tools
Comparing version 1.1.2 to 1.1.3
@@ -1,16 +0,3 @@ | ||
/// <reference types="node" /> | ||
import * as asyncHooks from 'async_hooks'; | ||
import { Namespace } from 'cls-hooked'; | ||
import { TransportMessage } from '../transport'; | ||
interface HandlingContext { | ||
/** | ||
* A list of child async process ids that are running in the same execution context | ||
* and need to be destroyed when the parent is destroyed. | ||
* | ||
* This can be handled with regular destroy hooks, however this is subject to GC | ||
* and runs the risk of keeping a large number of contexts open after their parent | ||
* context has closed. | ||
*/ | ||
childAsyncIds: number[]; | ||
message: TransportMessage<unknown>; | ||
} | ||
/** | ||
@@ -26,11 +13,22 @@ * This is an internal coordinator that tracks the execution context for | ||
/** | ||
* Executes a function within a context of cls-hooked | ||
*/ | ||
run: (fn: (...args: any[]) => void) => void; | ||
/** | ||
* Sets a new handling context for the current execution async id. Child asyncs should | ||
* only call this if they want to create a new context with themselves at the root. | ||
*/ | ||
set: (message: TransportMessage<unknown>) => Map<number, HandlingContext>; | ||
get: () => HandlingContext | undefined; | ||
destroy: () => void; | ||
enable: () => asyncHooks.AsyncHook; | ||
disable: () => asyncHooks.AsyncHook; | ||
set: (message: TransportMessage<unknown>) => any; | ||
/** | ||
* Fetches the message handling context of the active async stack | ||
*/ | ||
get: () => TransportMessage<unknown>; | ||
/** | ||
* Hooks into the async_hooks module to track the current execution context. Must be called before other operations. | ||
*/ | ||
enable: () => Namespace<Record<string, any>>; | ||
/** | ||
* Stops tracking the current execution context. | ||
*/ | ||
disable: () => void; | ||
}; | ||
export {}; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.messageHandlingContext = void 0; | ||
const tslib_1 = require("tslib"); | ||
const asyncHooks = tslib_1.__importStar(require("async_hooks")); | ||
const handlingContexts = new Map(); | ||
const init = (asyncId, _, triggerAsyncId) => { | ||
// Ensures that child promises inherit the same context as their parent | ||
if (handlingContexts.has(triggerAsyncId)) { | ||
const context = handlingContexts.get(triggerAsyncId); | ||
context.childAsyncIds.push(asyncId); | ||
handlingContexts.set(asyncId, context); | ||
} | ||
}; | ||
const destroy = (asyncId) => { | ||
if (handlingContexts.has(asyncId)) { | ||
handlingContexts.delete(asyncId); | ||
} | ||
}; | ||
const hooks = asyncHooks.createHook({ | ||
init, | ||
destroy | ||
}); | ||
const cls_hooked_1 = require("cls-hooked"); | ||
const NAMESPACE = 'message-handling-context'; | ||
let namespace; | ||
/** | ||
@@ -34,21 +17,26 @@ * This is an internal coordinator that tracks the execution context for | ||
/** | ||
* Executes a function within a context of cls-hooked | ||
*/ | ||
run: (fn) => namespace.run(fn), | ||
/** | ||
* Sets a new handling context for the current execution async id. Child asyncs should | ||
* only call this if they want to create a new context with themselves at the root. | ||
*/ | ||
set: (message) => handlingContexts.set(asyncHooks.executionAsyncId(), { | ||
childAsyncIds: [], | ||
message | ||
}), | ||
get: () => handlingContexts.get(asyncHooks.executionAsyncId()), | ||
destroy: () => { | ||
const asyncId = asyncHooks.executionAsyncId(); | ||
const context = handlingContexts.get(asyncId); | ||
if (!context) { | ||
return; | ||
set: (message) => namespace === null || namespace === void 0 ? void 0 : namespace.set('message', message), | ||
/** | ||
* Fetches the message handling context of the active async stack | ||
*/ | ||
get: () => namespace === null || namespace === void 0 ? void 0 : namespace.get('message'), | ||
/** | ||
* Hooks into the async_hooks module to track the current execution context. Must be called before other operations. | ||
*/ | ||
enable: () => (namespace = (0, cls_hooked_1.createNamespace)(NAMESPACE)), | ||
/** | ||
* Stops tracking the current execution context. | ||
*/ | ||
disable: () => { | ||
if ((0, cls_hooked_1.getNamespace)(NAMESPACE)) { | ||
(0, cls_hooked_1.destroyNamespace)(NAMESPACE); | ||
} | ||
context.childAsyncIds.forEach(childAsyncId => handlingContexts.delete(childAsyncId)); | ||
handlingContexts.delete(asyncId); | ||
}, | ||
enable: () => hooks.enable(), | ||
disable: () => hooks.disable() | ||
} | ||
}; |
@@ -71,3 +71,2 @@ "use strict"; | ||
} | ||
message_handling_context_1.messageHandlingContext.enable(); | ||
this.subscribeToInterruptSignals(this.coreDependencies.interruptSignals); | ||
@@ -114,7 +113,6 @@ this.isInitialized = true; | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
const context = message_handling_context_1.messageHandlingContext.get(); | ||
if (!context) { | ||
const message = message_handling_context_1.messageHandlingContext.get(); | ||
if (!message) { | ||
throw new error_1.FailMessageOutsideHandlingContext(); | ||
} | ||
const message = context.message; | ||
this.logger.debug('Failing message', { message }); | ||
@@ -149,2 +147,3 @@ return this.transport.fail(message); | ||
} | ||
message_handling_context_1.messageHandlingContext.enable(); | ||
this.internalState = bus_state_1.BusState.Started; | ||
@@ -165,2 +164,3 @@ for (var i = 0; i < this.concurrency; i++) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
message_handling_context_1.messageHandlingContext.disable(); | ||
const stoppedStates = [bus_state_1.BusState.Stopped, bus_state_1.BusState.Stopping]; | ||
@@ -214,9 +214,12 @@ if (stoppedStates.includes(this.state)) { | ||
this.runningWorkerCount++; | ||
while (this.internalState === bus_state_1.BusState.Started) { | ||
const messageHandled = yield this.handleNextMessage(); | ||
// Avoids locking up CPU when there's no messages to be processed | ||
if (!messageHandled) { | ||
yield (0, util_1.sleep)(EMPTY_QUEUE_SLEEP_MS); | ||
// Run the loop in a cls-hooked namespace to provide the message handling context to all async operations | ||
message_handling_context_1.messageHandlingContext.run(() => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
while (this.internalState === bus_state_1.BusState.Started) { | ||
const messageHandled = yield this.handleNextMessage(); | ||
// Avoids locking up CPU when there's no messages to be processed | ||
if (!messageHandled) { | ||
yield (0, util_1.sleep)(EMPTY_QUEUE_SLEEP_MS); | ||
} | ||
} | ||
} | ||
})); | ||
this.runningWorkerCount--; | ||
@@ -232,4 +235,4 @@ }); | ||
this.afterReceive.emit({ message }); | ||
message_handling_context_1.messageHandlingContext.set(message); | ||
try { | ||
message_handling_context_1.messageHandlingContext.set(message); | ||
yield this.messageReadMiddleware.dispatch(message); | ||
@@ -255,5 +258,2 @@ this.afterDispatch.emit({ | ||
} | ||
finally { | ||
message_handling_context_1.messageHandlingContext.destroy(); | ||
} | ||
return true; | ||
@@ -299,9 +299,7 @@ } | ||
(handlingContext | ||
? handlingContext.message.attributes.correlationId | ||
? handlingContext.attributes.correlationId | ||
: undefined) || | ||
(0, uuid_1.v4)(), | ||
attributes: clientOptions.attributes || {}, | ||
stickyAttributes: Object.assign(Object.assign({}, (handlingContext | ||
? handlingContext.message.attributes.stickyAttributes | ||
: {})), clientOptions.stickyAttributes) | ||
stickyAttributes: Object.assign(Object.assign({}, (handlingContext ? handlingContext.attributes.stickyAttributes : {})), clientOptions.stickyAttributes) | ||
}; | ||
@@ -308,0 +306,0 @@ this.logger.debug('Prepared transport options', { messageAttributes }); |
@@ -41,5 +41,4 @@ import { WorkflowState } from '../workflow-state'; | ||
private startWorkflowHandlingContext; | ||
private endWorkflowHandlingContext; | ||
private dispatchMessageToWorkflow; | ||
private persist; | ||
} |
@@ -59,3 +59,3 @@ "use strict"; | ||
if (this.workflowRegistry.length === 0) { | ||
this.logger.info('No workflows registered, skipping this step.'); | ||
this.logger.info('No workflows registered, skipping workflow initialization.'); | ||
return; | ||
@@ -136,10 +136,11 @@ } | ||
mapper.onStartedBy.forEach((options, messageConstructor) => handlerRegistry.register(messageConstructor, (message, messageAttributes) => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this.logger.debug('Starting new workflow instance', { | ||
workflow: options.workflowCtor, | ||
msg: message | ||
}); | ||
const workflowState = this.createWorkflowState(mapper.workflowStateCtor); | ||
const immutableWorkflowState = Object.freeze(Object.assign({}, workflowState)); | ||
this.startWorkflowHandlingContext(immutableWorkflowState); | ||
try { | ||
// Extend the current message handling context, and augment with workflow-specific context data | ||
message_handling_context_1.messageHandlingContext.run(() => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this.logger.debug('Starting new workflow instance', { | ||
workflow: options.workflowCtor, | ||
msg: message | ||
}); | ||
const workflowState = this.createWorkflowState(mapper.workflowStateCtor); | ||
const immutableWorkflowState = Object.freeze(Object.assign({}, workflowState)); | ||
this.startWorkflowHandlingContext(immutableWorkflowState); | ||
let workflow; | ||
@@ -170,6 +171,3 @@ if (container) { | ||
} | ||
} | ||
finally { | ||
this.endWorkflowHandlingContext(); | ||
} | ||
})); | ||
}))); | ||
@@ -196,9 +194,7 @@ } | ||
const workflowHandlers = workflowState.map((state) => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
try { | ||
// Extend the current message handling context, and augment with workflow-specific context data | ||
message_handling_context_1.messageHandlingContext.run(() => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this.startWorkflowHandlingContext(state); | ||
yield this.dispatchMessageToWorkflow(message, attributes, workflowCtor, state, mapper.workflowStateCtor, handler.workflowHandler, container); | ||
} | ||
finally { | ||
this.endWorkflowHandlingContext(); | ||
} | ||
})); | ||
})); | ||
@@ -235,3 +231,3 @@ const handlerResults = yield Promise.allSettled(workflowHandlers); | ||
}); | ||
const handlingContext = message_handling_context_1.messageHandlingContext.get().message; | ||
const handlingContext = message_handling_context_1.messageHandlingContext.get(); | ||
const workflowHandlingContext = structuredClone(handlingContext); | ||
@@ -243,6 +239,2 @@ workflowHandlingContext.attributes.stickyAttributes.workflowId = | ||
} | ||
endWorkflowHandlingContext() { | ||
this.logger.debug('Ending workflow handling context'); | ||
message_handling_context_1.messageHandlingContext.destroy(); | ||
} | ||
dispatchMessageToWorkflow(message, attributes, workflowCtor, workflowState, workflowStateConstructor, workflowHandler, container) { | ||
@@ -272,2 +264,3 @@ return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
const handler = workflow[workflowHandler]; | ||
// Invoke the workflow handler | ||
const workflowStateOutput = yield handler.bind(workflow)(message, immutableWorkflowState, attributes); | ||
@@ -274,0 +267,0 @@ const workflowName = workflowCtor.prototype.name; |
{ | ||
"name": "@node-ts/bus-core", | ||
"version": "1.1.2", | ||
"version": "1.1.3", | ||
"description": "A service bus for message-based, distributed node applications", | ||
@@ -13,2 +13,3 @@ "main": "./dist/index.js", | ||
"dependencies": { | ||
"cls-hooked": "^4.2.2", | ||
"debug": "^4.3.4", | ||
@@ -24,2 +25,3 @@ "reflect-metadata": "^0.1.13", | ||
"@node-ts/code-standards": "^0.0.10", | ||
"@types/cls-hooked": "^4.3.8", | ||
"@types/debug": "^4.1.5", | ||
@@ -26,0 +28,0 @@ "@types/faker": "^4.1.5", |
@@ -22,51 +22,43 @@ import { TransportMessage } from '../transport' | ||
it('should retrieve the message from within the same context', async () => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.set(message) | ||
const retrievedMessage = messageHandlingContext.get()!.message | ||
expect(retrievedMessage).toEqual(message) | ||
messageHandlingContext.destroy() | ||
}) | ||
it('should not retrieve a message after the context is destroyed', async () => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.set(message) | ||
messageHandlingContext.destroy() | ||
const context = messageHandlingContext.get() | ||
expect(context).toBeUndefined() | ||
}) | ||
it('should not retrieve a message from a different context', async () => { | ||
const context1 = new Promise<void>(resolve => { | ||
messageHandlingContext.run(() => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.set(message) | ||
const retrievedMessage = messageHandlingContext.get()!.message | ||
const retrievedMessage = messageHandlingContext.get()! | ||
expect(retrievedMessage).toEqual(message) | ||
messageHandlingContext.destroy() | ||
resolve() | ||
}) | ||
const context2 = new Promise<void>(resolve => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.set(message) | ||
const retrievedMessage = messageHandlingContext.get()!.message | ||
expect(retrievedMessage).toEqual(message) | ||
messageHandlingContext.destroy() | ||
resolve() | ||
}) | ||
it('should not retrieve a message from a different context', async () => { | ||
messageHandlingContext.run(async () => { | ||
const context1 = new Promise<void>(resolve => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.set(message) | ||
const retrievedMessage = messageHandlingContext.get()! | ||
expect(retrievedMessage).toEqual(message) | ||
resolve() | ||
}) | ||
const context2 = new Promise<void>(resolve => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.set(message) | ||
const retrievedMessage = messageHandlingContext.get()! | ||
expect(retrievedMessage).toEqual(message) | ||
resolve() | ||
}) | ||
await Promise.all([context1, context2]) | ||
}) | ||
await Promise.all([context1, context2]) | ||
}) | ||
it('should retrieve a message from a nested async chain', async () => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.set(message) | ||
messageHandlingContext.run(async () => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.set(message) | ||
await new Promise<void>(resolve => { | ||
const retrievedMessage = messageHandlingContext.get()!.message | ||
expect(retrievedMessage).toEqual(message) | ||
resolve() | ||
await new Promise<void>(resolve => { | ||
const retrievedMessage = messageHandlingContext.get()! | ||
expect(retrievedMessage).toEqual(message) | ||
resolve() | ||
}) | ||
}) | ||
messageHandlingContext.destroy() | ||
}) | ||
}) | ||
}) |
@@ -1,39 +0,12 @@ | ||
import * as asyncHooks from 'async_hooks' | ||
import { | ||
createNamespace, | ||
destroyNamespace, | ||
getNamespace, | ||
Namespace | ||
} from 'cls-hooked' | ||
import { TransportMessage } from '../transport' | ||
interface HandlingContext { | ||
/** | ||
* A list of child async process ids that are running in the same execution context | ||
* and need to be destroyed when the parent is destroyed. | ||
* | ||
* This can be handled with regular destroy hooks, however this is subject to GC | ||
* and runs the risk of keeping a large number of contexts open after their parent | ||
* context has closed. | ||
*/ | ||
childAsyncIds: number[] | ||
message: TransportMessage<unknown> | ||
} | ||
const NAMESPACE = 'message-handling-context' | ||
let namespace: Namespace | ||
const handlingContexts = new Map<number, HandlingContext>() | ||
const init = (asyncId: number, _: string, triggerAsyncId: number) => { | ||
// Ensures that child promises inherit the same context as their parent | ||
if (handlingContexts.has(triggerAsyncId)) { | ||
const context = handlingContexts.get(triggerAsyncId)! | ||
context.childAsyncIds.push(asyncId) | ||
handlingContexts.set(asyncId, context) | ||
} | ||
} | ||
const destroy = (asyncId: number) => { | ||
if (handlingContexts.has(asyncId)) { | ||
handlingContexts.delete(asyncId) | ||
} | ||
} | ||
const hooks = asyncHooks.createHook({ | ||
init, | ||
destroy | ||
}) | ||
/** | ||
@@ -49,2 +22,6 @@ * This is an internal coordinator that tracks the execution context for | ||
/** | ||
* Executes a function within a context of cls-hooked | ||
*/ | ||
run: (fn: (...args: any[]) => void) => namespace.run(fn), | ||
/** | ||
* Sets a new handling context for the current execution async id. Child asyncs should | ||
@@ -54,21 +31,19 @@ * only call this if they want to create a new context with themselves at the root. | ||
set: (message: TransportMessage<unknown>) => | ||
handlingContexts.set(asyncHooks.executionAsyncId(), { | ||
childAsyncIds: [], | ||
message | ||
}), | ||
get: () => handlingContexts.get(asyncHooks.executionAsyncId()), | ||
destroy: () => { | ||
const asyncId = asyncHooks.executionAsyncId() | ||
const context = handlingContexts.get(asyncId) | ||
if (!context) { | ||
return | ||
namespace?.set('message', message), | ||
/** | ||
* Fetches the message handling context of the active async stack | ||
*/ | ||
get: () => namespace?.get('message') as TransportMessage<unknown>, | ||
/** | ||
* Hooks into the async_hooks module to track the current execution context. Must be called before other operations. | ||
*/ | ||
enable: () => (namespace = createNamespace(NAMESPACE)), | ||
/** | ||
* Stops tracking the current execution context. | ||
*/ | ||
disable: () => { | ||
if (getNamespace(NAMESPACE)) { | ||
destroyNamespace(NAMESPACE) | ||
} | ||
context.childAsyncIds.forEach(childAsyncId => | ||
handlingContexts.delete(childAsyncId) | ||
) | ||
handlingContexts.delete(asyncId) | ||
}, | ||
enable: () => hooks.enable(), | ||
disable: () => hooks.disable() | ||
} | ||
} |
@@ -5,8 +5,9 @@ import { InMemoryQueue } from '../transport' | ||
import { sleep } from '../util' | ||
import { Mock, IMock } from 'typemoq' | ||
import { Mock, IMock, Times } from 'typemoq' | ||
import { BusInstance } from './bus-instance' | ||
import { handlerFor } from '../handler' | ||
import { TestCommand } from '../test/test-command' | ||
const event = new TestEvent() | ||
type Callback = () => void | ||
type Callback = (correlationId: string) => void | ||
@@ -21,9 +22,15 @@ describe('BusInstance - Concurrency', () => { | ||
const handler = handlerFor(TestEvent, async () => { | ||
const eventHandler = handlerFor(TestEvent, async () => { | ||
handleCount++ | ||
await new Promise(resolve => { | ||
resolutions.push(resolve) | ||
}) | ||
await bus.send(new TestCommand()) | ||
}) | ||
const commandHandler = handlerFor(TestCommand, async (_, attributes) => { | ||
callback.object(attributes.correlationId!) | ||
}) | ||
beforeAll(async () => { | ||
@@ -35,3 +42,4 @@ queue = new InMemoryQueue() | ||
.withTransport(queue) | ||
.withHandler(handler) | ||
.withHandler(eventHandler) | ||
.withHandler(commandHandler) | ||
.withConcurrency(CONCURRENCY) | ||
@@ -48,8 +56,9 @@ .build() | ||
beforeAll(async () => { | ||
// These should be handled immediately | ||
await bus.publish(event) | ||
await bus.publish(event) | ||
// This should be handled when the next worker becomes available | ||
await bus.publish(event) | ||
await Promise.all([ | ||
// These should be handled immediately | ||
bus.publish(event, { correlationId: 'first' }), | ||
bus.publish(event, { correlationId: 'second' }), | ||
// This should be handled when the next worker becomes available | ||
bus.publish(event, { correlationId: 'third' }) | ||
]) | ||
await sleep(100) | ||
@@ -61,9 +70,20 @@ }) | ||
// Let the first handler complete | ||
resolutions[0](undefined) | ||
await sleep(10) | ||
expect(handleCount).toEqual(CONCURRENCY + 1) | ||
// Resolve subsequent handlers | ||
resolutions[1](undefined) | ||
resolutions[2](undefined) | ||
}) | ||
describe('when the command handlers are run', () => { | ||
it('the message handling context should have propagated all sticky attributes', () => { | ||
callback.verify(x => x('first'), Times.once()) | ||
callback.verify(x => x('second'), Times.once()) | ||
callback.verify(x => x('third'), Times.once()) | ||
}) | ||
}) | ||
}) | ||
}) |
@@ -134,3 +134,2 @@ import { Transport, TransportMessage } from '../transport' | ||
messageHandlingContext.enable() | ||
this.subscribeToInterruptSignals(this.coreDependencies.interruptSignals) | ||
@@ -180,7 +179,6 @@ this.isInitialized = true | ||
async fail(): Promise<void> { | ||
const context = messageHandlingContext.get() | ||
if (!context) { | ||
const message = messageHandlingContext.get() | ||
if (!message) { | ||
throw new FailMessageOutsideHandlingContext() | ||
} | ||
const message = context.message | ||
this.logger.debug('Failing message', { message }) | ||
@@ -219,2 +217,3 @@ return this.transport.fail(message) | ||
} | ||
messageHandlingContext.enable() | ||
@@ -236,2 +235,3 @@ this.internalState = BusState.Started | ||
async stop(): Promise<void> { | ||
messageHandlingContext.disable() | ||
const stoppedStates = [BusState.Stopped, BusState.Stopping] | ||
@@ -290,10 +290,14 @@ if (stoppedStates.includes(this.state)) { | ||
this.runningWorkerCount++ | ||
while (this.internalState === BusState.Started) { | ||
const messageHandled = await this.handleNextMessage() | ||
// Avoids locking up CPU when there's no messages to be processed | ||
if (!messageHandled) { | ||
await sleep(EMPTY_QUEUE_SLEEP_MS) | ||
// Run the loop in a cls-hooked namespace to provide the message handling context to all async operations | ||
messageHandlingContext.run(async () => { | ||
while (this.internalState === BusState.Started) { | ||
const messageHandled = await this.handleNextMessage() | ||
// Avoids locking up CPU when there's no messages to be processed | ||
if (!messageHandled) { | ||
await sleep(EMPTY_QUEUE_SLEEP_MS) | ||
} | ||
} | ||
} | ||
}) | ||
this.runningWorkerCount-- | ||
@@ -309,6 +313,5 @@ } | ||
this.afterReceive.emit({ message }) | ||
messageHandlingContext.set(message) | ||
try { | ||
messageHandlingContext.set(message) | ||
await this.messageReadMiddleware.dispatch(message) | ||
@@ -336,4 +339,2 @@ | ||
return false | ||
} finally { | ||
messageHandlingContext.destroy() | ||
} | ||
@@ -402,3 +403,3 @@ return true | ||
(handlingContext | ||
? handlingContext.message.attributes.correlationId | ||
? handlingContext.attributes.correlationId | ||
: undefined) || | ||
@@ -408,5 +409,3 @@ generateUuid(), | ||
stickyAttributes: { | ||
...(handlingContext | ||
? handlingContext.message.attributes.stickyAttributes | ||
: {}), | ||
...(handlingContext ? handlingContext.attributes.stickyAttributes : {}), | ||
...clientOptions.stickyAttributes | ||
@@ -413,0 +412,0 @@ } |
@@ -82,3 +82,5 @@ import { WorkflowState, WorkflowStatus } from '../workflow-state' | ||
if (this.workflowRegistry.length === 0) { | ||
this.logger.info('No workflows registered, skipping this step.') | ||
this.logger.info( | ||
'No workflows registered, skipping workflow initialization.' | ||
) | ||
return | ||
@@ -182,12 +184,13 @@ } | ||
async (message, messageAttributes) => { | ||
this.logger.debug('Starting new workflow instance', { | ||
workflow: options.workflowCtor, | ||
msg: message | ||
}) | ||
const workflowState = this.createWorkflowState( | ||
mapper.workflowStateCtor! | ||
) | ||
const immutableWorkflowState = Object.freeze({ ...workflowState }) | ||
this.startWorkflowHandlingContext(immutableWorkflowState) | ||
try { | ||
// Extend the current message handling context, and augment with workflow-specific context data | ||
messageHandlingContext.run(async () => { | ||
this.logger.debug('Starting new workflow instance', { | ||
workflow: options.workflowCtor, | ||
msg: message | ||
}) | ||
const workflowState = this.createWorkflowState( | ||
mapper.workflowStateCtor! | ||
) | ||
const immutableWorkflowState = Object.freeze({ ...workflowState }) | ||
this.startWorkflowHandlingContext(immutableWorkflowState) | ||
let workflow: Workflow<WorkflowState> | ||
@@ -227,5 +230,3 @@ if (container) { | ||
} | ||
} finally { | ||
this.endWorkflowHandlingContext() | ||
} | ||
}) | ||
} | ||
@@ -278,3 +279,4 @@ ) | ||
const workflowHandlers = workflowState.map(async state => { | ||
try { | ||
// Extend the current message handling context, and augment with workflow-specific context data | ||
messageHandlingContext.run(async () => { | ||
this.startWorkflowHandlingContext(state) | ||
@@ -290,5 +292,3 @@ await this.dispatchMessageToWorkflow( | ||
) | ||
} finally { | ||
this.endWorkflowHandlingContext() | ||
} | ||
}) | ||
}) | ||
@@ -334,3 +334,3 @@ | ||
}) | ||
const handlingContext = messageHandlingContext.get()!.message | ||
const handlingContext = messageHandlingContext.get()! | ||
const workflowHandlingContext = structuredClone(handlingContext) | ||
@@ -342,7 +342,2 @@ workflowHandlingContext.attributes.stickyAttributes.workflowId = | ||
private endWorkflowHandlingContext() { | ||
this.logger.debug('Ending workflow handling context') | ||
messageHandlingContext.destroy() | ||
} | ||
private async dispatchMessageToWorkflow( | ||
@@ -378,2 +373,4 @@ message: Message, | ||
const handler = workflow[workflowHandler] as Function | ||
// Invoke the workflow handler | ||
const workflowStateOutput = await handler.bind(workflow)( | ||
@@ -380,0 +377,0 @@ message, |
@@ -172,4 +172,4 @@ import * as uuid from 'uuid' | ||
await bus.initialize() | ||
await bus.start() | ||
await bus.send(event) | ||
await bus.start() | ||
await sleep(CONSUME_TIMEOUT) | ||
@@ -176,0 +176,0 @@ }) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Debug access
Supply chain riskUses debug, reflection and dynamic code execution features.
Found 1 instance in 1 package
351692
261
8528
1
8
10
+ Addedcls-hooked@^4.2.2
+ Addedasync-hook-jl@1.7.6(transitive)
+ Addedcls-hooked@4.2.2(transitive)
+ Addedemitter-listener@1.1.2(transitive)
+ Addedsemver@5.7.2(transitive)
+ Addedshimmer@1.2.1(transitive)
+ Addedstack-chain@1.3.7(transitive)