Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@node-ts/bus-core

Package Overview
Dependencies
Maintainers
1
Versions
96
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@node-ts/bus-core - npm Package Compare versions

Comparing version 1.1.2 to 1.1.3

src/workflow/workflow-concurrency.integration.ts

38

dist/message-handling-context/message-handling-context.d.ts

@@ -1,16 +0,3 @@

/// <reference types="node" />
import * as asyncHooks from 'async_hooks';
import { Namespace } from 'cls-hooked';
import { TransportMessage } from '../transport';
interface HandlingContext {
/**
* A list of child async process ids that are running in the same execution context
* and need to be destroyed when the parent is destroyed.
*
* This can be handled with regular destroy hooks, however this is subject to GC
* and runs the risk of keeping a large number of contexts open after their parent
* context has closed.
*/
childAsyncIds: number[];
message: TransportMessage<unknown>;
}
/**

@@ -26,11 +13,22 @@ * This is an internal coordinator that tracks the execution context for

/**
* Executes a function within a context of cls-hooked
*/
run: (fn: (...args: any[]) => void) => void;
/**
* Sets a new handling context for the current execution async id. Child asyncs should
* only call this if they want to create a new context with themselves at the root.
*/
set: (message: TransportMessage<unknown>) => Map<number, HandlingContext>;
get: () => HandlingContext | undefined;
destroy: () => void;
enable: () => asyncHooks.AsyncHook;
disable: () => asyncHooks.AsyncHook;
set: (message: TransportMessage<unknown>) => any;
/**
* Fetches the message handling context of the active async stack
*/
get: () => TransportMessage<unknown>;
/**
* Hooks into the async_hooks module to track the current execution context. Must be called before other operations.
*/
enable: () => Namespace<Record<string, any>>;
/**
* Stops tracking the current execution context.
*/
disable: () => void;
};
export {};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.messageHandlingContext = void 0;
const tslib_1 = require("tslib");
const asyncHooks = tslib_1.__importStar(require("async_hooks"));
const handlingContexts = new Map();
const init = (asyncId, _, triggerAsyncId) => {
// Ensures that child promises inherit the same context as their parent
if (handlingContexts.has(triggerAsyncId)) {
const context = handlingContexts.get(triggerAsyncId);
context.childAsyncIds.push(asyncId);
handlingContexts.set(asyncId, context);
}
};
const destroy = (asyncId) => {
if (handlingContexts.has(asyncId)) {
handlingContexts.delete(asyncId);
}
};
const hooks = asyncHooks.createHook({
init,
destroy
});
const cls_hooked_1 = require("cls-hooked");
const NAMESPACE = 'message-handling-context';
let namespace;
/**

@@ -34,21 +17,26 @@ * This is an internal coordinator that tracks the execution context for

/**
* Executes a function within a context of cls-hooked
*/
run: (fn) => namespace.run(fn),
/**
* Sets a new handling context for the current execution async id. Child asyncs should
* only call this if they want to create a new context with themselves at the root.
*/
set: (message) => handlingContexts.set(asyncHooks.executionAsyncId(), {
childAsyncIds: [],
message
}),
get: () => handlingContexts.get(asyncHooks.executionAsyncId()),
destroy: () => {
const asyncId = asyncHooks.executionAsyncId();
const context = handlingContexts.get(asyncId);
if (!context) {
return;
set: (message) => namespace === null || namespace === void 0 ? void 0 : namespace.set('message', message),
/**
* Fetches the message handling context of the active async stack
*/
get: () => namespace === null || namespace === void 0 ? void 0 : namespace.get('message'),
/**
* Hooks into the async_hooks module to track the current execution context. Must be called before other operations.
*/
enable: () => (namespace = (0, cls_hooked_1.createNamespace)(NAMESPACE)),
/**
* Stops tracking the current execution context.
*/
disable: () => {
if ((0, cls_hooked_1.getNamespace)(NAMESPACE)) {
(0, cls_hooked_1.destroyNamespace)(NAMESPACE);
}
context.childAsyncIds.forEach(childAsyncId => handlingContexts.delete(childAsyncId));
handlingContexts.delete(asyncId);
},
enable: () => hooks.enable(),
disable: () => hooks.disable()
}
};

