New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

rabbit-queue

Package Overview
Dependencies
Maintainers
1
Versions
49
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbit-queue - npm Package Compare versions

Comparing version 4.3.0 to 4.4.0

4

js/queue.d.ts

@@ -13,2 +13,6 @@ import * as amqp from 'amqplib';

};
static STOP_STREAM: string;
static STOP_STREAM_MESSAGE: {
stopStream: boolean;
};
defaultOptions: {

@@ -15,0 +19,0 @@ durable: boolean;

@@ -113,3 +113,8 @@ "use strict";

if (headers.backpressure) {
yield Queue.getReply(chunk, properties, this.channel, replyTo, null, headers.timeout);
let serviceResponse = yield Queue.getReply(chunk, properties, this.channel, replyTo, null, headers.timeout);
if (serviceResponse && serviceResponse.stopStream === Queue.STOP_STREAM_MESSAGE.stopStream) {
ack();
reply.destroy();
return;
}
}

@@ -196,3 +201,5 @@ else {

Queue.ERROR_DURING_REPLY = { error: true, error_code: 999 };
Queue.STOP_STREAM = 'stop_stream';
Queue.STOP_STREAM_MESSAGE = { stopStream: true };
exports.default = Queue;
//# sourceMappingURL=queue.js.map

@@ -20,2 +20,3 @@ "use strict";

let streamHandlers = {};
let stopped = {};
let options = { channel: null };

@@ -89,3 +90,3 @@ function createReplyQueue(channel) {

delete replyHandlers[id];
streamHandler = streamHandlers[id] = new stream_1.Readable({
streamHandler = new stream_1.Readable({
objectMode: true,

@@ -102,2 +103,6 @@ read() {

});
streamHandler.on(queue_1.default.STOP_STREAM, () => {
stopped[id] = true;
});
streamHandlers[id] = streamHandler;
replyHandler(null, streamHandler);

@@ -117,2 +122,10 @@ }

backpressure = !streamHandler.push(obj);
if (stopped[id]) {
options.channel.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(queue_1.default.STOP_STREAM_MESSAGE)), properties);
streamHandler.destroy();
delete options[id];
delete stopped[id];
delete streamHandlers[id];
return;
}
if (backpressure) {

@@ -119,0 +132,0 @@ options[id] = { replyTo: msg.properties.replyTo, properties };

2

package.json
{
"name": "rabbit-queue",
"version": "4.3.0",
"version": "4.4.0",
"description": "AMQP/RabbitMQ queue management library.",

@@ -5,0 +5,0 @@ "main": "js/index.js",

@@ -237,2 +237,20 @@ # Rabbit Queue

### v4.2.x to > v4.4.x
Attention: The following functionality works with nodejs v10.14.2 and higher. In previous node versions there was a problem not resolving async iteration on destroyed streams
RPC stream enhancement: When backpressure is enabled, the consumer can stop communication, when data received is sufficient
eg:
```js
const reply = await rabbit.getReply('demoQueue', { test: 'data' }, { headers: { test: 1, backpressure: true }, correlationId: '1' });
for await (const chunk of reply) {
console.log(`Received chunk: ${chunk.toString()}`);
if ("sufficient_data_received") {
reply.emit(Queue.STOP_STREAM);
}
}
```
### v4.x.x to > v4.2.x

@@ -239,0 +257,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc