Comparing version 1.0.0 to 2.0.0
@@ -11,5 +11,5 @@ #!/usr/bin/env node | ||
backendConfig: { | ||
pollingDelay: 5000 | ||
pollingDelayMs: 5000 | ||
}, | ||
heartbeatDelay: 1000, | ||
heartbeatDelayMs: 1000, | ||
maxProcessingMessages: 10 | ||
@@ -16,0 +16,0 @@ }; |
@@ -10,3 +10,3 @@ #!/usr/bin/env node | ||
backendConfig: { | ||
pollingRate: 1000 | ||
pollingDelayMs: 1000 | ||
} | ||
@@ -13,0 +13,0 @@ }; |
272
lib/ddq.js
"use strict"; | ||
/** | ||
@@ -8,4 +7,3 @@ * @typeDef {Object} Ddq~config | ||
* @property {Object} backendConfig | ||
* @property {string} heartbeatDelay | ||
* @property {string} hostname | ||
* @property {string} heartbeatDelayMs | ||
* @property {number} maxProcessingMessages | ||
@@ -19,6 +17,9 @@ */ | ||
* @property {Object} config | ||
* @property {interger} heartbeatDelay | ||
* @property {integer} heartbeatDelayMs | ||
* @property {boolean} isBusy | ||
* @property {boolean} isClosing | ||
* @property {boolean} isListening | ||
* @property {boolean} isOpen | ||
* @property {boolean} isPausedByLimits | ||
* @property {boolean} isPausedByUser | ||
* @proprety {number} messagesBeingProcessed | ||
* @property {integer} messagesInTransit | ||
*/ | ||
@@ -36,2 +37,3 @@ | ||
* @property {Function} requeue | ||
* @property {string} topic | ||
*/ | ||
@@ -42,23 +44,17 @@ | ||
module.exports = (EventEmitter, timers) => { | ||
module.exports = (configValidation, EventEmitter, timers) => { | ||
/** | ||
* Signals that a message is done being processed. | ||
* Ensures that messagesInTransit is properly decremented and calls the | ||
* function to check whether or not the connection should be closed. | ||
* | ||
* When we were at our limit of simultaneous messages being processed, | ||
* this will start the backend up and let it poll again for more messages. | ||
* | ||
* @param {Ddq~DdqInstance} ddqInstance | ||
* @param {DDQ~instance} ddqInstance | ||
* @param {Function} callback | ||
* @return {Function} callback | ||
*/ | ||
function messageCompleted(ddqInstance) { | ||
ddqInstance.messagesBeingProcessed -= 1; | ||
if (ddqInstance.isPausedByLimits) { | ||
if (ddqInstance.messagesBeingProcessed < ddqInstance.maxProcessingMessages) { | ||
ddqInstance.isPausedByLimits = false; | ||
if (!ddqInstance.isPausedByUser) { | ||
ddqInstance.resumePolling(); | ||
} | ||
} | ||
} | ||
function applyCloseConditions(ddqInstance, callback) { | ||
return (err) => { | ||
ddqInstance.messagesInTransit -= 1; | ||
callback(err); | ||
ddqInstance.handleClosing(); | ||
}; | ||
} | ||
@@ -89,7 +85,7 @@ | ||
timeoutHandle = timers.setTimeout(heartbeat, ddqInstance.heartbeatDelay); | ||
timeoutHandle = timers.setTimeout(heartbeat, ddqInstance.heartbeatDelayMs); | ||
}); | ||
} | ||
timeoutHandle = timers.setTimeout(heartbeat, ddqInstance.heartbeatDelay); | ||
timeoutHandle = timers.setTimeout(heartbeat, ddqInstance.heartbeatDelayMs); | ||
@@ -103,2 +99,26 @@ return () => { | ||
/** | ||
* Calls and resets the closeCallback if it already exists. | ||
* | ||
* @param {Ddq~DdqBackendPluginInstance} ddqBackendInstance | ||
*/ | ||
function finishClosing(ddqBackendInstance) { | ||
ddqBackendInstance.closeCallback(); | ||
ddqBackendInstance.closeCallback = null; | ||
} | ||
/** | ||
* Calls the backend to stop listening and unsets the isListening flag. | ||
* | ||
* @param {DDQ~Instance} ddqInstance | ||
*/ | ||
function listenStop(ddqInstance) { | ||
ddqInstance.backend.stopListening(() => { | ||
finishClosing(ddqInstance); | ||
}); | ||
ddqInstance.isListening = false; | ||
} | ||
/** | ||
* Updates counters to indicate another message is being processed. | ||
@@ -111,7 +131,7 @@ * If we are at our limit, this pauses the backend's polling for more | ||
function messageBeingProcessed(ddqInstance) { | ||
ddqInstance.messagesBeingProcessed += 1; | ||
ddqInstance.messagesInTransit += 1; | ||
if (ddqInstance.messagesBeingProcessed >= ddqInstance.maxProcessingMessages) { | ||
if (ddqInstance.messagesInTransit >= ddqInstance.maxProcessingMessages) { | ||
ddqInstance.isPausedByLimits = true; | ||
ddqInstance.backend.pausePolling(); | ||
ddqInstance.backend.stopListening(); | ||
} | ||
@@ -128,23 +148,43 @@ } | ||
/** | ||
* Class for DDQ. | ||
* Signals a message is done being processed. | ||
* | ||
* When we were at our limit of simultaneous messages being processed, | ||
* this will start the backend up and let it poll again for more messages. | ||
* | ||
* @param {Ddq~DdqInstance} ddqInstance | ||
* @param {Function} [callback] | ||
*/ | ||
function messageCompleted(ddqInstance, callback) { | ||
if (ddqInstance.isPausedByLimits) { | ||
if (ddqInstance.messagesInTransit < ddqInstance.maxProcessingMessages) { | ||
ddqInstance.isPausedByLimits = false; | ||
ddqInstance.listenStart(callback); | ||
} | ||
} | ||
} | ||
/** | ||
* DDQ class. | ||
*/ | ||
class Ddq extends EventEmitter { | ||
/** | ||
* @param {Ddq~config} config | ||
* @throws {Error} when a config is not found | ||
*/ | ||
constructor(config) { | ||
if (!config) { | ||
throw new Error("No config was passed."); | ||
} | ||
var Plugin; | ||
configValidation.validateConfig(config); | ||
Plugin = require(`ddq-backend-${config.backend}`); | ||
super(); | ||
this.backend = require(`ddq-backend-${config.backend}`)(config.backendConfig); | ||
this.heartbeatDelay = config.heartbeatDelay; | ||
this.backend = new Plugin(config.backendConfig); | ||
this.heartbeatDelayMs = config.heartbeatDelayMs; | ||
this.isBusy = false; | ||
this.isClosing = false; | ||
this.isOpen = false; | ||
this.isListening = false; | ||
this.isPausedByLimits = false; | ||
this.isPausedByUser = false; | ||
this.maxProcessingMessages = config.maxProcessingMessages; | ||
this.messagesBeingProcessed = 0; | ||
this.pollingPaused = false; | ||
this.messagesInTransit = 0; | ||
} | ||
@@ -154,3 +194,4 @@ | ||
/** | ||
* Kills the listeners and polling. | ||
* Sets up conditions for closing and calls the function to attempt to | ||
* close. | ||
* | ||
@@ -164,4 +205,16 @@ * @param {Function} [callback] | ||
this.isListening = false; | ||
this.backend.close(callback); | ||
if (!this.isBusy && this.isOpen) { | ||
this.isBusy = true; | ||
this.isClosing = true; | ||
this.closeCallback = () => { | ||
this.backend.disconnect((err) => { | ||
this.isBusy = false; | ||
this.isOpen = false; | ||
callback(err); | ||
}); | ||
}; | ||
this.handleClosing(); | ||
} else { | ||
callback(new Error("Could not close.")); | ||
} | ||
} | ||
@@ -171,24 +224,47 @@ | ||
/** | ||
* Starts the polling and emits when data changes or an error happens. | ||
* Checks if the conditions for closing are met and initiates closing if they | ||
* are. This will also cause listening to stop. | ||
*/ | ||
listen() { | ||
this.backend.listen(); | ||
this.isListening = true; | ||
handleClosing() { | ||
if (this.isClosing && !this.messagesInTransit) { | ||
if (this.isListening) { | ||
listenStop(this); | ||
} else { | ||
finishClosing(this); | ||
} | ||
this.isBusy = false; | ||
} | ||
} | ||
/* The backend will emit a data event with a wrapped message. | ||
* We unwrap the message, construct convenience functions and | ||
* send that to whatever is listening to these messages from us. | ||
*/ | ||
this.backend.on(EMIT_DATA, (wrappedMessage) => { | ||
var doneWasCalled, timeoutRemover; | ||
if (this.isPausedByUser) { | ||
wrappedMessage.requeue(); | ||
} else { | ||
/** | ||
* Calls the backend plugin to begin listening and sets up listeners for | ||
* both data and error events. | ||
* | ||
* @param {Function} [callback] | ||
*/ | ||
listenStart(callback) { | ||
if (!callback) { | ||
callback = noop; | ||
} | ||
if (!this.isBusy && this.isOpen && !this.isListening) { | ||
this.backend.startListening(() => { | ||
this.isListening = true; | ||
callback(); | ||
}); | ||
/* The backend will emit a data event with a wrapped message. We | ||
* unwrap the message, construct convenience functions and send | ||
* that to whatever is listening to these messages from us. | ||
*/ | ||
this.backend.on(EMIT_DATA, (wrappedMessage) => { | ||
var doneWasCalled, timeoutRemover; | ||
doneWasCalled = false; | ||
timeoutRemover = doTheHeartbeat(this, wrappedMessage, timers); | ||
timeoutRemover = doTheHeartbeat(this, wrappedMessage); | ||
messageBeingProcessed(this); | ||
this.emit(EMIT_DATA, wrappedMessage.message, (err) => { | ||
if (doneWasCalled) { | ||
this.emit(EMIT_ERROR, new Error("Message completion callback was called multiple times")); | ||
this.emit(EMIT_ERROR, new Error("Message completion callback was called multiple times.")); | ||
} | ||
@@ -198,21 +274,24 @@ | ||
timeoutRemover(); | ||
messageCompleted(this); | ||
if (err) { | ||
wrappedMessage.requeue(); | ||
wrappedMessage.requeue(applyCloseConditions(this, callback)); | ||
} else { | ||
wrappedMessage.remove(); | ||
wrappedMessage.remove(applyCloseConditions(this, callback)); | ||
} | ||
messageCompleted(this, callback); | ||
}); | ||
} | ||
}); | ||
}); | ||
/** | ||
* Just want to relay the error event to the another listener. | ||
* The listener should then decide what to do next. | ||
*/ | ||
this.backend.on(EMIT_ERROR, (err) => { | ||
this.emit(EMIT_ERROR, err); | ||
}); | ||
/** | ||
* Just want to relay the error event to the another listener. | ||
* The listener should then decide what to do next. | ||
*/ | ||
this.backend.on(EMIT_ERROR, (err) => { | ||
this.emit(EMIT_ERROR, err); | ||
}); | ||
} else { | ||
callback(new Error("Could not start listening.")); | ||
} | ||
} | ||
@@ -222,26 +301,21 @@ | ||
/** | ||
* Tells the backend to pause the polling. This can be used if | ||
* the consumer needs to take care of something and does not want | ||
* the polling to continue. | ||
* Opens a connection to the database. | ||
* | ||
* @param {Function} [callback] | ||
*/ | ||
pausePolling() { | ||
if (this.isListening) { | ||
this.isPausedByUser = true; | ||
this.backend.pausePolling(); | ||
open(callback) { | ||
if (!callback) { | ||
callback = noop; | ||
} | ||
} | ||
if (!this.isBusy && !this.isOpen) { | ||
this.isBusy = true; | ||
/** | ||
* Resumes polling if a user has not paused the polling, DDQ has | ||
* not reached its limit of simultaneous messages being processed and | ||
* DDQ is listening for events. | ||
*/ | ||
resumePolling() { | ||
if (this.isPausedByUser) { | ||
this.isPausedByUser = false; | ||
if (!this.isPausedByLimits && this.isListening) { | ||
this.backend.resumePolling(); | ||
} | ||
this.backend.connect((err) => { | ||
this.isBusy = false; | ||
this.isOpen = true; | ||
callback(err); | ||
}); | ||
} else { | ||
callback(new Error("Could not open.")); | ||
} | ||
@@ -255,10 +329,22 @@ } | ||
* @param {*} message | ||
* @param {string} [topic] | ||
* @param {Function} [callback] | ||
*/ | ||
sendMessage(message, callback) { | ||
if (!callback) { | ||
callback = noop; | ||
sendMessage(message, topic, callback) { | ||
callback = callback || noop; | ||
if (typeof topic === "function") { | ||
callback = topic; | ||
topic = null; | ||
} | ||
this.backend.sendMessage(message, callback); | ||
if (!this.isBusy && this.isOpen) { | ||
this.messagesInTransit += 1; | ||
// Ensure that messagesInTransit is decremented. | ||
callback = applyCloseConditions(this, callback); | ||
this.backend.sendMessage(message, topic, callback); | ||
} else { | ||
callback(new Error("Could not send message.")); | ||
} | ||
} | ||
@@ -265,0 +351,0 @@ } |
"use strict"; | ||
var ddq, EventEmitter, timers; | ||
var configValidation, ddq, EventEmitter, timers; | ||
configValidation = require("./config-validation")(); | ||
ddq = require("./ddq"); | ||
EventEmitter = require("events"); | ||
timers = require("timers"); | ||
module.exports = ddq(EventEmitter, timers); | ||
module.exports = ddq(configValidation, EventEmitter, timers); |
{ | ||
"author": "Christopher Tetreault", | ||
"contributors": [ | ||
{ | ||
"name": "Christopher Tetreault", | ||
"url": "https://github.com/AbsentSemicolon" | ||
}, | ||
{ | ||
"name": "Andy Gertjejansen", | ||
"email": "andygertjejansen@outlook.com", | ||
"url": "https://github.com/quantumew" | ||
}, | ||
{ | ||
"name": "Lucas Nagle", | ||
"email": "lnagle@gmail.com", | ||
"url": "https://github.com/lnagle93" | ||
} | ||
], | ||
"bugs": { | ||
"url": "https://github.com/tests-always-included/ddq/issues" | ||
"url": "https://github.com/tests-always-included/ddq/issues" | ||
}, | ||
"dependencies": {}, | ||
"description": "DeDuplicated Queue", | ||
"devDependencies": { | ||
"codecov": "^1.0.1", | ||
"ddq-backend-mock": "^1.0.0", | ||
"ddq-backend-mock": "^2.0.1", | ||
"eslint": "^3.5.0", | ||
@@ -17,3 +32,3 @@ "istanbul": "^0.4.5", | ||
"engines": { | ||
"node": ">=4" | ||
"node": ">=4" | ||
}, | ||
@@ -34,6 +49,6 @@ "homepage": "https://github.com/tests-always-included/ddq#readme", | ||
"scripts": { | ||
"test": "istanbul cover jasmine-node node_modules/jasmine-test-helpers/lib/ --captureExceptions spec/ && codecov && eslint .", | ||
"test": "istanbul cover jasmine-node node_modules/jasmine-test-helpers/lib/ --captureExceptions spec/lib && eslint .", | ||
"watch": "jasmine-node node_modules/jasmine-test-helpers/lib/ lib spec --autotest --watch" | ||
}, | ||
"version": "1.0.0" | ||
"version": "2.0.0" | ||
} |
@@ -25,3 +25,3 @@ DeDuplicated Queue (DDQ) | ||
Another config value is the `heartbeatDelay`. DDQ uses a method on the wrapped message, which we'll get to later. A heartbeat routine is called every so often to update the task/job in the queue currently being processed. The `heartbeatDelay` is configured in milliseconds, so a value of "1000" would have the heartbeat execute every second. | ||
Another config value is the `heartbeatDelayMs`. DDQ uses a method on the wrapped message, which we'll get to later. A heartbeat routine is called every so often to update the task/job in the queue currently being processed. The `heartbeatDelayMs` is configured in milliseconds, so a value of "1000" would have the heartbeat execute every second. | ||
@@ -44,3 +44,3 @@ Also, servers might not be able to handle a certain number of processes, or you might only want to handle a few at a time, so setting the `maxProcessingMessages` to a number will make it so only up to that number of processes are created. DDQ will automatically tell the backend to pause the polling when this limit is reached. It will resume polling once the number of processing messages is lower than the max. | ||
password: "someReallyNiceLongSecurePassword", | ||
pollingDelay: 5000, | ||
pollingDelayMs: 5000, | ||
port: 3306, | ||
@@ -50,3 +50,3 @@ table: "query", | ||
}, | ||
heartbeatDelay: 1000, | ||
heartbeatDelayMs: 1000, | ||
maxProcessingMessages: 10 | ||
@@ -73,3 +73,3 @@ }); | ||
When the `data` event is triggered from the backend DDQ will use methods on the data received from the `backend`. These methods include: `heartbeat`, `heartbeatKill`, `requeue`, and `remove`. The only piece of information the software running DDQ will receive is the message and the callback from DDQ in order to say whether the processing of the message was successful or not. When the message is being processed DDQ with call the `heartbeat` method on the `wrappedMessage` using `heartbeatDelay` from the config to tell the `backend` to update the heartbeat at that interval. | ||
When the `data` event is triggered from the backend, DDQ will use methods on the data received from the `backend`. These methods include: `heartbeat`, `requeue`, and `remove`. The only piece of information the software running DDQ will receive is the message and the callback from DDQ in order to say whether the processing of the message was successful or not. When the message is being processed, DDQ will call the `heartbeat` method on the `wrappedMessage` using `heartbeatDelayMs` from the config to tell the `backend` to update the heartbeat at that interval. | ||
@@ -76,0 +76,0 @@ // Starts listening for events. |
"use strict"; | ||
describe("tests", () => { | ||
var config, Ddq, events, timersMock; | ||
describe("DDQ", () => { | ||
var config, configValidatorMock, Ddq, events, timersMock; | ||
configValidatorMock = jasmine.createSpyObj("configValidatorMock", [ | ||
"validateConfig" | ||
]); | ||
events = require("events"); | ||
@@ -12,8 +15,8 @@ timersMock = require("../mock/timers-mock"); | ||
backendConfig: { | ||
pollingDelay: 5000 | ||
pollingDelayMs: 5000 | ||
}, | ||
heartbeatDelay: 1000, | ||
heartbeatDelayMs: 1000, | ||
maxProcessingMessages: 2 | ||
}; | ||
Ddq = require("../../lib/ddq")(events, timersMock); | ||
Ddq = require("../../lib/ddq")(configValidatorMock, events, timersMock); | ||
}); | ||
@@ -40,6 +43,5 @@ describe(".constructor()", () => { | ||
ddq = new Ddq(config); | ||
ddq.listen(); | ||
ddq.open(); | ||
ddq.close((err) => { | ||
expect(err).not.toBeDefined(); | ||
done(); | ||
done(err); | ||
}); | ||
@@ -52,3 +54,3 @@ }); | ||
ddq = new Ddq(config); | ||
ddq.listen(); | ||
ddq.open(); | ||
ddq.close((err) => { | ||
@@ -63,7 +65,26 @@ expect(err).toEqual(jasmine.any(Error)); | ||
ddq = new Ddq(config); | ||
ddq.listen(); | ||
ddq.open(); | ||
ddq.close(); | ||
}); | ||
it("attempts to close with no connection open", () => { | ||
var ddq; | ||
ddq = new Ddq(config); | ||
ddq.close(() => { | ||
expect(ddq.closeCallback).not.toEqual(jasmine.any(Function)); | ||
}); | ||
}); | ||
it("closes while listening", () => { | ||
var ddq; | ||
ddq = new Ddq(config); | ||
ddq.isListening = true; | ||
ddq.open(); | ||
spyOn(ddq.backend, "stopListening").andCallThrough(); | ||
ddq.close(() => { | ||
expect(ddq.backend.stopListening).toHaveBeenCalled(); | ||
}); | ||
}); | ||
}); | ||
describe(".listen()", () => { | ||
describe(".open()", () => { | ||
it("starts listening on the backend", () => { | ||
@@ -73,241 +94,164 @@ var ddq; | ||
ddq = new Ddq(config); | ||
ddq.backend.listen = jasmine.createSpy("ddq.backend.listen"); | ||
ddq.listen(); | ||
expect(ddq.backend.listen).toHaveBeenCalled(); | ||
ddq.backend.connect = jasmine.createSpy("ddq.backend.connect"); | ||
ddq.open(); | ||
expect(ddq.backend.connect).toHaveBeenCalled(); | ||
}); | ||
it("fails to open a connection.", () => { | ||
var ddq; | ||
ddq = new Ddq(config); | ||
ddq.isBusy = true; | ||
ddq.open((err) => { | ||
expect(err).toEqual(jasmine.any(Error)); | ||
}); | ||
}); | ||
}); | ||
describe("EventEmitter", () => { | ||
var ddq, wrappedMessage; | ||
describe(".listenStart()", () => { | ||
var ddq, wmMock; | ||
beforeEach(() => { | ||
wrappedMessage = require("../mock/wrapped-message-mock")(); | ||
config.backendConfig.noPolling = true; | ||
ddq = new Ddq(config); | ||
ddq.listen(); | ||
timersMock.setTimeout = jasmine.createSpy("timeout"); | ||
spyOn(ddq.backend, "startListening").andCallThrough(); | ||
wmMock = require("../mock/wrapped-message-mock")(); | ||
ddq.backend.storedData = [ | ||
{ | ||
id: 0, | ||
isProcessing: false, | ||
requeded: false, | ||
owner: null | ||
}, | ||
{ | ||
id: 1, | ||
isProcessing: false, | ||
requeded: false, | ||
owner: null | ||
}, | ||
{ | ||
id: 2, | ||
isProcessing: false, | ||
requeded: false, | ||
owner: null | ||
} | ||
]; | ||
}); | ||
describe("data event", () => { | ||
it("requeues when polling was stopped and does not trigger a 'data' event", (done) => { | ||
ddq.on("data", jasmine.fail); | ||
wrappedMessage.requeue.andCallFake(() => { | ||
done(); | ||
}); | ||
ddq.pausePolling(); | ||
ddq.backend.emit("data", wrappedMessage); | ||
it("ddq is busy and fails to start connection", () => { | ||
ddq.open(); | ||
ddq.isBusy = true; | ||
ddq.listenStart((err) => { | ||
expect(err).toEqual(jasmine.any(Error)); | ||
}); | ||
it("pauses when reaching its limit", (done) => { | ||
var emitted; | ||
}); | ||
it("starts connection", (done) => { | ||
var called; | ||
emitted = false; | ||
ddq.backend.pausePolling = jasmine.createSpy("ddq.backend.pausePolling"); | ||
ddq.on("data", (message, callback) => { | ||
emitted = true; | ||
expect(message).toBe("mock message"); | ||
expect(callback).toEqual(jasmine.any(Function)); | ||
expect(ddq.backend.pausePolling).toHaveBeenCalled(); | ||
expect(emitted).toBe(true); | ||
expect(wrappedMessage.requeue).not.toHaveBeenCalled(); | ||
expect(wrappedMessage.remove).not.toHaveBeenCalled(); | ||
done(); | ||
}); | ||
ddq.messagesBeingProcessed = 5; | ||
ddq.backend.emit("data", wrappedMessage); | ||
}); | ||
it("removes on success", (done) => { | ||
ddq.on("data", (message, callback) => { | ||
called = false; | ||
timersMock.setTimeout.andCallFake((callback) => { | ||
if (!called) { | ||
called = true; | ||
callback(); | ||
expect(wrappedMessage.remove).toHaveBeenCalled(); | ||
done(); | ||
}); | ||
ddq.backend.emit("data", wrappedMessage); | ||
} | ||
}); | ||
it("requeues on failure", (done) => { | ||
ddq.on("data", (message, callback) => { | ||
callback(true); | ||
expect(wrappedMessage.requeue).toHaveBeenCalled(); | ||
done(); | ||
}); | ||
ddq.backend.emit("data", wrappedMessage); | ||
ddq.open(); | ||
ddq.on("data", (msg, callback) => { | ||
callback(); | ||
expect(ddq.backend.startListening).toHaveBeenCalled(); | ||
done(); | ||
}); | ||
it("resumes polling after processes count is under the limit again", (done) => { | ||
ddq.messagesBeingProcessed = 1; | ||
ddq.isPausedByLimits = true; | ||
ddq.on("data", (message, callback) => { | ||
ddq.listenStart(); | ||
ddq.backend.checkAndEmitData(); | ||
}); | ||
it("healthcheck fails.", (done) => { | ||
var called; | ||
called = false; | ||
timersMock.setTimeout.andCallFake((callback) => { | ||
if (!called) { | ||
called = true; | ||
callback(); | ||
expect(ddq.messagesBeingProcessed).toBe(1); | ||
expect(wrappedMessage.remove).toHaveBeenCalled(); | ||
expect(ddq.isPausedByLimits).toBe(false); | ||
done(); | ||
}); | ||
ddq.listen(); | ||
ddq.backend.emit("data", wrappedMessage); | ||
} | ||
}); | ||
it("requeues when too many processes are going", (done) => { | ||
ddq.messagesBeingProcessed = 5; | ||
ddq.isPausedByLimits = false; | ||
ddq.on("data", (message, callback) => { | ||
callback(); | ||
expect(ddq.messagesBeingProcessed).toBe(5); | ||
expect(ddq.isPausedByLimits).toBe(true); | ||
done(); | ||
}); | ||
ddq.listen(); | ||
ddq.backend.emit("data", wrappedMessage); | ||
ddq.open(); | ||
ddq.on("error", () => { | ||
expect(ddq.backend.startListening).toHaveBeenCalled(); | ||
done(); | ||
}); | ||
it("sets isPausedByUser in the process of doing a heartbeat", (done) => { | ||
wrappedMessage.heartbeat = jasmine.createSpy("wrappedMessage.heartbeat") | ||
.andCallFake((hbCallback) => { | ||
ddq.isPausedByUser = true; | ||
if (wrappedMessage.heartbeat.callCount === 1) { | ||
hbCallback(); | ||
} | ||
}); | ||
ddq.messagesBeingProcessed = 1; | ||
ddq.isPausedByLimits = false; | ||
ddq.on("data", (message, callback) => { | ||
callback(); | ||
expect(ddq.messagesBeingProcessed).toBe(1); | ||
expect(wrappedMessage.remove).toHaveBeenCalled(); | ||
expect(ddq.isPausedByLimits).toBe(false); | ||
done(); | ||
}); | ||
ddq.listen(); | ||
ddq.backend.emit("data", wrappedMessage); | ||
ddq.listenStart(); | ||
wmMock.heartbeat.andCallFake((callback) => { | ||
callback(new Error("err")); | ||
}); | ||
it("removes the message but emits that the callback was done repeatedly", (done) => { | ||
ddq.on("error", (err) => { | ||
expect(err).toEqual(jasmine.any(Error)); | ||
done(); | ||
}); | ||
ddq.on("data", (message, callback) => { | ||
callback(); | ||
callback(); | ||
}); | ||
ddq.backend.emit("data", wrappedMessage); | ||
expect(wrappedMessage.remove).toHaveBeenCalled(); | ||
}); | ||
ddq.backend.emit("data", wmMock); | ||
}); | ||
describe("error event", () => { | ||
it("is forwarded", (done) => { | ||
ddq.on("error", (err) => { | ||
expect(err).toBe("some error"); | ||
done(); | ||
}); | ||
ddq.backend.emit("error", "some error"); | ||
}); | ||
}); | ||
}); | ||
describe("heartbeat", () => { | ||
var ddq, errorCalled, wrappedMessage; | ||
it("message is requeued due to error", (done) => { | ||
var err; | ||
beforeEach(() => { | ||
errorCalled = false; | ||
config.backendConfig.noPolling = true; | ||
wrappedMessage = require("../mock/wrapped-message-mock")(); | ||
ddq = new Ddq(config); | ||
ddq.listen(); | ||
}); | ||
it("gets a good heartbeat", (done) => { | ||
ddq.on("data", (message, callback) => { | ||
callback(); | ||
expect(wrappedMessage.heartbeat).toHaveBeenCalled(); | ||
expect(wrappedMessage.remove).toHaveBeenCalled(); | ||
ddq.open(); | ||
ddq.on("data", (msg, callback) => { | ||
err = new Error("Cray cray"); | ||
callback(err); | ||
done(); | ||
}); | ||
ddq.backend.emit("data", wrappedMessage); | ||
ddq.listenStart(); | ||
ddq.backend.checkAndEmitData(); | ||
}); | ||
it("gets a bad heartbeat", (done) => { | ||
ddq.on("error", () => { | ||
errorCalled = true; | ||
it("duplicate calls to done", (done) => { | ||
ddq.open(); | ||
ddq.on("data", (msg, callback) => { | ||
callback(); | ||
callback(); | ||
}); | ||
ddq.on("data", (message, callback) => { | ||
callback(); | ||
expect(wrappedMessage.heartbeat).toHaveBeenCalled(); | ||
expect(wrappedMessage.remove).toHaveBeenCalled(); | ||
expect(errorCalled).toBe(true); | ||
ddq.on("error", (err) => { | ||
expect(err).toEqual(jasmine.any(Error)); | ||
done(); | ||
}); | ||
wrappedMessage.heartbeatError = true; | ||
ddq.backend.emit("data", wrappedMessage); | ||
ddq.listenStart(); | ||
ddq.backend.checkAndEmitData(); | ||
}); | ||
// Primarily for coverage of branches. | ||
it("gets a heartbeat where false was passed to callback", (done) => { | ||
wrappedMessage.heartbeat = jasmine.createSpy("wrappedMessage.heartbeat") | ||
.andCallFake((hbCallback) => { | ||
if (!wrappedMessage.heartbeat.callCount) { | ||
hbCallback(false); | ||
} | ||
}); | ||
ddq.on("data", (message, callback) => { | ||
callback(true); | ||
expect(wrappedMessage.requeue).toHaveBeenCalled(); | ||
it("fails with error relayed from backend", (done) => { | ||
ddq.open(); | ||
ddq.on("error", (err) => { | ||
expect(err).toEqual(jasmine.any(Error)); | ||
done(); | ||
}); | ||
wrappedMessage.heartbeatError = true; | ||
ddq.backend.emit("data", wrappedMessage); | ||
ddq.listenStart(); | ||
ddq.backend.emit("error", new Error("something")); | ||
}); | ||
}); | ||
describe(".pausePolling()", () => { | ||
var ddq; | ||
describe(".sendMessage()", () => { | ||
it("sends successfully", (done) => { | ||
var ddq; | ||
beforeEach(() => { | ||
ddq = new Ddq(config); | ||
ddq.backend.listen = jasmine.createSpy("ddq.backend.listen"); | ||
ddq.backend.pausePolling = jasmine.createSpy("ddq.backend.pausePolling"); | ||
ddq.open(); | ||
ddq.sendMessage("message", "topic", (err) => { | ||
expect(err).not.toBeDefined(); | ||
done(); | ||
}); | ||
}); | ||
it("sets the flag indicating the user paused polling", () => { | ||
ddq.listen(); | ||
ddq.pausePolling(); | ||
expect(ddq.isPausedByUser).toBe(true); | ||
expect(ddq.backend.pausePolling).toHaveBeenCalled(); | ||
}); | ||
it("does not pause when not listening", () => { | ||
ddq.pausePolling(); | ||
expect(ddq.isPausedByUser).toBe(false); | ||
expect(ddq.backend.pausePolling).not.toHaveBeenCalled(); | ||
}); | ||
}); | ||
describe(".resumePolling()", () => { | ||
var ddq; | ||
it("sends successfully no callback or topic", (done) => { | ||
var ddq; | ||
beforeEach(() => { | ||
ddq = new Ddq(config); | ||
ddq.backend.listen = jasmine.createSpy("ddq.backend.listen"); | ||
ddq.backend.resumePolling = jasmine.createSpy("ddq.backend.resumePolling"); | ||
ddq.listen(); | ||
ddq.open(); | ||
ddq.sendMessage("message"); | ||
done(); | ||
}); | ||
it("has conditions to resume polling", () => { | ||
ddq.pausePolling(); | ||
expect(ddq.backend.resumePolling).not.toHaveBeenCalled(); | ||
ddq.resumePolling(); | ||
expect(ddq.backend.resumePolling).toHaveBeenCalled(); | ||
}); | ||
it("is paused by user and will not resume", () => { | ||
ddq.isPausedByUser = false; | ||
ddq.resumePolling(); | ||
expect(ddq.backend.resumePolling).not.toHaveBeenCalled(); | ||
}); | ||
it("is paused by user and limits and will not resume", () => { | ||
ddq.isPausedByUser = true; | ||
ddq.isPausedByLimits = true; | ||
ddq.resumePolling(); | ||
expect(ddq.backend.resumePolling).not.toHaveBeenCalled(); | ||
}); | ||
}); | ||
describe(".sendMessage()", () => { | ||
it("sends successfully", (done) => { | ||
it("sends successfully no topic", (done) => { | ||
var ddq; | ||
ddq = new Ddq(config); | ||
ddq.open(); | ||
ddq.sendMessage("message", (err) => { | ||
expect(err).not.toBeDefined(); | ||
done(); | ||
done(err); | ||
}); | ||
}); | ||
it("reports errors", (done) => { | ||
it("fails sending due to an error", (done) => { | ||
var ddq; | ||
config.backendConfig.sendFail = true; | ||
ddq = new Ddq(config); | ||
ddq.sendMessage("message", (err) => { | ||
ddq.open(); | ||
ddq.isBusy = true; | ||
ddq.sendMessage("message", "topic", (err) => { | ||
expect(err).toEqual(jasmine.any(Error)); | ||
@@ -317,10 +261,3 @@ done(); | ||
}); | ||
it("reports errors", () => { | ||
var ddq; | ||
config.backendConfig.sendFail = true; | ||
ddq = new Ddq(config); | ||
ddq.sendMessage("message"); | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1011
48592
22
2