@node-ts/bus-core
Advanced tools
Comparing version 1.1.9 to 1.1.10
import { TransportMessage } from '../transport'; | ||
import ALS from 'alscontext'; | ||
type Context = TransportMessage<unknown> & { | ||
isInHandlerContext?: boolean; | ||
}; | ||
/** | ||
* A context that stores the transport message when it is received from the bus. Any calls in deeper stacks can | ||
* access the context by calling `messageHandlingContext.get()`. | ||
*/ | ||
declare class MessageHandlingContext extends ALS { | ||
@@ -7,13 +14,17 @@ /** | ||
*/ | ||
get(): TransportMessage<unknown>; | ||
get(): Context; | ||
/** | ||
* Set the message context for the current async stack | ||
*/ | ||
set(message: TransportMessage<unknown>): any; | ||
set(message: Context): any; | ||
/** | ||
* Start and run a new async context | ||
*/ | ||
run<T>(context: TransportMessage<unknown>, fn: () => T | Promise<T>): any; | ||
run<T>(context: Context, fn: () => T | Promise<T>, isInHandlerContext?: boolean): T | Promise<T>; | ||
/** | ||
* Check if the call stack is within a handler or workflow handler context | ||
*/ | ||
get isInHandlerContext(): boolean; | ||
} | ||
export declare const messageHandlingContext: MessageHandlingContext; | ||
export {}; |
@@ -6,2 +6,6 @@ "use strict"; | ||
const alscontext_1 = tslib_1.__importDefault(require("alscontext")); | ||
/** | ||
* A context that stores the transport message when it is received from the bus. Any calls in deeper stacks can | ||
* access the context by calling `messageHandlingContext.get()`. | ||
*/ | ||
class MessageHandlingContext extends alscontext_1.default { | ||
@@ -23,6 +27,13 @@ /** | ||
*/ | ||
run(context, fn) { | ||
return super.run({ message: context }, fn); | ||
run(context, fn, isInHandlerContext = false) { | ||
return super.run({ message: context, isInHandlerContext }, fn); | ||
} | ||
/** | ||
* Check if the call stack is within a handler or workflow handler context | ||
*/ | ||
get isInHandlerContext() { | ||
const isInHandlerContext = super.get('isInHandlerContext'); | ||
return isInHandlerContext === true; | ||
} | ||
} | ||
exports.messageHandlingContext = new MessageHandlingContext(); |
@@ -91,4 +91,3 @@ "use strict"; | ||
this.beforePublish.emit({ event, attributes }); | ||
const isInMessageHandler = message_handling_context_1.messageHandlingContext.get(); | ||
if (isInMessageHandler) { | ||
if (message_handling_context_1.messageHandlingContext.isInHandlerContext) { | ||
// Add message to outbox and only send when the handler resolves | ||
@@ -114,4 +113,3 @@ const outbox = this.outbox.get('outbox'); | ||
this.beforeSend.emit({ command, attributes }); | ||
const isInMessageHandler = message_handling_context_1.messageHandlingContext.get(); | ||
if (isInMessageHandler) { | ||
if (message_handling_context_1.messageHandlingContext.isInHandlerContext) { | ||
// Add message to outbox and only send when the handler resolves | ||
@@ -272,3 +270,3 @@ const outbox = this.outbox.get('outbox'); | ||
} | ||
})); | ||
}), true); | ||
} | ||
@@ -275,0 +273,0 @@ } |
@@ -145,3 +145,3 @@ "use strict"; | ||
yield this.dispatchMessageToWorkflow(message, messageAttributes, options.workflowCtor, immutableWorkflowState, mapper.workflowStateCtor, options.workflowHandler, container); | ||
})); | ||
}), true); | ||
}))); | ||
@@ -173,3 +173,3 @@ } | ||
yield this.dispatchMessageToWorkflow(message, attributes, workflowCtor, immutableWorkflowState, mapper.workflowStateCtor, handler.workflowHandler, container); | ||
})); | ||
}), true); | ||
})); | ||
@@ -176,0 +176,0 @@ const handlerResults = yield Promise.allSettled(workflowHandlers); |
{ | ||
"name": "@node-ts/bus-core", | ||
"version": "1.1.9", | ||
"version": "1.1.10", | ||
"description": "A service bus for message-based, distributed node applications", | ||
@@ -5,0 +5,0 @@ "main": "./dist/index.js", |
@@ -13,2 +13,20 @@ import { TransportMessage } from '../transport' | ||
describe('when a message is added', () => { | ||
it('should default to not being in a handler context', () => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.run(message, () => { | ||
expect(messageHandlingContext.isInHandlerContext).toEqual(false) | ||
}) | ||
}) | ||
it('should override being in a handler context', () => { | ||
const message = buildTransportMessage() | ||
messageHandlingContext.run( | ||
message, | ||
() => { | ||
expect(messageHandlingContext.isInHandlerContext).toEqual(true) | ||
}, | ||
true | ||
) | ||
}) | ||
it('should retrieve the message from within the same context', () => { | ||
@@ -15,0 +33,0 @@ const message = buildTransportMessage() |
import { TransportMessage } from '../transport' | ||
import ALS from 'alscontext' | ||
type Context = TransportMessage<unknown> & { isInHandlerContext?: boolean } | ||
/** | ||
* A context that stores the transport message when it is received from the bus. Any calls in deeper stacks can | ||
* access the context by calling `messageHandlingContext.get()`. | ||
*/ | ||
class MessageHandlingContext extends ALS { | ||
@@ -8,3 +14,3 @@ /** | ||
*/ | ||
get(): TransportMessage<unknown> { | ||
get(): Context { | ||
return super.get('message') | ||
@@ -16,3 +22,3 @@ } | ||
*/ | ||
set(message: TransportMessage<unknown>) { | ||
set(message: Context) { | ||
return super.set('message', message) | ||
@@ -24,7 +30,19 @@ } | ||
*/ | ||
run<T>(context: TransportMessage<unknown>, fn: () => T | Promise<T>) { | ||
return super.run({ message: context }, fn) | ||
run<T>( | ||
context: Context, | ||
fn: () => T | Promise<T>, | ||
isInHandlerContext = false | ||
): T | Promise<T> { | ||
return super.run({ message: context, isInHandlerContext }, fn) | ||
} | ||
/** | ||
* Check if the call stack is within a handler or workflow handler context | ||
*/ | ||
get isInHandlerContext(): boolean { | ||
const isInHandlerContext = super.get('isInHandlerContext') | ||
return isInHandlerContext === true | ||
} | ||
} | ||
export const messageHandlingContext = new MessageHandlingContext() |
@@ -8,6 +8,32 @@ import { Mock, Times } from 'typemoq' | ||
import { sleep } from '../util' | ||
import { InMemoryQueue } from '../transport' | ||
import { InMemoryQueue, TransportMessage } from '../transport' | ||
import { Workflow, WorkflowMapper, WorkflowState } from '../workflow' | ||
import { messageHandlingContext } from '../message-handling-context' | ||
describe('BusInstance Outboxing', () => { | ||
describe('when a message is sent from outside of a handler', () => { | ||
let bus: BusInstance | ||
beforeAll(async () => { | ||
bus = Bus.configure().build() | ||
await bus.initialize() | ||
}) | ||
it('should not outbox the message', async () => { | ||
await messageHandlingContext.run( | ||
{ | ||
attributes: { | ||
stickyAttributes: { | ||
originatorIP: '127.0.0.1', | ||
originatorClientUserId: 2 | ||
} | ||
} as any | ||
} as TransportMessage<unknown>, | ||
async () => { | ||
await bus.send(new TestCommand()) | ||
} | ||
) | ||
}) | ||
}) | ||
describe('when a message is sent from two handlers, and one fails', () => { | ||
@@ -14,0 +40,0 @@ let bus: BusInstance |
@@ -156,4 +156,3 @@ import { Transport, TransportMessage } from '../transport' | ||
const isInMessageHandler = messageHandlingContext.get() | ||
if (isInMessageHandler) { | ||
if (messageHandlingContext.isInHandlerContext) { | ||
// Add message to outbox and only send when the handler resolves | ||
@@ -181,4 +180,3 @@ const outbox = this.outbox.get('outbox')! | ||
const isInMessageHandler = messageHandlingContext.get() | ||
if (isInMessageHandler) { | ||
if (messageHandlingContext.isInHandlerContext) { | ||
// Add message to outbox and only send when the handler resolves | ||
@@ -327,29 +325,33 @@ const outbox = this.outbox.get('outbox')! | ||
return await messageHandlingContext.run(message, async () => { | ||
try { | ||
await this.messageReadMiddleware.dispatch(message) | ||
return await messageHandlingContext.run( | ||
message, | ||
async () => { | ||
try { | ||
await this.messageReadMiddleware.dispatch(message) | ||
this.afterDispatch.emit({ | ||
message: message.domainMessage, | ||
attributes: message.attributes | ||
}) | ||
return true | ||
} catch (error) { | ||
this.logger.error( | ||
'Message was unsuccessfully handled. Returning to queue.', | ||
{ | ||
busMessage: message, | ||
error: serializeError(error) | ||
} | ||
) | ||
this.onError.emit({ | ||
message: message.domainMessage, | ||
error: error as Error, | ||
attributes: message.attributes, | ||
rawMessage: message | ||
}) | ||
await this.transport.returnMessage(message) | ||
return false | ||
} | ||
}) | ||
this.afterDispatch.emit({ | ||
message: message.domainMessage, | ||
attributes: message.attributes | ||
}) | ||
return true | ||
} catch (error) { | ||
this.logger.error( | ||
'Message was unsuccessfully handled. Returning to queue.', | ||
{ | ||
busMessage: message, | ||
error: serializeError(error) | ||
} | ||
) | ||
this.onError.emit({ | ||
message: message.domainMessage, | ||
error: error as Error, | ||
attributes: message.attributes, | ||
rawMessage: message | ||
}) | ||
await this.transport.returnMessage(message) | ||
return false | ||
} | ||
}, | ||
true | ||
) | ||
} | ||
@@ -356,0 +358,0 @@ } catch (error) { |
@@ -196,13 +196,17 @@ import { WorkflowState, WorkflowStatus } from '../workflow-state' | ||
// Extend the current message handling context, and augment with workflow-specific context data | ||
await messageHandlingContext.run(workflowContext, async () => { | ||
await this.dispatchMessageToWorkflow( | ||
message, | ||
messageAttributes, | ||
options.workflowCtor, | ||
immutableWorkflowState, | ||
mapper.workflowStateCtor!, | ||
options.workflowHandler as keyof Workflow<any>, | ||
container | ||
) | ||
}) | ||
await messageHandlingContext.run( | ||
workflowContext, | ||
async () => { | ||
await this.dispatchMessageToWorkflow( | ||
message, | ||
messageAttributes, | ||
options.workflowCtor, | ||
immutableWorkflowState, | ||
mapper.workflowStateCtor!, | ||
options.workflowHandler as keyof Workflow<any>, | ||
container | ||
) | ||
}, | ||
true | ||
) | ||
} | ||
@@ -260,13 +264,17 @@ ) | ||
) | ||
await messageHandlingContext.run(workflowContext, async () => { | ||
await this.dispatchMessageToWorkflow( | ||
message, | ||
attributes, | ||
workflowCtor, | ||
immutableWorkflowState, | ||
mapper.workflowStateCtor!, | ||
handler.workflowHandler, | ||
container | ||
) | ||
}) | ||
await messageHandlingContext.run( | ||
workflowContext, | ||
async () => { | ||
await this.dispatchMessageToWorkflow( | ||
message, | ||
attributes, | ||
workflowCtor, | ||
immutableWorkflowState, | ||
mapper.workflowStateCtor!, | ||
handler.workflowHandler, | ||
container | ||
) | ||
}, | ||
true | ||
) | ||
}) | ||
@@ -273,0 +281,0 @@ |
360345
8781