@node-ts/bus-core
Advanced tools
Comparing version 1.3.0 to 1.3.1
import ALS from 'alscontext'; | ||
type Context = { | ||
/** | ||
* Flags that the application has requested that the current message be | ||
* returned to the queue for retry. | ||
*/ | ||
messageReturnedToQueue: boolean; | ||
@@ -4,0 +8,0 @@ }; |
@@ -72,4 +72,7 @@ "use strict"; | ||
const { messageReturnedToQueue } = message_lifecycle_context_1.messageLifecycleContext.get(); | ||
if (!messageReturnedToQueue) { | ||
if (messageReturnedToQueue) { | ||
this.logger.debug('Message was returned to queue by a handler and will not be deleted', { message }); | ||
// Receivers assume that the the host is responsible for deleting successful messages | ||
} | ||
else if (!this.receiver) { | ||
yield this.transport.deleteMessage(message); | ||
@@ -356,2 +359,6 @@ } | ||
}); | ||
// Receivers expect the host to return the message to the queue for retry | ||
if (this.receiver) { | ||
throw error; | ||
} | ||
yield this.transport.returnMessage(message); | ||
@@ -364,2 +371,6 @@ return false; | ||
this.logger.error('Failed to handle and dispatch message from transport', { error: (0, serialize_error_1.serializeError)(error) }); | ||
// Receivers expect the host to return the message to the queue for retry | ||
if (this.receiver) { | ||
throw error; | ||
} | ||
} | ||
@@ -366,0 +377,0 @@ return false; |
{ | ||
"name": "@node-ts/bus-core", | ||
"version": "1.3.0", | ||
"version": "1.3.1", | ||
"description": "A service bus for message-based, distributed node applications", | ||
@@ -5,0 +5,0 @@ "main": "./dist/index.js", |
import ALS from 'alscontext' | ||
type Context = { | ||
/** | ||
* Flags that the application has requested that the current message be | ||
* returned to the queue for retry. | ||
*/ | ||
messageReturnedToQueue: boolean | ||
@@ -5,0 +9,0 @@ } |
import { Message, MessageAttributes } from '@node-ts/bus-messages' | ||
import { Receiver } from '../receiver' | ||
import { MessageSerializer } from '../serialization' | ||
import { TransportMessage } from '../transport' | ||
import { InMemoryQueue, TransportMessage } from '../transport' | ||
import { Bus } from './bus' | ||
@@ -9,3 +9,4 @@ import { BusInstance } from './bus-instance' | ||
import { handlerFor } from '../handler' | ||
import { TestCommand } from '../test' | ||
import { TestCommand, TestEvent } from '../test' | ||
import { It, Mock, Times } from 'typemoq' | ||
@@ -43,2 +44,6 @@ const emptyAttributes: MessageAttributes = { | ||
let testCommandHandler = handlerFor(TestCommand, commandHandler) | ||
const handlerThatThrows = handlerFor(TestEvent, () => { | ||
throw new Error() | ||
}) | ||
const queue = Mock.ofType<InMemoryQueue>() | ||
@@ -49,2 +54,4 @@ beforeAll(async () => { | ||
.withHandler(testCommandHandler) | ||
.withHandler(handlerThatThrows) | ||
.withTransport(queue.object) | ||
.build() | ||
@@ -74,4 +81,17 @@ await bus.initialize() | ||
}) | ||
it('should not call delete message, as the receiver implementation should handle it', () => { | ||
queue.verify(q => q.deleteMessage(It.isAny()), Times.never()) | ||
}) | ||
}) | ||
describe('when an error is thrown when receiving a message', () => { | ||
it('the error should be re-thrown so the receiver host can retry the message', async () => { | ||
const event = new TestEvent() | ||
await expect(bus.receive(event)).rejects.toThrow() | ||
// Receiver host should return the message, not the application | ||
queue.verify(q => q.returnMessage(It.isAny()), Times.never()) | ||
}) | ||
}) | ||
describe('when a batch of messages are passed through to bus.receive()', () => { | ||
@@ -78,0 +98,0 @@ const commands = Array(10) |
@@ -474,2 +474,7 @@ import { Transport, TransportMessage } from '../transport' | ||
}) | ||
// Receivers expect the host to return the message to the queue for retry | ||
if (this.receiver) { | ||
throw error | ||
} | ||
await this.transport.returnMessage(message) | ||
@@ -486,2 +491,6 @@ return false | ||
) | ||
// Receivers expect the host to return the message to the queue for retry | ||
if (this.receiver) { | ||
throw error | ||
} | ||
} | ||
@@ -642,3 +651,3 @@ return false | ||
const { messageReturnedToQueue } = messageLifecycleContext.get() | ||
if (!messageReturnedToQueue) { | ||
if (messageReturnedToQueue) { | ||
this.logger.debug( | ||
@@ -648,2 +657,4 @@ 'Message was returned to queue by a handler and will not be deleted', | ||
) | ||
// Receivers assume that the the host is responsible for deleting successful messages | ||
} else if (!this.receiver) { | ||
await this.transport.deleteMessage(message) | ||
@@ -650,0 +661,0 @@ } |
389061
9521