@node-ts/bus-core
Advanced tools
Comparing version 0.6.4 to 0.6.5
@@ -6,11 +6,15 @@ import { HookCallback, HookAction, ErrorHookCallback, StandardHookCallback } from './bus'; | ||
* of registered hooks | ||
* | ||
* @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks | ||
* @example BusHooks<InMemoryMessage> | ||
* @example BusHooks<SQS.Message> | ||
*/ | ||
export declare class BusHooks { | ||
export declare class BusHooks<TransportMessageType = unknown> { | ||
private messageHooks; | ||
on(action: Extract<HookAction, 'error'>, callback: ErrorHookCallback): void; | ||
on(action: Extract<HookAction, 'error'>, callback: ErrorHookCallback<TransportMessageType>): void; | ||
on(action: Exclude<HookAction, 'error'>, callback: StandardHookCallback): void; | ||
off(action: HookAction, callback: HookCallback): void; | ||
off(action: HookAction, callback: HookCallback<TransportMessageType>): void; | ||
readonly send: StandardHookCallback[]; | ||
readonly publish: StandardHookCallback[]; | ||
readonly error: ErrorHookCallback[]; | ||
readonly error: ErrorHookCallback<TransportMessageType>[]; | ||
} |
@@ -9,2 +9,6 @@ "use strict"; | ||
* of registered hooks | ||
* | ||
* @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks | ||
* @example BusHooks<InMemoryMessage> | ||
* @example BusHooks<SQS.Message> | ||
*/ | ||
@@ -16,2 +20,6 @@ let BusHooks = class BusHooks { | ||
* of registered hooks | ||
* | ||
* @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks | ||
* @example BusHooks<InMemoryMessage> | ||
* @example BusHooks<SQS.Message> | ||
*/ | ||
@@ -18,0 +26,0 @@ constructor() { |
import { Event, Command, MessageAttributes, Message } from '@node-ts/bus-messages'; | ||
import { TransportMessage } from '../transport'; | ||
export declare enum BusState { | ||
@@ -10,4 +11,4 @@ Stopped = "stopped", | ||
export declare type StandardHookCallback = (message: Message, messageAttributes?: MessageAttributes) => Promise<void> | void; | ||
export declare type ErrorHookCallback = (message: Message, error: Error, messageAttributes?: MessageAttributes) => Promise<void> | void; | ||
export declare type HookCallback = StandardHookCallback | ErrorHookCallback; | ||
export declare type ErrorHookCallback<TransportMessageType> = (message: Message, error: Error, messageAttributes?: MessageAttributes, rawMessage?: TransportMessage<TransportMessageType>) => Promise<void> | void; | ||
export declare type HookCallback<TransportMessageType> = StandardHookCallback | ErrorHookCallback<TransportMessageType>; | ||
export interface Bus { | ||
@@ -46,9 +47,11 @@ /** | ||
/** | ||
* Registers a @param callback function that is invoked for every instance of @param action occuring | ||
* Registers a @param callback function that is invoked for every instance of @param action occurring | ||
* @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks | ||
*/ | ||
on(action: HookAction, callback: HookCallback): void; | ||
on<TransportMessageType = unknown>(action: HookAction, callback: HookCallback<TransportMessageType>): void; | ||
/** | ||
* Deregisters a @param callback function from firing when an @param action occurs | ||
* @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks | ||
*/ | ||
off(action: HookAction, callback: HookCallback): void; | ||
off<TransportMessageType = unknown>(action: HookAction, callback: HookCallback<TransportMessageType>): void; | ||
} |
@@ -1,2 +0,2 @@ | ||
import { Bus, BusState, HookAction, HookCallback } from './bus'; | ||
import { Bus, BusState } from './bus'; | ||
import { Transport, TransportMessage } from '../transport'; | ||
@@ -27,3 +27,3 @@ import { Event, Command, MessageAttributes } from '@node-ts/bus-messages'; | ||
on: any; | ||
off(action: HookAction, callback: HookCallback): void; | ||
off: any; | ||
private applicationLoop; | ||
@@ -30,0 +30,0 @@ private handleNextMessage; |
@@ -28,2 +28,4 @@ "use strict"; | ||
this.on = this.busHooks.on.bind(this.busHooks); | ||
// tslint:disable-next-line:member-ordering | ||
this.off = this.busHooks.off.bind(this.busHooks); | ||
} | ||
@@ -75,5 +77,2 @@ async publish(event, messageOptions = new bus_messages_1.MessageAttributes()) { | ||
} | ||
off(action, callback) { | ||
this.busHooks.off(action, callback); | ||
} | ||
async applicationLoop() { | ||
@@ -100,3 +99,3 @@ this.runningWorkerCount++; | ||
this.logger.warn('Message was unsuccessfully handled. Returning to queue', { message, error: serializeError(error) }); | ||
await Promise.all(this.busHooks.error.map(callback => callback(message.domainMessage, error, message.attributes))); | ||
await Promise.all(this.busHooks.error.map(callback => callback(message.domainMessage, error, message.attributes, message))); | ||
await this.transport.returnMessage(message); | ||
@@ -103,0 +102,0 @@ return false; |
{ | ||
"name": "@node-ts/bus-core", | ||
"version": "0.6.4", | ||
"version": "0.6.5", | ||
"description": "A service bus for message-based, distributed node applications", | ||
@@ -51,3 +51,3 @@ "main": "./dist/index.js", | ||
], | ||
"gitHead": "2fad73410a7338dc0d89039085564e68ab68c58a" | ||
"gitHead": "73b0b9a89aa107f452ce31ffe25a84786aea77b9" | ||
} |
@@ -8,6 +8,10 @@ import { injectable } from 'inversify' | ||
* of registered hooks | ||
* | ||
* @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks | ||
* @example BusHooks<InMemoryMessage> | ||
* @example BusHooks<SQS.Message> | ||
*/ | ||
@injectable() | ||
export class BusHooks { | ||
private messageHooks: { [key: string]: HookCallback[] } = { | ||
export class BusHooks<TransportMessageType = unknown> { | ||
private messageHooks: { [key: string]: HookCallback<TransportMessageType>[] } = { | ||
send: [], | ||
@@ -18,9 +22,9 @@ publish: [], | ||
on (action: Extract<HookAction, 'error'>, callback: ErrorHookCallback): void | ||
on (action: Extract<HookAction, 'error'>, callback: ErrorHookCallback<TransportMessageType>): void | ||
on (action: Exclude<HookAction, 'error'>, callback: StandardHookCallback): void | ||
on (action: HookAction, callback: HookCallback): void { | ||
on (action: HookAction, callback: HookCallback<TransportMessageType>): void { | ||
this.messageHooks[action].push(callback) | ||
} | ||
off (action: HookAction, callback: HookCallback): void { | ||
off (action: HookAction, callback: HookCallback<TransportMessageType>): void { | ||
const index = this.messageHooks[action].indexOf(callback) | ||
@@ -40,5 +44,5 @@ if (index >= 0) { | ||
get error (): ErrorHookCallback[] { | ||
return this.messageHooks.error as ErrorHookCallback[] | ||
get error (): ErrorHookCallback<TransportMessageType>[] { | ||
return this.messageHooks.error as ErrorHookCallback<TransportMessageType>[] | ||
} | ||
} |
import { Event, Command, MessageAttributes, Message } from '@node-ts/bus-messages' | ||
import { TransportMessage } from '../transport' | ||
@@ -13,10 +14,14 @@ export enum BusState { | ||
export type StandardHookCallback = ( | ||
message: Message, messageAttributes?: MessageAttributes | ||
message: Message, | ||
messageAttributes?: MessageAttributes | ||
) => Promise<void> | void | ||
export type ErrorHookCallback = ( | ||
message: Message, error: Error, messageAttributes?: MessageAttributes | ||
export type ErrorHookCallback<TransportMessageType> = ( | ||
message: Message, | ||
error: Error, | ||
messageAttributes?: MessageAttributes, | ||
rawMessage?: TransportMessage<TransportMessageType> | ||
) => Promise<void> | void | ||
export type HookCallback = StandardHookCallback | ErrorHookCallback | ||
export type HookCallback<TransportMessageType> = StandardHookCallback | ErrorHookCallback<TransportMessageType> | ||
@@ -64,10 +69,12 @@ | ||
/** | ||
* Registers a @param callback function that is invoked for every instance of @param action occuring | ||
* Registers a @param callback function that is invoked for every instance of @param action occurring | ||
* @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks | ||
*/ | ||
on (action: HookAction, callback: HookCallback): void | ||
on<TransportMessageType = unknown> (action: HookAction, callback: HookCallback<TransportMessageType>): void | ||
/** | ||
* Deregisters a @param callback function from firing when an @param action occurs | ||
* @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks | ||
*/ | ||
off (action: HookAction, callback: HookCallback): void | ||
off<TransportMessageType = unknown> (action: HookAction, callback: HookCallback<TransportMessageType>): void | ||
} |
// tslint:disable:max-classes-per-file | ||
import { ServiceBus } from './service-bus' | ||
import { MemoryQueue } from '../transport' | ||
import { InMemoryMessage, MemoryQueue, TransportMessage } from '../transport' | ||
import { BusState } from './bus' | ||
@@ -180,2 +180,13 @@ import { TestEvent } from '../test/test-event' | ||
const expectedTransportMessage: TransportMessage<InMemoryMessage> = { | ||
id: undefined, | ||
attributes: new MessageAttributes(), | ||
domainMessage: event, | ||
raw: { | ||
inFlight: true, | ||
seenCount: 1, | ||
payload: event | ||
} | ||
} | ||
expect(errorCallback).toHaveBeenCalledTimes(1) | ||
@@ -193,3 +204,4 @@ expect(errorCallback).toHaveBeenCalledWith( | ||
stickyAttributes: expect.anything() | ||
}) | ||
}), | ||
expect.objectContaining(expectedTransportMessage) | ||
) | ||
@@ -196,0 +208,0 @@ sut.off('error', errorCallback) |
import { injectable, inject, optional } from 'inversify' | ||
import autobind from 'autobind-decorator' | ||
import { Bus, BusState, HookAction, HookCallback } from './bus' | ||
import { Bus, BusState } from './bus' | ||
import { BUS_SYMBOLS, BUS_INTERNAL_SYMBOLS } from '../bus-symbols' | ||
@@ -101,5 +101,4 @@ import { Transport, TransportMessage } from '../transport' | ||
off (action: HookAction, callback: HookCallback): void { | ||
this.busHooks.off(action, callback) | ||
} | ||
// tslint:disable-next-line:member-ordering | ||
off = this.busHooks.off.bind(this.busHooks) | ||
@@ -135,3 +134,4 @@ private async applicationLoop (): Promise<void> { | ||
(error as Error), | ||
message.attributes | ||
message.attributes, | ||
message | ||
))) | ||
@@ -138,0 +138,0 @@ await this.transport.returnMessage(message) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
184277
3474