@@ -71,3 +71,2 @@ "use strict";

}
message_handling_context_1.messageHandlingContext.enable();
this.subscribeToInterruptSignals(this.coreDependencies.interruptSignals);

@@ -114,7 +113,6 @@ this.isInitialized = true;

return tslib_1.__awaiter(this, void 0, void 0, function* () {
const context = message_handling_context_1.messageHandlingContext.get();
if (!context) {
const message = message_handling_context_1.messageHandlingContext.get();
if (!message) {
throw new error_1.FailMessageOutsideHandlingContext();
}
const message = context.message;
this.logger.debug('Failing message', { message });

@@ -149,2 +147,3 @@ return this.transport.fail(message);

}
message_handling_context_1.messageHandlingContext.enable();
this.internalState = bus_state_1.BusState.Started;

@@ -165,2 +164,3 @@ for (var i = 0; i < this.concurrency; i++) {

return tslib_1.__awaiter(this, void 0, void 0, function* () {
message_handling_context_1.messageHandlingContext.disable();
const stoppedStates = [bus_state_1.BusState.Stopped, bus_state_1.BusState.Stopping];

@@ -214,9 +214,12 @@ if (stoppedStates.includes(this.state)) {

this.runningWorkerCount++;
while (this.internalState === bus_state_1.BusState.Started) {
const messageHandled = yield this.handleNextMessage();
// Avoids locking up CPU when there's no messages to be processed
if (!messageHandled) {
yield (0, util_1.sleep)(EMPTY_QUEUE_SLEEP_MS);
// Run the loop in a cls-hooked namespace to provide the message handling context to all async operations
message_handling_context_1.messageHandlingContext.run(() => tslib_1.__awaiter(this, void 0, void 0, function* () {
while (this.internalState === bus_state_1.BusState.Started) {
const messageHandled = yield this.handleNextMessage();
// Avoids locking up CPU when there's no messages to be processed
if (!messageHandled) {
yield (0, util_1.sleep)(EMPTY_QUEUE_SLEEP_MS);
}
}
}
}));
this.runningWorkerCount--;

@@ -232,4 +235,4 @@ });

this.afterReceive.emit({ message });
message_handling_context_1.messageHandlingContext.set(message);
try {
message_handling_context_1.messageHandlingContext.set(message);
yield this.messageReadMiddleware.dispatch(message);

@@ -255,5 +258,2 @@ this.afterDispatch.emit({

}
finally {
message_handling_context_1.messageHandlingContext.destroy();
}
return true;

@@ -299,9 +299,7 @@ }

(handlingContext
? handlingContext.message.attributes.correlationId
? handlingContext.attributes.correlationId
: undefined) ||
(0, uuid_1.v4)(),
attributes: clientOptions.attributes || {},
stickyAttributes: Object.assign(Object.assign({}, (handlingContext
? handlingContext.message.attributes.stickyAttributes
: {})), clientOptions.stickyAttributes)
stickyAttributes: Object.assign(Object.assign({}, (handlingContext ? handlingContext.attributes.stickyAttributes : {})), clientOptions.stickyAttributes)
};

@@ -308,0 +306,0 @@ this.logger.debug('Prepared transport options', { messageAttributes });

@@ -41,5 +41,4 @@ import { WorkflowState } from '../workflow-state';

private startWorkflowHandlingContext;
private endWorkflowHandlingContext;
private dispatchMessageToWorkflow;
private persist;
}

@@ -59,3 +59,3 @@ "use strict";

if (this.workflowRegistry.length === 0) {
this.logger.info('No workflows registered, skipping this step.');
this.logger.info('No workflows registered, skipping workflow initialization.');
return;

@@ -136,10 +136,11 @@ }

mapper.onStartedBy.forEach((options, messageConstructor) => handlerRegistry.register(messageConstructor, (message, messageAttributes) => tslib_1.__awaiter(this, void 0, void 0, function* () {
this.logger.debug('Starting new workflow instance', {
workflow: options.workflowCtor,
msg: message
});
const workflowState = this.createWorkflowState(mapper.workflowStateCtor);
const immutableWorkflowState = Object.freeze(Object.assign({}, workflowState));
this.startWorkflowHandlingContext(immutableWorkflowState);
try {
// Extend the current message handling context, and augment with workflow-specific context data
message_handling_context_1.messageHandlingContext.run(() => tslib_1.__awaiter(this, void 0, void 0, function* () {
this.logger.debug('Starting new workflow instance', {
workflow: options.workflowCtor,
msg: message
});
const workflowState = this.createWorkflowState(mapper.workflowStateCtor);
const immutableWorkflowState = Object.freeze(Object.assign({}, workflowState));
this.startWorkflowHandlingContext(immutableWorkflowState);
let workflow;

@@ -170,6 +171,3 @@ if (container) {

}
}
finally {
this.endWorkflowHandlingContext();
}
}));
})));

@@ -196,9 +194,7 @@ }

const workflowHandlers = workflowState.map((state) => tslib_1.__awaiter(this, void 0, void 0, function* () {
try {
// Extend the current message handling context, and augment with workflow-specific context data
message_handling_context_1.messageHandlingContext.run(() => tslib_1.__awaiter(this, void 0, void 0, function* () {
this.startWorkflowHandlingContext(state);
yield this.dispatchMessageToWorkflow(message, attributes, workflowCtor, state, mapper.workflowStateCtor, handler.workflowHandler, container);
}
finally {
this.endWorkflowHandlingContext();
}
}));
}));

@@ -235,3 +231,3 @@ const handlerResults = yield Promise.allSettled(workflowHandlers);

});
const handlingContext = message_handling_context_1.messageHandlingContext.get().message;
const handlingContext = message_handling_context_1.messageHandlingContext.get();
const workflowHandlingContext = structuredClone(handlingContext);

@@ -243,6 +239,2 @@ workflowHandlingContext.attributes.stickyAttributes.workflowId =

}
endWorkflowHandlingContext() {
this.logger.debug('Ending workflow handling context');
message_handling_context_1.messageHandlingContext.destroy();
}
dispatchMessageToWorkflow(message, attributes, workflowCtor, workflowState, workflowStateConstructor, workflowHandler, container) {

@@ -272,2 +264,3 @@ return tslib_1.__awaiter(this, void 0, void 0, function* () {

const handler = workflow[workflowHandler];
// Invoke the workflow handler
const workflowStateOutput = yield handler.bind(workflow)(message, immutableWorkflowState, attributes);

@@ -274,0 +267,0 @@ const workflowName = workflowCtor.prototype.name;

{
"name": "@node-ts/bus-core",
"version": "1.1.2",
"version": "1.1.3",
"description": "A service bus for message-based, distributed node applications",

@@ -13,2 +13,3 @@ "main": "./dist/index.js",

"dependencies": {
"cls-hooked": "^4.2.2",
"debug": "^4.3.4",

@@ -24,2 +25,3 @@ "reflect-metadata": "^0.1.13",

"@node-ts/code-standards": "^0.0.10",
"@types/cls-hooked": "^4.3.8",
"@types/debug": "^4.1.5",

@@ -26,0 +28,0 @@ "@types/faker": "^4.1.5",

@@ -22,51 +22,43 @@ import { TransportMessage } from '../transport'

it('should retrieve the message from within the same context', async () => {
const message = buildTransportMessage()
messageHandlingContext.set(message)
const retrievedMessage = messageHandlingContext.get()!.message
expect(retrievedMessage).toEqual(message)
messageHandlingContext.destroy()
})
it('should not retrieve a message after the context is destroyed', async () => {
const message = buildTransportMessage()
messageHandlingContext.set(message)
messageHandlingContext.destroy()
const context = messageHandlingContext.get()
expect(context).toBeUndefined()
})
it('should not retrieve a message from a different context', async () => {
const context1 = new Promise<void>(resolve => {
messageHandlingContext.run(() => {
const message = buildTransportMessage()
messageHandlingContext.set(message)
const retrievedMessage = messageHandlingContext.get()!.message
const retrievedMessage = messageHandlingContext.get()!
expect(retrievedMessage).toEqual(message)
messageHandlingContext.destroy()
resolve()
})
const context2 = new Promise<void>(resolve => {
const message = buildTransportMessage()
messageHandlingContext.set(message)
const retrievedMessage = messageHandlingContext.get()!.message
expect(retrievedMessage).toEqual(message)
messageHandlingContext.destroy()
resolve()
})
it('should not retrieve a message from a different context', async () => {
messageHandlingContext.run(async () => {
const context1 = new Promise<void>(resolve => {
const message = buildTransportMessage()
messageHandlingContext.set(message)
const retrievedMessage = messageHandlingContext.get()!
expect(retrievedMessage).toEqual(message)
resolve()
})
const context2 = new Promise<void>(resolve => {
const message = buildTransportMessage()
messageHandlingContext.set(message)
const retrievedMessage = messageHandlingContext.get()!
expect(retrievedMessage).toEqual(message)
resolve()
})
await Promise.all([context1, context2])
})
await Promise.all([context1, context2])
})
it('should retrieve a message from a nested async chain', async () => {
const message = buildTransportMessage()
messageHandlingContext.set(message)
messageHandlingContext.run(async () => {
const message = buildTransportMessage()
messageHandlingContext.set(message)
await new Promise<void>(resolve => {
const retrievedMessage = messageHandlingContext.get()!.message
expect(retrievedMessage).toEqual(message)
resolve()
await new Promise<void>(resolve => {
const retrievedMessage = messageHandlingContext.get()!
expect(retrievedMessage).toEqual(message)
resolve()
})
})
messageHandlingContext.destroy()
})
})
})

@@ -1,39 +0,12 @@

import * as asyncHooks from 'async_hooks'
import {
createNamespace,
destroyNamespace,
getNamespace,
Namespace
} from 'cls-hooked'
import { TransportMessage } from '../transport'
interface HandlingContext {
/**
* A list of child async process ids that are running in the same execution context
* and need to be destroyed when the parent is destroyed.
*
* This can be handled with regular destroy hooks, however this is subject to GC
* and runs the risk of keeping a large number of contexts open after their parent
* context has closed.
*/
childAsyncIds: number[]
message: TransportMessage<unknown>
}
const NAMESPACE = 'message-handling-context'
let namespace: Namespace
const handlingContexts = new Map<number, HandlingContext>()
const init = (asyncId: number, _: string, triggerAsyncId: number) => {
// Ensures that child promises inherit the same context as their parent
if (handlingContexts.has(triggerAsyncId)) {
const context = handlingContexts.get(triggerAsyncId)!
context.childAsyncIds.push(asyncId)
handlingContexts.set(asyncId, context)
}
}
const destroy = (asyncId: number) => {
if (handlingContexts.has(asyncId)) {
handlingContexts.delete(asyncId)
}
}
const hooks = asyncHooks.createHook({
init,
destroy
})
/**

@@ -49,2 +22,6 @@ * This is an internal coordinator that tracks the execution context for

/**
* Executes a function within a context of cls-hooked
*/
run: (fn: (...args: any[]) => void) => namespace.run(fn),
/**
* Sets a new handling context for the current execution async id. Child asyncs should

@@ -54,21 +31,19 @@ * only call this if they want to create a new context with themselves at the root.

set: (message: TransportMessage<unknown>) =>
handlingContexts.set(asyncHooks.executionAsyncId(), {
childAsyncIds: [],
message
}),
get: () => handlingContexts.get(asyncHooks.executionAsyncId()),
destroy: () => {
const asyncId = asyncHooks.executionAsyncId()
const context = handlingContexts.get(asyncId)
if (!context) {
return
namespace?.set('message', message),
/**
* Fetches the message handling context of the active async stack
*/
get: () => namespace?.get('message') as TransportMessage<unknown>,
/**
* Hooks into the async_hooks module to track the current execution context. Must be called before other operations.
*/
enable: () => (namespace = createNamespace(NAMESPACE)),
/**
* Stops tracking the current execution context.
*/
disable: () => {
if (getNamespace(NAMESPACE)) {
destroyNamespace(NAMESPACE)
}
context.childAsyncIds.forEach(childAsyncId =>
handlingContexts.delete(childAsyncId)
)
handlingContexts.delete(asyncId)
},
enable: () => hooks.enable(),
disable: () => hooks.disable()
}
}

@@ -5,8 +5,9 @@ import { InMemoryQueue } from '../transport'

import { sleep } from '../util'
import { Mock, IMock } from 'typemoq'
import { Mock, IMock, Times } from 'typemoq'
import { BusInstance } from './bus-instance'
import { handlerFor } from '../handler'
import { TestCommand } from '../test/test-command'
const event = new TestEvent()
type Callback = () => void
type Callback = (correlationId: string) => void

@@ -21,9 +22,15 @@ describe('BusInstance - Concurrency', () => {

const handler = handlerFor(TestEvent, async () => {
const eventHandler = handlerFor(TestEvent, async () => {
handleCount++
await new Promise(resolve => {
resolutions.push(resolve)
})
await bus.send(new TestCommand())
})
const commandHandler = handlerFor(TestCommand, async (_, attributes) => {
callback.object(attributes.correlationId!)
})
beforeAll(async () => {

@@ -35,3 +42,4 @@ queue = new InMemoryQueue()

.withTransport(queue)
.withHandler(handler)
.withHandler(eventHandler)
.withHandler(commandHandler)
.withConcurrency(CONCURRENCY)

@@ -48,8 +56,9 @@ .build()

beforeAll(async () => {
// These should be handled immediately
await bus.publish(event)
await bus.publish(event)
// This should be handled when the next worker becomes available
await bus.publish(event)
await Promise.all([
// These should be handled immediately
bus.publish(event, { correlationId: 'first' }),
bus.publish(event, { correlationId: 'second' }),
// This should be handled when the next worker becomes available
bus.publish(event, { correlationId: 'third' })
])
await sleep(100)

@@ -61,9 +70,20 @@ })

// Let the first handler complete
resolutions[0](undefined)
await sleep(10)
expect(handleCount).toEqual(CONCURRENCY + 1)
// Resolve subsequent handlers
resolutions[1](undefined)
resolutions[2](undefined)
})
describe('when the command handlers are run', () => {
it('the message handling context should have propagated all sticky attributes', () => {
callback.verify(x => x('first'), Times.once())
callback.verify(x => x('second'), Times.once())
callback.verify(x => x('third'), Times.once())
})
})
})
})

@@ -134,3 +134,2 @@ import { Transport, TransportMessage } from '../transport'

messageHandlingContext.enable()
this.subscribeToInterruptSignals(this.coreDependencies.interruptSignals)

@@ -180,7 +179,6 @@ this.isInitialized = true

async fail(): Promise<void> {
const context = messageHandlingContext.get()
if (!context) {
const message = messageHandlingContext.get()
if (!message) {
throw new FailMessageOutsideHandlingContext()
}
const message = context.message
this.logger.debug('Failing message', { message })

@@ -219,2 +217,3 @@ return this.transport.fail(message)

}
messageHandlingContext.enable()

@@ -236,2 +235,3 @@ this.internalState = BusState.Started

async stop(): Promise<void> {
messageHandlingContext.disable()
const stoppedStates = [BusState.Stopped, BusState.Stopping]

@@ -290,10 +290,14 @@ if (stoppedStates.includes(this.state)) {

this.runningWorkerCount++
while (this.internalState === BusState.Started) {
const messageHandled = await this.handleNextMessage()
// Avoids locking up CPU when there's no messages to be processed
if (!messageHandled) {
await sleep(EMPTY_QUEUE_SLEEP_MS)
// Run the loop in a cls-hooked namespace to provide the message handling context to all async operations
messageHandlingContext.run(async () => {
while (this.internalState === BusState.Started) {
const messageHandled = await this.handleNextMessage()
// Avoids locking up CPU when there's no messages to be processed
if (!messageHandled) {
await sleep(EMPTY_QUEUE_SLEEP_MS)
}
}
}
})
this.runningWorkerCount--

@@ -309,6 +313,5 @@ }

this.afterReceive.emit({ message })
messageHandlingContext.set(message)
try {
messageHandlingContext.set(message)
await this.messageReadMiddleware.dispatch(message)

@@ -336,4 +339,2 @@

return false
} finally {
messageHandlingContext.destroy()
}

@@ -402,3 +403,3 @@ return true

(handlingContext
? handlingContext.message.attributes.correlationId
? handlingContext.attributes.correlationId
: undefined) ||

@@ -408,5 +409,3 @@ generateUuid(),

stickyAttributes: {
...(handlingContext
? handlingContext.message.attributes.stickyAttributes
: {}),
...(handlingContext ? handlingContext.attributes.stickyAttributes : {}),
...clientOptions.stickyAttributes

@@ -413,0 +412,0 @@ }

@@ -82,3 +82,5 @@ import { WorkflowState, WorkflowStatus } from '../workflow-state'

if (this.workflowRegistry.length === 0) {
this.logger.info('No workflows registered, skipping this step.')
this.logger.info(
'No workflows registered, skipping workflow initialization.'
)
return

@@ -182,12 +184,13 @@ }

async (message, messageAttributes) => {
this.logger.debug('Starting new workflow instance', {
workflow: options.workflowCtor,
msg: message
})
const workflowState = this.createWorkflowState(
mapper.workflowStateCtor!
)
const immutableWorkflowState = Object.freeze({ ...workflowState })
this.startWorkflowHandlingContext(immutableWorkflowState)
try {
// Extend the current message handling context, and augment with workflow-specific context data
messageHandlingContext.run(async () => {
this.logger.debug('Starting new workflow instance', {
workflow: options.workflowCtor,
msg: message
})
const workflowState = this.createWorkflowState(
mapper.workflowStateCtor!
)
const immutableWorkflowState = Object.freeze({ ...workflowState })
this.startWorkflowHandlingContext(immutableWorkflowState)
let workflow: Workflow<WorkflowState>

@@ -227,5 +230,3 @@ if (container) {

}
} finally {
this.endWorkflowHandlingContext()
}
})
}

@@ -278,3 +279,4 @@ )

const workflowHandlers = workflowState.map(async state => {
try {
// Extend the current message handling context, and augment with workflow-specific context data
messageHandlingContext.run(async () => {
this.startWorkflowHandlingContext(state)

@@ -290,5 +292,3 @@ await this.dispatchMessageToWorkflow(

)
} finally {
this.endWorkflowHandlingContext()
}
})
})

@@ -334,3 +334,3 @@

})
const handlingContext = messageHandlingContext.get()!.message
const handlingContext = messageHandlingContext.get()!
const workflowHandlingContext = structuredClone(handlingContext)

@@ -342,7 +342,2 @@ workflowHandlingContext.attributes.stickyAttributes.workflowId =

private endWorkflowHandlingContext() {
this.logger.debug('Ending workflow handling context')
messageHandlingContext.destroy()
}
private async dispatchMessageToWorkflow(

@@ -378,2 +373,4 @@ message: Message,

const handler = workflow[workflowHandler] as Function
// Invoke the workflow handler
const workflowStateOutput = await handler.bind(workflow)(

@@ -380,0 +377,0 @@ message,

@@ -172,4 +172,4 @@ import * as uuid from 'uuid'

await bus.initialize()
await bus.start()
await bus.send(event)
await bus.start()
await sleep(CONSUME_TIMEOUT)

@@ -176,0 +176,0 @@ })

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc