rabbitmq-queue-stream
Advanced tools
Comparing version 0.1.8 to 0.1.9
@@ -341,2 +341,3 @@ var amqp = require("amqp"); | ||
} | ||
next(); | ||
}; | ||
@@ -361,3 +362,2 @@ this.source = queueStream.pipe(prepareMessage); | ||
} | ||
/* TODO: How do we handle errors from acking? */ | ||
var evt; | ||
@@ -364,0 +364,0 @@ if(message._meta.requeue) { |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.1.8", | ||
"version": "0.1.9", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
56
test.js
@@ -750,2 +750,56 @@ var EventEmitter = require("events").EventEmitter; | ||
}); | ||
}); | ||
describe("integration test", function() { | ||
/* Integration test by injecting fake amqp messages into _handleMessage. Cause | ||
The acks to queue the next message into the system. | ||
*/ | ||
var streamInstance; | ||
var message1 = {_id: "1"}; | ||
var message2 = {_id: "2"}; | ||
var message3 = {_id: "3"}; | ||
var payloads = [message1, message2, message3]; | ||
//ack stub defers control to something that injects a new message until | ||
var ackStub = function(instance) { | ||
return { | ||
acknowledge: function() { | ||
if(payloads.length) { | ||
setTimeout(function() { | ||
injectNewMessage(instance); | ||
}, 10); | ||
} | ||
} | ||
}; | ||
}; | ||
var injectNewMessage = function(instance) { | ||
instance._handleIncomingMessage(payloads.shift(), {}, {contentType: "application/json"}, ackStub(instance)); | ||
}; | ||
before(function(done) { | ||
var connection = new EventEmitter(); | ||
streamInstance = new rabbitmq.AMQPStream(connection); | ||
//setup .source and .sink properties | ||
streamInstance._streamifyQueue(done); | ||
}); | ||
it("pipes all outstanding messages received by rabbit downstream when properly acked", function(done) { | ||
var receivedMessages = []; | ||
var collector = new stream.Transform({objectMode: true}); | ||
collector._transform = function(obj, enc, next) { | ||
receivedMessages.push(obj); | ||
if(receivedMessages.length === 3) { | ||
done(); | ||
} | ||
this.push(obj); | ||
next(); | ||
}; | ||
streamInstance.source.pipe(collector).pipe(streamInstance.sink); | ||
injectNewMessage(streamInstance); | ||
}); | ||
}); | ||
}); | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
45134
1042