rabbit-queue
Advanced tools
Comparing version 4.3.0 to 4.4.0
@@ -13,2 +13,6 @@ import * as amqp from 'amqplib'; | ||
}; | ||
static STOP_STREAM: string; | ||
static STOP_STREAM_MESSAGE: { | ||
stopStream: boolean; | ||
}; | ||
defaultOptions: { | ||
@@ -15,0 +19,0 @@ durable: boolean; |
@@ -113,3 +113,8 @@ "use strict"; | ||
if (headers.backpressure) { | ||
yield Queue.getReply(chunk, properties, this.channel, replyTo, null, headers.timeout); | ||
let serviceResponse = yield Queue.getReply(chunk, properties, this.channel, replyTo, null, headers.timeout); | ||
if (serviceResponse && serviceResponse.stopStream === Queue.STOP_STREAM_MESSAGE.stopStream) { | ||
ack(); | ||
reply.destroy(); | ||
return; | ||
} | ||
} | ||
@@ -196,3 +201,5 @@ else { | ||
Queue.ERROR_DURING_REPLY = { error: true, error_code: 999 }; | ||
Queue.STOP_STREAM = 'stop_stream'; | ||
Queue.STOP_STREAM_MESSAGE = { stopStream: true }; | ||
exports.default = Queue; | ||
//# sourceMappingURL=queue.js.map |
@@ -20,2 +20,3 @@ "use strict"; | ||
let streamHandlers = {}; | ||
let stopped = {}; | ||
let options = { channel: null }; | ||
@@ -89,3 +90,3 @@ function createReplyQueue(channel) { | ||
delete replyHandlers[id]; | ||
streamHandler = streamHandlers[id] = new stream_1.Readable({ | ||
streamHandler = new stream_1.Readable({ | ||
objectMode: true, | ||
@@ -102,2 +103,6 @@ read() { | ||
}); | ||
streamHandler.on(queue_1.default.STOP_STREAM, () => { | ||
stopped[id] = true; | ||
}); | ||
streamHandlers[id] = streamHandler; | ||
replyHandler(null, streamHandler); | ||
@@ -117,2 +122,10 @@ } | ||
backpressure = !streamHandler.push(obj); | ||
if (stopped[id]) { | ||
options.channel.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(queue_1.default.STOP_STREAM_MESSAGE)), properties); | ||
streamHandler.destroy(); | ||
delete options[id]; | ||
delete stopped[id]; | ||
delete streamHandlers[id]; | ||
return; | ||
} | ||
if (backpressure) { | ||
@@ -119,0 +132,0 @@ options[id] = { replyTo: msg.properties.replyTo, properties }; |
{ | ||
"name": "rabbit-queue", | ||
"version": "4.3.0", | ||
"version": "4.4.0", | ||
"description": "AMQP/RabbitMQ queue management library.", | ||
@@ -5,0 +5,0 @@ "main": "js/index.js", |
@@ -237,2 +237,20 @@ # Rabbit Queue | ||
### v4.2.x to > v4.4.x | ||
Attention: The following functionality works with nodejs v10.14.2 and higher. In previous node versions there was a problem not resolving async iteration on destroyed streams | ||
RPC stream enhancement: When backpressure is enabled, the consumer can stop communication, when data received is sufficient | ||
eg: | ||
```js | ||
const reply = await rabbit.getReply('demoQueue', { test: 'data' }, { headers: { test: 1, backpressure: true }, correlationId: '1' }); | ||
for await (const chunk of reply) { | ||
console.log(`Received chunk: ${chunk.toString()}`); | ||
if ("sufficient_data_received") { | ||
reply.emit(Queue.STOP_STREAM); | ||
} | ||
} | ||
``` | ||
### v4.x.x to > v4.2.x | ||
@@ -239,0 +257,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
111813
1360
296