Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rabbitmq-queue-stream

Package Overview
Dependencies
Maintainers
3
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbitmq-queue-stream - npm Package Compare versions

Comparing version 0.1.5 to 0.1.6

12

index.js

@@ -34,7 +34,7 @@ var amqp = require("amqp");

exports.DeleteMessage = function(message) {
exports.RejectMessage = function(message) {
if(!message._meta) {
return console.error();
}
message._meta.delete = true;
message._meta.reject = true;
return message;

@@ -355,11 +355,15 @@ };

/* TODO: How do we handle errors from acking? */
var evt;
if(message._meta.requeue) {
me.__outstandingAcks[ackIndex].reject(true);
} else if(message._meta.delete) {
evt = "requeued";
} else if(message._meta.reject) {
me.__outstandingAcks[ackIndex].reject(false);
evt = "rejected";
} else {
me.__outstandingAcks[ackIndex].acknowledge(false);
evt = "acknowledged";
}
me.__outstandingAcks[ackIndex] = null;
this.emit("deleted", message);
this.emit(evt, message);
next();

@@ -366,0 +370,0 @@ };

{
"name": "rabbitmq-queue-stream",
"version": "0.1.5",
"version": "0.1.6",
"description": "Reliable streaming interface to rabbitmq queues",
"main": "index.js",
"scripts": {
"test": "make test-codecov.io"
"test": "make test"
},

@@ -9,0 +9,0 @@ "repository": {

@@ -68,4 +68,4 @@ # rabbitmq-queue-stream

*
* Delete:
* this.push(RabbitMQStream.DeleteMessage(data));
* Reject:
* this.push(RabbitMQStream.RejectMessage(data));
*/

@@ -107,3 +107,3 @@ next();

#### .source
* parseError - Emitted when a job cannot be json parsed. Passes in malform
* parseError - Emitted when a job cannot be json parsed.
```javascript

@@ -116,11 +116,13 @@ myQueueStream.source.on("parseError", function(err, message) {

#### .sink
* deleted - Emitted everytime a job is deleted from the queue
* acknowledged - Emitted everytime a message is acknowledged
* rejected - Emitted when a message is rejected
* requeued - Emitted when a message is requeued
```javascript
var totalDeleted = 0;
myQueueStream.source.on("deleted", function() {
console.log("Deleted", totalDeleted++);
var totalAcked = 0;
myQueueStream.source.on("acknowledged", function(message) {
console.log("Acknowledged:", message);
totalAcked++;
});
```
* formatError - Sink received a job that does not have the necessary information to be deleted from the queue.
Most likely emitted when objects not originating from .source are written to sink.
* formatError - Sink received a job that does not have the necessary information to be deleted from the queue. Most likely emitted when objects not originating from .source are written to sink.
```javascript

@@ -127,0 +129,0 @@ myQueueStream.sink.on("formatError", function(err, message) {

@@ -501,3 +501,3 @@ var EventEmitter = require("events").EventEmitter;

it("releases messages from queue when tagged with rabbitmq.ReleaseMessage", function(done) {
it("releases messages from queue when tagged with rabbitmq.RequeueMessage", function(done) {
instance.__outstandingAcks = [

@@ -511,3 +511,3 @@ undefined,

instance._streamifyQueue(cb);
instance.sink.on("deleted", function () {
instance.sink.on("requeued", function () {
expect(amqpResponseStub.reject.callCount).to.be(1);

@@ -521,3 +521,3 @@ expect(amqpResponseStub.reject.args[0][0]).to.be(true);

it("removes messages from queue when tagged with AMQPStream.RemoveMessage", function(done) {
it("removes messages from queue when tagged with AMQPStream.RejectMessage", function(done) {
instance.__outstandingAcks = [

@@ -528,6 +528,6 @@ undefined,

readable._read = function () {
this.push(rabbitmq.DeleteMessage(goodMessage));
this.push(rabbitmq.RejectMessage(goodMessage));
};
instance._streamifyQueue(cb);
instance.sink.on("deleted", function () {
instance.sink.on("rejected", function () {
expect(amqpResponseStub.reject.callCount).to.be(1);

@@ -551,3 +551,3 @@ expect(amqpResponseStub.reject.args[0][0]).to.be(false);

instance._streamifyQueue(cb);
instance.sink.on("deleted", function () {
instance.sink.on("acknowledged", function () {
expect(amqpResponseStub.acknowledge.callCount).to.be(1);

@@ -554,0 +554,0 @@ expect(amqpResponseStub.acknowledge.args[0][0]).to.be(false);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc