rethinkdb-job-queue
Advanced tools
Comparing version 1.1.2 to 2.0.0
@@ -5,30 +5,42 @@ 'use strict'; | ||
// Logging has been removed due to circular dependencies. | ||
var enums = require('./enums'); | ||
function isDate(value) { | ||
// logger(`isDate`, value) | ||
return value instanceof Date || Object.prototype.toString.call(value) === '[object Date]'; | ||
} | ||
function isInteger(value) { | ||
return Object.prototype.toString.call(value) === '[object Number]' && !Number.isNaN(value) && value % 1 === 0; | ||
} | ||
function addMs(dateObject, value) { | ||
var multiplier = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : 0; | ||
if (isInteger(dateObject)) { | ||
value = dateObject; | ||
dateObject = new Date(); | ||
} | ||
if (isDate(dateObject) && isInteger(value)) { | ||
return new Date(dateObject.getTime() + value * multiplier); | ||
} | ||
throw new Error(enums.message.datetimeInvalid); | ||
} | ||
function addMilliseconds(dateObject, ms) { | ||
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject); | ||
return new Date(dateObject.getTime() + ms); | ||
return addMs(dateObject, ms, 1); | ||
} | ||
function addSeconds(dateObject, sec) { | ||
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject); | ||
return addMilliseconds(dateObject, sec * 1000); | ||
return addMs(dateObject, sec, 1000); | ||
} | ||
function addMinutes(dateObject, min) { | ||
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject); | ||
return addMilliseconds(dateObject, min * 60000); | ||
return addMs(dateObject, min, 60000); | ||
} | ||
function addHours(dateObject, hours) { | ||
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject); | ||
return addMilliseconds(dateObject, hours * 3600000); | ||
return addMs(dateObject, hours, 3600000); | ||
} | ||
function addDays(dateObject, days) { | ||
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject); | ||
return addMilliseconds(dateObject, days * 86400000); | ||
return addMs(dateObject, days, 86400000); | ||
} | ||
@@ -35,0 +47,0 @@ |
@@ -52,5 +52,18 @@ 'use strict'; | ||
function createIndexStatus(q) { | ||
logger('createIndexStatus'); | ||
var indexName = enums.index.indexStatus; | ||
return Promise.resolve().then(function () { | ||
return q.r.db(q.db).table(q.name).indexList().contains(indexName).run(); | ||
}).then(function (exists) { | ||
if (exists) { | ||
return exists; | ||
} | ||
return q.r.db(q.db).table(q.name).indexCreate(indexName).run(); | ||
}); | ||
} | ||
module.exports = function assertIndex(q) { | ||
logger('assertIndex'); | ||
return Promise.all([createIndexActiveDateEnable(q), createIndexInactivePriorityDateCreated(q), createIndexFinishedDateFinished(q)]).then(function (indexCreateResult) { | ||
return Promise.all([createIndexActiveDateEnable(q), createIndexInactivePriorityDateCreated(q), createIndexFinishedDateFinished(q), createIndexStatus(q)]).then(function (indexCreateResult) { | ||
logger('Waiting for index...'); | ||
@@ -57,0 +70,0 @@ return q.r.db(q.db).table(q.name).indexWait().run(); |
'use strict'; | ||
var logger = require('./logger')(module); | ||
var Promise = require('bluebird'); | ||
var dbAssertDatabase = require('./db-assert-database'); | ||
@@ -10,3 +11,15 @@ var dbAssertTable = require('./db-assert-table'); | ||
logger('dbAssert'); | ||
return dbAssertDatabase(q).then(function () { | ||
// The delay algorithm below is to prevent multiple Queue objects | ||
// attempting to create the database/table/indexes at the same time. | ||
// Before the delay was introduced it was possible to end up with two | ||
// databases in RethinkDB with the same name. | ||
var randomDelay = Math.floor(Math.random() * 1000); | ||
if (!q.master) { | ||
randomDelay += q._databaseInitDelay; | ||
} | ||
return Promise.delay(randomDelay).then(function () { | ||
return dbAssertDatabase(q); | ||
}).then(function () { | ||
return dbAssertTable(q); | ||
@@ -13,0 +26,0 @@ }).then(function () { |
@@ -27,2 +27,3 @@ 'use strict'; | ||
retryCount: q.r.row('retryCount'), | ||
processCount: q.r.row('processCount'), | ||
message: 'Master: ' + enums.message.failed, | ||
@@ -75,3 +76,3 @@ dateEnable: q.r.row('dateEnable') | ||
logger('Event: reviewed', runReviewTasksResult); | ||
q.emit(enums.status.reviewed, runReviewTasksResult); | ||
q.emit(enums.status.reviewed, q.id, runReviewTasksResult); | ||
queueProcess.restart(q); | ||
@@ -78,0 +79,0 @@ return Promise.props({ |
@@ -43,2 +43,3 @@ 'use strict'; | ||
terminated: 'terminated', | ||
reanimated: 'reanimated', | ||
log: 'log', // Event only, not a job status | ||
@@ -52,2 +53,3 @@ updated: 'updated' // Event only, not a job status | ||
db: 'rjqJobQueue', | ||
databaseInitDelay: 1000, | ||
masterInterval: 310000, // 5 minutes and 10 seconds | ||
@@ -58,2 +60,4 @@ priority: 'normal', | ||
retryDelay: 600000, // 10 minutes | ||
repeat: false, | ||
repeatDelay: 300000, // 5 minutes | ||
concurrency: 1, | ||
@@ -65,3 +69,4 @@ removeFinishedJobs: 15552000000 // 180 days | ||
indexInactivePriorityDateCreated: 'indexInactivePriorityDateCreated', | ||
indexFinishedDateFinished: 'indexFinishedDateFinished' | ||
indexFinishedDateFinished: 'indexFinishedDateFinished', | ||
indexStatus: 'status' | ||
}); | ||
@@ -91,6 +96,7 @@ var dbResult = Object.freeze({ | ||
jobProgress: 'Job progress updated. Old value in log data', | ||
jobNotActive: 'Job is not at an active status', | ||
jobNotAdded: 'Job not added to the queue', | ||
jobAlreadyAdded: 'Job is already in the queue', | ||
jobDataInvalid: 'Job data can not be a function', | ||
jobInvalid: 'Job object is invalid', | ||
jobReanimated: 'Job has been reanimated', | ||
processTwice: 'Cannot call queue process twice', | ||
@@ -102,2 +108,4 @@ idInvalid: 'The job id is invalid', | ||
retryDelayIvalid: 'The job retryDelay value is invalid', | ||
repeatInvalid: 'The job repeat value is invalid', | ||
repeatDelayInvalid: 'The job repeatDelay value is invalid', | ||
dateEnableIvalid: 'The job dateEnable value is invalid', | ||
@@ -108,2 +116,3 @@ dbError: 'RethinkDB returned an error', | ||
globalStateError: 'The global state document change feed is invalid', | ||
datetimeInvalid: 'Invalid datetime arguments', | ||
noErrorStack: 'The error has no stack detail', | ||
@@ -110,0 +119,0 @@ noErrorMessage: 'The error has no message' |
@@ -57,3 +57,3 @@ 'use strict'; | ||
logger('ensureDate', value); | ||
return isDate(value) ? value : new Date(value); | ||
return isDate(value) ? value : new Date(); | ||
} | ||
@@ -111,2 +111,8 @@ | ||
function isError(value) { | ||
logger('isError', value); | ||
return value instanceof Error; | ||
} | ||
module.exports.error = isError; | ||
function isLog(value) { | ||
@@ -171,2 +177,13 @@ logger('isLog', value); | ||
module.exports.repeating = function isRepeating(job) { | ||
logger('isRepeating', job); | ||
if (isTrue(job.repeat)) { | ||
return true; | ||
} | ||
if (isInteger(job.repeat) && job.repeat > 0 && job.processCount <= job.repeat) { | ||
return true; | ||
} | ||
return false; | ||
}; | ||
module.exports.active = function isActive(job) { | ||
@@ -173,0 +190,0 @@ logger('isActive', job); |
@@ -12,9 +12,10 @@ 'use strict'; | ||
logger('completed: [' + job.id + ']', result); | ||
job.status = enums.status.completed; | ||
var isRepeating = is.repeating(job); | ||
job.status = isRepeating ? enums.status.waiting : enums.status.completed; | ||
job.dateFinished = new Date(); | ||
job.progress = 100; | ||
job.progress = isRepeating ? 0 : 100; | ||
var duration = job.dateFinished - job.dateStarted; | ||
duration = duration >= 0 ? duration : 0; | ||
var log = jobLog.createLogObject(job, result, enums.message.completed); | ||
var log = jobLog.createLogObject(job, result, enums.status.completed); | ||
log.duration = duration; | ||
@@ -25,2 +26,3 @@ | ||
status: job.status, | ||
dateEnable: job.q.r.branch(isRepeating, job.q.r.now().add(job.q.r.row('repeatDelay').div(1000)), job.q.r.row('dateEnable')), | ||
dateFinished: job.dateFinished, | ||
@@ -35,5 +37,5 @@ progress: job.progress, | ||
}).then(function (jobIds) { | ||
logger('Event: completed [' + jobIds[0] + ']'); | ||
job.q.emit(enums.status.completed, jobIds[0]); | ||
if (is.true(job.q.removeFinishedJobs)) { | ||
logger('Event: completed', jobIds[0], isRepeating); | ||
job.q.emit(enums.status.completed, job.q.id, jobIds[0], isRepeating); | ||
if (!isRepeating && is.true(job.q.removeFinishedJobs)) { | ||
return job.q.removeJob(job).then(function (deleteResult) { | ||
@@ -40,0 +42,0 @@ return jobIds; |
@@ -6,2 +6,3 @@ 'use strict'; | ||
var is = require('./is'); | ||
var datetime = require('./datetime'); | ||
var enums = require('./enums'); | ||
@@ -18,9 +19,20 @@ var jobLog = require('./job-log'); | ||
var isRetry = job.retryCount < job.retryMax; | ||
var isRepeating = is.repeating(job); | ||
var dateEnable = new Date(); | ||
job.status = enums.status.terminated; | ||
if (isRetry) { | ||
job.status = enums.status.failed; | ||
dateEnable = datetime.add.ms(job.retryDelay * job.retryCount); | ||
job.retryCount++; | ||
logType = enums.log.warning; | ||
} else { | ||
job.status = enums.status.terminated; | ||
} | ||
if (!isRetry && isRepeating) { | ||
job.status = enums.status.waiting; | ||
dateEnable = datetime.add.ms(job.repeatDelay); | ||
job.retryCount = 0; | ||
} | ||
job.dateFinished = new Date(); | ||
@@ -44,3 +56,3 @@ job.progress = 0; | ||
dateFinished: job.dateFinished, | ||
dateEnable: job.q.r.now().add(job.q.r.row('retryDelay').div(1000).mul(job.q.r.row('retryCount'))), | ||
dateEnable: dateEnable, | ||
log: job.q.r.row('log').append(log), | ||
@@ -53,10 +65,10 @@ queueId: job.q.id | ||
}).then(function (jobIds) { | ||
if (isRetry) { | ||
logger('Event: failed [' + jobIds[0] + ']'); | ||
job.q.emit(enums.status.failed, jobIds[0]); | ||
if (isRetry || isRepeating) { | ||
logger('Event: failed', job.q.id, jobIds[0]); | ||
job.q.emit(enums.status.failed, job.q.id, jobIds[0]); | ||
} else { | ||
logger('Event: terminated [' + jobIds[0] + ']'); | ||
job.q.emit(enums.status.terminated, jobIds[0]); | ||
logger('Event: terminated', job.q.id, jobIds[0]); | ||
job.q.emit(enums.status.terminated, job.q.id, jobIds[0]); | ||
} | ||
if (!isRetry && is.true(job.q.removeFinishedJobs)) { | ||
if (!isRetry && !isRepeating && is.true(job.q.removeFinishedJobs)) { | ||
return job.q.removeJob(job).then(function (deleteResult) { | ||
@@ -63,0 +75,0 @@ return jobIds; |
@@ -21,3 +21,4 @@ 'use strict'; | ||
status: status, | ||
retryCount: job.retryCount | ||
retryCount: job.retryCount, | ||
processCount: job.processCount | ||
}; | ||
@@ -47,4 +48,4 @@ } | ||
job.log.push(newLog); | ||
logger('Event: log [' + job.id + ']', updateResult); | ||
job.q.emit(enums.status.log, job.id); | ||
logger('Event: log', job.q.id, job.id); | ||
job.q.emit(enums.status.log, job.q.id, job.id); | ||
return true; | ||
@@ -51,0 +52,0 @@ }); |
@@ -18,2 +18,4 @@ 'use strict'; | ||
finalOptions.retryDelay = enums.options.retryDelay; | ||
finalOptions.repeat = enums.options.repeat; | ||
finalOptions.repeatDelay = enums.options.repeatDelay; | ||
@@ -36,2 +38,10 @@ if (Object.keys(enums.priority).includes(oldOptions.priority)) { | ||
if (is.true(oldOptions.repeat) || is.false(oldOptions.repeat) || is.integer(oldOptions.repeat) && oldOptions.repeat >= 0) { | ||
finalOptions.repeat = oldOptions.repeat; | ||
} | ||
if (is.integer(oldOptions.repeatDelay) && oldOptions.repeatDelay >= 0) { | ||
finalOptions.repeatDelay = oldOptions.repeatDelay; | ||
} | ||
if (Object.keys(enums.priority).includes(newOptions.priority)) { | ||
@@ -53,3 +63,11 @@ finalOptions.priority = newOptions.priority; | ||
if (is.true(newOptions.repeat) || is.false(newOptions.repeat) || is.integer(newOptions.repeat) && newOptions.repeat >= 0) { | ||
finalOptions.repeat = newOptions.repeat; | ||
} | ||
if (is.integer(newOptions.repeatDelay) && newOptions.repeatDelay >= 0) { | ||
finalOptions.repeatDelay = newOptions.repeatDelay; | ||
} | ||
return finalOptions; | ||
}; |
@@ -8,2 +8,3 @@ 'use strict'; | ||
var jobLog = require('./job-log'); | ||
var dbResult = require('./db-result'); | ||
@@ -14,3 +15,3 @@ module.exports = function jobProgress(job, percent) { | ||
logger('Error: progress called on non-active job', job); | ||
return Promise.resolve(false); | ||
return Promise.reject(new Error(enums.message.jobNotActive)); | ||
} | ||
@@ -34,8 +35,10 @@ if (!percent || !is.number(percent) || percent < 0) { | ||
log: job.q.r.row('log').append(newLog) | ||
}).run(); | ||
}, { returnChanges: true }).run(); | ||
}).then(function (updateResult) { | ||
logger('Event: progress [' + job.id + '] [' + percent + ']'); | ||
job.q.emit(enums.status.progress, job.id, percent); | ||
return true; | ||
return dbResult.toJob(job.q, updateResult); | ||
}).then(function (updateResult) { | ||
logger('Event: progress', job.q.id, job.id, percent); | ||
job.q.emit(enums.status.progress, job.q.id, job.id, percent); | ||
return updateResult[0]; | ||
}); | ||
}; |
@@ -24,6 +24,6 @@ 'use strict'; | ||
logger('updateResult', updateResult); | ||
logger('Event: updated [' + job.id + ']'); | ||
job.q.emit(enums.status.updated, job.id); | ||
return true; | ||
logger('Event: updated', job.q.id, job.id); | ||
job.q.emit(enums.status.updated, job.q.id, job.id); | ||
return job; | ||
}); | ||
}; |
@@ -40,2 +40,5 @@ 'use strict'; | ||
this.retryCount = 0; | ||
this.repeat = options.repeat; | ||
this.repeatDelay = options.repeatDelay; | ||
this.processCount = 0; | ||
this.progress = 0; | ||
@@ -98,2 +101,20 @@ this.status = enums.status.created; | ||
}, { | ||
key: 'setRepeat', | ||
value: function setRepeat(newRepeat) { | ||
if (is.boolean(newRepeat) || is.integer(newRepeat) && newRepeat >= 0) { | ||
this.repeat = newRepeat; | ||
return this; | ||
} | ||
throw new Error(enums.message.repeatInvalid); | ||
} | ||
}, { | ||
key: 'setRepeatDelay', | ||
value: function setRepeatDelay(newRepeatDelay) { | ||
if (is.integer(newRepeatDelay) && newRepeatDelay >= 0) { | ||
this.repeatDelay = newRepeatDelay; | ||
return this; | ||
} | ||
throw new Error(enums.message.repeatDelayInvalid); | ||
} | ||
}, { | ||
key: 'setDateEnable', | ||
@@ -108,12 +129,12 @@ value: function setDateEnable(newDateEnable) { | ||
}, { | ||
key: 'setProgress', | ||
value: function setProgress(percent) { | ||
key: 'updateProgress', | ||
value: function updateProgress(percent) { | ||
var _this = this; | ||
logger('setProgress [' + percent + ']'); | ||
logger('updateProgress [' + percent + ']'); | ||
return this.q.ready().then(function () { | ||
return jobProgress(_this, percent); | ||
}).catch(function (err) { | ||
logger('setProgress Error:', err); | ||
_this.q.emit(enums.status.error, err); | ||
logger('Event: updateProgress error', err, _this.q.id); | ||
_this.q.emit(enums.status.error, _this.q.id, err); | ||
return Promise.reject(err); | ||
@@ -131,4 +152,4 @@ }); | ||
}).catch(function (err) { | ||
logger('update Error:', err); | ||
_this2.q.emit(enums.status.error, err); | ||
logger('Event: update error', err, _this2.q.id); | ||
_this2.q.emit(enums.status.error, _this2.q.id, err); | ||
return Promise.reject(err); | ||
@@ -163,4 +184,4 @@ }); | ||
}).catch(function (err) { | ||
logger('addLog Error:', err); | ||
_this3.q.emit(enums.status.error, err); | ||
logger('Event: addLog error', err, _this3.q.id); | ||
_this3.q.emit(enums.status.error, _this3.q.id, err); | ||
return Promise.reject(err); | ||
@@ -167,0 +188,0 @@ }); |
@@ -11,3 +11,3 @@ 'use strict'; | ||
module.exports = function queueAddJob(q, job, skipStatusCheck) { | ||
module.exports = function queueAddJob(q, job) { | ||
logger('addJob', job); | ||
@@ -17,6 +17,3 @@ return Promise.resolve().then(function () { | ||
}).map(function (oneJob) { | ||
if (!skipStatusCheck && oneJob.status !== enums.status.created) { | ||
return Promise.reject(new Error(enums.message.jobAlreadyAdded)); | ||
} | ||
if (!skipStatusCheck) { | ||
if (oneJob.status === enums.status.created) { | ||
oneJob.status = enums.status.waiting; | ||
@@ -41,6 +38,6 @@ } | ||
for (var _iterator = savedJobs[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) { | ||
var _job = _step.value; | ||
var savedjob = _step.value; | ||
logger('Event: added [' + _job.id + ']'); | ||
q.emit(enums.status.added, _job.id); | ||
logger('Event: added [' + savedjob.id + ']'); | ||
q.emit(enums.status.added, q.id, savedjob.id); | ||
} | ||
@@ -47,0 +44,0 @@ } catch (err) { |
@@ -29,2 +29,3 @@ 'use strict'; | ||
retryCount: q.r.row('retryCount'), | ||
processCount: q.r.row('processCount'), | ||
message: reason | ||
@@ -39,4 +40,4 @@ }), | ||
jobIds.forEach(function (jobId) { | ||
logger('Event: cancelled [' + jobId + ']'); | ||
q.emit(enums.status.cancelled, jobId); | ||
logger('Event: cancelled', q.id, jobId); | ||
q.emit(enums.status.cancelled, q.id, jobId); | ||
}); | ||
@@ -43,0 +44,0 @@ if (is.true(q.removeFinishedJobs)) { |
@@ -85,3 +85,3 @@ 'use strict'; | ||
logger('Event: reviewed [local: false]'); | ||
q.emit(enums.status.reviewed, { | ||
q.emit(enums.status.reviewed, newVal.queueId, { | ||
local: false, | ||
@@ -97,3 +97,5 @@ reviewed: null, | ||
} | ||
q.emit(enums.status.error, new Error(enums.message.globalStateError)); | ||
var _err = new Error(enums.message.globalStateError); | ||
logger('Event: State document change error', _err, q.id); | ||
q.emit(enums.status.error, q.id, _err); | ||
return enums.status.error; | ||
@@ -105,3 +107,3 @@ } | ||
logger('Event: added [' + newVal.id + ']'); | ||
q.emit(enums.status.added, newVal.id); | ||
q.emit(enums.status.added, newVal.queueId, newVal.id); | ||
restartProcessing(q); | ||
@@ -114,3 +116,3 @@ return enums.status.added; | ||
logger('Event: active [' + newVal.id + ']'); | ||
q.emit(enums.status.active, newVal.id); | ||
q.emit(enums.status.active, newVal.queueId, newVal.id); | ||
return enums.status.active; | ||
@@ -122,3 +124,3 @@ } | ||
logger('Event: progress [' + newVal.progress + ']'); | ||
q.emit(enums.status.progress, newVal.id, newVal.progress); | ||
q.emit(enums.status.progress, newVal.queueId, newVal.id, newVal.progress); | ||
return enums.status.progress; | ||
@@ -129,4 +131,5 @@ } | ||
if (is.completed(newVal) && !is.completed(oldVal)) { | ||
logger('Event: completed [' + newVal.id + ']'); | ||
q.emit(enums.status.completed, newVal.id); | ||
var isRepeating = is.repeating(newVal); | ||
logger('Event: completed', newVal.queueId, newVal.id, isRepeating); | ||
q.emit(enums.status.completed, newVal.queueId, newVal.id, isRepeating); | ||
return enums.status.completed; | ||
@@ -137,4 +140,4 @@ } | ||
if (is.cancelled(newVal) && !is.cancelled(oldVal)) { | ||
logger('Event: cancelled [' + newVal.id + ']'); | ||
q.emit(enums.status.cancelled, newVal.id); | ||
logger('Event: cancelled', newVal.queueId, newVal.id); | ||
q.emit(enums.status.cancelled, newVal.queueId, newVal.id); | ||
return enums.status.cancelled; | ||
@@ -145,4 +148,4 @@ } | ||
if (is.failed(newVal) && !is.failed(oldVal)) { | ||
logger('Event: failed [' + newVal.id + ']'); | ||
q.emit(enums.status.failed, newVal.id); | ||
logger('Event: failed', newVal.queueId, newVal.id); | ||
q.emit(enums.status.failed, newVal.queueId, newVal.id); | ||
return enums.status.failed; | ||
@@ -153,4 +156,4 @@ } | ||
if (is.terminated(newVal) && !is.terminated(oldVal)) { | ||
logger('Event: terminated [' + newVal.id + ']'); | ||
q.emit(enums.status.terminated, newVal.id); | ||
logger('Event: terminated', newVal.queueId, newVal.id); | ||
q.emit(enums.status.terminated, newVal.queueId, newVal.id); | ||
return enums.status.terminated; | ||
@@ -161,4 +164,4 @@ } | ||
if (!is.job(newVal) && is.job(oldVal)) { | ||
logger('Event: removed [' + oldVal.id + ']'); | ||
q.emit(enums.status.removed, oldVal.id); | ||
logger('Event: removed', oldVal.id); | ||
q.emit(enums.status.removed, null, oldVal.id); | ||
return enums.status.removed; | ||
@@ -169,4 +172,4 @@ } | ||
if (is.job(newVal) && is.job(oldVal) && is.array(newVal.log) && is.array(oldVal.log) && newVal.log.length > oldVal.log.length) { | ||
logger('Event: log', newVal.log); | ||
q.emit(enums.status.log, newVal.id); | ||
logger('Event: log', newVal.queueId, newVal.log); | ||
q.emit(enums.status.log, newVal.queueId, newVal.id); | ||
return enums.status.log; | ||
@@ -173,0 +176,0 @@ } |
@@ -24,3 +24,3 @@ 'use strict'; | ||
q._changeFeedCursor = changeFeed; | ||
q._changeFeedCursor.each(function (err, change) { | ||
return q._changeFeedCursor.each(function (err, change) { | ||
return queueChange(q, err, change); | ||
@@ -46,3 +46,3 @@ }); | ||
module.exports.detach = function dbDetach(q, drainPool) { | ||
module.exports.detach = function dbDetach(q) { | ||
logger('detach'); | ||
@@ -56,3 +56,3 @@ return Promise.resolve().then(function () { | ||
} | ||
return null; | ||
return true; | ||
}).then(function () { | ||
@@ -63,17 +63,20 @@ if (q.master) { | ||
} | ||
return null; | ||
}).then(function () { | ||
if (drainPool) { | ||
q._ready = Promise.resolve(false); | ||
logger('draining connection pool'); | ||
return q.r.getPoolMaster().drain(); | ||
} | ||
return null; | ||
}).then(function () { | ||
if (drainPool) { | ||
logger('Event: detached [' + q.id + ']'); | ||
q.emit(enums.status.detached, q.id); | ||
} | ||
return null; | ||
return true; | ||
}); | ||
}; | ||
module.exports.drain = function drain(q) { | ||
return Promise.resolve().then(function () { | ||
q._ready = Promise.resolve(false); | ||
logger('draining connection pool'); | ||
return q.r.getPoolMaster().drain(); | ||
}).delay(1000).then(function () { | ||
logger('Event: detached [' + q.id + ']'); | ||
q.emit(enums.status.detached, q.id); | ||
}).delay(1000).then(function () { | ||
q.eventNames().forEach(function (key) { | ||
q.removeAllListeners(key); | ||
}); | ||
return true; | ||
}); | ||
}; |
@@ -11,14 +11,12 @@ 'use strict'; | ||
logger('queueDrop'); | ||
return queueStop(q, false).then(function () { | ||
return queueStop(q).then(function () { | ||
q._ready = Promise.resolve(false); | ||
return queueDb.detach(q, false); | ||
return queueDb.detach(q); | ||
}).then(function () { | ||
return q.r.db(q.db).tableDrop(q.name).run(); | ||
}).then(function () { | ||
return queueDb.detach(q, true); | ||
}).then(function () { | ||
logger('Event: dropped [' + q.id + ']'); | ||
q.emit(enums.status.dropped, q.id); | ||
return true; | ||
return queueDb.drain(q); | ||
}); | ||
}; |
@@ -22,2 +22,3 @@ 'use strict'; | ||
queueId: q.id, | ||
processCount: q.r.row('processCount').add(1), | ||
log: q.r.row('log').append({ | ||
@@ -29,2 +30,3 @@ date: q.r.now(), | ||
retryCount: q.r.row('retryCount'), | ||
processCount: q.r.row('processCount'), | ||
message: enums.message.active | ||
@@ -46,3 +48,3 @@ }) | ||
logger('Event: active [' + job.id + ']'); | ||
q.emit(enums.status.active, job.id); | ||
q.emit(enums.status.active, q.id, job.id); | ||
} | ||
@@ -49,0 +51,0 @@ } catch (err) { |
@@ -22,4 +22,4 @@ 'use strict'; | ||
return new Promise(function (resolve, reject) { | ||
logger('Event: pausing [' + q.id + ']'); | ||
q.emit(enums.status.pausing, eventGlobal, q.id); | ||
logger('Event: pausing', q.id, eventGlobal); | ||
q.emit(enums.status.pausing, q.id, eventGlobal); | ||
if (q.running < 1) { | ||
@@ -37,4 +37,4 @@ return resolve(); | ||
}).then(function () { | ||
logger('Event: paused [global:' + eventGlobal + '] [' + q.id + ']'); | ||
q.emit(enums.status.paused, eventGlobal, q.id); | ||
logger('Event: paused', q.id, eventGlobal); | ||
q.emit(enums.status.paused, q.id, eventGlobal); | ||
return true; | ||
@@ -56,6 +56,6 @@ }); | ||
queueProcess.restart(q); | ||
logger('Event: resumed [global:' + eventGlobal + '] [' + q.id + ']'); | ||
q.emit(enums.status.resumed, eventGlobal, q.id); | ||
logger('Event: resumed', q.id, eventGlobal); | ||
q.emit(enums.status.resumed, q.id, eventGlobal); | ||
return true; | ||
}); | ||
}; |
@@ -32,4 +32,4 @@ 'use strict'; | ||
var err = new Error(enums.message.cancelCallbackInvalid); | ||
logger('Event: error [' + err + ']'); | ||
job.q.emit(enums.status.error, err); | ||
logger('Event: addOnCancelHandler error', err, job.q.id); | ||
job.q.emit(enums.status.error, job.q.id, err); | ||
throw err; | ||
@@ -61,4 +61,4 @@ } | ||
function restartJobTimeout(jobId) { | ||
logger('resetJobTimeout', jobId); | ||
function restartJobTimeout(queueId, jobId) { | ||
logger('resetJobTimeout', queueId, jobId); | ||
var jobTimeout = void 0; | ||
@@ -76,4 +76,4 @@ if (jobTimeouts.has(jobId)) { | ||
function nextHandler(jobResult) { | ||
logger('nextHandler', 'Running: [' + job.q.running + ']', jobResult); | ||
function nextHandler(err, jobResult) { | ||
logger('nextHandler', 'Running: [' + job.q.running + ']', err, jobResult); | ||
logger('handled', handled); | ||
@@ -88,19 +88,16 @@ // Ignore mulpiple calls to next() | ||
return new Promise(function (resolve, reject) { | ||
logger('Promise resolving or rejecting jobResult'); | ||
if (jobResult instanceof Error) { | ||
reject(jobResult); | ||
} | ||
return resolve(jobResult); | ||
}).then(function (successResult) { | ||
logger('jobResult resolved successfully'); | ||
return jobCompleted(job, successResult); | ||
}).catch(function (err) { | ||
var returnPromise = void 0; | ||
if (err) { | ||
logger('jobResult is an error'); | ||
if (err && err.cancelJob) { | ||
return queueCancelJob(job.q, job, err.cancelJob); | ||
} else if (err) { | ||
return jobFailed(job, err); | ||
if (err.cancelJob) { | ||
returnPromise = queueCancelJob(job.q, job, err.cancelJob); | ||
} else { | ||
returnPromise = jobFailed(job, err); | ||
} | ||
}).then(function (finalResult) { | ||
} else { | ||
logger('jobResult processed successfully'); | ||
returnPromise = jobCompleted(job, jobResult); | ||
} | ||
return returnPromise.then(function (finalResult) { | ||
job.q._running--; | ||
@@ -110,4 +107,4 @@ setImmediate(jobTick, job.q); | ||
}).catch(function (err) { | ||
logger('next() Promise Error:', err); | ||
job.q.emit(enums.status.error, err); | ||
logger('Event: next() Promise error', err, job.q.id); | ||
job.q.emit(enums.status.error, job.q.id, err); | ||
return Promise.reject(err); | ||
@@ -125,3 +122,3 @@ }); | ||
logger('Event: processing [' + job.id + ']'); | ||
job.q.emit(enums.status.processing, job.id); | ||
job.q.emit(enums.status.processing, job.q.id, job.id); | ||
logger('calling handler function'); | ||
@@ -180,5 +177,5 @@ job.q._handler(job, nextHandler, addOnCancelHandler); | ||
}).catch(function (err) { | ||
logger('queueGetNextJob Error:', err); | ||
getNextJobCleanup(q._getNextJobCalled); | ||
q.emit(enums.status.error, err); | ||
logger('Event: queueGetNextJob error:', err, q.id); | ||
q.emit(enums.status.error, q.id, err); | ||
return Promise.reject(err); | ||
@@ -198,3 +195,3 @@ }); | ||
q.on(enums.status.progress, restartJobTimeout); | ||
q.on(enums.status.cancelled, function (jobId) { | ||
q.on(enums.status.cancelled, function (queueId, jobId) { | ||
return onCancelJob(jobId, q); | ||
@@ -201,0 +198,0 @@ }); |
@@ -31,4 +31,4 @@ 'use strict'; | ||
logger('Event: removed [' + id + ']'); | ||
q.emit(enums.status.removed, id); | ||
logger('Event: removed', q.id, id); | ||
q.emit(enums.status.removed, q.id, id); | ||
} | ||
@@ -35,0 +35,0 @@ } catch (err) { |
@@ -17,5 +17,5 @@ 'use strict'; | ||
logger('Event: reset [' + totalRemoved + ']'); | ||
q.emit(enums.status.reset, totalRemoved); | ||
q.emit(enums.status.reset, q.id, totalRemoved); | ||
return totalRemoved; | ||
}); | ||
}; |
@@ -8,9 +8,7 @@ 'use strict'; | ||
module.exports = function queueStop(q) { | ||
var drainPool = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : true; | ||
logger('queueStop with drain:', drainPool); | ||
logger('queueStop'); | ||
logger('Event: stopping [' + q.id + ']'); | ||
q.emit(enums.status.stopping, q.id); | ||
return q.pause().then(function () { | ||
return queueDb.detach(q, drainPool); | ||
return queueDb.detach(q); | ||
}).then(function () { | ||
@@ -17,0 +15,0 @@ logger('Event: stopped [' + q.id + ']'); |
@@ -9,5 +9,3 @@ 'use strict'; | ||
return Promise.resolve().then(function () { | ||
return q.r.db(q.db).table(q.name).hasFields('status').group(function (job) { | ||
return job.pluck('status'); | ||
}).count(); | ||
return q.r.db(q.db).table(q.name).group({ index: 'status' }).count(); | ||
}).then(function (reduction) { | ||
@@ -30,3 +28,5 @@ var summary = { | ||
summary[stat.group.status] = stat.reduction; | ||
if (stat.group) { | ||
summary[stat.group] = stat.reduction; | ||
} | ||
} | ||
@@ -33,0 +33,0 @@ } catch (err) { |
@@ -55,2 +55,3 @@ 'use strict'; | ||
_this._masterInterval = options.masterInterval == null ? enums.options.masterInterval : options.masterInterval; | ||
_this._databaseInitDelay = options.databaseInitDelay == null ? enums.options.databaseInitDelay : options.databaseInitDelay; | ||
_this._jobOptions = jobOptions(); | ||
@@ -84,4 +85,4 @@ _this._changeFeedCursor = false; | ||
}).catch(function (err) { | ||
logger('addJob Error:', err); | ||
_this2.emit(enums.status.error, err); | ||
logger('Event: addJob error', _this2.id, err); | ||
_this2.emit(enums.status.error, _this2.id, err); | ||
return Promise.reject(err); | ||
@@ -99,4 +100,4 @@ }); | ||
}).catch(function (err) { | ||
logger('getJob Error:', err); | ||
_this3.emit(enums.status.error, err); | ||
logger('Event: getJob error', _this3.id, err); | ||
_this3.emit(enums.status.error, _this3.id, err); | ||
return Promise.reject(err); | ||
@@ -114,4 +115,4 @@ }); | ||
}).catch(function (err) { | ||
logger('findJob Error:', err); | ||
_this4.emit(enums.status.error, err); | ||
logger('Event: findJob error', _this4.id, err); | ||
_this4.emit(enums.status.error, _this4.id, err); | ||
return Promise.reject(err); | ||
@@ -129,4 +130,4 @@ }); | ||
}).catch(function (err) { | ||
logger('cancelJob Error:', err); | ||
_this5.emit(enums.status.error, err); | ||
logger('Event: cancelJob error', _this5.id, err); | ||
_this5.emit(enums.status.error, _this5.id, err); | ||
return Promise.reject(err); | ||
@@ -144,4 +145,4 @@ }); | ||
}).catch(function (err) { | ||
logger('removeJob Error:', err); | ||
_this6.emit(enums.status.error, err); | ||
logger('Event: removeJob error', _this6.id, err); | ||
_this6.emit(enums.status.error, _this6.id, err); | ||
return Promise.reject(err); | ||
@@ -159,4 +160,4 @@ }); | ||
}).catch(function (err) { | ||
logger('process Error:', err); | ||
_this7.emit(enums.status.error, err); | ||
logger('Event: process error', _this7.id, err); | ||
_this7.emit(enums.status.error, _this7.id, err); | ||
return Promise.reject(err); | ||
@@ -174,4 +175,4 @@ }); | ||
}).catch(function (err) { | ||
logger('review Error:', err); | ||
_this8.emit(enums.status.error, err); | ||
logger('Event: review error', _this8.id, err); | ||
_this8.emit(enums.status.error, _this8.id, err); | ||
return Promise.reject(err); | ||
@@ -189,4 +190,4 @@ }); | ||
}).catch(function (err) { | ||
logger('summary Error:', err); | ||
_this9.emit(enums.status.error, err); | ||
logger('Event: summary error', _this9.id, err); | ||
_this9.emit(enums.status.error, _this9.id, err); | ||
return Promise.reject(err); | ||
@@ -210,4 +211,4 @@ }); | ||
}).catch(function (err) { | ||
logger('pause Error:', err); | ||
_this10.emit(enums.status.error, err); | ||
logger('Event: pause error', _this10.id, err); | ||
_this10.emit(enums.status.error, _this10.id, err); | ||
return Promise.reject(err); | ||
@@ -225,4 +226,4 @@ }); | ||
}).catch(function (err) { | ||
logger('resume Error:', err); | ||
_this11.emit(enums.status.error, err); | ||
logger('Event: resume error', _this11.id, err); | ||
_this11.emit(enums.status.error, _this11.id, err); | ||
return Promise.reject(err); | ||
@@ -240,4 +241,4 @@ }); | ||
}).catch(function (err) { | ||
logger('reset Error:', err); | ||
_this12.emit(enums.status.error, err); | ||
logger('Event: reset error', _this12.id, err); | ||
_this12.emit(enums.status.error, _this12.id, err); | ||
return Promise.reject(err); | ||
@@ -252,5 +253,7 @@ }); | ||
logger('stop'); | ||
return queueStop(this).catch(function (err) { | ||
logger('stop Error:', err); | ||
_this13.emit(enums.status.error, err); | ||
return queueStop(this).then(function () { | ||
return queueDb.drain(_this13); | ||
}).catch(function (err) { | ||
logger('Event: stop error', _this13.id, err); | ||
_this13.emit(enums.status.error, _this13.id, err); | ||
return Promise.reject(err); | ||
@@ -266,4 +269,4 @@ }); | ||
return queueDrop(this).catch(function (err) { | ||
logger('drop Error:', err); | ||
_this14.emit(enums.status.error, err); | ||
logger('Event: drop error', _this14.id, err); | ||
_this14.emit(enums.status.error, _this14.id, err); | ||
return Promise.reject(err); | ||
@@ -344,3 +347,5 @@ }); | ||
if (!is.integer(value) || value < 1) { | ||
this.emit(enums.status.error, new Error(enums.message.concurrencyInvalid), value); | ||
var err = new Error(enums.message.concurrencyInvalid); | ||
logger('Event: concurrency error', this.id, err); | ||
this.emit(enums.status.error, this.id, err); | ||
return; | ||
@@ -347,0 +352,0 @@ } |
{ | ||
"name": "rethinkdb-job-queue", | ||
"version": "1.1.2", | ||
"version": "2.0.0", | ||
"description": "A persistent job or task queue backed by RethinkDB.", | ||
@@ -36,6 +36,13 @@ "main": "index.js", | ||
"clean": "rm -Rf dist", | ||
"test": "tape ./tests/test-runner.js | tap-spec", | ||
"tc": "tape ./tests/test-current.js | tap-spec", | ||
"build": "npm run clean && babel src --presets babel-preset-latest --out-dir dist", | ||
"test": "tap --timeout 1000 ./tests/test-runner.js", | ||
"tv": "tap --timeout 1000 --reporter tap ./tests/test-runner.js", | ||
"tc": "node ./tests/test-current.js", | ||
"tcd": "node --inspect --debug-brk ./tests/test-current.js", | ||
"lint": "standard", | ||
"coverage": "npm run coverage:rm && npm run coverage:cover && npm run coverage:report && npm run coverage:check", | ||
"coverage:cover": "istanbul cover ./tests/test-runner.js", | ||
"coverage:rm": "rm -Rf coverage", | ||
"coverage:report": "istanbul report", | ||
"coverage:check": "istanbul check-coverage ./coverage/coverage.json", | ||
"upgrade": "npm run upgrade:rm && npm run upgrade:ncu && npm run upgrade:install && npm run upgrade:finish", | ||
@@ -52,6 +59,6 @@ "upgrade:rm": "rm -Rf node_modules", | ||
"bluebird": "^3.4.6", | ||
"debug": "^2.2.0", | ||
"rethinkdbdash": "^2.3.23", | ||
"debug": "^2.3.2", | ||
"rethinkdbdash": "^2.3.26", | ||
"serialize-error": "^2.0.0", | ||
"uuid": "^2.0.3" | ||
"uuid": "^3.0.0" | ||
}, | ||
@@ -61,2 +68,3 @@ "devDependencies": { | ||
"babel-preset-latest": "^6.16.0", | ||
"istanbul": "0.4.5", | ||
"npm-check-updates": "^2.8.6", | ||
@@ -66,4 +74,4 @@ "proxyquire": "^1.7.10", | ||
"tap-spec": "^4.1.1", | ||
"tape": "^4.6.2" | ||
"tap": "^8.0.1" | ||
} | ||
} |
@@ -36,6 +36,7 @@ # Introduction | ||
* [Retrying failed jobs][job-retry-url] | ||
* [Repeatable jobs][job-repeat-url] | ||
* [Job reanimation][job-reanimation-url] | ||
* [Job Editing][job-editing-url] | ||
* Rich job [history log][job-log-url] | ||
* Over 1400 [integration tests][testing-url] | ||
* Over 2000 [integration tests][testing-url] | ||
@@ -47,3 +48,3 @@ [queue-connection-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Queue-Connection | ||
[job-priority-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job-Options#job-priority-option | ||
[job-progress-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job.setProgress | ||
[job-progress-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job.updateProgress | ||
[job-delayed-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Delayed-Job | ||
@@ -55,3 +56,4 @@ [find-job-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Queue.findJob | ||
[job-retry-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job-Retry | ||
[job-reanimation-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job-Editing#job-reanimation | ||
[job-repeat-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job.repeat | ||
[job-reanimation-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Queue.reanimateJob | ||
[job-editing-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job-Editing | ||
@@ -63,12 +65,6 @@ [job-log-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job.log | ||
For full documentation [please see the wiki][rjq-wiki-url] | ||
* [Full Documentation][rjq-wiki-url] | ||
* [Change Log][rjq-changelog-url] | ||
* [Code Coverage Report][rjq-coverage-url] from more than 2000 Integration Tests | ||
See the [Change Log][rjq-changelog-url] for recent changes. | ||
## Project Status | ||
* This `rethinkdb-job-queue` module is fully functional. | ||
* There are over 1400 integration tests. | ||
* Updated to v1.0.0 to support [SemVer](http://semver.org/). | ||
## Quick Start | ||
@@ -109,3 +105,3 @@ | ||
// Do something with your result | ||
return next(result) | ||
return next(null, result) | ||
} catch (err) { | ||
@@ -188,3 +184,3 @@ console.error(err) | ||
console.dir(info) | ||
return next(info) | ||
return next(null, info) | ||
}).catch((err) => { | ||
@@ -244,2 +240,3 @@ // This catch is for nodemailer sendMail errors. | ||
[rjq-changelog-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Change-Log | ||
[rjq-coverage-url]: https://rawgit.com/grantcarthew/node-rethinkdb-job-queue/master/coverage/lcov-report/index.html | ||
[thinker-image]: https://cdn.rawgit.com/grantcarthew/node-rethinkdb-job-queue/master/thinkerjoblist.png | ||
@@ -246,0 +243,0 @@ [nodemailer-url]: https://www.npmjs.com/package/nodemailer |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const is = require('../src/is') | ||
@@ -7,4 +7,4 @@ const tError = require('./test-error') | ||
module.exports = function () { | ||
test('chrono', (t) => { | ||
t.plan(13) | ||
test('datetime', (t) => { | ||
t.plan(23) | ||
@@ -21,10 +21,20 @@ try { | ||
t.equal(datetime.add.ms(tDate, tValue) - tDate, tValue, 'Add ms is valid') | ||
t.ok(is.date(datetime.add.ms(tValue)), 'Add ms only is a date object') | ||
t.ok(is.dateAfter(datetime.add.ms(tValue)), 'Add ms only is valid') | ||
t.ok(is.date(datetime.add.sec(tDate, tValue)), 'Add sec is a date object') | ||
t.equal(datetime.add.sec(tDate, tValue) - tDate, tValue * 1000, 'Add sec is valid') | ||
t.ok(is.date(datetime.add.sec(tValue)), 'Add sec only is a date object') | ||
t.ok(is.dateAfter(datetime.add.sec(tValue)), 'Add sec only is valid') | ||
t.ok(is.date(datetime.add.min(tDate, tValue)), 'Add min is a date object') | ||
t.equal(datetime.add.min(tDate, tValue) - tDate, tValue * 60000, 'Add min is valid') | ||
t.ok(is.date(datetime.add.min(tValue)), 'Add min only is a date object') | ||
t.ok(is.dateAfter(datetime.add.min(tValue)), 'Add min only is valid') | ||
t.ok(is.date(datetime.add.hours(tDate, tValue)), 'Add hours is a date object') | ||
t.equal(datetime.add.hours(tDate, tValue) - tDate, tValue * 3600000, 'Add hours is valid') | ||
t.ok(is.date(datetime.add.hours(tValue)), 'Add hours only is a date object') | ||
t.ok(is.dateAfter(datetime.add.hours(tValue)), 'Add hours only is valid') | ||
t.ok(is.date(datetime.add.days(tDate, tValue)), 'Add days is a date object') | ||
t.equal(datetime.add.days(tDate, tValue) - tDate, tValue * 86400000, 'Add days is valid') | ||
t.ok(is.date(datetime.add.days(tValue)), 'Add days only is a date object') | ||
t.ok(is.dateAfter(datetime.add.days(tValue)), 'Add days only is valid') | ||
t.equal(datetime.formatDate(tDate), dateStings.date, 'formatDate is valid') | ||
@@ -31,0 +41,0 @@ t.equal(datetime.formatTime(tDate), dateStings.time, 'formatTime is valid') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -3,0 +3,0 @@ const tError = require('./test-error') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -3,0 +3,0 @@ const tError = require('./test-error') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -3,0 +3,0 @@ const tError = require('./test-error') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -3,0 +3,0 @@ const tError = require('./test-error') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const tError = require('./test-error') | ||
@@ -3,0 +3,0 @@ const tOpts = require('./test-options') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -3,0 +3,0 @@ const tError = require('./test-error') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -16,7 +16,8 @@ const is = require('../src/is') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'db-review' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('db-review', (t) => { | ||
t.plan(85) | ||
test(testName, (t) => { | ||
t.plan(86) | ||
@@ -33,2 +34,3 @@ let processRestart = 0 | ||
let state = { | ||
testName, | ||
enabled: false, | ||
@@ -57,2 +59,3 @@ ready: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
@@ -246,3 +249,2 @@ updated: 0 | ||
t.comment('db-review: event summary') | ||
eventHandlers.remove(t, q, state) | ||
@@ -249,0 +251,0 @@ return q.reset() |
@@ -1,4 +0,4 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const tError = require('./test-error') | ||
const enums = require('../dist/enums') | ||
const enums = require('../src/enums') | ||
@@ -18,7 +18,7 @@ module.exports = function () { | ||
t.equal(Object.keys(enums.priority).length, 6, 'Enums priority has correct number of keys') | ||
t.equal(Object.keys(enums.status).length, 25, 'Enums status has correct number of keys') | ||
t.equal(Object.keys(enums.options).length, 11, 'Enums options has correct number of keys') | ||
t.equal(Object.keys(enums.index).length, 3, 'Enums index has correct number of keys') | ||
t.equal(Object.keys(enums.status).length, 26, 'Enums status has correct number of keys') | ||
t.equal(Object.keys(enums.options).length, 14, 'Enums options has correct number of keys') | ||
t.equal(Object.keys(enums.index).length, 4, 'Enums index has correct number of keys') | ||
t.equal(Object.keys(enums.log).length, 3, 'Enums log has correct number of keys') | ||
t.equal(Object.keys(enums.message).length, 25, 'Enums message has correct number of keys') | ||
t.equal(Object.keys(enums.message).length, 29, 'Enums message has correct number of keys') | ||
} catch (err) { | ||
@@ -25,0 +25,0 @@ tError(err, module, t) |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const uuid = require('uuid') | ||
@@ -8,3 +8,3 @@ const is = require('../src/is') | ||
test('is', (t) => { | ||
t.plan(62) | ||
t.plan(71) | ||
@@ -20,3 +20,5 @@ const ms = 5000 | ||
priority: enums.priority.normal, | ||
status: enums.status.created | ||
status: enums.status.created, | ||
repeat: false, | ||
processCount: 0 | ||
} | ||
@@ -62,2 +64,5 @@ const log = { | ||
t.notOk(is.array({}), 'Is array false with object') | ||
t.ok(is.error(new Error()), 'Is error true with new Error') | ||
t.ok(is.error(Error()), 'Is error true with Error') | ||
t.notOk(is.error('not an error'), 'Is error false with string') | ||
t.ok(is.job(job), 'Is job true with mock job') | ||
@@ -81,2 +86,14 @@ t.notOk(is.job(), 'Is job false with null') | ||
t.notOk(is.job(job), 'Is job false with invalid status') | ||
job.repeat = true | ||
t.ok(is.repeating(job), 'Is job repeating true when repeat is true') | ||
job.repeat = 5 | ||
t.ok(is.repeating(job), 'Is job repeating true when repeat is integer') | ||
job.processCount = 5 | ||
t.ok(is.repeating(job), 'Is job repeating true when processCount equals repeat') | ||
job.processCount = 6 | ||
t.notOk(is.repeating(job), 'Is job repeating false when processCount > repeat') | ||
job.repeat = 0 | ||
t.notOk(is.repeating(job), 'Is job repeating false when repeat is 0') | ||
job.repeat = false | ||
t.notOk(is.repeating(job), 'Is job repeating false when repeat is false') | ||
job.status = enums.status.created | ||
@@ -83,0 +100,0 @@ t.notOk(is.active(job), 'Is active false with invalid status') |
@@ -1,4 +0,5 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
const is = require('../src/is') | ||
const datetime = require('../src/datetime') | ||
const tError = require('./test-error') | ||
@@ -10,7 +11,9 @@ const enums = require('../src/enums') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'job-completed' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('job-completed', (t) => { | ||
t.plan(23) | ||
test(testName, (t) => { | ||
t.plan(137) | ||
@@ -20,25 +23,34 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
job.data = tData | ||
let beforeDate | ||
let afterDate | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
function completedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.equal(jobId, job.id, `Event: Job completed [${jobId}]`) | ||
} | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 1, | ||
reset: 1, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 3, | ||
waiting: 0, | ||
active: 0, | ||
completed: 7, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 5 | ||
} | ||
function removedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.equal(jobId, job.id, `Event: Job removed [${jobId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.completed, completedEventHandler) | ||
q.on(enums.status.removed, removedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.completed, completedEventHandler) | ||
q.removeListener(enums.status.removed, removedEventHandler) | ||
} | ||
@@ -52,3 +64,3 @@ return q.reset().then((resetResult) => { | ||
// ---------- Job Completed Test ---------- | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
t.comment('job-completed: Job Completed') | ||
@@ -74,2 +86,143 @@ return jobCompleted(savedJob[0], tData) | ||
// ---------- Job Repeat True Test ---------- | ||
t.comment('job-completed: Job Repeat True') | ||
job = q.createJob().setRepeat(true) | ||
job.data = tData | ||
return q.addJob(job) | ||
}).then((savedJob) => { | ||
t.equal(savedJob[0].id, job.id, 'Job saved successfully') | ||
savedJob[0].processCount = 1 | ||
return savedJob[0].update() | ||
}).then((updatedJob) => { | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return jobCompleted(updatedJob, tData) | ||
}).then((completedIds) => { | ||
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids') | ||
return q.getJob(completedIds[0]) | ||
}).then((repeatedJobs) => { | ||
beforeDate = datetime.add.min(-1) | ||
afterDate = datetime.add.min(6) | ||
t.equal(repeatedJobs[0].status, enums.status.waiting, 'Repeat job completed status is waiting') | ||
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes') | ||
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now') | ||
t.ok(repeatedJobs[0].progress === 0, 'Repeat job completed progress is 0 ') | ||
t.equal(repeatedJobs[0].processCount, 1, 'Repeat job completed processCount is 1 ') | ||
t.equal(repeatedJobs[0].log.length, 3, 'Repeat job completed log count is valid') | ||
let log = repeatedJobs[0].getLastLog() | ||
t.ok(is.date(log.date), 'Repeat log date is a date') | ||
t.equal(log.data, tData, 'Repeat log data is valid') | ||
t.equal(log.message, enums.status.completed, 'Repeat log message is valid') | ||
t.equal(log.queueId, q.id, 'Repeat log queueId is valid') | ||
t.equal(log.processCount, 1, 'Repeat log processCount is 1') | ||
t.equal(log.status, enums.status.waiting, 'Repeat log status is valid') | ||
t.equal(log.type, enums.log.information, 'Repeat log type is valid') | ||
repeatedJobs[0].processCount++ | ||
return repeatedJobs[0].update() | ||
}).then((updatedJob) => { | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return jobCompleted(updatedJob, tData) | ||
}).then((completedIds) => { | ||
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids') | ||
return q.getJob(completedIds[0]) | ||
}).then((repeatedJobs) => { | ||
beforeDate = datetime.add.min(-1) | ||
afterDate = datetime.add.min(6) | ||
t.equal(repeatedJobs[0].status, enums.status.waiting, 'Repeat job completed status is waiting') | ||
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes') | ||
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now') | ||
t.ok(repeatedJobs[0].progress === 0, 'Repeat job completed progress is 0 ') | ||
t.equal(repeatedJobs[0].processCount, 2, 'Repeat job completed processCount is 2 ') | ||
t.equal(repeatedJobs[0].log.length, 5, 'Repeat job completed log count is valid') | ||
let log = repeatedJobs[0].getLastLog() | ||
t.ok(is.date(log.date), 'Repeat log date is a date') | ||
t.equal(log.data, tData, 'Repeat log data is valid') | ||
t.equal(log.message, enums.status.completed, 'Repeat log message is valid') | ||
t.equal(log.queueId, q.id, 'Repeat log queueId is valid') | ||
t.equal(log.processCount, 2, 'Repeat log processCount is 2') | ||
t.equal(log.status, enums.status.waiting, 'Repeat log status is valid') | ||
t.equal(log.type, enums.log.information, 'Repeat log type is valid') | ||
// ---------- Job Repeat Number Test ---------- | ||
t.comment('job-completed: Job Repeat Number') | ||
job = q.createJob().setRepeat(2) | ||
job.data = tData | ||
return q.addJob(job) | ||
}).then((savedJob) => { | ||
t.equal(savedJob[0].id, job.id, 'Job saved successfully') | ||
savedJob[0].processCount++ | ||
return savedJob[0].update() | ||
}).then((updatedJob) => { | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return jobCompleted(updatedJob, tData) | ||
}).then((completedIds) => { | ||
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids') | ||
return q.getJob(completedIds[0]) | ||
}).then((repeatedJobs) => { | ||
beforeDate = datetime.add.min(-1) | ||
afterDate = datetime.add.min(6) | ||
t.equal(repeatedJobs[0].status, enums.status.waiting, 'Repeat job completed status is waiting') | ||
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes') | ||
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now') | ||
t.ok(repeatedJobs[0].progress === 0, 'Repeat job completed progress is 0 ') | ||
t.equal(repeatedJobs[0].processCount, 1, 'Repeat job completed processCount is 1 ') | ||
t.equal(repeatedJobs[0].log.length, 3, 'Repeat job completed log count is valid') | ||
let log = repeatedJobs[0].getLastLog() | ||
t.ok(is.date(log.date), 'Repeat log date is a date') | ||
t.equal(log.data, tData, 'Repeat log data is valid') | ||
t.equal(log.message, enums.status.completed, 'Repeat log message is valid') | ||
t.equal(log.queueId, q.id, 'Repeat log queueId is valid') | ||
t.equal(log.processCount, 1, 'Repeat log processCount is 1') | ||
t.equal(log.status, enums.status.waiting, 'Repeat log status is valid') | ||
t.equal(log.type, enums.log.information, 'Repeat log type is valid') | ||
repeatedJobs[0].processCount++ | ||
return repeatedJobs[0].update() | ||
}).then((updatedJob) => { | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return jobCompleted(updatedJob, tData) | ||
}).then((completedIds) => { | ||
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids') | ||
return q.getJob(completedIds[0]) | ||
}).then((repeatedJobs) => { | ||
beforeDate = datetime.add.min(-1) | ||
afterDate = datetime.add.min(6) | ||
t.equal(repeatedJobs[0].status, enums.status.waiting, 'Repeat job completed status is waiting') | ||
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes') | ||
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now') | ||
t.ok(repeatedJobs[0].progress === 0, 'Repeat job completed progress is 0 ') | ||
t.equal(repeatedJobs[0].processCount, 2, 'Repeat job completed processCount is 2 ') | ||
t.equal(repeatedJobs[0].log.length, 5, 'Repeat job completed log count is valid') | ||
let log = repeatedJobs[0].getLastLog() | ||
t.ok(is.date(log.date), 'Repeat log date is a date') | ||
t.equal(log.data, tData, 'Repeat log data is valid') | ||
t.equal(log.message, enums.status.completed, 'Repeat log message is valid') | ||
t.equal(log.queueId, q.id, 'Repeat log queueId is valid') | ||
t.equal(log.processCount, 2, 'Repeat log processCount is 2') | ||
t.equal(log.status, enums.status.waiting, 'Repeat log status is valid') | ||
t.equal(log.type, enums.log.information, 'Repeat log type is valid') | ||
repeatedJobs[0].processCount++ | ||
return repeatedJobs[0].update() | ||
}).then((updatedJob) => { | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return jobCompleted(updatedJob, tData) | ||
}).then((completedIds) => { | ||
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids') | ||
return q.getJob(completedIds[0]) | ||
}).then((repeatedJobs) => { | ||
beforeDate = datetime.add.min(-1) | ||
afterDate = datetime.add.min(6) | ||
t.equal(repeatedJobs[0].status, enums.status.completed, 'Repeat job completed status is completed') | ||
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes') | ||
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now') | ||
t.ok(repeatedJobs[0].progress === 100, 'Repeat job completed progress is 100 ') | ||
t.equal(repeatedJobs[0].processCount, 3, 'Repeat job completed processCount is 3 ') | ||
t.equal(repeatedJobs[0].log.length, 7, 'Repeat job completed log count is valid') | ||
let log = repeatedJobs[0].getLastLog() | ||
t.ok(is.date(log.date), 'Repeat log date is a date') | ||
t.equal(log.data, tData, 'Repeat log data is valid') | ||
t.equal(log.message, enums.status.completed, 'Repeat log message is valid') | ||
t.equal(log.queueId, q.id, 'Repeat log queueId is valid') | ||
t.equal(log.processCount, 3, 'Repeat log processCount is 3') | ||
t.equal(log.status, enums.status.completed, 'Repeat log status is valid') | ||
t.equal(log.type, enums.log.information, 'Repeat log type is valid') | ||
// ---------- Job Completed with Remove Test ---------- | ||
@@ -92,3 +245,5 @@ t.comment('job-completed: Job Completed with Remove') | ||
t.ok(resetResult >= 0, 'Queue reset') | ||
removeEventHandlers() | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
q.stop() | ||
@@ -95,0 +250,0 @@ return resolve(t.end()) |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -10,7 +10,9 @@ const is = require('../src/is') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'job-failed' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('job-failed', (t) => { | ||
t.plan(88) | ||
test(testName, (t) => { | ||
t.plan(155) | ||
@@ -20,33 +22,30 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
function failedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.equal(jobId, job.id, | ||
`Event: Job failed [${jobId}]`) | ||
} | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 1, | ||
reset: 1, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 2, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 5, | ||
terminated: 2, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
function terminatedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.equal(jobId, job.id, | ||
`Event: Job terminated [${jobId}]`) | ||
} | ||
} | ||
function removedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.equal(jobId, job.id, | ||
`Event: Job removed [${jobId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.failed, failedEventHandler) | ||
q.on(enums.status.terminated, terminatedEventHandler) | ||
q.on(enums.status.removed, removedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.failed, failedEventHandler) | ||
q.removeListener(enums.status.terminated, terminatedEventHandler) | ||
q.removeListener(enums.status.removed, removedEventHandler) | ||
} | ||
@@ -64,3 +63,3 @@ let job = q.createJob() | ||
// ---------- Job Failed Retry 0 Test ---------- | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
t.comment('job-failed: Original Job Failure') | ||
@@ -184,6 +183,70 @@ return jobFailed(savedJob[0], err) | ||
t.equal(exist.length, 0, 'Job not in database') | ||
// ---------- Job Failed with Repeat Test ---------- | ||
t.comment('job-failed: Job Failed with Repeat') | ||
job = q.createJob() | ||
job.data = tData | ||
job.retryMax = 1 | ||
job.retryDelay = 200 | ||
job.repeat = 1 | ||
job.repeatDelay = 500 | ||
q._removeFinishedJobs = false | ||
return q.addJob(job) | ||
}).then((savedJob) => { | ||
t.equal(savedJob[0].id, job.id, 'Job saved successfully') | ||
return jobFailed(savedJob[0], err) | ||
}).then((retry1id) => { | ||
t.equal(retry1id.length, 1, 'Job failed successfully') | ||
t.equal(retry1id[0], job.id, 'Job failed returned job id') | ||
return q.getJob(retry1id[0]) | ||
}).then((retry1) => { | ||
t.equal(retry1[0].status, enums.status.failed, 'Job status is failed') | ||
t.equal(retry1[0].retryCount, 1, 'Job retryCount is 1') | ||
t.equal(retry1[0].progress, 0, 'Job progress is 0') | ||
t.equal(retry1[0].queueId, q.id, 'Job queueId is valid') | ||
t.ok(is.date(retry1[0].dateFinished), 'Job dateFinished is a date') | ||
t.ok(is.date(retry1[0].dateEnable), 'Job dateEnable is a date') | ||
t.equal(retry1[0].log.length, 2, 'Job has 1 log entry') | ||
t.ok(is.date(retry1[0].log[1].date), 'Log date is a date') | ||
t.equal(retry1[0].log[1].queueId, q.id, 'Log queueId is valid') | ||
t.equal(retry1[0].log[1].type, enums.log.warning, 'Log type is warning') | ||
t.equal(retry1[0].log[1].status, enums.status.failed, 'Log status is failed') | ||
t.ok(retry1[0].log[1].retryCount = 1, 'Log retryCount is valid') | ||
t.ok(retry1[0].log[1].message, 'Log message exists') | ||
t.equal(retry1[0].log[1].message, enums.message.failed, 'Log message is valid') | ||
t.equal(retry1[0].log[1].errorMessage, err.message, 'Log error message is valid') | ||
t.equal(retry1[0].log[1].errorStack, err.stack, 'Log stack is valid') | ||
t.ok(retry1[0].log[1].duration >= 0, 'Log duration is >= 0') | ||
// ---------- Final Retry Job Repeat Failure Test ---------- | ||
t.comment('job-failed: Final Retry Job Repeat Failure') | ||
return jobFailed(retry1[0], err) | ||
}).then((retry2id) => { | ||
t.equal(retry2id.length, 1, 'Job failed successfully') | ||
t.equal(retry2id[0], job.id, 'Job failed returned job id') | ||
return q.getJob(retry2id[0]) | ||
}).then((retry2) => { | ||
t.equal(retry2[0].status, enums.status.waiting, 'Job status is waiting') | ||
t.equal(retry2[0].retryCount, 0, 'Job retryCount is 0') | ||
t.equal(retry2[0].progress, 0, 'Job progress is 0') | ||
t.equal(retry2[0].queueId, q.id, 'Job queueId is valid') | ||
t.ok(is.date(retry2[0].dateFinished), 'Job dateFinished is a date') | ||
t.ok(is.date(retry2[0].dateEnable), 'Job dateEnable is a date') | ||
t.equal(retry2[0].log.length, 3, 'Job has 2 log entries') | ||
t.ok(is.date(retry2[0].log[2].date), 'Log date is a date') | ||
t.equal(retry2[0].log[2].queueId, q.id, 'Log queueId is valid') | ||
t.equal(retry2[0].log[2].type, enums.log.error, 'Log type is error') | ||
t.equal(retry2[0].log[2].status, enums.status.waiting, 'Log status is waiting') | ||
t.ok(retry2[0].log[2].retryCount = 2, 'Log retryCount is valid') | ||
t.ok(retry2[0].log[2].message, 'Log message exists') | ||
t.equal(retry2[0].log[2].message, enums.message.failed, 'Log message is valid') | ||
t.equal(retry2[0].log[2].errorMessage, err.message, 'Log error message is valid') | ||
t.equal(retry2[0].log[2].errorStack, err.stack, 'Log stack is valid') | ||
t.ok(retry2[0].log[2].duration >= 0, 'Log duration is >= 0') | ||
return q.reset() | ||
}).then((resetResult) => { | ||
t.ok(resetResult >= 0, 'Queue reset') | ||
removeEventHandlers() | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
q.stop() | ||
@@ -190,0 +253,0 @@ return resolve(t.end()) |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -10,7 +10,9 @@ const is = require('../src/is') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'job-log' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('job-log', (t) => { | ||
t.plan(47) | ||
test(testName, (t) => { | ||
t.plan(76) | ||
@@ -22,20 +24,35 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
let testEvents = false | ||
function logEventHandler (jobId) { | ||
if (testEvents) { | ||
t.equal(jobId, job.id, `Event: log [${jobId}]`) | ||
} | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 0, | ||
reset: 1, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 1, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 4, | ||
updated: 0 | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.log, logEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.log, logEventHandler) | ||
} | ||
return q.reset().then((resetResult) => { | ||
t.ok(is.integer(resetResult), 'Queue reset') | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
return q.addJob(job) | ||
@@ -59,2 +76,3 @@ }).then((newJob) => { | ||
t.ok(jobWithLog1[0].log[1].retryCount >= 0, 'Log retryCount is valid') | ||
t.ok(jobWithLog1[0].log[1].processCount >= 0, 'Log processCount is valid') | ||
t.equal(jobWithLog1[0].log[1].message, tData, 'Log 1 message is valid') | ||
@@ -77,2 +95,3 @@ t.equal(jobWithLog1[0].log[1].data, tData, 'Log 1 detail is valid') | ||
t.ok(jobWithLog2[0].log[2].retryCount >= 0, 'Log retryCount is valid') | ||
t.ok(jobWithLog2[0].log[2].processCount >= 0, 'Log processCount is valid') | ||
t.equal(jobWithLog2[0].log[2].message, tData, 'Log 2 message is valid') | ||
@@ -95,2 +114,3 @@ t.equal(jobWithLog2[0].log[2].data, tData, 'Log 2 data is valid') | ||
t.ok(jobWithLog3[0].log[3].retryCount >= 0, 'Log retryCount is valid') | ||
t.ok(jobWithLog3[0].log[3].processCount >= 0, 'Log processCount is valid') | ||
t.equal(jobWithLog3[0].log[3].message, enums.message.seeLogData, 'Log 3 message is valid') | ||
@@ -113,2 +133,3 @@ t.ok(is.object(jobWithLog3[0].log[3].data), 'Log 3 data is valid') | ||
t.ok(jobWithLog4[0].log[4].retryCount >= 0, 'Log retryCount is valid') | ||
t.ok(jobWithLog4[0].log[4].processCount >= 0, 'Log processCount is valid') | ||
t.equal(jobWithLog4[0].log[4].message, enums.message.seeLogData, 'Log 4 message is valid') | ||
@@ -121,3 +142,5 @@ t.equal(jobWithLog4[0].log[4].data.foo, 'bar', 'Log 4 data object is valid') | ||
t.ok(resetResult >= 0, 'Queue reset') | ||
removeEventHandlers() | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
q.stop() | ||
@@ -124,0 +147,0 @@ return resolve(t.end()) |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const tError = require('./test-error') | ||
@@ -8,3 +8,3 @@ const enums = require('../src/enums') | ||
test('job-options', (t) => { | ||
t.plan(28) | ||
t.plan(54) | ||
@@ -15,5 +15,6 @@ try { | ||
t.equal(to.timeout, enums.options.timeout, 'Job default timeout option is valid') | ||
t.equal(to.retryMax, enums.options.retryMax, 'Job default retryMax option is 3') | ||
t.equal(to.retryDelay, enums.options.retryDelay, 'Job default retryDelay option is 600') | ||
t.equal(to.retryMax, enums.options.retryMax, 'Job default retryMax option is valid') | ||
t.equal(to.retryDelay, enums.options.retryDelay, 'Job default retryDelay option is valid') | ||
t.equal(to.repeat, enums.options.repeat, 'Job default repeat option is valid') | ||
t.equal(to.repeatDelay, enums.options.repeatDelay, 'Job default repeatDelay option is valid') | ||
to = jobOptions({ | ||
@@ -23,33 +24,61 @@ priority: 'high', | ||
retryMax: 8, | ||
retryDelay: 200000 | ||
retryDelay: 200000, | ||
repeat: 4, | ||
repeatDelay: 1000 | ||
}, to) | ||
t.equal(to.priority, 'high', 'Job custom priority option is correct') | ||
t.equal(to.timeout, 100000, 'Job custom timeout option is correct') | ||
t.equal(to.retryMax, 8, 'Job custom retryMax option is correct') | ||
t.equal(to.retryDelay, 200000, 'Job custom retryDelay option is correct') | ||
t.equal(to.priority, 'high', 'Job custom priority option is valid') | ||
t.equal(to.timeout, 100000, 'Job custom timeout option is valid') | ||
t.equal(to.retryMax, 8, 'Job custom retryMax option is valid') | ||
t.equal(to.retryDelay, 200000, 'Job custom retryDelay option is valid') | ||
t.equal(to.repeat, 4, 'Job custom repeat option is valid') | ||
t.equal(to.repeatDelay, 1000, 'Job custom repeatDelay option is valid') | ||
to = jobOptions({ priority: 'lowest' }, to) | ||
t.equal(to.priority, 'lowest', 'Job priority custom priority option is correct') | ||
t.equal(to.timeout, 100000, 'Job priority custom timeout option is correct') | ||
t.equal(to.retryMax, 8, 'Job priority custom retryMax option is correct') | ||
t.equal(to.retryDelay, 200000, 'Job priority custom retryDelay option is correct') | ||
t.equal(to.priority, 'lowest', 'Job priority custom priority option is valid') | ||
t.equal(to.timeout, 100000, 'Job priority custom timeout option is valid') | ||
t.equal(to.retryMax, 8, 'Job priority custom retryMax option is valid') | ||
t.equal(to.retryDelay, 200000, 'Job priority custom retryDelay option is valid') | ||
t.equal(to.repeat, 4, 'Job priority custom repeat option is valid') | ||
t.equal(to.repeatDelay, 1000, 'Job priority custom repeatDelay option is valid') | ||
to = jobOptions({ timeout: 700000 }, to) | ||
t.equal(to.priority, 'lowest', 'Job timeout custom priority option is correct') | ||
t.equal(to.timeout, 700000, 'Job timeout custom timeout option is correct') | ||
t.equal(to.retryMax, 8, 'Job timeout custom retryMax option is correct') | ||
t.equal(to.retryDelay, 200000, 'Job timeout custom retryDelay option is correct') | ||
t.equal(to.priority, 'lowest', 'Job timeout custom priority option is valid') | ||
t.equal(to.timeout, 700000, 'Job timeout custom timeout option is valid') | ||
t.equal(to.retryMax, 8, 'Job timeout custom retryMax option is valid') | ||
t.equal(to.retryDelay, 200000, 'Job timeout custom retryDelay option is valid') | ||
t.equal(to.repeat, 4, 'Job timeout custom repeat option is valid') | ||
t.equal(to.repeatDelay, 1000, 'Job timeout custom repeatDelay option is valid') | ||
to = jobOptions({ retryMax: 2 }, to) | ||
t.equal(to.priority, 'lowest', 'Job retryMax custom priority option is correct') | ||
t.equal(to.timeout, 700000, 'Job retryMax custom timeout option is correct') | ||
t.equal(to.retryMax, 2, 'Job retryMax custom retryMax option is correct') | ||
t.equal(to.retryDelay, 200000, 'Job retryMax custom retryDelay option is correct') | ||
t.equal(to.priority, 'lowest', 'Job retryMax custom priority option is valid') | ||
t.equal(to.timeout, 700000, 'Job retryMax custom timeout option is valid') | ||
t.equal(to.retryMax, 2, 'Job retryMax custom retryMax option is valid') | ||
t.equal(to.retryDelay, 200000, 'Job retryMax custom retryDelay option is valid') | ||
t.equal(to.repeat, 4, 'Job retryMax custom repeat option is valid') | ||
t.equal(to.repeatDelay, 1000, 'Job retryMax custom repeatDelay option is valid') | ||
to = jobOptions({ retryDelay: 800000 }, to) | ||
t.equal(to.priority, 'lowest', 'Job retryDelay custom priority option is correct') | ||
t.equal(to.timeout, 700000, 'Job retryDelay custom timeout option is correct') | ||
t.equal(to.retryMax, 2, 'Job retryDelay custom retryMax option is correct') | ||
t.equal(to.retryDelay, 800000, 'Job retryDelay custom retryDelay option is correct') | ||
t.equal(to.priority, 'lowest', 'Job retryDelay custom priority option is valid') | ||
t.equal(to.timeout, 700000, 'Job retryDelay custom timeout option is valid') | ||
t.equal(to.retryMax, 2, 'Job retryDelay custom retryMax option is valid') | ||
t.equal(to.retryDelay, 800000, 'Job retryDelay custom retryDelay option is valid') | ||
t.equal(to.repeat, 4, 'Job retryDelay custom repeat option is valid') | ||
t.equal(to.repeatDelay, 1000, 'Job retryDelay custom repeatDelay option is valid') | ||
to = jobOptions({ repeat: false }, to) | ||
t.equal(to.priority, 'lowest', 'Job repeat custom priority option is valid') | ||
t.equal(to.timeout, 700000, 'Job repeat custom timeout option is valid') | ||
t.equal(to.retryMax, 2, 'Job repeat custom retryMax option is valid') | ||
t.equal(to.retryDelay, 800000, 'Job repeat custom retryDelay option is valid') | ||
t.equal(to.repeat, false, 'Job repeat custom repeat option is valid') | ||
t.equal(to.repeatDelay, 1000, 'Job repeat custom repeatDelay option is valid') | ||
to = jobOptions({ repeatDelay: 2000 }, to) | ||
t.equal(to.priority, 'lowest', 'Job repeatDelay custom priority option is valid') | ||
t.equal(to.timeout, 700000, 'Job repeatDelay custom timeout option is valid') | ||
t.equal(to.retryMax, 2, 'Job repeatDelay custom retryMax option is valid') | ||
t.equal(to.retryDelay, 800000, 'Job repeatDelay custom retryDelay option is valid') | ||
t.equal(to.repeat, false, 'Job repeatDelay custom repeat option is valid') | ||
t.equal(to.repeatDelay, 2000, 'Job repeatDelay custom repeatDelay option is valid') | ||
to = jobOptions({ | ||
@@ -59,8 +88,12 @@ priority: 'oops', | ||
retryMax: -30, | ||
retryDelay: -40 | ||
retryDelay: -40, | ||
repeat: -50, | ||
repeatDelay: -60 | ||
}, to) | ||
t.equal(to.priority, 'lowest', 'Job invalid priority option is correct') | ||
t.equal(to.timeout, 700000, 'Job invalid timeout option is correct') | ||
t.equal(to.retryMax, 2, 'Job invalid retryMax option is correct') | ||
t.equal(to.retryDelay, 800000, 'Job invalid retryDelay option is correct') | ||
t.equal(to.priority, 'lowest', 'Job invalid priority option is reverted') | ||
t.equal(to.timeout, 700000, 'Job invalid timeout option is reverted') | ||
t.equal(to.retryMax, 2, 'Job invalid retryMax option is reverted') | ||
t.equal(to.retryDelay, 800000, 'Job invalid retryDelay option is reverted') | ||
t.equal(to.repeat, false, 'Job invalid repeat option is reverted') | ||
t.equal(to.repeatDelay, 2000, 'Job invalid repeatDelay option is reverted') | ||
} catch (err) { | ||
@@ -67,0 +100,0 @@ tError(err, module, t) |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const is = require('../src/is') | ||
@@ -3,0 +3,0 @@ const jobParse = require('../src/job-parse') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -11,7 +11,8 @@ const datetime = require('../src/datetime') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'job-progress' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('job-progress', (t) => { | ||
t.plan(46) | ||
test(testName, (t) => { | ||
t.plan(47) | ||
@@ -23,6 +24,6 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
job.retryCount = 0 | ||
let oldPercent | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
@@ -51,2 +52,3 @@ ready: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
@@ -64,3 +66,3 @@ updated: 0 | ||
}).then((updatedJob) => { | ||
t.ok(updatedJob, 'Job updated successfully') | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return job.q.r.db(job.q.db).table(job.q.name).get(job.id).update({ | ||
@@ -82,3 +84,3 @@ retryCount: 1 | ||
}).then((updatedJob) => { | ||
t.ok(updatedJob, 'Job updated successfully') | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return q.getJob(job.id) | ||
@@ -96,3 +98,3 @@ }).then((updatedJob) => { | ||
}).then((updatedJob) => { | ||
t.ok(updatedJob, 'Job updated successfully') | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return q.getJob(job.id) | ||
@@ -104,3 +106,3 @@ }).then((updatedJob) => { | ||
}).then((updatedJob) => { | ||
t.ok(updatedJob, 'Job updated successfully') | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return q.getJob(job.id) | ||
@@ -112,3 +114,3 @@ }).then((updatedJob) => { | ||
}).then((updatedJob) => { | ||
t.ok(updatedJob, 'Job updated successfully') | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return q.getJob(job.id) | ||
@@ -121,3 +123,3 @@ }).then((updatedJob) => { | ||
}).then((updatedJob) => { | ||
t.ok(updatedJob, 'Job updated successfully') | ||
t.ok(is.job(updatedJob), 'Job updated successfully') | ||
return q.getJob(job.id) | ||
@@ -127,8 +129,8 @@ }).then((updatedJob) => { | ||
updatedJob[0].status = enums.status.failed | ||
return jobProgress(updatedJob[0], 101) | ||
}).then((inactiveResult) => { | ||
t.notOk(inactiveResult, 'Inactive job returns false') | ||
return jobProgress(updatedJob[0], 101).catch((err) => { | ||
t.ok(is.error(err), 'Inactive job rejects Promise') | ||
}) | ||
}).then(() => { | ||
// | ||
// ---------- Event Summary ---------- | ||
t.comment('job-progress: Event Summary') | ||
eventHandlers.remove(t, q, state) | ||
@@ -135,0 +137,0 @@ return q.reset() |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -10,7 +10,9 @@ const is = require('../src/is') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'job-update' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('job-update', (t) => { | ||
t.plan(31) | ||
test(testName, (t) => { | ||
t.plan(55) | ||
@@ -23,16 +25,30 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
function updatedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.equal(jobId, job.id, `Event: updated [${jobId}]`) | ||
} | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 0, | ||
reset: 1, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 0, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 1 | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.updated, updatedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.updated, updatedEventHandler) | ||
} | ||
@@ -51,3 +67,3 @@ return q.reset().then((resetResult) => { | ||
// ---------- Job Update Test ---------- | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
t.comment('job-update: Update') | ||
@@ -59,3 +75,3 @@ job = savedJobs1[0] | ||
}).then((updateResult) => { | ||
t.ok(updateResult, 'Job updated successfully') | ||
t.ok(is.job(updateResult), 'Job updated successfully') | ||
return q.getJob(job.id) | ||
@@ -90,3 +106,5 @@ }).then((updatedJob) => { | ||
t.ok(resetResult >= 0, 'Queue reset') | ||
removeEventHandlers() | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
q.stop() | ||
@@ -93,0 +111,0 @@ return resolve(t.end()) |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -10,11 +10,13 @@ const is = require('../src/is') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'job' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('job', (t) => { | ||
t.plan(81) | ||
test(testName, (t) => { | ||
t.plan(114) | ||
const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
const newJob = new Job(q) | ||
let newJob = new Job(q) | ||
newJob.data = tData | ||
@@ -24,38 +26,31 @@ let savedJob | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
function addedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: added [${jobId}]`) | ||
} | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 1, | ||
processing: 0, | ||
progress: 1, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 0, | ||
reset: 1, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 1, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 1, | ||
updated: 0 | ||
} | ||
function logEventHandler (jobId) { | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: log [${jobId}]`) | ||
} | ||
} | ||
function progressEventHandler (jobId) { | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`) | ||
} | ||
} | ||
function updatedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: updated [${jobId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.added, addedEventHandler) | ||
q.on(enums.status.log, logEventHandler) | ||
q.on(enums.status.progress, progressEventHandler) | ||
q.on(enums.status.updated, updatedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.added, addedEventHandler) | ||
q.removeListener(enums.status.log, logEventHandler) | ||
q.removeListener(enums.status.progress, progressEventHandler) | ||
q.removeListener(enums.status.updated, updatedEventHandler) | ||
} | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
@@ -94,2 +89,14 @@ // ---------- New Job Tests ---------- | ||
t.equal(newJob.retryDelay, 100, 'Job setRetryDelay successfully changed value') | ||
t.throws(() => { newJob.setRepeat('not valid') }, 'Job setRepeat thows if invalid') | ||
newJob.setRepeat(100) | ||
t.equal(newJob.repeat, 100, 'Job setRepeat successfully changed value') | ||
t.throws(() => { newJob.setRepeat(-10) }, 'Job setRepeat thows if negative') | ||
newJob.setRepeat(true) | ||
t.ok(is.true(newJob.repeat), 'Job setRepeat successfully changed value to true') | ||
newJob.setRepeat(false) | ||
t.ok(is.false(newJob.repeat), 'Job setRepeat successfully changed value to false') | ||
t.throws(() => { newJob.setRepeatDelay('not valid') }, 'Job setRepeatDelay thows if invalid') | ||
t.throws(() => { newJob.setRepeatDelay(-10) }, 'Job setRepeatDelay thows if negative') | ||
newJob.setRepeatDelay(100) | ||
t.equal(newJob.repeatDelay, 100, 'Job setRetryDelay successfully changed value') | ||
t.throws(() => { newJob.setDateEnable('not valid') }, 'Job setDateEnable thows if invalid') | ||
@@ -103,3 +110,3 @@ const testDate = new Date() | ||
let cleanJob = newJob.getCleanCopy() | ||
t.equal(Object.keys(cleanJob).length, 13, 'Clean job has valid number of properties') | ||
t.equal(Object.keys(cleanJob).length, 16, 'Clean job has valid number of properties') | ||
t.equal(cleanJob.id, newJob.id, 'Clean job has valid id') | ||
@@ -165,3 +172,3 @@ t.equal(cleanJob.data, newJob.data, 'Clean job data is valid') | ||
t.equal(custJob.priority, 'high', 'New job with new priority created successfully') | ||
t.throws(() => { new Job(q, () => { }) }, 'New job with function throws error') | ||
t.throws(() => { newJob = new Job(q, () => { }) }, 'New job with function throws error') | ||
@@ -188,5 +195,5 @@ // ---------- Add Job Log ---------- | ||
savedJob.status = enums.status.active | ||
return savedJob.setProgress(50) | ||
return savedJob.updateProgress(50) | ||
}).then((progressResult) => { | ||
t.ok(progressResult, 'Job setProgress returned true') | ||
t.ok(progressResult, 'Job updateProgress returned true') | ||
return q.getJob(savedJob.id) | ||
@@ -197,3 +204,3 @@ }).then((jobsFromDb) => { | ||
removeEventHandlers() | ||
eventHandlers.remove(t, q, state) | ||
return q.reset() | ||
@@ -200,0 +207,0 @@ }).then((resetResult) => { |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -9,15 +9,41 @@ const is = require('../src/is') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-add-job' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-add-job', (t) => { | ||
t.plan(30) | ||
test(testName, (t) => { | ||
t.plan(51) | ||
const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
let addedCount = 0 | ||
function addedEventHandler (jobId) { | ||
addedCount++ | ||
t.ok(is.uuid(jobId), `Event: added [${addedCount}] [${jobId}]`) | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 0, | ||
reset: 0, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 3, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
q.on(enums.status.added, addedEventHandler) | ||
@@ -32,2 +58,3 @@ const job = q.createJob() | ||
t.ok(is.integer(resetResult), 'Queue reset') | ||
eventHandlers.add(t, q, state) | ||
@@ -88,14 +115,5 @@ // ---------- Add Single Job Tests ---------- | ||
}).then(() => { | ||
job.status = enums.status.waiting | ||
// ---------- Add Invalid Status Job Tests ---------- | ||
t.comment('queue-add-job: Add Invalid Status Job') | ||
return queueAddJob(q, job).then(() => { | ||
t.fail('Promise is not being rejected when job status is invalid') | ||
}).catch((err) => { | ||
t.equal(err.message, enums.message.jobAlreadyAdded, 'Job with status not equal to created returns a rejected promise') | ||
}) | ||
}).then(() => { | ||
t.equal(addedCount, 3, 'Jobs added event count is valid') | ||
q.removeListener(enums.status.added, addedEventHandler) | ||
// | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
return q.reset() | ||
@@ -102,0 +120,0 @@ }).then((resetResult) => { |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -10,7 +10,9 @@ const is = require('../src/is') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-cancel-job' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-cancel-job', (t) => { | ||
t.plan(40) | ||
test(testName, (t) => { | ||
t.plan(70) | ||
@@ -20,23 +22,30 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
function cancelledEventHandler (jobId) { | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: Job cancelled [${jobId}]`) | ||
} | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 5, | ||
reset: 1, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 6, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 11, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
function removedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: Job removed [${jobId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.cancelled, cancelledEventHandler) | ||
q.on(enums.status.removed, removedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.cancelled, cancelledEventHandler) | ||
q.removeListener(enums.status.removed, removedEventHandler) | ||
} | ||
@@ -55,3 +64,3 @@ const jobsToCreate = 5 | ||
// ---------- Cancel Multiple Jobs Tests ---------- | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
t.comment('queue-cancel-job: Cancel Multiple Jobs') | ||
@@ -107,3 +116,6 @@ return queueCancelJob(q, savedJobs, tData) | ||
t.ok(resetResult >= 0, 'Queue reset') | ||
removeEventHandlers() | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
q.stop() | ||
@@ -110,0 +122,0 @@ return resolve(t.end()) |
@@ -1,14 +0,15 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
const is = require('../src/is') | ||
const tError = require('./test-error') | ||
const enums = require('../src/enums') | ||
const Queue = require('../src/queue') | ||
const dbReview = require('../src/db-review') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-change' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-change', (t) => { | ||
t.plan(84) | ||
test(testName, (t) => { | ||
t.plan(61) | ||
@@ -19,256 +20,35 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
let readyEventCount = 0 | ||
let readyEventTotal = 1 | ||
function readyEventHandler (queueId) { | ||
readyEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: ready [${readyEventCount} of ${readyEventTotal}] [${queueId}]`) | ||
} | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 1, | ||
processing: 3, | ||
progress: 3, | ||
pausing: 3, | ||
paused: 3, | ||
resumed: 2, | ||
removed: 1, | ||
reset: 0, | ||
error: 0, | ||
reviewed: 2, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 3, | ||
waiting: 0, | ||
active: 3, | ||
completed: 1, | ||
cancelled: 1, | ||
failed: 1, | ||
terminated: 1, | ||
reanimated: 0, | ||
log: 1, | ||
updated: 0 | ||
} | ||
let addedEventCount = 0 | ||
let addedEventTotal = 3 | ||
function addedEventHandler (jobId) { | ||
addedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: added [${addedEventCount} of ${addedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let activeEventCount = 0 | ||
let activeEventTotal = 3 | ||
function activeEventHandler (jobId) { | ||
activeEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: active [${activeEventCount} of ${activeEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let processingEventCount = 0 | ||
let processingEventTotal = 3 | ||
function processingEventHandler (jobId) { | ||
processingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: processing [${processingEventCount} of ${processingEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let progressEventCount = 0 | ||
let progressEventTotal = 3 | ||
function progressEventHandler (jobId, percent) { | ||
progressEventCount++ | ||
if (testEvents) { | ||
t.pass(`Event: progress [${progressEventCount} of ${progressEventTotal}]`) | ||
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`) | ||
t.ok(is.number(percent), `Event: progress [${percent}%]`) | ||
} | ||
} | ||
let completedEventCount = 0 | ||
let completedEventTotal = 1 | ||
function completedEventHandler (jobId) { | ||
completedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: completed [${completedEventCount} of ${completedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let cancelledEventCount = 0 | ||
let cancelledEventTotal = 1 | ||
function cancelledEventHandler (jobId) { | ||
cancelledEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: cancelled [${cancelledEventCount} of ${cancelledEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let failedEventCount = 0 | ||
let failedEventTotal = 1 | ||
function failedEventHandler (jobId) { | ||
failedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: failed [${failedEventCount} of ${failedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let terminatedEventCount = 0 | ||
let terminatedEventTotal = 1 | ||
function terminatedEventHandler (jobId) { | ||
terminatedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: terminated [${terminatedEventCount} of ${terminatedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let logEventCount = 0 | ||
let logEventTotal = 1 | ||
function logEventHandler (jobId) { | ||
logEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: log [${logEventCount} of ${logEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let updatedEventCount = 0 | ||
let updatedEventTotal = 0 | ||
function updatedEventHandler (jobId) { | ||
updatedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: updated [${updatedEventCount} of ${updatedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let pausingEventCount = 0 | ||
let pausingEventTotal = 3 | ||
function pausingEventHandler (global, queueId) { | ||
pausingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: pausing [${pausingEventCount} of ${pausingEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`) | ||
} | ||
} | ||
let pausedEventCount = 0 | ||
let pausedEventTotal = 3 | ||
function pausedEventHandler (global, queueId) { | ||
pausedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: paused [${pausedEventCount} of ${pausedEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: paused [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`) | ||
} | ||
} | ||
let resumedEventCount = 0 | ||
let resumedEventTotal = 2 | ||
function resumedEventHandler (global, queueId) { | ||
resumedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: resumed [${resumedEventCount} of ${resumedEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`) | ||
} | ||
} | ||
let removedEventCount = 0 | ||
let removedEventTotal = 1 | ||
function removedEventHandler (jobId) { | ||
removedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: removed [${removedEventCount} of ${removedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let idleEventCount = 0 | ||
let idleEventTotal = 1 | ||
function idleEventHandler (queueId) { | ||
if (idleEventCount < 1) { | ||
idleEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: idle [${idleEventCount} of ${idleEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
} | ||
let reviewedEventCount = 0 | ||
let reviewedEventTotal = 2 | ||
function reviewedEventHandler (result) { | ||
reviewedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.object(result), `Event: reviewed [${reviewedEventCount} of ${reviewedEventTotal}]`) | ||
} | ||
} | ||
let resetEventCount = 0 | ||
let resetEventTotal = 0 | ||
function resetEventHandler (total) { | ||
resetEventCount++ | ||
if (testEvents) { | ||
t.ok(is.integer(total), `Event: reset [${resetEventCount} of ${resetEventTotal}] [${total}]`) | ||
} | ||
} | ||
let errorEventCount = 0 | ||
let errorEventTotal = 0 | ||
function errorEventHandler (err) { | ||
errorEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(err.message), `Event: error [${errorEventCount} of ${errorEventTotal}] [${err.message}]`) | ||
} | ||
} | ||
let detachedEventCount = 0 | ||
let detachedEventTotal = 0 | ||
function detachedEventHandler (queueId) { | ||
detachedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: detached [${detachedEventCount} of ${detachedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let stoppingEventCount = 0 | ||
let stoppingEventTotal = 0 | ||
function stoppingEventHandler (queueId) { | ||
stoppingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: stopping [${stoppingEventCount} of ${stoppingEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let stoppedEventCount = 0 | ||
let stoppedEventTotal = 0 | ||
function stoppedEventHandler (queueId) { | ||
stoppedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: stopped [${stoppedEventCount} of ${stoppedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let droppedEventCount = 0 | ||
let droppedEventTotal = 0 | ||
function droppedEventHandler (queueId) { | ||
droppedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: dropped [${droppedEventCount} of ${droppedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.ready, readyEventHandler) | ||
q.on(enums.status.added, addedEventHandler) | ||
q.on(enums.status.active, activeEventHandler) | ||
q.on(enums.status.processing, processingEventHandler) | ||
q.on(enums.status.progress, progressEventHandler) | ||
q.on(enums.status.completed, completedEventHandler) | ||
q.on(enums.status.cancelled, cancelledEventHandler) | ||
q.on(enums.status.failed, failedEventHandler) | ||
q.on(enums.status.terminated, terminatedEventHandler) | ||
q.on(enums.status.paused, pausedEventHandler) | ||
q.on(enums.status.pausing, pausingEventHandler) | ||
q.on(enums.status.resumed, resumedEventHandler) | ||
q.on(enums.status.removed, removedEventHandler) | ||
q.on(enums.status.log, logEventHandler) | ||
q.on(enums.status.updated, updatedEventHandler) | ||
q.on(enums.status.idle, idleEventHandler) | ||
q.on(enums.status.reviewed, reviewedEventHandler) | ||
q.on(enums.status.reset, resetEventHandler) | ||
q.on(enums.status.error, errorEventHandler) | ||
q.on(enums.status.detached, detachedEventHandler) | ||
q.on(enums.status.stopping, stoppingEventHandler) | ||
q.on(enums.status.stopped, stoppedEventHandler) | ||
q.on(enums.status.dropped, droppedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.ready, readyEventHandler) | ||
q.removeListener(enums.status.added, addedEventHandler) | ||
q.removeListener(enums.status.active, activeEventHandler) | ||
q.removeListener(enums.status.processing, processingEventHandler) | ||
q.removeListener(enums.status.progress, progressEventHandler) | ||
q.removeListener(enums.status.completed, completedEventHandler) | ||
q.removeListener(enums.status.cancelled, cancelledEventHandler) | ||
q.removeListener(enums.status.failed, failedEventHandler) | ||
q.removeListener(enums.status.terminated, terminatedEventHandler) | ||
q.removeListener(enums.status.paused, pausedEventHandler) | ||
q.removeListener(enums.status.pausing, pausingEventHandler) | ||
q.removeListener(enums.status.resumed, resumedEventHandler) | ||
q.removeListener(enums.status.removed, removedEventHandler) | ||
q.removeListener(enums.status.log, logEventHandler) | ||
q.removeListener(enums.status.updated, updatedEventHandler) | ||
q.removeListener(enums.status.idle, idleEventHandler) | ||
q.removeListener(enums.status.reviewed, reviewedEventHandler) | ||
q.removeListener(enums.status.reset, resetEventHandler) | ||
q.removeListener(enums.status.error, errorEventHandler) | ||
q.removeListener(enums.status.detached, detachedEventHandler) | ||
q.removeListener(enums.status.stopping, stoppingEventHandler) | ||
q.removeListener(enums.status.stopped, stoppedEventHandler) | ||
q.removeListener(enums.status.dropped, droppedEventHandler) | ||
return q.resume() | ||
} | ||
let job = qPub.createJob() | ||
let processDelay = 500 | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
@@ -282,5 +62,5 @@ return qPub.reset().then((resetResult) => { | ||
t.equal(j.id, job.id, `Job Processed [${j.id}]`) | ||
next('queue-change') | ||
next(null, 'queue-change') | ||
}, processDelay) | ||
return j.setProgress(50) | ||
return j.updateProgress(50) | ||
}) | ||
@@ -335,30 +115,6 @@ | ||
}).delay(processDelay).then(() => { | ||
removeEventHandlers() | ||
// | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
// ---------- Event summary Test ---------- | ||
t.comment('queue-change: Event Summary') | ||
t.equal(readyEventCount, readyEventTotal, 'Ready event count valid') | ||
t.equal(addedEventCount, addedEventTotal, 'Added event count valid') | ||
t.equal(activeEventCount, activeEventTotal, 'Active event count valid') | ||
t.equal(processingEventCount, processingEventTotal, 'Processing event count valid') | ||
t.equal(progressEventCount, progressEventTotal, 'Progress event count valid') | ||
t.equal(completedEventCount, completedEventTotal, 'Completed event count valid') | ||
t.equal(cancelledEventCount, cancelledEventTotal, 'Cancelled event count valid') | ||
t.equal(failedEventCount, failedEventTotal, 'Failed event count valid') | ||
t.equal(terminatedEventCount, terminatedEventTotal, 'Terminated event count valid') | ||
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid') | ||
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid') | ||
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid') | ||
t.equal(removedEventCount, removedEventTotal, 'Removed event count valid') | ||
t.equal(logEventCount, logEventTotal, 'Log event count valid') | ||
t.equal(updatedEventCount, updatedEventTotal, 'Updated event count valid') | ||
t.equal(idleEventCount, idleEventTotal, 'Idle event count valid') | ||
t.equal(reviewedEventCount, reviewedEventTotal, 'Reviewed event count valid') | ||
t.equal(resetEventCount, resetEventTotal, 'Reset event count valid') | ||
t.equal(errorEventCount, errorEventTotal, 'Error event count valid') | ||
t.equal(detachedEventCount, detachedEventTotal, 'Detached event count valid') | ||
t.equal(stoppingEventCount, stoppingEventTotal, 'Stopping event count valid') | ||
t.equal(stoppedEventCount, stoppedEventTotal, 'Stopped event count valid') | ||
t.equal(droppedEventCount, droppedEventTotal, 'Dropped event count valid') | ||
return q.reset() | ||
@@ -365,0 +121,0 @@ }).then((resetResult) => { |
@@ -1,5 +0,4 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
const is = require('../src/is') | ||
const enums = require('../src/enums') | ||
const tError = require('./test-error') | ||
@@ -10,32 +9,45 @@ const queueDb = require('../src/queue-db') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-db' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-db', (t) => { | ||
t.plan(70) | ||
test(testName, (t) => { | ||
t.plan(75) | ||
const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
let readyEventCount = 0 | ||
q.on(enums.status.ready, function readyEventHandler (qid) { | ||
readyEventCount++ | ||
t.pass(`Event: Queue ready [${qid}]`) | ||
t.equal(qid, q.id, `Event: Queue ready id is valid`) | ||
if (readyEventCount >= 6) { | ||
this.removeListener(enums.status.ready, readyEventHandler) | ||
} | ||
}) | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 0, | ||
reset: 0, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 1, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 0, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
let detachEventCount = 0 | ||
q.on(enums.status.detached, function detachedEventHandler (qid) { | ||
detachEventCount++ | ||
t.pass(`Event: Queue detached [${qid}]`) | ||
t.equal(qid, q.id, `Event: Queue detached id valid`) | ||
if (detachEventCount >= 6) { | ||
this.removeListener(enums.status.detached, detachedEventHandler) | ||
} | ||
}) | ||
return q.reset().then((resetResult) => { | ||
t.ok(is.integer(resetResult), 'Queue reset') | ||
eventHandlers.add(t, q, state) | ||
q._masterInterval = 300 | ||
@@ -52,4 +64,6 @@ q._changeFeed = true | ||
t.comment('queue-db: Detach with Drain') | ||
return queueDb.detach(q, true) | ||
return queueDb.detach(q) | ||
}).then(() => { | ||
return queueDb.drain(q) | ||
}).then(() => { | ||
t.pass('Pass: Queue detached with pool drain') | ||
@@ -68,2 +82,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled') | ||
}).then((ready) => { | ||
eventHandlers.add(t, q, state) | ||
t.ok(ready, 'Queue in a ready state') | ||
@@ -75,4 +90,6 @@ t.ok(dbReview.isEnabled(q), 'Review is enabled') | ||
t.comment('queue-db: Detach with Drain') | ||
return queueDb.detach(q, true) | ||
return queueDb.detach(q) | ||
}).then(() => { | ||
return queueDb.drain(q) | ||
}).then(() => { | ||
t.pass('Pass: Queue detached with pool drain') | ||
@@ -93,2 +110,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled') | ||
}).then((ready) => { | ||
eventHandlers.add(t, q, state) | ||
t.ok(ready, 'Queue in a ready state') | ||
@@ -100,4 +118,6 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled') | ||
t.comment('queue-db: Detach with Drain') | ||
return queueDb.detach(q, true) | ||
return queueDb.detach(q) | ||
}).then(() => { | ||
return queueDb.drain(q) | ||
}).then(() => { | ||
t.pass('Pass: Queue detached with pool drain') | ||
@@ -118,2 +138,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled') | ||
}).then((ready) => { | ||
eventHandlers.add(t, q, state) | ||
t.ok(ready, 'Queue in a ready state') | ||
@@ -125,4 +146,6 @@ t.ok(dbReview.isEnabled(q), 'Review is enabled') | ||
t.comment('queue-db: Detach with Drain') | ||
return queueDb.detach(q, true) | ||
return queueDb.detach(q) | ||
}).then(() => { | ||
return queueDb.drain(q) | ||
}).then(() => { | ||
t.pass('Pass: Queue detached with pool drain') | ||
@@ -143,2 +166,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled') | ||
}).then((ready) => { | ||
eventHandlers.add(t, q, state) | ||
t.ok(ready, 'Queue in a ready state') | ||
@@ -150,4 +174,6 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled') | ||
t.comment('queue-db: Detach with Drain') | ||
return queueDb.detach(q, true) | ||
return queueDb.detach(q) | ||
}).then(() => { | ||
return queueDb.drain(q) | ||
}).then(() => { | ||
t.pass('Pass: Queue detached with pool drain') | ||
@@ -168,2 +194,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled') | ||
}).then((ready) => { | ||
eventHandlers.add(t, q, state) | ||
t.ok(ready, 'Queue in a ready state') | ||
@@ -173,5 +200,5 @@ t.ok(dbReview.isEnabled(q), 'Review is enabled') | ||
// ---------- Detach with Drain ---------- | ||
t.comment('queue-db: Detach with Drain') | ||
return queueDb.detach(q, false) | ||
// ---------- Detach without Drain ---------- | ||
t.comment('queue-db: Detach without Drain') | ||
return queueDb.detach(q) | ||
}).then(() => { | ||
@@ -187,4 +214,6 @@ t.pass('Pass: Queue detached without pool drain') | ||
t.comment('queue-db: Detach with Drain') | ||
return queueDb.detach(q, true) | ||
return queueDb.detach(q) | ||
}).then(() => { | ||
return queueDb.drain(q) | ||
}).then(() => { | ||
q._masterInterval = false | ||
@@ -202,2 +231,5 @@ q._changeFeed = true | ||
t.ok(q._changeFeedCursor.connection.open, 'Change feed is connected') | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
q.stop() | ||
@@ -204,0 +236,0 @@ return resolve(t.end()) |
@@ -1,5 +0,4 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
const is = require('../src/is') | ||
const enums = require('../src/enums') | ||
const tError = require('./test-error') | ||
@@ -10,48 +9,48 @@ const queueDrop = require('../src/queue-drop') | ||
const rethinkdbdash = require('rethinkdbdash') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-drop' | ||
module.exports = function () { | ||
const mockQueue = { | ||
r: rethinkdbdash(Object.assign(tOpts.cxn(), { silent: true })), | ||
db: tOpts.dbName, | ||
name: tOpts.queueName, | ||
id: 'mock:queue:id' | ||
} | ||
return new Promise((resolve, reject) => { | ||
test('queue-drop', (t) => { | ||
t.plan(10) | ||
test(testName, (t) => { | ||
t.plan(33) | ||
const mockQueue = { | ||
r: rethinkdbdash(Object.assign(tOpts.cxn(), { silent: true })), | ||
db: tOpts.dbName, | ||
name: tOpts.queueName, | ||
id: 'mock:queue:id' | ||
} | ||
let q = new Queue(tOpts.cxn(), tOpts.default()) | ||
let testEvents = false | ||
function stoppingEventHandler (qid) { | ||
if (testEvents) { | ||
t.pass(`Event: Queue stopping [${qid}]`) | ||
t.equal(qid, q.id, `Event: Queue stopping id is valid`) | ||
} | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 1, | ||
paused: 1, | ||
resumed: 0, | ||
removed: 0, | ||
reset: 0, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 1, | ||
stopping: 1, | ||
stopped: 1, | ||
dropped: 1, | ||
added: 0, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
function stoppedEventHandler (qid) { | ||
if (testEvents) { | ||
t.pass(`Event: Queue stopped [${qid}]`) | ||
t.equal(qid, q.id, `Event: Queue stopped id is valid`) | ||
} | ||
} | ||
function droppedEventHandler (qid) { | ||
if (testEvents) { | ||
t.pass(`Event: Queue dropped [${qid}]`) | ||
t.equal(qid, q.id, `Event: Queue dropped id is valid`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.stopping, stoppingEventHandler) | ||
q.on(enums.status.stopped, stoppedEventHandler) | ||
q.on(enums.status.dropped, droppedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.stopping, stoppingEventHandler) | ||
q.removeListener(enums.status.stopped, stoppedEventHandler) | ||
q.removeListener(enums.status.dropped, droppedEventHandler) | ||
} | ||
@@ -70,3 +69,3 @@ function simulateJobProcessing () { | ||
t.comment('queue-drop: Drop Queue') | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
simulateJobProcessing() | ||
@@ -83,3 +82,4 @@ return queueDrop(q) | ||
removeEventHandlers() | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
mockQueue.r.getPoolMaster().drain() | ||
@@ -86,0 +86,0 @@ return resolve(t.end()) |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -3,0 +3,0 @@ const tError = require('./test-error') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -3,0 +3,0 @@ const is = require('../src/is') |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -11,7 +11,9 @@ const datetime = require('../src/datetime') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-get-next-job' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-get-next-job', (t) => { | ||
t.plan(105) | ||
test(testName, (t) => { | ||
t.plan(152) | ||
@@ -21,8 +23,2 @@ // ---------- Creating Priority Test Jobs ---------- | ||
q._concurrency = 1 | ||
let activeCount = 0 | ||
function activeEventHandler (jobId) { | ||
activeCount++ | ||
t.ok(is.uuid(jobId), `Event: Job Active [${activeCount}] [${jobId}]`) | ||
} | ||
q.on(enums.status.active, activeEventHandler) | ||
@@ -79,2 +75,32 @@ const jobLowest = q.createJob().setPriority('lowest') | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 0, | ||
reset: 0, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 24, | ||
waiting: 0, | ||
active: 19, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
// Uncomment below for debugging | ||
@@ -88,2 +114,3 @@ // allCreatedJobs.map((j) => { | ||
t.ok(is.integer(resetResult), 'Queue reset') | ||
eventHandlers.add(t, q, state) | ||
return queueAddJob(q, allCreatedJobs, true) | ||
@@ -223,4 +250,4 @@ }).then((savedJobs) => { | ||
// ---------- Testing dateEnable with retryCount ---------- | ||
t.comment('queue-get-next-job: dateEnable with retryCount') | ||
// ---------- Testing dateEnable, retryCount, and processCount ---------- | ||
t.comment('queue-get-next-job: dateEnable, retryCount, and processCount') | ||
retryJobs = new Array(4) | ||
@@ -251,2 +278,3 @@ retryJobs[0] = q.createJob().setDateEnable(datetime.add.sec(new Date(), -100)) | ||
t.ok(is.dateBefore(retryGet2[1].dateEnable, retryGet2[2].dateEnable), 'dateEnable for third job is valid') | ||
t.equal(retryGet2[0].processCount, 1, 'processCount is valid') | ||
return queueGetNextJob(q) | ||
@@ -256,4 +284,6 @@ }).then((retryGet3) => { | ||
t.equal(retryGet3[0].id, retryJobs[0].id, 'Last job is valid') | ||
t.equal(activeCount, 19, 'Active event count valid') | ||
q.removeListener(enums.status.active, activeEventHandler) | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
return q.reset() | ||
@@ -260,0 +290,0 @@ }).then((resetResult) => { |
@@ -1,5 +0,3 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
const enums = require('../src/enums') | ||
const is = require('../src/is') | ||
const tError = require('./test-error') | ||
@@ -12,7 +10,9 @@ const Queue = require('../src/queue') | ||
{ './queue-process': processStub }) | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-interruption' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-interruption', (t) => { | ||
t.plan(19) | ||
test(testName, (t) => { | ||
t.plan(33) | ||
@@ -25,48 +25,33 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
let pausingEventCount = 0 | ||
let pausingEventTotal = 1 | ||
function pausingEventHandler (global, queueId) { | ||
pausingEventCount++ | ||
if (testEvents) { | ||
t.pass(`Event: pausing [${pausingEventCount} of ${pausingEventTotal}] `) | ||
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`) | ||
} | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 1, | ||
paused: 1, | ||
resumed: 1, | ||
removed: 0, | ||
reset: 0, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 0, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
let pausedEventCount = 0 | ||
let pausedEventTotal = 1 | ||
function pausedEventHandler (global, queueId) { | ||
pausedEventCount++ | ||
if (testEvents) { | ||
t.pass(`Event: paused [${pausedEventCount} of ${pausedEventTotal}] `) | ||
t.ok(is.boolean(global), `Event: paused [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`) | ||
} | ||
} | ||
let resumedEventCount = 0 | ||
let resumedEventTotal = 1 | ||
function resumedEventHandler (global, queueId) { | ||
resumedEventCount++ | ||
if (testEvents) { | ||
t.pass(`Event: resumed [${resumedEventCount} of ${resumedEventTotal}] `) | ||
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.paused, pausingEventHandler) | ||
q.on(enums.status.paused, pausedEventHandler) | ||
q.on(enums.status.resumed, resumedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.pausing, pausedEventHandler) | ||
q.removeListener(enums.status.paused, pausedEventHandler) | ||
q.removeListener(enums.status.resumed, resumedEventHandler) | ||
} | ||
return q.ready().then((ready) => { | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
t.ok(ready, 'Queue is ready') | ||
@@ -93,9 +78,5 @@ | ||
t.notOk(q.paused, 'Queue is not paused') | ||
removeEventHandlers() | ||
// ---------- Event summary Test ---------- | ||
t.comment('queue-interruption: Event Summary') | ||
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid') | ||
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid') | ||
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid') | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
@@ -102,0 +83,0 @@ q.stop() |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -13,7 +13,8 @@ const datetime = require('../src/datetime') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-process' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-process', (t) => { | ||
t.plan(304) | ||
test(testName, { timeout: 200000 }, (t) => { | ||
t.plan(409) | ||
@@ -26,14 +27,24 @@ // ---------- Test Setup ---------- | ||
let jobDelay = 200 | ||
let repeatDelay = 300 | ||
const noOfJobsToCreate = 10 | ||
const allJobsDelay = jobDelay * (noOfJobsToCreate + 2) | ||
function resumeProcessPauseGet () { | ||
return q.resume().delay(jobDelay * 0.6).then(() => { | ||
return q.pause() | ||
}).delay(jobDelay).then(() => { | ||
return q.getJob(jobs) | ||
}).delay(repeatDelay * 2) | ||
} | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 38, | ||
processing: 48, | ||
progress: 1, | ||
pausing: 2, | ||
paused: 2, | ||
resumed: 3, | ||
pausing: 14, | ||
paused: 14, | ||
resumed: 14, | ||
removed: 0, | ||
@@ -48,9 +59,10 @@ idle: 12, | ||
dropped: 0, | ||
added: 35, | ||
added: 37, | ||
waiting: 0, | ||
active: 38, | ||
completed: 32, | ||
cancelled: 2, | ||
active: 48, | ||
completed: 42, | ||
cancelled: 3, | ||
failed: 3, | ||
terminated: 1, | ||
reanimated: 0, | ||
log: 0, | ||
@@ -60,5 +72,11 @@ updated: 0 | ||
let completedEventCount = 0 | ||
const summaryCompleted = 32 | ||
const summaryCancelled = 2 | ||
// idle event testing is not part of the test-event-handlers module | ||
// because it is too difficult to measure. This handler and one | ||
// test below is to ensure idle is being called. | ||
function idleEventHandler (qid) { | ||
t.ok(is.string(qid), `Event: idle`) | ||
} | ||
const summaryCompleted = 33 | ||
const summaryCancelled = 3 | ||
const summaryTerminated = 1 | ||
@@ -88,3 +106,3 @@ | ||
setTimeout(function () { | ||
return job.setProgress(50).then((result) => { | ||
return job.updateProgress(50).then((result) => { | ||
t.ok(result, 'Job progress updated: ' + job.id) | ||
@@ -97,3 +115,3 @@ }) | ||
jobProcessTimeoutId = false | ||
next('Job Completed: ' + job.id) | ||
next(null, 'Job Completed: ' + job.id) | ||
.then((runningJobs) => { | ||
@@ -145,2 +163,3 @@ t.ok(is.integer(runningJobs), `Next call returns running jobs [${runningJobs}]`) | ||
}).then(() => { | ||
q.on(enums.status.idle, idleEventHandler) | ||
return q.resume() | ||
@@ -150,6 +169,78 @@ }).delay(jobDelay / 2).then(() => { | ||
}).delay(jobDelay * 8).then(() => { | ||
completedEventCount = state.count.get(enums.status.completed) | ||
t.equal(state.count.get(enums.status.completed), noOfJobsToCreate, `Queue has completed ${completedEventCount} jobs`) | ||
q.removeListener(enums.status.idle, idleEventHandler) | ||
t.ok(q.idle, 'Queue is idle') | ||
// ---------- Repeat True Test ---------- | ||
t.comment('queue-process: Repeat True') | ||
jobs = q.createJob().setRepeat(true).setRepeatDelay(repeatDelay) | ||
t.ok(jobs.repeat, 'Job repeat is true') | ||
t.equal(jobs.repeatDelay, repeatDelay, 'Job repeat delay is valid') | ||
return q.pause() | ||
}).then(() => { | ||
return q.addJob(jobs) | ||
}).then((savedJobs) => { | ||
t.equal(savedJobs.length, 1, `Jobs saved successfully: [${savedJobs.length}]`) | ||
return resumeProcessPauseGet() | ||
}).then((repeatJob) => { | ||
t.equal(repeatJob[0].processCount, 1, 'Repeat job processed once') | ||
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting') | ||
return resumeProcessPauseGet() | ||
}).then((repeatJob) => { | ||
t.equal(repeatJob[0].processCount, 2, 'Repeat job processed twice') | ||
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting') | ||
return resumeProcessPauseGet() | ||
}).then((repeatJob) => { | ||
t.equal(repeatJob[0].processCount, 3, 'Repeat job processed three times') | ||
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting') | ||
return resumeProcessPauseGet() | ||
}).then((repeatJob) => { | ||
t.equal(repeatJob[0].processCount, 4, 'Repeat job processed four times') | ||
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting') | ||
return resumeProcessPauseGet() | ||
}).then((repeatJob) => { | ||
t.equal(repeatJob[0].processCount, 5, 'Repeat job processed five times') | ||
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting') | ||
return q.cancelJob(jobs) | ||
}).then((cancelledJobs) => { | ||
t.ok(is.uuid(cancelledJobs[0]), 'repeat job cancelled') | ||
return q.getJob(jobs) | ||
}).then((cancelledJobs) => { | ||
t.equal(cancelledJobs[0].processCount, 5, `Job repeated 5 times before cancel`) | ||
t.equal(cancelledJobs[0].log.length, 12, 'repeat job log count valid') | ||
// ---------- Repeat Number Test ---------- | ||
t.comment('queue-process: Repeat Number') | ||
jobs = q.createJob().setRepeat(4).setRepeatDelay(repeatDelay) | ||
t.equal(jobs.repeat, 4, `Job repeat is 4`) | ||
t.equal(jobs.repeatDelay, repeatDelay, 'Job repeat delay is valid') | ||
return q.pause() | ||
}).then(() => { | ||
return q.addJob(jobs) | ||
}).then((savedJobs) => { | ||
t.equal(savedJobs.length, 1, `Jobs saved successfully: [${savedJobs.length}]`) | ||
return resumeProcessPauseGet() | ||
}).then((repeatJob) => { | ||
t.equal(repeatJob[0].processCount, 1, 'Repeat job processed once') | ||
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting') | ||
return resumeProcessPauseGet() | ||
}).then((repeatJob) => { | ||
t.equal(repeatJob[0].processCount, 2, 'Repeat job processed twice') | ||
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting') | ||
return resumeProcessPauseGet() | ||
}).then((repeatJob) => { | ||
t.equal(repeatJob[0].processCount, 3, 'Repeat job processed three times') | ||
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting') | ||
return resumeProcessPauseGet() | ||
}).then((repeatJob) => { | ||
t.equal(repeatJob[0].processCount, 4, 'Repeat job processed four times') | ||
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting') | ||
return resumeProcessPauseGet() | ||
}).then((completedJobs) => { | ||
t.equal(completedJobs[0].processCount, 5, 'Repeat job processed five times') | ||
t.equal(completedJobs[0].status, enums.status.completed, 'Repeat job is completed') | ||
t.equal(completedJobs[0].log.length, 11, 'repeat job log count valid') | ||
return q.resume() | ||
}).then(() => { | ||
// // TODO - Remove below to run all tests. | ||
// return q.stop().then(() => Promise.reject()) | ||
// ---------- Processing Restart on Job Add Test ---------- | ||
@@ -166,4 +257,2 @@ t.comment('queue-process: Process Restart on Job Add') | ||
}).delay(allJobsDelay).then(() => { | ||
completedEventCount = state.count.get(enums.status.completed) | ||
t.equal(completedEventCount, noOfJobsToCreate * 2, `Queue has completed ${completedEventCount} jobs`) | ||
t.ok(q.idle, 'Queue is idle') | ||
@@ -186,4 +275,2 @@ return q.pause() | ||
}).delay(allJobsDelay).then(() => { | ||
completedEventCount = state.count.get(enums.status.completed) | ||
t.equal(completedEventCount, noOfJobsToCreate * 3, `Queue has completed ${completedEventCount} jobs`) | ||
t.pass('Restart processing succeeded') | ||
@@ -291,3 +378,2 @@ t.ok(q.idle, 'Queue is idle') | ||
// ---------- Event Summary ---------- | ||
t.comment('queue-process: Event Summary') | ||
eventHandlers.remove(t, q, state) | ||
@@ -294,0 +380,0 @@ |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -9,7 +9,9 @@ const is = require('../src/is') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-remove-job' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-remove-job', (t) => { | ||
t.plan(24) | ||
test(testName, (t) => { | ||
t.plan(55) | ||
@@ -22,20 +24,35 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
let testEvents = false | ||
function removedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: removed [${jobId}]`) | ||
} | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 8, | ||
reset: 0, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 8, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.removed, removedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.removed, removedEventHandler) | ||
} | ||
return q.reset().then((resetResult) => { | ||
t.ok(is.integer(resetResult), 'Queue reset') | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
return q.addJob(jobs) | ||
@@ -110,3 +127,5 @@ }).then((savedJobs) => { | ||
}).then(() => { | ||
removeEventHandlers() | ||
// | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
return q.reset() | ||
@@ -113,0 +132,0 @@ }).then((resetResult) => { |
@@ -1,5 +0,4 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
const is = require('../src/is') | ||
const enums = require('../src/enums') | ||
const tError = require('./test-error') | ||
@@ -9,7 +8,9 @@ const queueReset = require('../src/queue-reset') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-reset' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-reset', (t) => { | ||
t.plan(7) | ||
test(testName, (t) => { | ||
t.plan(32) | ||
@@ -22,19 +23,36 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
] | ||
let eventCount = 0 | ||
function resetEventHandler (total) { | ||
eventCount++ | ||
t.pass('Event: Queue reset') | ||
if (eventCount < 2) { return } | ||
t.equal(total, 3, 'Queue reset removed valid number of jobs') | ||
return q.summary().then((afterSummary) => { | ||
t.equal(afterSummary.waiting, 0, 'Status summary contains no added jobs') | ||
q.removeListener(enums.status.reset, resetEventHandler) | ||
q.stop() | ||
return resolve(t.end()) | ||
}).catch(err => tError(err, module, t)) | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 0, | ||
paused: 0, | ||
resumed: 0, | ||
removed: 0, | ||
reset: 1, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 3, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
q.on(enums.status.reset, resetEventHandler) | ||
return q.reset().then((removed) => { | ||
t.ok(is.integer(removed), 'Initial reset succeeded') | ||
eventHandlers.add(t, q, state) | ||
return q.addJob(jobs) | ||
@@ -47,2 +65,13 @@ }).then((savedJobs) => { | ||
return queueReset(q) | ||
}).then((total) => { | ||
t.equal(total, 3, 'Queue reset removed valid number of jobs') | ||
return q.summary() | ||
}).then((afterSummary) => { | ||
t.equal(afterSummary.waiting, 0, 'Status summary contains no added jobs') | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
q.stop() | ||
return resolve(t.end()) | ||
}).catch(err => tError(err, module, t)) | ||
@@ -49,0 +78,0 @@ }) |
@@ -1,6 +0,4 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
// const datetime = require('../src/datetime') | ||
const enums = require('../src/enums') | ||
const is = require('../src/is') | ||
const tError = require('./test-error') | ||
@@ -11,7 +9,9 @@ const tData = require('./test-options').tData | ||
const queueState = require('../src/queue-state') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-state' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-state', (t) => { | ||
t.plan(15) | ||
test(testName, (t) => { | ||
t.plan(29) | ||
@@ -24,49 +24,34 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
let pausingEventCount = 0 | ||
let pausingEventTotal = 1 | ||
function pausingEventHandler (global, queueId) { | ||
pausingEventCount++ | ||
if (testEvents) { | ||
t.pass(`Event: pausing [${pausingEventCount} of ${pausingEventTotal}] `) | ||
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`) | ||
} | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 1, | ||
paused: 1, | ||
resumed: 1, | ||
removed: 0, | ||
reset: 0, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 0, | ||
stopping: 0, | ||
stopped: 0, | ||
dropped: 0, | ||
added: 0, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
let pausedEventCount = 0 | ||
let pausedEventTotal = 1 | ||
function pausedEventHandler (global, queueId) { | ||
pausedEventCount++ | ||
if (testEvents) { | ||
t.pass(`Event: paused [${pausedEventCount} of ${pausedEventTotal}] `) | ||
t.ok(is.boolean(global), `Event: paused [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`) | ||
} | ||
} | ||
let resumedEventCount = 0 | ||
let resumedEventTotal = 1 | ||
function resumedEventHandler (global, queueId) { | ||
resumedEventCount++ | ||
if (testEvents) { | ||
t.pass(`Event: resumed [${resumedEventCount} of ${resumedEventTotal}] `) | ||
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.paused, pausingEventHandler) | ||
q.on(enums.status.paused, pausedEventHandler) | ||
q.on(enums.status.resumed, resumedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.paused, pausingEventHandler) | ||
q.removeListener(enums.status.paused, pausedEventHandler) | ||
q.removeListener(enums.status.resumed, resumedEventHandler) | ||
} | ||
q.reset().then((resetResult) => { | ||
t.ok(resetResult >= 0, 'Queue reset') | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
@@ -85,9 +70,5 @@ // ---------- Global Pause Test ---------- | ||
// ---------- Event summary Test ---------- | ||
t.comment('queue-state: Event Summary') | ||
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid') | ||
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid') | ||
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid') | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
removeEventHandlers() | ||
q.stop() | ||
@@ -94,0 +75,0 @@ q2.stop() |
@@ -1,5 +0,4 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
const is = require('../src/is') | ||
const enums = require('../src/enums') | ||
const tError = require('./test-error') | ||
@@ -11,33 +10,41 @@ const queueStop = require('../src/queue-stop') | ||
const tOpts = require('./test-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue-stop' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue-stop', (t) => { | ||
t.plan(31) | ||
test(testName, (t) => { | ||
t.plan(79) | ||
const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
let q = new Queue(tOpts.cxn(), tOpts.master(999999)) | ||
let testEvents = false | ||
function stoppingEventHandler (qid) { | ||
if (testEvents) { | ||
t.pass(`Event: Queue stopping [${qid}]`) | ||
t.equal(qid, q.id, `Event: Queue stopping id is valid`) | ||
} | ||
// ---------- Event Handler Setup ---------- | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 0, | ||
progress: 0, | ||
pausing: 1, | ||
paused: 1, | ||
resumed: 0, | ||
removed: 0, | ||
reset: 0, | ||
error: 0, | ||
reviewed: 0, | ||
detached: 1, | ||
stopping: 1, | ||
stopped: 1, | ||
dropped: 0, | ||
added: 0, | ||
waiting: 0, | ||
active: 0, | ||
completed: 0, | ||
cancelled: 0, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
function stoppedEventHandler (qid) { | ||
if (testEvents) { | ||
t.pass(`Event: Queue stopped [${qid}]`) | ||
t.equal(qid, q.id, `Event: Queue stopped id is valid`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.stopping, stoppingEventHandler) | ||
q.on(enums.status.stopped, stoppedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.stopping, stoppingEventHandler) | ||
q.removeListener(enums.status.stopped, stoppedEventHandler) | ||
} | ||
@@ -53,5 +60,2 @@ function simulateJobProcessing () { | ||
t.ok(is.integer(resetResult), 'Queue reset') | ||
q._running = 1 | ||
q._masterInterval = 300 | ||
dbReview.enable(q) | ||
return q.ready() | ||
@@ -63,3 +67,3 @@ }).then((ready) => { | ||
t.notOk(q.paused, 'Queue is not paused') | ||
addEventHandlers() | ||
eventHandlers.add(t, q, state) | ||
@@ -69,4 +73,6 @@ // ---------- Stop with Drain ---------- | ||
simulateJobProcessing() | ||
return queueStop(q, true) | ||
return queueStop(q) | ||
}).then((stopped) => { | ||
return queueDb.drain(q) | ||
}).then((stopped) => { | ||
t.ok(stopped, 'Queue stopped with pool drain') | ||
@@ -80,11 +86,12 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled') | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
// ---------- Stop without Drain ---------- | ||
t.comment('queue-stop: Stop without Drain') | ||
return queueDb.attach(q, tOpts.cxn()) | ||
}).then(() => { | ||
q = new Queue(tOpts.cxn(), tOpts.master(999999)) | ||
return q.ready() | ||
}).then((ready) => { | ||
t.ok(ready, 'Queue in a ready state') | ||
return q.resume() | ||
}).then(() => { | ||
eventHandlers.add(t, q, state) | ||
t.ok(dbReview.isEnabled(q), 'Review is enabled') | ||
@@ -94,3 +101,3 @@ t.ok(q._changeFeedCursor.connection.open, 'Change feed is connected') | ||
simulateJobProcessing() | ||
return queueStop(q, false) | ||
return queueStop(q) | ||
}).then((stopped2) => { | ||
@@ -105,4 +112,6 @@ t.ok(stopped2, 'Queue stopped without pool drain') | ||
// detaching with drain or node will not exit gracefully | ||
return queueDb.detach(q, true) | ||
return queueDb.detach(q) | ||
}).then(() => { | ||
return queueDb.drain(q) | ||
}).then(() => { | ||
return queueDb.attach(q, tOpts.cxn()) | ||
@@ -119,4 +128,4 @@ }).then(() => { | ||
// ---------- Clean Up ---------- | ||
removeEventHandlers() | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, q, state) | ||
q.stop() | ||
@@ -123,0 +132,0 @@ return resolve(t.end()) |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -18,3 +18,3 @@ const is = require('../src/is') | ||
let jobs = [] | ||
for (let i = 0; i < 7; i++) { | ||
for (let i = 0; i < 6; i++) { | ||
jobs.push(q.createJob()) | ||
@@ -26,8 +26,8 @@ } | ||
jobs[3].status = enums.status.cancelled | ||
jobs[5].status = enums.status.failed | ||
jobs[6].status = enums.status.terminated | ||
jobs[4].status = enums.status.failed | ||
jobs[5].status = enums.status.terminated | ||
return q.reset().then((resetResult) => { | ||
t.ok(is.integer(resetResult), 'Queue reset') | ||
return queueAddJob(q, jobs, true) | ||
return queueAddJob(q, jobs) | ||
}).then(() => { | ||
@@ -42,3 +42,3 @@ return queueSummary(q) | ||
t.equal(summary.terminated, 1, 'Queue status summary includes terminated') | ||
t.equal(summary.total, 7, 'Queue status summary includes total') | ||
t.equal(summary.total, 6, 'Queue status summary includes total') | ||
return q.reset() | ||
@@ -45,0 +45,0 @@ }).then((resetResult) => { |
@@ -1,2 +0,2 @@ | ||
const test = require('tape') | ||
const test = require('tap').test | ||
const Promise = require('bluebird') | ||
@@ -8,10 +8,20 @@ const enums = require('../src/enums') | ||
const Queue = require('../src/queue') | ||
const jobOptions = require('../src/job-options') | ||
const eventHandlers = require('./test-event-handlers') | ||
const testName = 'queue' | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('queue', (t) => { | ||
t.plan(157) | ||
test(testName, (t) => { | ||
// t.plan(135) | ||
t.plan(142) | ||
let q = new Queue(tOpts.cxn(), tOpts.queueNameOnly()) | ||
let q2 | ||
// ---------- Test Setup ---------- | ||
let qReady = new Queue(tOpts.cxn(), tOpts.queueNameOnly()) | ||
let qNotMaster | ||
let qNumMaster | ||
let qMain | ||
let qDrop | ||
let qSub | ||
let qPub | ||
@@ -23,7 +33,9 @@ let job | ||
retryMax: 5, | ||
retryDelay: 400 | ||
retryDelay: 400, | ||
repeat: 5, | ||
repeatDelay: 5000 | ||
} | ||
function processHandler (job, next) { | ||
setTimeout(function finishJob () { | ||
next(`Job completed [${job.id}]`) | ||
next(null, `Job completed [${job.id}]`) | ||
}, 100) | ||
@@ -33,326 +45,107 @@ } | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
let readyEventCount = 0 | ||
let readyEventTotal = 0 | ||
function readyEventHandler (queueId) { | ||
readyEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: ready [${readyEventCount} of ${readyEventTotal}] [${queueId}]`) | ||
} | ||
let state = { | ||
testName, | ||
enabled: false, | ||
ready: 0, | ||
processing: 2, | ||
progress: 0, | ||
pausing: 2, | ||
paused: 2, | ||
resumed: 1, | ||
removed: 1, | ||
idle: 12, | ||
reset: 1, | ||
error: 3, | ||
reviewed: 0, | ||
detached: 1, | ||
stopping: 1, | ||
stopped: 1, | ||
dropped: 0, | ||
added: 3, | ||
waiting: 0, | ||
active: 2, | ||
completed: 2, | ||
cancelled: 1, | ||
failed: 0, | ||
terminated: 0, | ||
reanimated: 0, | ||
log: 0, | ||
updated: 0 | ||
} | ||
let addedEventCount = 0 | ||
let addedEventTotal = 4 | ||
function addedEventHandler (jobId) { | ||
addedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: added [${addedEventCount} of ${addedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let activeEventCount = 0 | ||
let activeEventTotal = 3 | ||
function activeEventHandler (jobId) { | ||
activeEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: active [${activeEventCount} of ${activeEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let processingEventCount = 0 | ||
let processingEventTotal = 3 | ||
function processingEventHandler (jobId) { | ||
processingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: processing [${processingEventCount} of ${processingEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let progressEventCount = 0 | ||
let progressEventTotal = 0 | ||
function progressEventHandler (jobId, percent) { | ||
progressEventCount++ | ||
if (testEvents) { | ||
t.pass(`Event: progress [${progressEventCount} of ${progressEventTotal}]`) | ||
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`) | ||
t.ok(is.number(percent), `Event: progress [${percent}%]`) | ||
} | ||
} | ||
let completedEventCount = 0 | ||
let completedEventTotal = 3 | ||
function completedEventHandler (jobId) { | ||
completedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: completed [${completedEventCount} of ${completedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let cancelledEventCount = 0 | ||
let cancelledEventTotal = 1 | ||
function cancelledEventHandler (jobId) { | ||
cancelledEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: cancelled [${cancelledEventCount} of ${cancelledEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let failedEventCount = 0 | ||
let failedEventTotal = 0 | ||
function failedEventHandler (jobId) { | ||
failedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: failed [${failedEventCount} of ${failedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let terminatedEventCount = 0 | ||
let terminatedEventTotal = 0 | ||
function terminatedEventHandler (jobId) { | ||
terminatedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: terminated [${terminatedEventCount} of ${terminatedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let logEventCount = 0 | ||
let logEventTotal = 0 | ||
function logEventHandler (jobId) { | ||
logEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: log [${logEventCount} of ${logEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let updatedEventCount = 0 | ||
let updatedEventTotal = 0 | ||
function updatedEventHandler (jobId) { | ||
updatedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: updated [${updatedEventCount} of ${updatedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let pausingEventCount = 0 | ||
let pausingEventTotal = 2 | ||
function pausingEventHandler (global, queueId) { | ||
pausingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: pausing [${pausingEventCount} of ${pausingEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`) | ||
} | ||
} | ||
let pausedEventCount = 0 | ||
let pausedEventTotal = 2 | ||
function pausedEventHandler (global, queueId) { | ||
pausedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: paused [${pausedEventCount} of ${pausedEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: paused [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`) | ||
} | ||
} | ||
let resumedEventCount = 0 | ||
let resumedEventTotal = 1 | ||
function resumedEventHandler (global, queueId) { | ||
resumedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: resumed [${resumedEventCount} of ${resumedEventTotal}]`) | ||
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`) | ||
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`) | ||
} | ||
} | ||
let removedEventCount = 0 | ||
let removedEventTotal = 1 | ||
function removedEventHandler (jobId) { | ||
removedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.uuid(jobId), `Event: removed [${removedEventCount} of ${removedEventTotal}] [${jobId}]`) | ||
} | ||
} | ||
let idleEventCount = 0 | ||
let idleEventTotal = 4 | ||
function idleEventHandler (queueId) { | ||
if (idleEventCount < 4) { | ||
idleEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: idle [${idleEventCount} of ${idleEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
} | ||
let reviewedEventCount = 0 | ||
let reviewedEventTotal = 0 | ||
function reviewedEventHandler (queueId) { | ||
reviewedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: reviewed [${reviewedEventCount} of ${reviewedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let resetEventCount = 0 | ||
let resetEventTotal = 1 | ||
function resetEventHandler (total) { | ||
resetEventCount++ | ||
if (testEvents) { | ||
t.ok(is.integer(total), `Event: reset [${resetEventCount} of ${resetEventTotal}] [${total}]`) | ||
} | ||
} | ||
let errorEventCount = 0 | ||
let errorEventTotal = 3 | ||
function errorEventHandler (err) { | ||
errorEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(err.message), `Event: error [${errorEventCount} of ${errorEventTotal}] [${err.message}]`) | ||
} | ||
} | ||
let detachedEventCount = 0 | ||
let detachedEventTotal = 1 | ||
function detachedEventHandler (queueId) { | ||
detachedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: detached [${detachedEventCount} of ${detachedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let stoppingEventCount = 0 | ||
let stoppingEventTotal = 1 | ||
function stoppingEventHandler (queueId) { | ||
stoppingEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: stopping [${stoppingEventCount} of ${stoppingEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let stoppedEventCount = 0 | ||
let stoppedEventTotal = 1 | ||
function stoppedEventHandler (queueId) { | ||
stoppedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: stopped [${stoppedEventCount} of ${stoppedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
let droppedEventCount = 0 | ||
let droppedEventTotal = 1 | ||
function droppedEventHandler (queueId) { | ||
droppedEventCount++ | ||
if (testEvents) { | ||
t.ok(is.string(queueId), `Event: dropped [${droppedEventCount} of ${droppedEventTotal}] [${queueId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.ready, readyEventHandler) | ||
q.on(enums.status.added, addedEventHandler) | ||
q.on(enums.status.active, activeEventHandler) | ||
q.on(enums.status.processing, processingEventHandler) | ||
q.on(enums.status.progress, progressEventHandler) | ||
q.on(enums.status.completed, completedEventHandler) | ||
q.on(enums.status.cancelled, cancelledEventHandler) | ||
q.on(enums.status.failed, failedEventHandler) | ||
q.on(enums.status.terminated, terminatedEventHandler) | ||
q.on(enums.status.paused, pausedEventHandler) | ||
q.on(enums.status.pausing, pausingEventHandler) | ||
q.on(enums.status.resumed, resumedEventHandler) | ||
q.on(enums.status.removed, removedEventHandler) | ||
q.on(enums.status.log, logEventHandler) | ||
q.on(enums.status.updated, updatedEventHandler) | ||
q.on(enums.status.idle, idleEventHandler) | ||
q.on(enums.status.reviewed, reviewedEventHandler) | ||
q.on(enums.status.reset, resetEventHandler) | ||
q.on(enums.status.error, errorEventHandler) | ||
q.on(enums.status.detached, detachedEventHandler) | ||
q.on(enums.status.stopping, stoppingEventHandler) | ||
q.on(enums.status.stopped, stoppedEventHandler) | ||
q.on(enums.status.dropped, droppedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.ready, readyEventHandler) | ||
q.removeListener(enums.status.added, addedEventHandler) | ||
q.removeListener(enums.status.active, activeEventHandler) | ||
q.removeListener(enums.status.processing, processingEventHandler) | ||
q.removeListener(enums.status.progress, progressEventHandler) | ||
q.removeListener(enums.status.completed, completedEventHandler) | ||
q.removeListener(enums.status.cancelled, cancelledEventHandler) | ||
q.removeListener(enums.status.failed, failedEventHandler) | ||
q.removeListener(enums.status.terminated, terminatedEventHandler) | ||
q.removeListener(enums.status.paused, pausedEventHandler) | ||
q.removeListener(enums.status.pausing, pausingEventHandler) | ||
q.removeListener(enums.status.resumed, resumedEventHandler) | ||
q.removeListener(enums.status.removed, removedEventHandler) | ||
q.removeListener(enums.status.log, logEventHandler) | ||
q.removeListener(enums.status.updated, updatedEventHandler) | ||
q.removeListener(enums.status.idle, idleEventHandler) | ||
q.removeListener(enums.status.reviewed, reviewedEventHandler) | ||
q.removeListener(enums.status.reset, resetEventHandler) | ||
q.removeListener(enums.status.error, errorEventHandler) | ||
q.removeListener(enums.status.detached, detachedEventHandler) | ||
q.removeListener(enums.status.stopping, stoppingEventHandler) | ||
q.removeListener(enums.status.stopped, stoppedEventHandler) | ||
q.removeListener(enums.status.dropped, droppedEventHandler) | ||
} | ||
const stopDelay = 1000 | ||
return q.ready().then((ready) => { | ||
return qReady.ready().then((ready) => { | ||
t.ok(ready, 'Queue ready returns true') | ||
return qReady.reset() | ||
}).delay(stopDelay).then(() => { | ||
t.pass('Queue reset') | ||
return qReady.stop() | ||
}).then(() => { | ||
// ---------- masterInterval Options Tests ---------- | ||
t.comment('queue: masterInterval Options') | ||
return q.stop() | ||
qNotMaster = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: false })) | ||
t.ok(is.false(qNotMaster.masterInterval), 'False masterInterval is false') | ||
return qNotMaster.ready() | ||
}).delay(stopDelay).then(() => { | ||
return qNotMaster.stop() | ||
}).then(() => { | ||
q = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: true })) | ||
t.ok(is.true(q.masterInterval), 'True masterInterval is true') | ||
return q.ready() | ||
qNumMaster = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: 12345 })) | ||
t.equal(qNumMaster.masterInterval, 12345, 'Number masterInterval is number') | ||
return qNumMaster.ready() | ||
}).delay(stopDelay).then(() => { | ||
return qNumMaster.stop() | ||
}).then(() => { | ||
return q.stop() | ||
}).then(() => { | ||
q = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: false })) | ||
t.ok(is.false(q.masterInterval), 'False masterInterval is false') | ||
return q.ready() | ||
}).then(() => { | ||
return q.stop() | ||
}).then(() => { | ||
q = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: 12345 })) | ||
t.equal(q.masterInterval, 12345, 'Number masterInterval is number') | ||
return q.ready() | ||
}).then(() => { | ||
return q.stop() | ||
}).then(() => { | ||
q = new Queue(tOpts.cxn(), tOpts.queueNameOnly()) | ||
qMain = new Queue(tOpts.cxn(), tOpts.queueNameOnly()) | ||
return q.ready() | ||
return qMain.ready() | ||
}).then(() => { | ||
return q.reset() | ||
return qMain.reset() | ||
}).then((totalRemoved) => { | ||
t.ok(is.integer(totalRemoved), 'Queue has been reset') | ||
addEventHandlers() | ||
eventHandlers.add(t, qMain, state) | ||
// ---------- Constructor with Default Options Tests ---------- | ||
t.comment('queue: Constructor with Default Options') | ||
t.ok(q, 'Queue created with default options') | ||
t.equal(q.name, tOpts.queueName, 'Default queue name valid') | ||
t.ok(is.string(q.id), 'Queue id is valid') | ||
t.equal(q.host, enums.options.host, 'Default host name is valid') | ||
t.equal(q.port, enums.options.port, 'Default port is valid') | ||
t.equal(q.db, tOpts.dbName, 'Default db name is valid') | ||
t.ok(is.function(q.r), 'Queue r valid') | ||
t.ok(q.changeFeed, 'Queue change feed is enabled') | ||
t.ok(q.master, 'Queue is master queue') | ||
t.equal(q.masterInterval, enums.options.masterInterval, 'Queue masterInterval is valid') | ||
t.ok(is.object(q.jobOptions), 'Queue jobOptions is an object') | ||
t.equal(q.jobOptions.priority, enums.priorityFromValue(40), 'Default job priority is normal') | ||
t.equal(q.jobOptions.timeout, enums.options.timeout, 'Default job timeout is valid') | ||
t.equal(q.jobOptions.retryMax, enums.options.retryMax, 'Default job retryMax is valid') | ||
t.equal(q.jobOptions.retryDelay, enums.options.retryDelay, 'Default job retryDelay is valid') | ||
t.equal(q.removeFinishedJobs, enums.options.removeFinishedJobs, 'Default removeFinishedJobs is valid') | ||
t.equal(q.running, 0, 'Running jobs is zero') | ||
t.equal(q.concurrency, enums.options.concurrency, 'Default concurrency is valid') | ||
t.notOk(q.paused, 'Queue is not paused') | ||
t.ok(q.idle, 'Queue is idle') | ||
t.ok(qMain, 'Queue created with default options') | ||
t.equal(qMain.name, tOpts.queueName, 'Default queue name valid') | ||
t.ok(is.string(qMain.id), 'Queue id is valid') | ||
t.equal(qMain.host, enums.options.host, 'Default host name is valid') | ||
t.equal(qMain.port, enums.options.port, 'Default port is valid') | ||
t.equal(qMain.db, tOpts.dbName, 'Default db name is valid') | ||
t.ok(is.function(qMain.r), 'Queue r valid') | ||
t.ok(qMain.changeFeed, 'Queue change feed is enabled') | ||
t.ok(qMain.master, 'Queue is master queue') | ||
t.equal(qMain.masterInterval, enums.options.masterInterval, 'Queue masterInterval is valid') | ||
t.ok(is.object(qMain.jobOptions), 'Queue jobOptions is an object') | ||
t.equal(qMain.jobOptions.priority, enums.priorityFromValue(40), 'Default job priority is normal') | ||
t.equal(qMain.jobOptions.timeout, enums.options.timeout, 'Default job timeout is valid') | ||
t.equal(qMain.jobOptions.retryMax, enums.options.retryMax, 'Default job retryMax is valid') | ||
t.equal(qMain.jobOptions.retryDelay, enums.options.retryDelay, 'Default job retryDelay is valid') | ||
t.equal(qMain.jobOptions.repeat, enums.options.repeat, 'Default job repeat is valid') | ||
t.equal(qMain.jobOptions.repeatDelay, enums.options.repeatDelay, 'Default job repeatDelay is valid') | ||
t.equal(qMain.removeFinishedJobs, enums.options.removeFinishedJobs, 'Default removeFinishedJobs is valid') | ||
t.equal(qMain.running, 0, 'Running jobs is zero') | ||
t.equal(qMain.concurrency, enums.options.concurrency, 'Default concurrency is valid') | ||
t.notOk(qMain.paused, 'Queue is not paused') | ||
t.ok(qMain.idle, 'Queue is idle') | ||
// ---------- Set Properties Tests ---------- | ||
t.comment('queue: Set Properties') | ||
q.jobOptions = customJobOptions | ||
t.deepEqual(q.jobOptions, customJobOptions, 'Job options set successfully') | ||
q.jobOptions = undefined | ||
t.deepEqual(q.jobOptions, customJobOptions, 'Job options restored to default on invalid value') | ||
q.concurrency = 100 | ||
t.equal(q.concurrency, 100, 'Queue concurrency set with valid value successfully') | ||
q.concurrency = -50 | ||
t.equal(q.concurrency, 100, 'Queue concurrency unchanged with invalid value') | ||
q.concurrency = 1.5 | ||
t.equal(q.concurrency, 100, 'Queue concurrency unchanged with invalid value') | ||
q.concurrency = 'string' | ||
t.equal(q.concurrency, 100, 'Queue concurrency unchanged with invalid value') | ||
qMain.jobOptions = customJobOptions | ||
t.deepEqual(qMain.jobOptions, customJobOptions, 'Job options set successfully') | ||
qMain.jobOptions = undefined | ||
t.deepEqual(qMain.jobOptions, customJobOptions, 'Job options restored to default on invalid value') | ||
qMain.concurrency = 100 | ||
t.equal(qMain.concurrency, 100, 'Queue concurrency set with valid value successfully') | ||
qMain.concurrency = -50 | ||
t.equal(qMain.concurrency, 100, 'Queue concurrency unchanged with invalid value') | ||
qMain.concurrency = 1.5 | ||
t.equal(qMain.concurrency, 100, 'Queue concurrency unchanged with invalid value') | ||
qMain.concurrency = 'string' | ||
t.equal(qMain.concurrency, 100, 'Queue concurrency unchanged with invalid value') | ||
// ---------- Create Job Tests ---------- | ||
t.comment('queue: Create Job') | ||
job = q.createJob() | ||
job = qMain.createJob() | ||
t.ok(is.job(job), 'Queue createJob created a job object') | ||
@@ -368,5 +161,6 @@ t.equal(job.priority, customJobOptions.priority, 'Queue created job with new default priority') | ||
retryMax: 2, | ||
retryDelay: 900 | ||
retryDelay: 900, | ||
repeat: 0 | ||
} | ||
job = q.createJob().setPriority('low').setTimeout(400).setRetryMax(2).setRetryDelay(900) | ||
job = qMain.createJob().setPriority('low').setTimeout(400).setRetryMax(2).setRetryDelay(900) | ||
t.ok(is.job(job), 'Queue createJob created a job object') | ||
@@ -378,7 +172,8 @@ t.equal(job.priority, customJobOptions.priority, 'Queue created job with custom priority') | ||
// ---------- Create Job Tests ---------- | ||
// ---------- Add Job Tests ---------- | ||
t.comment('queue: Add Job') | ||
job = q.createJob() | ||
qMain.jobOptions = jobOptions() // Resetting job options | ||
job = qMain.createJob() | ||
job.data = tOpts.tData | ||
return q.addJob(job) | ||
return qMain.addJob(job) | ||
}).then((savedJobs) => { | ||
@@ -392,3 +187,3 @@ t.ok(is.array(savedJobs), 'Add job returns an array') | ||
t.comment('queue: Get Job') | ||
return q.getJob(savedJobs[0].id) | ||
return qMain.getJob(savedJobs[0].id) | ||
}).then((savedJobs2) => { | ||
@@ -402,3 +197,3 @@ t.ok(is.array(savedJobs2), 'Get job returns an array') | ||
t.comment('queue: Find Job') | ||
return q.findJob({ data: tOpts.tData }) | ||
return qMain.findJob({ data: tOpts.tData }) | ||
}).then((savedJobs3) => { | ||
@@ -412,7 +207,7 @@ t.ok(is.array(savedJobs3), 'Find job returns an array') | ||
t.comment('queue: Cancel Job') | ||
return q.cancelJob(savedJobs3[0].id) | ||
return qMain.cancelJob(savedJobs3[0].id) | ||
}).then((cancelledJobs) => { | ||
t.ok(is.array(cancelledJobs), 'Cancel job returns an array') | ||
t.ok(is.uuid(cancelledJobs[0]), 'Cancel job returns ids') | ||
return q.getJob(cancelledJobs[0]) | ||
return qMain.getJob(cancelledJobs[0]) | ||
}).then((cancelledJobs2) => { | ||
@@ -424,7 +219,7 @@ t.ok(is.array(cancelledJobs2), 'Get job returns an array') | ||
t.comment('queue: Remove Job') | ||
return q.removeJob(cancelledJobs2[0].id) | ||
return qMain.removeJob(cancelledJobs2[0].id) | ||
}).then((removedCount) => { | ||
t.ok(is.array(removedCount), 'Remove job returns an array') | ||
t.equal(removedCount.length, 1, 'Removed count is valid') | ||
return q.getJob(job.id) | ||
return qMain.getJob(job.id) | ||
}).then((noJobs) => { | ||
@@ -436,8 +231,8 @@ t.ok(is.array(noJobs), 'Get job returns an array') | ||
t.comment('queue: Process Job') | ||
return q.process(processHandler) | ||
return qMain.process(processHandler) | ||
}).then(() => { | ||
job = q.createJob() | ||
return q.addJob(job) | ||
job = qMain.createJob() | ||
return qMain.addJob(job) | ||
}).delay(400).then((addedJob) => { | ||
return q.getJob(addedJob[0].id) | ||
return qMain.getJob(addedJob[0].id) | ||
}).then((finishedJobs) => { | ||
@@ -449,10 +244,10 @@ t.ok(is.array(finishedJobs), 'Job is in queue') | ||
t.comment('queue: Pause') | ||
return q.pause() | ||
return qMain.pause() | ||
}).then((isPaused) => { | ||
t.ok(isPaused, 'Queue pause returns true') | ||
t.ok(q.paused, 'Queue is paused') | ||
job = q.createJob() | ||
return q.addJob(job) | ||
t.ok(qMain.paused, 'Queue is paused') | ||
job = qMain.createJob() | ||
return qMain.addJob(job) | ||
}).delay(200).then((addedJob) => { | ||
return q.getJob(addedJob[0].id) | ||
return qMain.getJob(addedJob[0].id) | ||
}).then((addedJobs) => { | ||
@@ -464,7 +259,7 @@ t.ok(is.array(addedJobs), 'Job is in queue') | ||
t.comment('queue: Resume') | ||
return q.resume() | ||
return qMain.resume() | ||
}).delay(200).then((isResumed) => { | ||
t.ok(isResumed, 'Queue resume returns true') | ||
t.notOk(q.paused, 'Queue is not paused') | ||
return q.getJob(job.id) | ||
t.notOk(qMain.paused, 'Queue is not paused') | ||
return qMain.getJob(job.id) | ||
}).then((finishedJobs2) => { | ||
@@ -476,3 +271,3 @@ t.ok(is.array(finishedJobs2), 'Job is in queue') | ||
t.comment('queue: Summary') | ||
return q.summary() | ||
return qMain.summary() | ||
}).then((summary) => { | ||
@@ -490,7 +285,7 @@ t.ok(is.object(summary), 'Queue summary returns an object') | ||
t.comment('queue: Reset') | ||
return q.reset() | ||
return qMain.reset() | ||
}).then((totalReset) => { | ||
t.ok(is.integer(totalReset), 'Queue reset returns integer') | ||
t.equal(totalReset, 2, 'Reset return value is valid') | ||
return q.summary() | ||
return qMain.summary() | ||
}).then((summary2) => { | ||
@@ -502,22 +297,21 @@ t.ok(is.object(summary2), 'Queue summary returns an object') | ||
t.comment('queue: Stop') | ||
return q.stop() | ||
return qMain.stop() | ||
}).then((stopped) => { | ||
t.ok(stopped, 'Queue stop returns true') | ||
return q.ready() | ||
return qMain.ready() | ||
}).then((ready) => { | ||
t.ok(is.false(ready), 'Queue ready returns false') | ||
removeEventHandlers() | ||
// ---------- Event Summary ---------- | ||
eventHandlers.remove(t, qMain, state) | ||
// ---------- Drop Tests ---------- | ||
t.comment('queue: Drop') | ||
q = new Queue(tOpts.cxn(), tOpts.queueNameOnly()) | ||
testEvents = true | ||
q.on(enums.status.dropped, droppedEventHandler) | ||
return q.drop() | ||
qDrop = new Queue(tOpts.cxn(), tOpts.queueNameOnly()) | ||
}).then(() => { | ||
return qDrop.drop() | ||
}).then((dropped) => { | ||
t.ok(dropped, 'Queue drop returns true') | ||
return q.ready() | ||
return qDrop.ready() | ||
}).then((ready) => { | ||
testEvents = false | ||
q.removeListener(enums.status.dropped, droppedEventHandler) | ||
t.ok(is.false(ready), 'Queue ready returns false') | ||
@@ -527,55 +321,26 @@ | ||
t.comment('queue: Multi-Queue') | ||
q = new Queue(tOpts.cxn(), tOpts.queueNameOnly()) | ||
return q.ready() | ||
}).then((ready) => { | ||
t.ok(ready, `First queue ready [${q.id}]`) | ||
addEventHandlers() | ||
q2 = new Queue(tOpts.cxn(), tOpts.queueNameOnly()) | ||
return q2.ready() | ||
}).then((ready2) => { | ||
t.ok(ready2, `Second queue ready [${q2.id}]`) | ||
q.process(processHandler) | ||
job = q2.createJob() | ||
return q2.addJob(job) | ||
}).then((jobOnQ2) => { | ||
t.equal(jobOnQ2[0].id, job.id, 'Job added to second queue') | ||
qSub = new Queue(tOpts.cxn(), tOpts.default()) | ||
return qSub.ready() | ||
}).then((subReady) => { | ||
t.ok(subReady, `Subscriber queue ready [${qSub.id}]`) | ||
qPub = new Queue(tOpts.cxn(), tOpts.default()) | ||
return qPub.ready() | ||
}).then((pubReady) => { | ||
t.ok(pubReady, `Publisher queue ready [${qPub.id}]`) | ||
qSub.process(processHandler) | ||
job = qPub.createJob() | ||
return qPub.addJob(job) | ||
}).then((jobOnQPub) => { | ||
t.equal(jobOnQPub[0].id, job.id, 'Job added to publisher queue') | ||
}).delay(1000).then(() => { | ||
return q.getJob(job.id) | ||
return qSub.getJob(job.id) | ||
}).then((jobCheck) => { | ||
t.ok(is.array(jobCheck), 'Job is in queue') | ||
t.equal(jobCheck[0].status, enums.status.completed, 'Job is completed') | ||
removeEventHandlers() | ||
return Promise.all([ | ||
q.stop(), | ||
q2.stop() | ||
]) | ||
}).then(() => { | ||
// | ||
// ---------- Event summary Test ---------- | ||
t.comment('queue: Event Summary') | ||
t.equal(readyEventCount, readyEventTotal, 'Ready event count valid') | ||
t.equal(addedEventCount, addedEventTotal, 'Added event count valid') | ||
t.equal(activeEventCount, activeEventTotal, 'Active event count valid') | ||
t.equal(processingEventCount, processingEventTotal, 'Processing event count valid') | ||
t.equal(progressEventCount, progressEventTotal, 'Progress event count valid') | ||
t.equal(completedEventCount, completedEventTotal, 'Completed event count valid') | ||
t.equal(cancelledEventCount, cancelledEventTotal, 'Cancelled event count valid') | ||
t.equal(failedEventCount, failedEventTotal, 'Failed event count valid') | ||
t.equal(terminatedEventCount, terminatedEventTotal, 'Terminated event count valid') | ||
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid') | ||
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid') | ||
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid') | ||
t.equal(removedEventCount, removedEventTotal, 'Removed event count valid') | ||
t.equal(logEventCount, logEventTotal, 'Log event count valid') | ||
t.equal(updatedEventCount, updatedEventTotal, 'Updated event count valid') | ||
t.equal(idleEventCount, idleEventTotal, 'Idle event count valid') | ||
t.equal(reviewedEventCount, reviewedEventTotal, 'Reviewed event count valid') | ||
t.equal(resetEventCount, resetEventTotal, 'Reset event count valid') | ||
t.equal(errorEventCount, errorEventTotal, 'Error event count valid') | ||
t.equal(detachedEventCount, detachedEventTotal, 'Detached event count valid') | ||
t.equal(stoppingEventCount, stoppingEventTotal, 'Stopping event count valid') | ||
t.equal(stoppedEventCount, stoppedEventTotal, 'Stopped event count valid') | ||
t.equal(droppedEventCount, droppedEventTotal, 'Dropped event count valid') | ||
return qSub.stop() | ||
}).delay(stopDelay).then(() => { | ||
t.pass('Subscriber Queue Stopped') | ||
return qPub.stop() | ||
}).delay(stopDelay).then(() => { | ||
t.pass('Publisher Queue Stopped') | ||
return resolve(t.end()) | ||
@@ -582,0 +347,0 @@ }).catch(err => tError(err, module, t)) |
@@ -1,40 +0,54 @@ | ||
const Promise = require('bluebird') | ||
const enums = require('./enums.spec') | ||
const is = require('./is.spec') | ||
const datetime = require('./datetime.spec') | ||
const dbAssertDatabase = require('./db-assert-database.spec') | ||
const dbAssertTable = require('./db-assert-table.spec') | ||
const dbAssertIndex = require('./db-assert-index.spec') | ||
const dbAssert = require('./db-assert.spec') | ||
const dbDriver = require('./db-driver.spec') | ||
const dbResult = require('./db-result.spec') | ||
const dbReview = require('./db-review.spec') | ||
const job = require('./job.spec') | ||
const jobOptions = require('./job-options.spec') | ||
const jobParse = require('./job-parse.spec') | ||
const jobProgress = require('./job-progress.spec') | ||
const jobCompleted = require('./job-completed.spec') | ||
const jobUpdate = require('./job-update.spec') | ||
const jobFailed = require('./job-failed.spec') | ||
const jobLog = require('./job-log.spec') | ||
const queue = require('./queue.spec') | ||
const queueDb = require('./queue-db.spec') | ||
const queueState = require('./queue-state.spec') | ||
const queueAddJob = require('./queue-add-job.spec') | ||
const queueGetJob = require('./queue-get-job.spec') | ||
const queueFindJob = require('./queue-find-job.spec') | ||
const queueGetNextJob = require('./queue-get-next-job.spec') | ||
const queueProcess = require('./queue-process.spec') | ||
const queueChange = require('./queue-change.spec') | ||
const queueInterruption = require('./queue-interruption.spec') | ||
const queueCancelJob = require('./queue-cancel-job.spec') | ||
const queueRemoveJob = require('./queue-remove-job.spec') | ||
const queueReset = require('./queue-reset.spec') | ||
const queueStop = require('./queue-stop.spec') | ||
const queueDrop = require('./queue-drop.spec') | ||
const queueSummary = require('./queue-summary.spec') | ||
const tests = new Map() | ||
tests.set('enums', require('./enums.spec')) | ||
tests.set('is', require('./is.spec')) | ||
tests.set('datetime', require('./datetime.spec')) | ||
tests.set('dbAssertDatabase', require('./db-assert-database.spec')) | ||
tests.set('dbAssertTable', require('./db-assert-table.spec')) | ||
tests.set('dbAssertIndex', require('./db-assert-index.spec')) | ||
tests.set('dbAssert', require('./db-assert.spec')) | ||
tests.set('dbDriver', require('./db-driver.spec')) | ||
tests.set('dbResult', require('./db-result.spec')) | ||
tests.set('dbReview', require('./db-review.spec')) | ||
tests.set('job', require('./job.spec')) | ||
tests.set('jobOptions', require('./job-options.spec')) | ||
tests.set('jobParse', require('./job-parse.spec')) | ||
tests.set('jobProgress', require('./job-progress.spec')) | ||
tests.set('jobCompleted', require('./job-completed.spec')) | ||
tests.set('jobUpdate', require('./job-update.spec')) | ||
tests.set('jobFailed', require('./job-failed.spec')) | ||
tests.set('jobLog', require('./job-log.spec')) | ||
tests.set('queue', require('./queue.spec')) | ||
tests.set('queueDb', require('./queue-db.spec')) | ||
tests.set('queueState', require('./queue-state.spec')) | ||
tests.set('queueAddJob', require('./queue-add-job.spec')) | ||
tests.set('queueGetJob', require('./queue-get-job.spec')) | ||
tests.set('queueFindJob', require('./queue-find-job.spec')) | ||
tests.set('queueGetNextJob', require('./queue-get-next-job.spec')) | ||
tests.set('queueReanimateJob', require('./queue-reanimate-job.spec')) | ||
tests.set('queueProcess', require('./queue-process.spec')) | ||
tests.set('queueChange', require('./queue-change.spec')) | ||
tests.set('queueInterruption', require('./queue-interruption.spec')) | ||
tests.set('queueCancelJob', require('./queue-cancel-job.spec')) | ||
tests.set('queueRemoveJob', require('./queue-remove-job.spec')) | ||
tests.set('queueReset', require('./queue-reset.spec')) | ||
tests.set('queueStop', require('./queue-stop.spec')) | ||
tests.set('queueDrop', require('./queue-drop.spec')) | ||
tests.set('queueSummary', require('./queue-summary.spec')) | ||
return dbAssert().then(() => { | ||
}).then(() => { | ||
return queueChange() | ||
}) | ||
if (tests.has(process.argv[2])) { | ||
tests.get('dbAssert')().then(() => { | ||
return tests.get(process.argv[2])() | ||
}) | ||
} else { | ||
const line = '==============================================' | ||
console.log('\x1b[32m', line, '\x1b[0m') | ||
console.log('\x1b[36m', ' INVALID TEST NAME!', '\x1b[0m') | ||
console.log('\x1b[36m', ' Use one of the test names below:', '\x1b[0m') | ||
console.log('\x1b[32m', line, '\x1b[0m') | ||
for (let test of tests.keys()) { | ||
console.log('\x1b[34m', ` ${test}`, '\x1b[0m') | ||
} | ||
console.log('\x1b[32m', line, '\x1b[0m') | ||
console.log('\x1b[36m', ' Example: npm run tc jobOptions', '\x1b[0m') | ||
console.log('\x1b[32m', line, '\x1b[0m') | ||
} |
@@ -16,6 +16,6 @@ const is = require('../src/is') | ||
function readyEventHandler () { | ||
function readyEventHandler (qid) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.ready) | ||
t.pass(`Event: ready [${total} of ${state.ready}]`) | ||
t.ok(is.string(qid), `Event: ready [${total} of ${state.ready}]`) | ||
} | ||
@@ -25,6 +25,6 @@ } | ||
function processingEventHandler (jobId) { | ||
function processingEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.processing) | ||
t.ok(is.uuid(jobId), | ||
t.ok(is.string(qid) && is.uuid(jobId), | ||
`Event: processing [${total} of ${state.processing}] [${jobId}]`) | ||
@@ -35,6 +35,7 @@ } | ||
function progressEventHandler (jobId, percent) { | ||
function progressEventHandler (qid, jobId, percent) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.progress) | ||
t.ok(is.uuid(jobId), | ||
let percentTest = is.integer(percent) && percent >= 0 && percent <= 100 | ||
t.ok(is.string(qid) && is.uuid(jobId) && percentTest, | ||
`Event: progress ${percent} [${total} of ${state.progress}] [${jobId}]`) | ||
@@ -48,3 +49,3 @@ } | ||
let total = incCount(enums.status.pausing) | ||
t.pass(`Event: pausing [${total} of ${state.pausing}] [${qid}]`) | ||
t.ok(is.string(qid), `Event: pausing [${total} of ${state.pausing}] [${qid}]`) | ||
} | ||
@@ -57,3 +58,3 @@ } | ||
let total = incCount(enums.status.paused) | ||
t.pass(`Event: paused [${total} of ${state.paused}] [${qid}]`) | ||
t.ok(is.string(qid), `Event: paused [${total} of ${state.paused}] [${qid}]`) | ||
} | ||
@@ -66,3 +67,3 @@ } | ||
let total = incCount(enums.status.resumed) | ||
t.pass(`Event: resumed [${total} of ${state.resumed}] [${qid}]`) | ||
t.ok(is.string(qid), `Event: resumed [${total} of ${state.resumed}] [${qid}]`) | ||
} | ||
@@ -72,6 +73,6 @@ } | ||
function removedEventHandler (qid) { | ||
function removedEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.removed) | ||
t.pass(`Event: removed [${total} of ${state.removed}] [${qid}]`) | ||
t.ok(is.string(qid) && is.uuid(jobId), `Event: removed [${total} of ${state.removed}] [${qid}]`) | ||
} | ||
@@ -81,14 +82,7 @@ } | ||
function idleEventHandler (qid) { | ||
function resetEventHandler (qid, totalRemoved) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.idle) | ||
t.pass(`Event: idle [${total}] [${qid}]`) | ||
} | ||
} | ||
state.handler.set(enums.status.idle, idleEventHandler) | ||
function resetEventHandler (qid) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.reset) | ||
t.pass(`Event: reset [${total} of ${state.reset}] [${qid}]`) | ||
t.ok(is.string(qid) && is.integer(totalRemoved) && totalRemoved >= 0, | ||
`Event: reset [${total} of ${state.reset}] [${qid}]`) | ||
} | ||
@@ -98,6 +92,17 @@ } | ||
function reviewedEventHandler (replaceCount) { | ||
function reviewedEventHandler (qid, reviewResult) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.reviewed) | ||
t.pass(`Event: reviewed [${total} of ${state.reviewed}] [${replaceCount}]`) | ||
let result | ||
if (is.object(reviewResult) && reviewResult.local) { | ||
result = is.integer(reviewResult.reviewed) && | ||
is.integer(reviewResult.removed) && | ||
reviewResult.reviewed >= 0 && | ||
reviewResult.removed >= 0 | ||
} else { | ||
// global events will contain null in reviewed and removed. | ||
result = is.object(reviewResult) && !reviewResult.local | ||
} | ||
t.ok(result, | ||
`Event: reviewed [${total} of ${state.reviewed}] [${reviewResult}]`) | ||
} | ||
@@ -110,3 +115,4 @@ } | ||
let total = incCount(enums.status.detached) | ||
t.pass(`Event: detached [${total} of ${state.detached}] [${qid}]`) | ||
t.ok(is.string(qid), | ||
`Event: detached [${total} of ${state.detached}] [${qid}]`) | ||
} | ||
@@ -119,3 +125,4 @@ } | ||
let total = incCount(enums.status.stopping) | ||
t.pass(`Event: stopping [${total} of ${state.stopping}] [${qid}]`) | ||
t.ok(is.string(qid), | ||
`Event: stopping [${total} of ${state.stopping}] [${qid}]`) | ||
} | ||
@@ -128,3 +135,4 @@ } | ||
let total = incCount(enums.status.stopped) | ||
t.pass(`Event: stopped [${total} of ${state.stopped}] [${qid}]`) | ||
t.ok(is.string(qid), | ||
`Event: stopped [${total} of ${state.stopped}] [${qid}]`) | ||
} | ||
@@ -137,3 +145,4 @@ } | ||
let total = incCount(enums.status.dropped) | ||
t.pass(`Event: dropped [${total} of ${state.dropped}] [${qid}]`) | ||
t.ok(is.string(qid), | ||
`Event: dropped [${total} of ${state.dropped}] [${qid}]`) | ||
} | ||
@@ -143,6 +152,7 @@ } | ||
function addedEventHandler (jobId) { | ||
function addedEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.added) | ||
t.pass(`Event: added [${total} of ${state.added}] [${jobId}]`) | ||
t.ok(is.string(qid) && is.uuid(jobId), | ||
`Event: added [${total} of ${state.added}] [${jobId}]`) | ||
} | ||
@@ -152,6 +162,7 @@ } | ||
function activeEventHandler (jobId) { | ||
function activeEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.active) | ||
t.pass(`Event: active [${total} of ${state.active}] [${jobId}]`) | ||
t.ok(is.string(qid) && is.uuid(jobId), | ||
`Event: active [${total} of ${state.active}] [${jobId}]`) | ||
} | ||
@@ -161,7 +172,7 @@ } | ||
function completedEventHandler (jobId) { | ||
function completedEventHandler (qid, jobId, isRepeating) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.completed) | ||
t.ok(is.uuid(jobId), | ||
`Event: completed [${total} of ${state.completed}] [${jobId}]`) | ||
t.ok(is.string(qid) && is.uuid(jobId) && is.boolean(isRepeating), | ||
`Event: completed [${total} of ${state.completed}] [${jobId}] isRepeating: [${isRepeating}]`) | ||
} | ||
@@ -171,6 +182,6 @@ } | ||
function cancelledEventHandler (jobId) { | ||
function cancelledEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.cancelled) | ||
t.ok(is.uuid(jobId), | ||
t.ok(is.string(qid) && is.uuid(jobId), | ||
`Event: cancelled [${total} of ${state.cancelled}] [${jobId}]`) | ||
@@ -181,6 +192,6 @@ } | ||
function failedEventHandler (jobId) { | ||
function failedEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.failed) | ||
t.ok(is.uuid(jobId), | ||
t.ok(is.string(qid) && is.uuid(jobId), | ||
`Event: failed [${total} of ${state.failed}] [${jobId}]`) | ||
@@ -191,6 +202,6 @@ } | ||
function terminatedEventHandler (jobId) { | ||
function terminatedEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.terminated) | ||
t.ok(is.uuid(jobId), | ||
t.ok(is.string(qid) && is.uuid(jobId), | ||
`Event: terminated [${total} of ${state.terminated}] [${jobId}]`) | ||
@@ -201,6 +212,15 @@ } | ||
function logEventHandler (jobId) { | ||
function reanimatedEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.reanimated) | ||
t.ok(is.string(qid) && is.uuid(jobId), | ||
`Event: reanimated [${total} of ${state.reanimated}] [${jobId}]`) | ||
} | ||
} | ||
state.handler.set(enums.status.reanimated, reanimatedEventHandler) | ||
function logEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.log) | ||
t.ok(is.uuid(jobId), | ||
t.ok(is.string(qid) && is.uuid(jobId), | ||
`Event: log [${total} of ${state.log}] [${jobId}]`) | ||
@@ -211,6 +231,6 @@ } | ||
function updatedEventHandler (jobId) { | ||
function updatedEventHandler (qid, jobId) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.updated) | ||
t.ok(is.uuid(jobId), | ||
t.ok(is.string(qid) && is.uuid(jobId), | ||
`Event: updated [${total} of ${state.updated}] [${jobId}]`) | ||
@@ -221,2 +241,11 @@ } | ||
function errorEventHandler (qid, err) { | ||
if (state.enabled) { | ||
let total = incCount(enums.status.error) | ||
t.ok(is.string(qid) && is.error(err), | ||
`Event: error [${total} of ${state.error}] Error: [${err.message}]`) | ||
} | ||
} | ||
state.handler.set(enums.status.error, errorEventHandler) | ||
state.enabled = true | ||
@@ -226,2 +255,3 @@ state.handler.forEach((value, key) => { | ||
}) | ||
t.comment(state.testName + ': Event Handlers Added') | ||
} | ||
@@ -231,2 +261,4 @@ | ||
state.enabled = false | ||
t.comment(state.testName + ': Event Summary') | ||
state.handler.forEach((fn, status) => { | ||
@@ -236,2 +268,3 @@ t.equal(state.count.get(status), state[status], `Total ${status} events: [${state[status]}]`) | ||
}) | ||
t.comment(state.testName + ': Event Handlers Removed') | ||
} |
@@ -27,2 +27,3 @@ const Promise = require('bluebird') | ||
const queueGetNextJob = require('./queue-get-next-job.spec') | ||
const queueReanimateJob = require('./queue-reanimate-job.spec') | ||
const queueProcess = require('./queue-process.spec') | ||
@@ -38,3 +39,3 @@ const queueChange = require('./queue-change.spec') | ||
return dbAssertDatabase().then(() => { | ||
dbAssertDatabase().then(() => { | ||
}).then(() => { | ||
@@ -47,2 +48,4 @@ return dbAssertTable() | ||
}).then(() => { | ||
return queueReset() | ||
}).then(() => { | ||
return Promise.all([ | ||
@@ -78,2 +81,4 @@ dbDriver(), | ||
}).then(() => { | ||
return queueReanimateJob() | ||
}).then(() => { | ||
return queueCancelJob() | ||
@@ -87,8 +92,4 @@ }).then(() => { | ||
}).then(() => { | ||
// This test has bee removed for now due to this issue on Tape | ||
// https://github.com/substack/tape/issues/223 | ||
// To test queueChange, use the test-current.js file by running | ||
// npm run tc | ||
// return queueChange() | ||
// }).then(() => { | ||
return queueChange() | ||
}).then(() => { | ||
return queue() | ||
@@ -95,0 +96,0 @@ }).then(() => { |
@@ -1,51 +0,90 @@ | ||
const test = require('tape') | ||
const Promise = require('bluebird') | ||
// const test = require('tap').test | ||
// const Promise = require('bluebird') | ||
// const datetime = require('../src/datetime') | ||
const enums = require('../src/enums') | ||
// const is = require('../src/is') | ||
const tError = require('./test-error') | ||
const tData = require('./test-options').tData | ||
const tOpts = require('./test-options') | ||
const Queue = require('../src/queue') | ||
module.exports = function () { | ||
return new Promise((resolve, reject) => { | ||
test('XXXXXXXX', (t) => { | ||
t.plan(2) | ||
const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
const job = q.createJob() | ||
job.data = tData | ||
// ---------- Event Handler Setup ---------- | ||
let testEvents = false | ||
function completedEventHandler (jobId) { | ||
if (testEvents) { | ||
t.equal(jobId, job.id, `Event: Job completed [${jobId}]`) | ||
} | ||
} | ||
function addEventHandlers () { | ||
testEvents = true | ||
q.on(enums.status.completed, completedEventHandler) | ||
} | ||
function removeEventHandlers () { | ||
testEvents = false | ||
q.removeListener(enums.status.completed, completedEventHandler) | ||
} | ||
q.addJob(job).then((savedJob) => { | ||
t.equal(savedJob[0].id, job.id, 'Job saved successfully') | ||
return q.reset() | ||
}).then((resetResult) => { | ||
t.ok(resetResult >= 0, 'Queue reset') | ||
addEventHandlers() | ||
// ---------- First Test ---------- | ||
removeEventHandlers() | ||
q.stop() | ||
return resolve(t.end()) | ||
}).catch(err => tError(err, module, t)) | ||
}) | ||
}) | ||
} | ||
// const enums = require('../src/enums') | ||
// const tError = require('./test-error') | ||
// const tOpts = require('./test-options') | ||
// const tData = require('./test-options').tData | ||
// const queueProcess = require('../src/queue-process') | ||
// const dbReview = require('../src/db-review') | ||
// const Queue = require('../src/queue') | ||
// const eventHandlers = require('./test-event-handlers') | ||
// const testName = 'XXXXXXXXXXXX' | ||
// | ||
// module.exports = function () { | ||
// return new Promise((resolve, reject) => { | ||
// test(testName, (t) => { | ||
// t.plan(1000) | ||
// | ||
// // ---------- Test Setup ---------- | ||
// const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
// | ||
// let jobs | ||
// let jobDelay = 200 | ||
// const noOfJobsToCreate = 10 | ||
// | ||
// // ---------- Event Handler Setup ---------- | ||
// let state = { | ||
// testName, | ||
// enabled: false, | ||
// ready: 0, | ||
// processing: 0, | ||
// progress: 0, | ||
// pausing: 0, | ||
// paused: 0, | ||
// resumed: 0, | ||
// removed: 0, | ||
// reset: 0, | ||
// error: 0, | ||
// reviewed: 0, | ||
// detached: 0, | ||
// stopping: 0, | ||
// stopped: 0, | ||
// dropped: 0, | ||
// added: 0, | ||
// waiting: 0, | ||
// active: 0, | ||
// completed: 0, | ||
// cancelled: 0, | ||
// failed: 0, | ||
// terminated: 0, | ||
// reanimated: 0, | ||
// log: 0, | ||
// updated: 0 | ||
// } | ||
// | ||
// // ---------- Test Setup ---------- | ||
// jobs = [] | ||
// for (let i = 0; i < noOfJobsToCreate; i++) { | ||
// jobs.push(q.createJob()) | ||
// } | ||
// return q.reset().then((resetResult) => { | ||
// t.ok(is.integer(resetResult), 'Queue reset') | ||
// return q.pause() | ||
// }).then(() => { | ||
// eventHandlers.add(t, q, state) | ||
// | ||
// // ---------- Processing, Pause, and Concurrency Test ---------- | ||
// t.comment('queue-process: Process, Pause, and Concurrency') | ||
// return q.addJob(jobs) | ||
// }).then((savedJobs) => { | ||
// t.equal(savedJobs.length, noOfJobsToCreate, `Jobs saved successfully: [${savedJobs.length}]`) | ||
// | ||
// // ---------- Queue Summary ---------- | ||
// t.comment('queue-process: Queue Summary') | ||
// return q.summary() | ||
// }).then((queueSummary) => { | ||
// | ||
// // ---------- Event Summary ---------- | ||
// eventHandlers.remove(t, q, state) | ||
// | ||
// return q.reset() | ||
// }).then((resetResult) => { | ||
// t.ok(resetResult >= 0, 'Queue reset') | ||
// q.stop() | ||
// return resolve(t.end()) | ||
// }).catch(err => tError(err, module, t)) | ||
// }) | ||
// }) | ||
// } |
# WORKLOG | ||
## Working On | ||
## TODO | ||
* global queue reset, reviewed, error event? | ||
* Add failedCount? | ||
* Add global rate limiter? |
544442
95
7633
8
252
+ Addeduuid@3.4.0(transitive)
- Removeduuid@2.0.3(transitive)
Updateddebug@^2.3.2
Updatedrethinkdbdash@^2.3.26
Updateduuid@^3.0.0