Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rabbitmq-queue-stream

Package Overview
Dependencies
Maintainers
3
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbitmq-queue-stream - npm Package Compare versions

Comparing version 0.1.9 to 0.1.10

8

index.js

@@ -287,3 +287,3 @@ var amqp = require("amqp");

var serializableMessage = {
data: isJSON ? message : message.data,
payload: isJSON ? message : message.data,
headers: headers,

@@ -337,7 +337,3 @@ deliveryInfo: deliveryInfo,

systemDebug("_transform prepareMessage");
if(_.isPlainObject(message.data)) {
this.push(_.merge(message.data, {_meta: message._meta}));
} else {
this.push(_.pick(message, "data", "_meta"));
}
this.push(_.pick(message, "payload", "_meta"));
next();

@@ -344,0 +340,0 @@ };

{
"name": "rabbitmq-queue-stream",
"version": "0.1.9",
"version": "0.1.10",
"description": "Reliable streaming interface to rabbitmq queues",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -58,6 +58,15 @@ # rabbitmq-queue-stream

var myProcessingStream = new stream.Transform({objectMode: true});
myProcessingStream._transform(function(data, enc, next) {
console.log("Doing something with", data);
this.push(data);
myProcessingStream._transform(function(obj, enc, next) {
/*
* Messages received from the source will have their data namespaced
* in the `obj.payload` property. `payload` will contain a parsed
* JSON object if clients specified contentType: application/json
* when enqueuing the message. Messages enqueued with contentType:
* application/json but are malformed will be automatically rejected.
* Add a listener to event `parseError`, which will be emitted by
* channel.source, to handle errors yourself.
*/
this.push(obj);
/*
* Messages are successfully acked and removed from the queue by default.

@@ -67,6 +76,6 @@ * RabbitMQStream provides methods to requeue and delete messages too.

* Requeue:
* this.push(RabbitMQStream.RequeueMessage(data));
* this.push(RabbitMQStream.RequeueMessage(obj));
*
* Reject:
* this.push(RabbitMQStream.RejectMessage(data));
* this.push(RabbitMQStream.RejectMessage(obj));
*/

@@ -108,3 +117,3 @@ next();

#### .source
* parseError - Emitted when a job cannot be json parsed.
* parseError - Emitted when a message specifies contentType: application/json but is malformed JSON.
```javascript

@@ -111,0 +120,0 @@ myQueueStream.source.on("parseError", function(err, message) {

@@ -486,6 +486,6 @@ var EventEmitter = require("events").EventEmitter;

instance._waitForMessage = sinon.stub();
instance._waitForMessage.onCall(0).yields({data: {"something": "somethingElse"}, _meta: { ackIndex: 10 }});
instance._waitForMessage.onCall(0).yields({payload: {"something": "somethingElse"}, _meta: { ackIndex: 10 }});
writable._write = function (message) {
expect(message).to.eql({something: "somethingElse", _meta: {ackIndex: 10}});
expect(message).to.eql({payload: {something: "somethingElse"}, _meta: {ackIndex: 10}});
done();

@@ -763,3 +763,2 @@ };

//ack stub defers control to something that injects a new message until
var ackStub = function(instance) {

@@ -794,2 +793,7 @@ return {

if(receivedMessages.length === 3) {
expect(receivedMessages).to.eql([message1, message2, message3].map(
function(m) {
return {payload: m, _meta: {ackIndex: 0}};
}
));
done();

@@ -796,0 +800,0 @@ }

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc