rethinkdb-job-queue
Advanced tools
Comparing version 0.2.1 to 0.3.0
# `rethinkdb-job-queue` Change log | ||
## v0.3.0 / 2016-09-28 | ||
* Added global queue pause. | ||
* Fixed tests for global queue pause. | ||
## v0.2.1 / 2016-09-27 | ||
@@ -4,0 +9,0 @@ |
@@ -13,2 +13,7 @@ 'use strict'; | ||
state: { | ||
docId: '86f6ff5b-0c4e-46ad-9a5f-e90eb19c9b00', | ||
global: 'global', | ||
local: 'local' | ||
}, | ||
priority: { | ||
@@ -96,4 +101,7 @@ lowest: 60, | ||
concurrencyInvalid: 'Invalid concurrency value', | ||
cancelCallbackInvalid: 'The onCancel callback is not a function' | ||
cancelCallbackInvalid: 'The onCancel callback is not a function', | ||
globalStateError: 'The global state document change feed is invalid', | ||
noErrorStack: 'The error has no stack detail', | ||
noErrorMessage: 'The error has no messag' | ||
} | ||
}; |
@@ -27,6 +27,7 @@ 'use strict'; | ||
var errMessage = err && err.message ? err.message : err; | ||
var log = job.createLog(errMessage, logType); | ||
var log = job.createLog(enums.message.failed, logType); | ||
log.duration = duration; | ||
log.retryCount = job.retryCount; | ||
log.errorMessage = err && err.message ? err.message : enums.message.noErrorMessage; | ||
log.errorStack = err && err.stack ? err.stack : enums.message.noErrorStack; | ||
@@ -33,0 +34,0 @@ return Promise.resolve().then(function () { |
@@ -10,3 +10,2 @@ 'use strict'; | ||
// skipStatusCheck is for ease of adding jobs during tests | ||
module.exports = function queueAddJob(q, job, skipStatusCheck) { | ||
@@ -13,0 +12,0 @@ logger('addJob', job); |
@@ -8,6 +8,8 @@ 'use strict'; | ||
var queueProcess = require('./queue-process'); | ||
var queueInterruption = require('./queue-interruption'); | ||
// Following is the list of supported change feed events; | ||
// paused - Global event | ||
// resumed - Global event | ||
// added | ||
// log | ||
// active | ||
@@ -20,2 +22,3 @@ // progress | ||
// removed | ||
// log | ||
@@ -25,11 +28,12 @@ module.exports = function queueChange(q, err) { | ||
logger('queueChange'); | ||
logger('queueChange', change); | ||
var newVal = change.new_val; | ||
var oldVal = change.old_val; | ||
var queueId = false; | ||
if (is.job(newVal)) { | ||
if (newVal && newVal.queueId) { | ||
queueId = newVal.queueId; | ||
} | ||
if (!is.job(newVal) && is.job(oldVal)) { | ||
if (!newVal && oldVal && oldVal.queueId) { | ||
queueId = oldVal.queueId; | ||
@@ -43,3 +47,3 @@ } | ||
// Prevent any change processing if change is caused by this queue | ||
if (queueId === q.id && !q.testing) { | ||
if (queueId === q.id) { | ||
logger('Change feed by self, skipping events'); | ||
@@ -53,7 +57,27 @@ return; | ||
if (q.testing) { | ||
logger('------------- QUEUE CHANGE -------------'); | ||
logger(util.inspect(change, { colors: true })); | ||
logger('----------------------------------------'); | ||
logger('------------- QUEUE CHANGE -------------'); | ||
logger(util.inspect(change, { colors: true })); | ||
logger(queueId); | ||
logger('----------------------------------------'); | ||
// Queue global state change | ||
if (!newVal && oldVal && oldVal.id && oldVal.id === enums.state.docId) { | ||
// Ignoring state document deletion. | ||
return enums.status.active; | ||
} | ||
if (newVal && newVal.id && newVal.id === enums.state.docId) { | ||
logger('State document changed'); | ||
if (newVal && newVal.state) { | ||
if (newVal.state === enums.status.paused) { | ||
logger('Global queue state paused'); | ||
return queueInterruption.pause(q, enums.state.global); | ||
} | ||
if (newVal.state === enums.status.active) { | ||
logger('Global queue state active'); | ||
return queueInterruption.resume(q, enums.state.global); | ||
} | ||
} | ||
q.emit(enums.status.error, new Error(enums.message.globalStateError)); | ||
return enums.status.error; | ||
} | ||
@@ -77,2 +101,9 @@ // Job added | ||
// Job progress | ||
if (is.job(newVal) && is.job(oldVal) && newVal.progress !== oldVal.progress) { | ||
logger('Event: progress [' + newVal.progress + ']'); | ||
q.emit(enums.status.progress, newVal.id, newVal.progress); | ||
return enums.status.progress; | ||
} | ||
// Job completed | ||
@@ -85,9 +116,2 @@ if (is.completed(newVal) && !is.completed(oldVal)) { | ||
// Job removed | ||
if (!is.job(newVal) && is.job(oldVal)) { | ||
logger('Event: removed [' + oldVal.id + ']'); | ||
q.emit(enums.status.removed, oldVal.id); | ||
return enums.status.removed; | ||
} | ||
// Job cancelled | ||
@@ -114,7 +138,7 @@ if (is.cancelled(newVal) && !is.cancelled(oldVal)) { | ||
// Job progress | ||
if (is.job(newVal) && is.job(oldVal) && newVal.progress !== oldVal.progress) { | ||
logger('Event: progress [' + newVal.progress + ']'); | ||
q.emit(enums.status.progress, newVal.id, newVal.progress); | ||
return enums.status.progress; | ||
// Job removed | ||
if (!is.job(newVal) && is.job(oldVal)) { | ||
logger('Event: removed [' + oldVal.id + ']'); | ||
q.emit(enums.status.removed, oldVal.id); | ||
return enums.status.removed; | ||
} | ||
@@ -121,0 +145,0 @@ |
@@ -6,23 +6,34 @@ 'use strict'; | ||
var enums = require('./enums'); | ||
var is = require('./is'); | ||
var queueProcess = require('./queue-process'); | ||
var queueState = require('./queue-state'); | ||
module.exports.pause = function interruptionPause(q) { | ||
logger('pause'); | ||
return new Promise(function (resolve, reject) { | ||
q._paused = true; | ||
logger('Event: pausing [' + q.id + ']'); | ||
q.emit(enums.status.pausing, q.id); | ||
if (q.running < 1) { | ||
return resolve(); | ||
module.exports.pause = function interruptionPause(q, source) { | ||
logger('pause', source); | ||
q._paused = true; | ||
var makeGlobal = is.true(source); | ||
var eventGlobal = makeGlobal || source === enums.state.global; | ||
return q.ready().then(function () { | ||
if (makeGlobal) { | ||
return queueState(q, enums.status.paused); | ||
} | ||
var intId = setInterval(function pausing() { | ||
logger('Pausing, waiting on running jobs: [' + q.running + ']'); | ||
return; | ||
}).then(function () { | ||
return new Promise(function (resolve, reject) { | ||
logger('Event: pausing [' + q.id + ']'); | ||
q.emit(enums.status.pausing, eventGlobal, q.id); | ||
if (q.running < 1) { | ||
clearInterval(intId); | ||
resolve(); | ||
return resolve(); | ||
} | ||
}, 400); | ||
var intId = setInterval(function pausing() { | ||
logger('Pausing, waiting on running jobs: [' + q.running + ']'); | ||
if (q.running < 1) { | ||
clearInterval(intId); | ||
resolve(); | ||
} | ||
}, 400); | ||
}); | ||
}).then(function () { | ||
logger('Event: paused [' + q.id + ']'); | ||
q.emit(enums.status.paused, q.id); | ||
logger('Event: paused [global:' + eventGlobal + '] [' + q.id + ']'); | ||
q.emit(enums.status.paused, eventGlobal, q.id); | ||
return true; | ||
@@ -32,11 +43,18 @@ }); | ||
module.exports.resume = function interruptionResume(q) { | ||
logger('resume'); | ||
return Promise.resolve().then(function () { | ||
q._paused = false; | ||
module.exports.resume = function interruptionResume(q, source) { | ||
logger('resume', source); | ||
q._paused = false; | ||
var makeGlobal = is.true(source); | ||
var eventGlobal = makeGlobal || source === enums.state.global; | ||
return q.ready().then(function () { | ||
if (makeGlobal) { | ||
return queueState(q, enums.status.active); | ||
} | ||
return; | ||
}).then(function () { | ||
queueProcess.restart(q); | ||
logger('Event: resumed [' + q.id + ']'); | ||
q.emit(enums.status.resumed, q.id); | ||
logger('Event: resumed [global:' + eventGlobal + '] [' + q.id + ']'); | ||
q.emit(enums.status.resumed, eventGlobal, q.id); | ||
return true; | ||
}); | ||
}; |
@@ -203,3 +203,3 @@ 'use strict'; | ||
key: 'pause', | ||
value: function pause() { | ||
value: function pause(global) { | ||
var _this10 = this; | ||
@@ -209,3 +209,3 @@ | ||
return this.ready().then(function () { | ||
return queueInterruption.pause(_this10); | ||
return queueInterruption.pause(_this10, global); | ||
}).catch(function (err) { | ||
@@ -219,3 +219,3 @@ logger('pause Error:', err); | ||
key: 'resume', | ||
value: function resume() { | ||
value: function resume(global) { | ||
var _this11 = this; | ||
@@ -225,3 +225,3 @@ | ||
return this.ready().then(function () { | ||
return queueInterruption.resume(_this11); | ||
return queueInterruption.resume(_this11, global); | ||
}).catch(function (err) { | ||
@@ -228,0 +228,0 @@ logger('resume Error:', err); |
{ | ||
"name": "rethinkdb-job-queue", | ||
"version": "0.2.1", | ||
"version": "0.3.0", | ||
"description": "A persistent job or task queue backed by RethinkDB.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -37,3 +37,3 @@ # Introduction | ||
* Rich job [history log][job-log-url] | ||
* Over 1200 [integration tests][testing-url] | ||
* Over 1300 [integration tests][testing-url] | ||
@@ -65,3 +65,3 @@ | ||
* This `rethinkdb-job-queue` module is fully functional. | ||
* There are over 1200 integration tests. | ||
* There are over 1300 integration tests. | ||
* This project is complete and needs to be taken out for a spin. | ||
@@ -68,0 +68,0 @@ * In a few months I will bump the version up to 1.0.0 to support [SemVer](http://semver.org/). |
@@ -7,3 +7,3 @@ const test = require('tape') | ||
test('enums', (t) => { | ||
t.plan(12) | ||
t.plan(13) | ||
@@ -17,8 +17,9 @@ try { | ||
t.equal(enums.priorityFromValue(10), 'highest', 'Priority from value 10 returns highest') | ||
t.equal(Object.keys(enums.priority).length, 6, 'Enums priority has has correct number of keys') | ||
t.equal(Object.keys(enums.state).length, 3, 'Enums state has the correct number of keys') | ||
t.equal(Object.keys(enums.priority).length, 6, 'Enums priority has correct number of keys') | ||
t.equal(Object.keys(enums.status).length, 25, 'Enums status has correct number of keys') | ||
t.equal(Object.keys(enums.options).length, 11, 'Enums options has correct number of keys') | ||
t.equal(Object.keys(enums.index).length, 3, 'Enums index has has correct number of keys') | ||
t.equal(Object.keys(enums.log).length, 3, 'Enums log has has correct number of keys') | ||
t.equal(Object.keys(enums.message).length, 13, 'Enums message has has correct number of keys') | ||
t.equal(Object.keys(enums.index).length, 3, 'Enums index has correct number of keys') | ||
t.equal(Object.keys(enums.log).length, 3, 'Enums log has correct number of keys') | ||
t.equal(Object.keys(enums.message).length, 16, 'Enums message has correct number of keys') | ||
} catch (err) { | ||
@@ -25,0 +26,0 @@ tError(err, module, t) |
@@ -14,3 +14,3 @@ const test = require('tape') | ||
test('job-failed', (t) => { | ||
t.plan(76) | ||
t.plan(84) | ||
@@ -83,3 +83,5 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
t.ok(retry1[0].log[1].message, 'Log message exists') | ||
t.equal(retry1[0].log[1].message, err.message, 'Log message is valid') | ||
t.equal(retry1[0].log[1].message, enums.message.failed, 'Log message is valid') | ||
t.equal(retry1[0].log[1].errorMessage, err.message, 'Log error message is valid') | ||
t.equal(retry1[0].log[1].errorStack, err.stack, 'Log stack is valid') | ||
t.ok(retry1[0].log[1].duration >= 0, 'Log duration is >= 0') | ||
@@ -106,3 +108,5 @@ | ||
t.ok(retry2[0].log[2].message, 'Log message exists') | ||
t.equal(retry2[0].log[2].message, err.message, 'Log message is valid') | ||
t.equal(retry2[0].log[2].message, enums.message.failed, 'Log message is valid') | ||
t.equal(retry2[0].log[2].errorMessage, err.message, 'Log error message is valid') | ||
t.equal(retry2[0].log[2].errorStack, err.stack, 'Log stack is valid') | ||
t.ok(retry2[0].log[2].duration >= 0, 'Log duration is >= 0') | ||
@@ -130,3 +134,5 @@ | ||
t.ok(retry3[0].log[3].message, 'Log message exists') | ||
t.equal(retry3[0].log[3].message, err.message, 'Log message is valid') | ||
t.equal(retry3[0].log[3].message, enums.message.failed, 'Log message is valid') | ||
t.equal(retry3[0].log[3].errorMessage, err.message, 'Log error message is valid') | ||
t.equal(retry3[0].log[3].errorStack, err.stack, 'Log stack is valid') | ||
t.ok(retry3[0].log[3].duration >= 0, 'Log duration is >= 0') | ||
@@ -154,3 +160,5 @@ | ||
t.ok(failed[0].log[4].message, 'Log message exists') | ||
t.equal(failed[0].log[4].message, err.message, 'Log message is valid') | ||
t.equal(failed[0].log[4].message, enums.message.failed, 'Log message is valid') | ||
t.equal(failed[0].log[4].errorMessage, err.message, 'Log error message is valid') | ||
t.equal(failed[0].log[4].errorStack, err.stack, 'Log stack is valid') | ||
t.ok(failed[0].log[4].duration >= 0, 'Log duration is >= 0') | ||
@@ -157,0 +165,0 @@ |
@@ -12,18 +12,46 @@ const test = require('tape') | ||
test('queue-change', (t) => { | ||
t.plan(48) | ||
t.plan(77) | ||
const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
let readyEventCount = 0 | ||
let readyEventTotal = 0 | ||
function readyEventHandler (queueId) { | ||
readyEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: ready [${readyEventCount} of ${readyEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let addedEventCount = 0 | ||
let addedEventTotal = 3 | ||
function addedEventHandler (jobId) { | ||
addedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: added [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: added [${addedEventCount} of ${addedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let activeEventCount = 0 | ||
let activeEventTotal = 3 | ||
function activeEventHandler (jobId) { | ||
activeEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: active [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: active [${activeEventCount} of ${activeEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let processingEventCount = 0 | ||
let processingEventTotal = 3 | ||
function processingEventHandler (jobId) { | ||
processingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: processing [${processingEventCount} of ${processingEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let progressEventCount = 0 | ||
let progressEventTotal = 3 | ||
function progressEventHandler (jobId, percent) { | ||
progressEventCount++ | ||
if (testEvents) { | ||
t.pass(`Event: progress [${progressEventCount} of ${progressEventTotal}]`) | ||
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`) | ||
@@ -33,39 +61,159 @@ t.ok(is.number(percent), `Event: progress [${percent}%]`) | ||
} | ||
let completedEventCount = 0 | ||
let completedEventTotal = 1 | ||
function completedEventHandler (jobId) { | ||
completedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: completed [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: completed [${completedEventCount} of ${completedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let cancelledEventCount = 0 | ||
let cancelledEventTotal = 1 | ||
function cancelledEventHandler (jobId) { | ||
cancelledEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: cancelled [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: cancelled [${cancelledEventCount} of ${cancelledEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let failedEventCount = 0 | ||
let failedEventTotal = 1 | ||
function failedEventHandler (jobId) { | ||
failedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: failed [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: failed [${failedEventCount} of ${failedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let terminatedEventCount = 0 | ||
let terminatedEventTotal = 1 | ||
function terminatedEventHandler (jobId) { | ||
terminatedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: terminated [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: terminated [${terminatedEventCount} of ${terminatedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let logEventCount = 0 | ||
let logEventTotal = 1 | ||
function logEventHandler (jobId) { | ||
logEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: log [${logEventCount} of ${logEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let updatedEventCount = 0 | ||
let updatedEventTotal = 0 | ||
function updatedEventHandler (jobId) { | ||
updatedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: updated [${updatedEventCount} of ${updatedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let pausingEventCount = 0 | ||
let pausingEventTotal = 2 | ||
function pausingEventHandler (global, queueId) { | ||
pausingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: pausing [${pausingEventCount} of ${pausingEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`) | ||
} | ||
} | ||
let pausedEventCount = 0 | ||
let pausedEventTotal = 2 | ||
function pausedEventHandler (global, queueId) { | ||
pausedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: paused [${pausedEventCount} of ${pausedEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: paused [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`) | ||
} | ||
} | ||
let resumedEventCount = 0 | ||
let resumedEventTotal = 2 | ||
function resumedEventHandler (global, queueId) { | ||
resumedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: resumed [${resumedEventCount} of ${resumedEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`) | ||
} | ||
} | ||
let removedEventCount = 0 | ||
let removedEventTotal = 1 | ||
function removedEventHandler (jobId) { | ||
removedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: removed [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: removed [${removedEventCount} of ${removedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
function logEventHandler (jobId) { | ||
let idleEventCount = 0 | ||
let idleEventTotal = 1 | ||
function idleEventHandler (queueId) { | ||
idleEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: log [${jobId}]`) | ||
t.ok(is.string(queueId), `Event: idle [${idleEventCount} of ${idleEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let testEvents = false | ||
let reviewedEventCount = 0 | ||
let reviewedEventTotal = 0 | ||
function reviewedEventHandler (queueId) { | ||
reviewedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: reviewed [${reviewedEventCount} of ${reviewedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let resetEventCount = 0 | ||
let resetEventTotal = 1 | ||
function resetEventHandler (total) { | ||
resetEventCount++ | ||
if (testEvents) { | ||
t.ok(is.integer(total), `Event: reset [${resetEventCount} of ${resetEventTotal}] [${total}]`) | ||
} | ||
} | ||
let errorEventCount = 0 | ||
let errorEventTotal = 0 | ||
function errorEventHandler (err) { | ||
errorEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(err.message), `Event: error [${errorEventCount} of ${errorEventTotal}] [${err.message}]`) | ||
} | ||
} | ||
let detachedEventCount = 0 | ||
let detachedEventTotal = 0 | ||
function detachedEventHandler (queueId) { | ||
detachedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: detached [${detachedEventCount} of ${detachedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let stoppingEventCount = 0 | ||
let stoppingEventTotal = 0 | ||
function stoppingEventHandler (queueId) { | ||
stoppingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: stopping [${stoppingEventCount} of ${stoppingEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let stoppedEventCount = 0 | ||
let stoppedEventTotal = 0 | ||
function stoppedEventHandler (queueId) { | ||
stoppedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: stopped [${stoppedEventCount} of ${stoppedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let droppedEventCount = 0 | ||
let droppedEventTotal = 0 | ||
function droppedEventHandler (queueId) { | ||
droppedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: dropped [${droppedEventCount} of ${droppedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.testing = true | ||
q.on(enums.status.ready, readyEventHandler) | ||
q.on(enums.status.added, addedEventHandler) | ||
q.on(enums.status.log, logEventHandler) | ||
q.on(enums.status.active, activeEventHandler) | ||
q.on(enums.status.processing, processingEventHandler) | ||
q.on(enums.status.progress, progressEventHandler) | ||
@@ -76,10 +224,23 @@ q.on(enums.status.completed, completedEventHandler) | ||
q.on(enums.status.terminated, terminatedEventHandler) | ||
q.on(enums.status.paused, pausedEventHandler) | ||
q.on(enums.status.pausing, pausingEventHandler) | ||
q.on(enums.status.resumed, resumedEventHandler) | ||
q.on(enums.status.removed, removedEventHandler) | ||
q.on(enums.status.log, logEventHandler) | ||
q.on(enums.status.updated, updatedEventHandler) | ||
q.on(enums.status.idle, idleEventHandler) | ||
q.on(enums.status.reviewed, reviewedEventHandler) | ||
q.on(enums.status.reset, resetEventHandler) | ||
q.on(enums.status.error, errorEventHandler) | ||
q.on(enums.status.detached, detachedEventHandler) | ||
q.on(enums.status.stopping, stoppingEventHandler) | ||
q.on(enums.status.stopped, stoppedEventHandler) | ||
q.on(enums.status.dropped, droppedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.testing = false | ||
q.removeListener(enums.status.ready, readyEventHandler) | ||
q.removeListener(enums.status.added, addedEventHandler) | ||
q.removeListener(enums.status.log, logEventHandler) | ||
q.removeListener(enums.status.active, activeEventHandler) | ||
q.removeListener(enums.status.processing, processingEventHandler) | ||
q.removeListener(enums.status.progress, progressEventHandler) | ||
@@ -90,3 +251,16 @@ q.removeListener(enums.status.completed, completedEventHandler) | ||
q.removeListener(enums.status.terminated, terminatedEventHandler) | ||
q.removeListener(enums.status.paused, pausedEventHandler) | ||
q.removeListener(enums.status.pausing, pausingEventHandler) | ||
q.removeListener(enums.status.resumed, resumedEventHandler) | ||
q.removeListener(enums.status.removed, removedEventHandler) | ||
q.removeListener(enums.status.log, logEventHandler) | ||
q.removeListener(enums.status.updated, updatedEventHandler) | ||
q.removeListener(enums.status.idle, idleEventHandler) | ||
q.removeListener(enums.status.reviewed, reviewedEventHandler) | ||
q.removeListener(enums.status.reset, resetEventHandler) | ||
q.removeListener(enums.status.error, errorEventHandler) | ||
q.removeListener(enums.status.detached, detachedEventHandler) | ||
q.removeListener(enums.status.stopping, stoppingEventHandler) | ||
q.removeListener(enums.status.stopped, stoppedEventHandler) | ||
q.removeListener(enums.status.dropped, droppedEventHandler) | ||
return q.resume() | ||
@@ -156,4 +330,30 @@ } | ||
t.ok(resetResult >= 0, 'Queue reset') | ||
return removeEventHandlers() | ||
}).then(() => { | ||
removeEventHandlers() | ||
// ---------- Event summary Test ---------- | ||
t.comment('queue: Event Summary') | ||
t.equal(readyEventCount, readyEventTotal, 'Ready event count valid') | ||
t.equal(addedEventCount, addedEventTotal, 'Added event count valid') | ||
t.equal(activeEventCount, activeEventTotal, 'Active event count valid') | ||
t.equal(processingEventCount, processingEventTotal, 'Processing event count valid') | ||
t.equal(progressEventCount, progressEventTotal, 'Progress event count valid') | ||
t.equal(completedEventCount, completedEventTotal, 'Completed event count valid') | ||
t.equal(cancelledEventCount, cancelledEventTotal, 'Cancelled event count valid') | ||
t.equal(failedEventCount, failedEventTotal, 'Failed event count valid') | ||
t.equal(terminatedEventCount, terminatedEventTotal, 'Terminated event count valid') | ||
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid') | ||
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid') | ||
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid') | ||
t.equal(removedEventCount, removedEventTotal, 'Removed event count valid') | ||
t.equal(logEventCount, logEventTotal, 'Log event count valid') | ||
t.equal(updatedEventCount, updatedEventTotal, 'Updated event count valid') | ||
t.equal(idleEventCount, idleEventTotal, 'Idle event count valid') | ||
t.equal(reviewedEventCount, reviewedEventTotal, 'Reviewed event count valid') | ||
t.equal(resetEventCount, resetEventTotal, 'Reset event count valid') | ||
t.equal(errorEventCount, errorEventTotal, 'Error event count valid') | ||
t.equal(detachedEventCount, detachedEventTotal, 'Detached event count valid') | ||
t.equal(stoppingEventCount, stoppingEventTotal, 'Stopping event count valid') | ||
t.equal(stoppedEventCount, stoppedEventTotal, 'Stopped event count valid') | ||
t.equal(droppedEventCount, droppedEventTotal, 'Dropped event count valid') | ||
q.stop() | ||
@@ -160,0 +360,0 @@ return resolve(t.end()) |
@@ -48,2 +48,3 @@ const test = require('tape') | ||
jobFailed.data = 'Failed' | ||
jobFailed.dateEnable = datetime.add.sec(new Date(), -100) | ||
jobFailed.dateCreated = datetime.add.sec(new Date(), -100) | ||
@@ -50,0 +51,0 @@ const jobActive = q.createJob({priority: 'normal'}) |
const test = require('tape') | ||
const Promise = require('bluebird') | ||
const enums = require('../src/enums') | ||
const is = require('../src/is') | ||
const tError = require('./test-error') | ||
@@ -15,3 +16,3 @@ const Queue = require('../src/queue') | ||
test('queue-interruption', (t) => { | ||
t.plan(10) | ||
t.plan(19) | ||
@@ -25,15 +26,30 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
let testEvents = false | ||
function pausingEventHandler (qId) { | ||
let pausingEventCount = 0 | ||
let pausingEventTotal = 1 | ||
function pausingEventHandler (global, queueId) { | ||
pausingEventCount++ | ||
if (testEvents) { | ||
t.equal(qId, q.id, `Event: pausing [${qId}]`) | ||
t.pass(`Event: pausing [${pausingEventCount} of ${pausingEventTotal}] `) | ||
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`) | ||
} | ||
} | ||
function pausedEventHandler (qId) { | ||
let pausedEventCount = 0 | ||
let pausedEventTotal = 1 | ||
function pausedEventHandler (global, queueId) { | ||
pausedEventCount++ | ||
if (testEvents) { | ||
t.equal(qId, q.id, `Event: paused [${qId}]`) | ||
t.pass(`Event: paused [${pausedEventCount} of ${pausedEventTotal}] `) | ||
t.ok(is.boolean(global), `Event: paused [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`) | ||
} | ||
} | ||
function resumedEventHandler (qId) { | ||
let resumedEventCount = 0 | ||
let resumedEventTotal = 1 | ||
function resumedEventHandler (global, queueId) { | ||
resumedEventCount++ | ||
if (testEvents) { | ||
t.equal(qId, q.id, `Event: resumed [${qId}]`) | ||
t.pass(`Event: resumed [${resumedEventCount} of ${resumedEventTotal}] `) | ||
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`) | ||
} | ||
@@ -78,2 +94,9 @@ } | ||
removeEventHandlers() | ||
// ---------- Event summary Test ---------- | ||
t.comment('queue-interruption: Event Summary') | ||
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid') | ||
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid') | ||
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid') | ||
q.stop() | ||
@@ -80,0 +103,0 @@ return resolve(t.end()) |
@@ -12,3 +12,3 @@ const test = require('tape') | ||
test('queue', (t) => { | ||
t.plan(110) | ||
t.plan(154) | ||
@@ -33,66 +33,192 @@ let q = new Queue(tOpts.cxn(), tOpts.queueNameOnly()) | ||
let testEvents = false | ||
function readyEventHandler (qId) { | ||
let readyEventCount = 0 | ||
let readyEventTotal = 0 | ||
function readyEventHandler (queueId) { | ||
readyEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(qId), `Event: ready [${qId}]`) | ||
t.ok(is.string(queueId), `Event: ready [${readyEventCount} of ${readyEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let addedEventCount = 0 | ||
let addedEventTotal = 4 | ||
function addedEventHandler (jobId) { | ||
addedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: added [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: added [${addedEventCount} of ${addedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let activeEventCount = 0 | ||
let activeEventTotal = 3 | ||
function activeEventHandler (jobId) { | ||
activeEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: active [${activeEventCount} of ${activeEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let processingEventCount = 0 | ||
let processingEventTotal = 3 | ||
function processingEventHandler (jobId) { | ||
processingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: processing [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: processing [${processingEventCount} of ${processingEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
function pausedEventHandler (qId) { | ||
let progressEventCount = 0 | ||
let progressEventTotal = 0 | ||
function progressEventHandler (jobId, percent) { | ||
progressEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(qId), `Event: paused [${qId}]`) | ||
t.pass(`Event: progress [${progressEventCount} of ${progressEventTotal}]`) | ||
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`) | ||
t.ok(is.number(percent), `Event: progress [${percent}%]`) | ||
} | ||
} | ||
function resumedEventHandler (qId) { | ||
let completedEventCount = 0 | ||
let completedEventTotal = 3 | ||
function completedEventHandler (jobId) { | ||
completedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(qId), `Event: resumed [${qId}]`) | ||
t.ok(is.uuid(jobId), `Event: completed [${completedEventCount} of ${completedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let cancelledEventCount = 0 | ||
let cancelledEventTotal = 1 | ||
function cancelledEventHandler (jobId) { | ||
cancelledEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: cancelled [${cancelledEventCount} of ${cancelledEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let failedEventCount = 0 | ||
let failedEventTotal = 0 | ||
function failedEventHandler (jobId) { | ||
failedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: failed [${failedEventCount} of ${failedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let terminatedEventCount = 0 | ||
let terminatedEventTotal = 0 | ||
function terminatedEventHandler (jobId) { | ||
terminatedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: terminated [${terminatedEventCount} of ${terminatedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let logEventCount = 0 | ||
let logEventTotal = 0 | ||
function logEventHandler (jobId) { | ||
logEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: log [${logEventCount} of ${logEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let updatedEventCount = 0 | ||
let updatedEventTotal = 0 | ||
function updatedEventHandler (jobId) { | ||
updatedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: updated [${updatedEventCount} of ${updatedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let pausingEventCount = 0 | ||
let pausingEventTotal = 2 | ||
function pausingEventHandler (global, queueId) { | ||
pausingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: pausing [${pausingEventCount} of ${pausingEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`) | ||
} | ||
} | ||
let pausedEventCount = 0 | ||
let pausedEventTotal = 2 | ||
function pausedEventHandler (global, queueId) { | ||
pausedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: paused [${pausedEventCount} of ${pausedEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: paused [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`) | ||
} | ||
} | ||
let resumedEventCount = 0 | ||
let resumedEventTotal = 1 | ||
function resumedEventHandler (global, queueId) { | ||
resumedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: resumed [${resumedEventCount} of ${resumedEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`) | ||
} | ||
} | ||
let removedEventCount = 0 | ||
let removedEventTotal = 1 | ||
function removedEventHandler (jobId) { | ||
removedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: removed [${jobId}]`) | ||
t.ok(is.uuid(jobId), `Event: removed [${removedEventCount} of ${removedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
function idleEventHandler (qId) { | ||
let idleEventCount = 0 | ||
let idleEventTotal = 4 | ||
function idleEventHandler (queueId) { | ||
idleEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(qId), `Event: idle [${qId}]`) | ||
q.removeListener(enums.status.idle, idleEventHandler) | ||
t.ok(is.string(queueId), `Event: idle [${idleEventCount} of ${idleEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let reviewedEventCount = 0 | ||
let reviewedEventTotal = 0 | ||
function reviewedEventHandler (queueId) { | ||
reviewedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: reviewed [${reviewedEventCount} of ${reviewedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let resetEventCount = 0 | ||
let resetEventTotal = 1 | ||
function resetEventHandler (total) { | ||
resetEventCount++ | ||
if (testEvents) { | ||
t.ok(is.integer(total), `Event: reset [${total}]`) | ||
t.ok(is.integer(total), `Event: reset [${resetEventCount} of ${resetEventTotal}] [${total}]`) | ||
} | ||
} | ||
let errorEventCount = 0 | ||
let errorEventTotal = 3 | ||
function errorEventHandler (err) { | ||
errorEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(err.message), `Event: error [${err.message}]`) | ||
t.ok(is.string(err.message), `Event: error [${errorEventCount} of ${errorEventTotal}] [${err.message}]`) | ||
} | ||
} | ||
function detachedEventHandler (qId) { | ||
let detachedEventCount = 0 | ||
let detachedEventTotal = 1 | ||
function detachedEventHandler (queueId) { | ||
detachedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(qId), `Event: detached [${qId}]`) | ||
t.ok(is.string(queueId), `Event: detached [${detachedEventCount} of ${detachedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
function stoppingEventHandler (qId) { | ||
let stoppingEventCount = 0 | ||
let stoppingEventTotal = 1 | ||
function stoppingEventHandler (queueId) { | ||
stoppingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(qId), `Event: stopping [${qId}]`) | ||
t.ok(is.string(queueId), `Event: stopping [${stoppingEventCount} of ${stoppingEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
function stoppedEventHandler (qId) { | ||
let stoppedEventCount = 0 | ||
let stoppedEventTotal = 1 | ||
function stoppedEventHandler (queueId) { | ||
stoppedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(qId), `Event: stopped [${qId}]`) | ||
t.ok(is.string(queueId), `Event: stopped [${stoppedEventCount} of ${stoppedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
function droppedEventHandler (qId) { | ||
let droppedEventCount = 0 | ||
let droppedEventTotal = 1 | ||
function droppedEventHandler (queueId) { | ||
droppedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(qId), `Event: dropped[${qId}]`) | ||
t.ok(is.string(queueId), `Event: dropped [${droppedEventCount} of ${droppedEventTotal}] [${queueId}]`) | ||
} | ||
@@ -105,7 +231,17 @@ } | ||
q.on(enums.status.added, addedEventHandler) | ||
q.on(enums.status.active, activeEventHandler) | ||
q.on(enums.status.processing, processingEventHandler) | ||
q.on(enums.status.progress, progressEventHandler) | ||
q.on(enums.status.completed, completedEventHandler) | ||
q.on(enums.status.cancelled, cancelledEventHandler) | ||
q.on(enums.status.failed, failedEventHandler) | ||
q.on(enums.status.terminated, terminatedEventHandler) | ||
q.on(enums.status.paused, pausedEventHandler) | ||
q.on(enums.status.pausing, pausingEventHandler) | ||
q.on(enums.status.resumed, resumedEventHandler) | ||
q.on(enums.status.removed, removedEventHandler) | ||
q.on(enums.status.log, logEventHandler) | ||
q.on(enums.status.updated, updatedEventHandler) | ||
q.on(enums.status.idle, idleEventHandler) | ||
q.on(enums.status.reviewed, reviewedEventHandler) | ||
q.on(enums.status.reset, resetEventHandler) | ||
@@ -122,7 +258,17 @@ q.on(enums.status.error, errorEventHandler) | ||
q.removeListener(enums.status.added, addedEventHandler) | ||
q.removeListener(enums.status.active, activeEventHandler) | ||
q.removeListener(enums.status.processing, processingEventHandler) | ||
q.removeListener(enums.status.progress, progressEventHandler) | ||
q.removeListener(enums.status.completed, completedEventHandler) | ||
q.removeListener(enums.status.cancelled, cancelledEventHandler) | ||
q.removeListener(enums.status.failed, failedEventHandler) | ||
q.removeListener(enums.status.terminated, terminatedEventHandler) | ||
q.removeListener(enums.status.paused, pausedEventHandler) | ||
q.removeListener(enums.status.pausing, pausingEventHandler) | ||
q.removeListener(enums.status.resumed, resumedEventHandler) | ||
q.removeListener(enums.status.removed, removedEventHandler) | ||
q.removeListener(enums.status.log, logEventHandler) | ||
q.removeListener(enums.status.updated, updatedEventHandler) | ||
q.removeListener(enums.status.idle, idleEventHandler) | ||
q.removeListener(enums.status.reviewed, reviewedEventHandler) | ||
q.removeListener(enums.status.reset, resetEventHandler) | ||
@@ -362,2 +508,29 @@ q.removeListener(enums.status.error, errorEventHandler) | ||
removeEventHandlers() | ||
// ---------- Event summary Test ---------- | ||
t.comment('queue: Event Summary') | ||
t.equal(readyEventCount, readyEventTotal, 'Ready event count valid') | ||
t.equal(addedEventCount, addedEventTotal, 'Added event count valid') | ||
t.equal(activeEventCount, activeEventTotal, 'Active event count valid') | ||
t.equal(processingEventCount, processingEventTotal, 'Processing event count valid') | ||
t.equal(progressEventCount, progressEventTotal, 'Progress event count valid') | ||
t.equal(completedEventCount, completedEventTotal, 'Completed event count valid') | ||
t.equal(cancelledEventCount, cancelledEventTotal, 'Cancelled event count valid') | ||
t.equal(failedEventCount, failedEventTotal, 'Failed event count valid') | ||
t.equal(terminatedEventCount, terminatedEventTotal, 'Terminated event count valid') | ||
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid') | ||
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid') | ||
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid') | ||
t.equal(removedEventCount, removedEventTotal, 'Removed event count valid') | ||
t.equal(logEventCount, logEventTotal, 'Log event count valid') | ||
t.equal(updatedEventCount, updatedEventTotal, 'Updated event count valid') | ||
t.equal(idleEventCount, idleEventTotal, 'Idle event count valid') | ||
t.equal(reviewedEventCount, reviewedEventTotal, 'Reviewed event count valid') | ||
t.equal(resetEventCount, resetEventTotal, 'Reset event count valid') | ||
t.equal(errorEventCount, errorEventTotal, 'Error event count valid') | ||
t.equal(detachedEventCount, detachedEventTotal, 'Detached event count valid') | ||
t.equal(stoppingEventCount, stoppingEventTotal, 'Stopping event count valid') | ||
t.equal(stoppedEventCount, stoppedEventTotal, 'Stopped event count valid') | ||
t.equal(droppedEventCount, droppedEventTotal, 'Dropped event count valid') | ||
q.stop() | ||
@@ -364,0 +537,0 @@ q2.stop() |
@@ -18,6 +18,7 @@ // const Promise = require('bluebird') | ||
// const jobUpdate = require('./job-update.spec') | ||
// const jobFailed = require('./job-failed.spec') | ||
const jobFailed = require('./job-failed.spec') | ||
// const jobAddLog = require('./job-add-log.spec') | ||
const queue = require('./queue.spec') | ||
// const queue = require('./queue.spec') | ||
// const queueDb = require('./queue-db.spec') | ||
// const queueState = require('./queue-state.spec') | ||
// const queueAddJob = require('./queue-add-job.spec') | ||
@@ -39,3 +40,3 @@ // const queueGetJob = require('./queue-get-job.spec') | ||
}).then(() => { | ||
return queue() | ||
return jobFailed() | ||
}) |
@@ -22,2 +22,3 @@ const Promise = require('bluebird') | ||
const queueDb = require('./queue-db.spec') | ||
const queueState = require('./queue-state.spec') | ||
const queueAddJob = require('./queue-add-job.spec') | ||
@@ -79,2 +80,4 @@ const queueGetJob = require('./queue-get-job.spec') | ||
}).then(() => { | ||
return queueState() | ||
}).then(() => { | ||
return queueProcess() | ||
@@ -81,0 +84,0 @@ }).then(() => { |
279081
83
6317