New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

rethinkdb-job-queue

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rethinkdb-job-queue - npm Package Compare versions

Comparing version 1.1.2 to 2.0.0

.istanbul.yml

34

dist/datetime.js

@@ -5,30 +5,42 @@ 'use strict';

// Logging has been removed due to circular dependencies.
var enums = require('./enums');
function isDate(value) {
// logger(`isDate`, value)
return value instanceof Date || Object.prototype.toString.call(value) === '[object Date]';
}
function isInteger(value) {
return Object.prototype.toString.call(value) === '[object Number]' && !Number.isNaN(value) && value % 1 === 0;
}
function addMs(dateObject, value) {
var multiplier = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : 0;
if (isInteger(dateObject)) {
value = dateObject;
dateObject = new Date();
}
if (isDate(dateObject) && isInteger(value)) {
return new Date(dateObject.getTime() + value * multiplier);
}
throw new Error(enums.message.datetimeInvalid);
}
function addMilliseconds(dateObject, ms) {
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject);
return new Date(dateObject.getTime() + ms);
return addMs(dateObject, ms, 1);
}
function addSeconds(dateObject, sec) {
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject);
return addMilliseconds(dateObject, sec * 1000);
return addMs(dateObject, sec, 1000);
}
function addMinutes(dateObject, min) {
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject);
return addMilliseconds(dateObject, min * 60000);
return addMs(dateObject, min, 60000);
}
function addHours(dateObject, hours) {
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject);
return addMilliseconds(dateObject, hours * 3600000);
return addMs(dateObject, hours, 3600000);
}
function addDays(dateObject, days) {
dateObject = isDate(dateObject) ? dateObject : new Date(dateObject);
return addMilliseconds(dateObject, days * 86400000);
return addMs(dateObject, days, 86400000);
}

@@ -35,0 +47,0 @@

@@ -52,5 +52,18 @@ 'use strict';

function createIndexStatus(q) {
logger('createIndexStatus');
var indexName = enums.index.indexStatus;
return Promise.resolve().then(function () {
return q.r.db(q.db).table(q.name).indexList().contains(indexName).run();
}).then(function (exists) {
if (exists) {
return exists;
}
return q.r.db(q.db).table(q.name).indexCreate(indexName).run();
});
}
module.exports = function assertIndex(q) {
logger('assertIndex');
return Promise.all([createIndexActiveDateEnable(q), createIndexInactivePriorityDateCreated(q), createIndexFinishedDateFinished(q)]).then(function (indexCreateResult) {
return Promise.all([createIndexActiveDateEnable(q), createIndexInactivePriorityDateCreated(q), createIndexFinishedDateFinished(q), createIndexStatus(q)]).then(function (indexCreateResult) {
logger('Waiting for index...');

@@ -57,0 +70,0 @@ return q.r.db(q.db).table(q.name).indexWait().run();

'use strict';
var logger = require('./logger')(module);
var Promise = require('bluebird');
var dbAssertDatabase = require('./db-assert-database');

@@ -10,3 +11,15 @@ var dbAssertTable = require('./db-assert-table');

logger('dbAssert');
return dbAssertDatabase(q).then(function () {
// The delay algorithm below is to prevent multiple Queue objects
// attempting to create the database/table/indexes at the same time.
// Before the delay was introduced it was possible to end up with two
// databases in RethinkDB with the same name.
var randomDelay = Math.floor(Math.random() * 1000);
if (!q.master) {
randomDelay += q._databaseInitDelay;
}
return Promise.delay(randomDelay).then(function () {
return dbAssertDatabase(q);
}).then(function () {
return dbAssertTable(q);

@@ -13,0 +26,0 @@ }).then(function () {

@@ -27,2 +27,3 @@ 'use strict';

retryCount: q.r.row('retryCount'),
processCount: q.r.row('processCount'),
message: 'Master: ' + enums.message.failed,

@@ -75,3 +76,3 @@ dateEnable: q.r.row('dateEnable')

logger('Event: reviewed', runReviewTasksResult);
q.emit(enums.status.reviewed, runReviewTasksResult);
q.emit(enums.status.reviewed, q.id, runReviewTasksResult);
queueProcess.restart(q);

@@ -78,0 +79,0 @@ return Promise.props({

@@ -43,2 +43,3 @@ 'use strict';

terminated: 'terminated',
reanimated: 'reanimated',
log: 'log', // Event only, not a job status

@@ -52,2 +53,3 @@ updated: 'updated' // Event only, not a job status

db: 'rjqJobQueue',
databaseInitDelay: 1000,
masterInterval: 310000, // 5 minutes and 10 seconds

@@ -58,2 +60,4 @@ priority: 'normal',

retryDelay: 600000, // 10 minutes
repeat: false,
repeatDelay: 300000, // 5 minutes
concurrency: 1,

@@ -65,3 +69,4 @@ removeFinishedJobs: 15552000000 // 180 days

indexInactivePriorityDateCreated: 'indexInactivePriorityDateCreated',
indexFinishedDateFinished: 'indexFinishedDateFinished'
indexFinishedDateFinished: 'indexFinishedDateFinished',
indexStatus: 'status'
});

@@ -91,6 +96,7 @@ var dbResult = Object.freeze({

jobProgress: 'Job progress updated. Old value in log data',
jobNotActive: 'Job is not at an active status',
jobNotAdded: 'Job not added to the queue',
jobAlreadyAdded: 'Job is already in the queue',
jobDataInvalid: 'Job data can not be a function',
jobInvalid: 'Job object is invalid',
jobReanimated: 'Job has been reanimated',
processTwice: 'Cannot call queue process twice',

@@ -102,2 +108,4 @@ idInvalid: 'The job id is invalid',

retryDelayIvalid: 'The job retryDelay value is invalid',
repeatInvalid: 'The job repeat value is invalid',
repeatDelayInvalid: 'The job repeatDelay value is invalid',
dateEnableIvalid: 'The job dateEnable value is invalid',

@@ -108,2 +116,3 @@ dbError: 'RethinkDB returned an error',

globalStateError: 'The global state document change feed is invalid',
datetimeInvalid: 'Invalid datetime arguments',
noErrorStack: 'The error has no stack detail',

@@ -110,0 +119,0 @@ noErrorMessage: 'The error has no message'

@@ -57,3 +57,3 @@ 'use strict';

logger('ensureDate', value);
return isDate(value) ? value : new Date(value);
return isDate(value) ? value : new Date();
}

@@ -111,2 +111,8 @@

function isError(value) {
logger('isError', value);
return value instanceof Error;
}
module.exports.error = isError;
function isLog(value) {

@@ -171,2 +177,13 @@ logger('isLog', value);

module.exports.repeating = function isRepeating(job) {
logger('isRepeating', job);
if (isTrue(job.repeat)) {
return true;
}
if (isInteger(job.repeat) && job.repeat > 0 && job.processCount <= job.repeat) {
return true;
}
return false;
};
module.exports.active = function isActive(job) {

@@ -173,0 +190,0 @@ logger('isActive', job);

@@ -12,9 +12,10 @@ 'use strict';

logger('completed: [' + job.id + ']', result);
job.status = enums.status.completed;
var isRepeating = is.repeating(job);
job.status = isRepeating ? enums.status.waiting : enums.status.completed;
job.dateFinished = new Date();
job.progress = 100;
job.progress = isRepeating ? 0 : 100;
var duration = job.dateFinished - job.dateStarted;
duration = duration >= 0 ? duration : 0;
var log = jobLog.createLogObject(job, result, enums.message.completed);
var log = jobLog.createLogObject(job, result, enums.status.completed);
log.duration = duration;

@@ -25,2 +26,3 @@

status: job.status,
dateEnable: job.q.r.branch(isRepeating, job.q.r.now().add(job.q.r.row('repeatDelay').div(1000)), job.q.r.row('dateEnable')),
dateFinished: job.dateFinished,

@@ -35,5 +37,5 @@ progress: job.progress,

}).then(function (jobIds) {
logger('Event: completed [' + jobIds[0] + ']');
job.q.emit(enums.status.completed, jobIds[0]);
if (is.true(job.q.removeFinishedJobs)) {
logger('Event: completed', jobIds[0], isRepeating);
job.q.emit(enums.status.completed, job.q.id, jobIds[0], isRepeating);
if (!isRepeating && is.true(job.q.removeFinishedJobs)) {
return job.q.removeJob(job).then(function (deleteResult) {

@@ -40,0 +42,0 @@ return jobIds;

@@ -6,2 +6,3 @@ 'use strict';

var is = require('./is');
var datetime = require('./datetime');
var enums = require('./enums');

@@ -18,9 +19,20 @@ var jobLog = require('./job-log');

var isRetry = job.retryCount < job.retryMax;
var isRepeating = is.repeating(job);
var dateEnable = new Date();
job.status = enums.status.terminated;
if (isRetry) {
job.status = enums.status.failed;
dateEnable = datetime.add.ms(job.retryDelay * job.retryCount);
job.retryCount++;
logType = enums.log.warning;
} else {
job.status = enums.status.terminated;
}
if (!isRetry && isRepeating) {
job.status = enums.status.waiting;
dateEnable = datetime.add.ms(job.repeatDelay);
job.retryCount = 0;
}
job.dateFinished = new Date();

@@ -44,3 +56,3 @@ job.progress = 0;

dateFinished: job.dateFinished,
dateEnable: job.q.r.now().add(job.q.r.row('retryDelay').div(1000).mul(job.q.r.row('retryCount'))),
dateEnable: dateEnable,
log: job.q.r.row('log').append(log),

@@ -53,10 +65,10 @@ queueId: job.q.id

}).then(function (jobIds) {
if (isRetry) {
logger('Event: failed [' + jobIds[0] + ']');
job.q.emit(enums.status.failed, jobIds[0]);
if (isRetry || isRepeating) {
logger('Event: failed', job.q.id, jobIds[0]);
job.q.emit(enums.status.failed, job.q.id, jobIds[0]);
} else {
logger('Event: terminated [' + jobIds[0] + ']');
job.q.emit(enums.status.terminated, jobIds[0]);
logger('Event: terminated', job.q.id, jobIds[0]);
job.q.emit(enums.status.terminated, job.q.id, jobIds[0]);
}
if (!isRetry && is.true(job.q.removeFinishedJobs)) {
if (!isRetry && !isRepeating && is.true(job.q.removeFinishedJobs)) {
return job.q.removeJob(job).then(function (deleteResult) {

@@ -63,0 +75,0 @@ return jobIds;

@@ -21,3 +21,4 @@ 'use strict';

status: status,
retryCount: job.retryCount
retryCount: job.retryCount,
processCount: job.processCount
};

@@ -47,4 +48,4 @@ }

job.log.push(newLog);
logger('Event: log [' + job.id + ']', updateResult);
job.q.emit(enums.status.log, job.id);
logger('Event: log', job.q.id, job.id);
job.q.emit(enums.status.log, job.q.id, job.id);
return true;

@@ -51,0 +52,0 @@ });

@@ -18,2 +18,4 @@ 'use strict';

finalOptions.retryDelay = enums.options.retryDelay;
finalOptions.repeat = enums.options.repeat;
finalOptions.repeatDelay = enums.options.repeatDelay;

@@ -36,2 +38,10 @@ if (Object.keys(enums.priority).includes(oldOptions.priority)) {

if (is.true(oldOptions.repeat) || is.false(oldOptions.repeat) || is.integer(oldOptions.repeat) && oldOptions.repeat >= 0) {
finalOptions.repeat = oldOptions.repeat;
}
if (is.integer(oldOptions.repeatDelay) && oldOptions.repeatDelay >= 0) {
finalOptions.repeatDelay = oldOptions.repeatDelay;
}
if (Object.keys(enums.priority).includes(newOptions.priority)) {

@@ -53,3 +63,11 @@ finalOptions.priority = newOptions.priority;

if (is.true(newOptions.repeat) || is.false(newOptions.repeat) || is.integer(newOptions.repeat) && newOptions.repeat >= 0) {
finalOptions.repeat = newOptions.repeat;
}
if (is.integer(newOptions.repeatDelay) && newOptions.repeatDelay >= 0) {
finalOptions.repeatDelay = newOptions.repeatDelay;
}
return finalOptions;
};

@@ -8,2 +8,3 @@ 'use strict';

var jobLog = require('./job-log');
var dbResult = require('./db-result');

@@ -14,3 +15,3 @@ module.exports = function jobProgress(job, percent) {

logger('Error: progress called on non-active job', job);
return Promise.resolve(false);
return Promise.reject(new Error(enums.message.jobNotActive));
}

@@ -34,8 +35,10 @@ if (!percent || !is.number(percent) || percent < 0) {

log: job.q.r.row('log').append(newLog)
}).run();
}, { returnChanges: true }).run();
}).then(function (updateResult) {
logger('Event: progress [' + job.id + '] [' + percent + ']');
job.q.emit(enums.status.progress, job.id, percent);
return true;
return dbResult.toJob(job.q, updateResult);
}).then(function (updateResult) {
logger('Event: progress', job.q.id, job.id, percent);
job.q.emit(enums.status.progress, job.q.id, job.id, percent);
return updateResult[0];
});
};

@@ -24,6 +24,6 @@ 'use strict';

logger('updateResult', updateResult);
logger('Event: updated [' + job.id + ']');
job.q.emit(enums.status.updated, job.id);
return true;
logger('Event: updated', job.q.id, job.id);
job.q.emit(enums.status.updated, job.q.id, job.id);
return job;
});
};

@@ -40,2 +40,5 @@ 'use strict';

this.retryCount = 0;
this.repeat = options.repeat;
this.repeatDelay = options.repeatDelay;
this.processCount = 0;
this.progress = 0;

@@ -98,2 +101,20 @@ this.status = enums.status.created;

}, {
key: 'setRepeat',
value: function setRepeat(newRepeat) {
if (is.boolean(newRepeat) || is.integer(newRepeat) && newRepeat >= 0) {
this.repeat = newRepeat;
return this;
}
throw new Error(enums.message.repeatInvalid);
}
}, {
key: 'setRepeatDelay',
value: function setRepeatDelay(newRepeatDelay) {
if (is.integer(newRepeatDelay) && newRepeatDelay >= 0) {
this.repeatDelay = newRepeatDelay;
return this;
}
throw new Error(enums.message.repeatDelayInvalid);
}
}, {
key: 'setDateEnable',

@@ -108,12 +129,12 @@ value: function setDateEnable(newDateEnable) {

}, {
key: 'setProgress',
value: function setProgress(percent) {
key: 'updateProgress',
value: function updateProgress(percent) {
var _this = this;
logger('setProgress [' + percent + ']');
logger('updateProgress [' + percent + ']');
return this.q.ready().then(function () {
return jobProgress(_this, percent);
}).catch(function (err) {
logger('setProgress Error:', err);
_this.q.emit(enums.status.error, err);
logger('Event: updateProgress error', err, _this.q.id);
_this.q.emit(enums.status.error, _this.q.id, err);
return Promise.reject(err);

@@ -131,4 +152,4 @@ });

}).catch(function (err) {
logger('update Error:', err);
_this2.q.emit(enums.status.error, err);
logger('Event: update error', err, _this2.q.id);
_this2.q.emit(enums.status.error, _this2.q.id, err);
return Promise.reject(err);

@@ -163,4 +184,4 @@ });

}).catch(function (err) {
logger('addLog Error:', err);
_this3.q.emit(enums.status.error, err);
logger('Event: addLog error', err, _this3.q.id);
_this3.q.emit(enums.status.error, _this3.q.id, err);
return Promise.reject(err);

@@ -167,0 +188,0 @@ });

@@ -11,3 +11,3 @@ 'use strict';

module.exports = function queueAddJob(q, job, skipStatusCheck) {
module.exports = function queueAddJob(q, job) {
logger('addJob', job);

@@ -17,6 +17,3 @@ return Promise.resolve().then(function () {

}).map(function (oneJob) {
if (!skipStatusCheck && oneJob.status !== enums.status.created) {
return Promise.reject(new Error(enums.message.jobAlreadyAdded));
}
if (!skipStatusCheck) {
if (oneJob.status === enums.status.created) {
oneJob.status = enums.status.waiting;

@@ -41,6 +38,6 @@ }

for (var _iterator = savedJobs[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) {
var _job = _step.value;
var savedjob = _step.value;
logger('Event: added [' + _job.id + ']');
q.emit(enums.status.added, _job.id);
logger('Event: added [' + savedjob.id + ']');
q.emit(enums.status.added, q.id, savedjob.id);
}

@@ -47,0 +44,0 @@ } catch (err) {

@@ -29,2 +29,3 @@ 'use strict';

retryCount: q.r.row('retryCount'),
processCount: q.r.row('processCount'),
message: reason

@@ -39,4 +40,4 @@ }),

jobIds.forEach(function (jobId) {
logger('Event: cancelled [' + jobId + ']');
q.emit(enums.status.cancelled, jobId);
logger('Event: cancelled', q.id, jobId);
q.emit(enums.status.cancelled, q.id, jobId);
});

@@ -43,0 +44,0 @@ if (is.true(q.removeFinishedJobs)) {

@@ -85,3 +85,3 @@ 'use strict';

logger('Event: reviewed [local: false]');
q.emit(enums.status.reviewed, {
q.emit(enums.status.reviewed, newVal.queueId, {
local: false,

@@ -97,3 +97,5 @@ reviewed: null,

}
q.emit(enums.status.error, new Error(enums.message.globalStateError));
var _err = new Error(enums.message.globalStateError);
logger('Event: State document change error', _err, q.id);
q.emit(enums.status.error, q.id, _err);
return enums.status.error;

@@ -105,3 +107,3 @@ }

logger('Event: added [' + newVal.id + ']');
q.emit(enums.status.added, newVal.id);
q.emit(enums.status.added, newVal.queueId, newVal.id);
restartProcessing(q);

@@ -114,3 +116,3 @@ return enums.status.added;

logger('Event: active [' + newVal.id + ']');
q.emit(enums.status.active, newVal.id);
q.emit(enums.status.active, newVal.queueId, newVal.id);
return enums.status.active;

@@ -122,3 +124,3 @@ }

logger('Event: progress [' + newVal.progress + ']');
q.emit(enums.status.progress, newVal.id, newVal.progress);
q.emit(enums.status.progress, newVal.queueId, newVal.id, newVal.progress);
return enums.status.progress;

@@ -129,4 +131,5 @@ }

if (is.completed(newVal) && !is.completed(oldVal)) {
logger('Event: completed [' + newVal.id + ']');
q.emit(enums.status.completed, newVal.id);
var isRepeating = is.repeating(newVal);
logger('Event: completed', newVal.queueId, newVal.id, isRepeating);
q.emit(enums.status.completed, newVal.queueId, newVal.id, isRepeating);
return enums.status.completed;

@@ -137,4 +140,4 @@ }

if (is.cancelled(newVal) && !is.cancelled(oldVal)) {
logger('Event: cancelled [' + newVal.id + ']');
q.emit(enums.status.cancelled, newVal.id);
logger('Event: cancelled', newVal.queueId, newVal.id);
q.emit(enums.status.cancelled, newVal.queueId, newVal.id);
return enums.status.cancelled;

@@ -145,4 +148,4 @@ }

if (is.failed(newVal) && !is.failed(oldVal)) {
logger('Event: failed [' + newVal.id + ']');
q.emit(enums.status.failed, newVal.id);
logger('Event: failed', newVal.queueId, newVal.id);
q.emit(enums.status.failed, newVal.queueId, newVal.id);
return enums.status.failed;

@@ -153,4 +156,4 @@ }

if (is.terminated(newVal) && !is.terminated(oldVal)) {
logger('Event: terminated [' + newVal.id + ']');
q.emit(enums.status.terminated, newVal.id);
logger('Event: terminated', newVal.queueId, newVal.id);
q.emit(enums.status.terminated, newVal.queueId, newVal.id);
return enums.status.terminated;

@@ -161,4 +164,4 @@ }

if (!is.job(newVal) && is.job(oldVal)) {
logger('Event: removed [' + oldVal.id + ']');
q.emit(enums.status.removed, oldVal.id);
logger('Event: removed', oldVal.id);
q.emit(enums.status.removed, null, oldVal.id);
return enums.status.removed;

@@ -169,4 +172,4 @@ }

if (is.job(newVal) && is.job(oldVal) && is.array(newVal.log) && is.array(oldVal.log) && newVal.log.length > oldVal.log.length) {
logger('Event: log', newVal.log);
q.emit(enums.status.log, newVal.id);
logger('Event: log', newVal.queueId, newVal.log);
q.emit(enums.status.log, newVal.queueId, newVal.id);
return enums.status.log;

@@ -173,0 +176,0 @@ }

@@ -24,3 +24,3 @@ 'use strict';

q._changeFeedCursor = changeFeed;
q._changeFeedCursor.each(function (err, change) {
return q._changeFeedCursor.each(function (err, change) {
return queueChange(q, err, change);

@@ -46,3 +46,3 @@ });

module.exports.detach = function dbDetach(q, drainPool) {
module.exports.detach = function dbDetach(q) {
logger('detach');

@@ -56,3 +56,3 @@ return Promise.resolve().then(function () {

}
return null;
return true;
}).then(function () {

@@ -63,17 +63,20 @@ if (q.master) {

}
return null;
}).then(function () {
if (drainPool) {
q._ready = Promise.resolve(false);
logger('draining connection pool');
return q.r.getPoolMaster().drain();
}
return null;
}).then(function () {
if (drainPool) {
logger('Event: detached [' + q.id + ']');
q.emit(enums.status.detached, q.id);
}
return null;
return true;
});
};
module.exports.drain = function drain(q) {
return Promise.resolve().then(function () {
q._ready = Promise.resolve(false);
logger('draining connection pool');
return q.r.getPoolMaster().drain();
}).delay(1000).then(function () {
logger('Event: detached [' + q.id + ']');
q.emit(enums.status.detached, q.id);
}).delay(1000).then(function () {
q.eventNames().forEach(function (key) {
q.removeAllListeners(key);
});
return true;
});
};

@@ -11,14 +11,12 @@ 'use strict';

logger('queueDrop');
return queueStop(q, false).then(function () {
return queueStop(q).then(function () {
q._ready = Promise.resolve(false);
return queueDb.detach(q, false);
return queueDb.detach(q);
}).then(function () {
return q.r.db(q.db).tableDrop(q.name).run();
}).then(function () {
return queueDb.detach(q, true);
}).then(function () {
logger('Event: dropped [' + q.id + ']');
q.emit(enums.status.dropped, q.id);
return true;
return queueDb.drain(q);
});
};

@@ -22,2 +22,3 @@ 'use strict';

queueId: q.id,
processCount: q.r.row('processCount').add(1),
log: q.r.row('log').append({

@@ -29,2 +30,3 @@ date: q.r.now(),

retryCount: q.r.row('retryCount'),
processCount: q.r.row('processCount'),
message: enums.message.active

@@ -46,3 +48,3 @@ })

logger('Event: active [' + job.id + ']');
q.emit(enums.status.active, job.id);
q.emit(enums.status.active, q.id, job.id);
}

@@ -49,0 +51,0 @@ } catch (err) {

@@ -22,4 +22,4 @@ 'use strict';

return new Promise(function (resolve, reject) {
logger('Event: pausing [' + q.id + ']');
q.emit(enums.status.pausing, eventGlobal, q.id);
logger('Event: pausing', q.id, eventGlobal);
q.emit(enums.status.pausing, q.id, eventGlobal);
if (q.running < 1) {

@@ -37,4 +37,4 @@ return resolve();

}).then(function () {
logger('Event: paused [global:' + eventGlobal + '] [' + q.id + ']');
q.emit(enums.status.paused, eventGlobal, q.id);
logger('Event: paused', q.id, eventGlobal);
q.emit(enums.status.paused, q.id, eventGlobal);
return true;

@@ -56,6 +56,6 @@ });

queueProcess.restart(q);
logger('Event: resumed [global:' + eventGlobal + '] [' + q.id + ']');
q.emit(enums.status.resumed, eventGlobal, q.id);
logger('Event: resumed', q.id, eventGlobal);
q.emit(enums.status.resumed, q.id, eventGlobal);
return true;
});
};

@@ -32,4 +32,4 @@ 'use strict';

var err = new Error(enums.message.cancelCallbackInvalid);
logger('Event: error [' + err + ']');
job.q.emit(enums.status.error, err);
logger('Event: addOnCancelHandler error', err, job.q.id);
job.q.emit(enums.status.error, job.q.id, err);
throw err;

@@ -61,4 +61,4 @@ }

function restartJobTimeout(jobId) {
logger('resetJobTimeout', jobId);
function restartJobTimeout(queueId, jobId) {
logger('resetJobTimeout', queueId, jobId);
var jobTimeout = void 0;

@@ -76,4 +76,4 @@ if (jobTimeouts.has(jobId)) {

function nextHandler(jobResult) {
logger('nextHandler', 'Running: [' + job.q.running + ']', jobResult);
function nextHandler(err, jobResult) {
logger('nextHandler', 'Running: [' + job.q.running + ']', err, jobResult);
logger('handled', handled);

@@ -88,19 +88,16 @@ // Ignore mulpiple calls to next()

return new Promise(function (resolve, reject) {
logger('Promise resolving or rejecting jobResult');
if (jobResult instanceof Error) {
reject(jobResult);
}
return resolve(jobResult);
}).then(function (successResult) {
logger('jobResult resolved successfully');
return jobCompleted(job, successResult);
}).catch(function (err) {
var returnPromise = void 0;
if (err) {
logger('jobResult is an error');
if (err && err.cancelJob) {
return queueCancelJob(job.q, job, err.cancelJob);
} else if (err) {
return jobFailed(job, err);
if (err.cancelJob) {
returnPromise = queueCancelJob(job.q, job, err.cancelJob);
} else {
returnPromise = jobFailed(job, err);
}
}).then(function (finalResult) {
} else {
logger('jobResult processed successfully');
returnPromise = jobCompleted(job, jobResult);
}
return returnPromise.then(function (finalResult) {
job.q._running--;

@@ -110,4 +107,4 @@ setImmediate(jobTick, job.q);

}).catch(function (err) {
logger('next() Promise Error:', err);
job.q.emit(enums.status.error, err);
logger('Event: next() Promise error', err, job.q.id);
job.q.emit(enums.status.error, job.q.id, err);
return Promise.reject(err);

@@ -125,3 +122,3 @@ });

logger('Event: processing [' + job.id + ']');
job.q.emit(enums.status.processing, job.id);
job.q.emit(enums.status.processing, job.q.id, job.id);
logger('calling handler function');

@@ -180,5 +177,5 @@ job.q._handler(job, nextHandler, addOnCancelHandler);

}).catch(function (err) {
logger('queueGetNextJob Error:', err);
getNextJobCleanup(q._getNextJobCalled);
q.emit(enums.status.error, err);
logger('Event: queueGetNextJob error:', err, q.id);
q.emit(enums.status.error, q.id, err);
return Promise.reject(err);

@@ -198,3 +195,3 @@ });

q.on(enums.status.progress, restartJobTimeout);
q.on(enums.status.cancelled, function (jobId) {
q.on(enums.status.cancelled, function (queueId, jobId) {
return onCancelJob(jobId, q);

@@ -201,0 +198,0 @@ });

@@ -31,4 +31,4 @@ 'use strict';

logger('Event: removed [' + id + ']');
q.emit(enums.status.removed, id);
logger('Event: removed', q.id, id);
q.emit(enums.status.removed, q.id, id);
}

@@ -35,0 +35,0 @@ } catch (err) {

@@ -17,5 +17,5 @@ 'use strict';

logger('Event: reset [' + totalRemoved + ']');
q.emit(enums.status.reset, totalRemoved);
q.emit(enums.status.reset, q.id, totalRemoved);
return totalRemoved;
});
};

@@ -8,9 +8,7 @@ 'use strict';

module.exports = function queueStop(q) {
var drainPool = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : true;
logger('queueStop with drain:', drainPool);
logger('queueStop');
logger('Event: stopping [' + q.id + ']');
q.emit(enums.status.stopping, q.id);
return q.pause().then(function () {
return queueDb.detach(q, drainPool);
return queueDb.detach(q);
}).then(function () {

@@ -17,0 +15,0 @@ logger('Event: stopped [' + q.id + ']');

@@ -9,5 +9,3 @@ 'use strict';

return Promise.resolve().then(function () {
return q.r.db(q.db).table(q.name).hasFields('status').group(function (job) {
return job.pluck('status');
}).count();
return q.r.db(q.db).table(q.name).group({ index: 'status' }).count();
}).then(function (reduction) {

@@ -30,3 +28,5 @@ var summary = {

summary[stat.group.status] = stat.reduction;
if (stat.group) {
summary[stat.group] = stat.reduction;
}
}

@@ -33,0 +33,0 @@ } catch (err) {

@@ -55,2 +55,3 @@ 'use strict';

_this._masterInterval = options.masterInterval == null ? enums.options.masterInterval : options.masterInterval;
_this._databaseInitDelay = options.databaseInitDelay == null ? enums.options.databaseInitDelay : options.databaseInitDelay;
_this._jobOptions = jobOptions();

@@ -84,4 +85,4 @@ _this._changeFeedCursor = false;

}).catch(function (err) {
logger('addJob Error:', err);
_this2.emit(enums.status.error, err);
logger('Event: addJob error', _this2.id, err);
_this2.emit(enums.status.error, _this2.id, err);
return Promise.reject(err);

@@ -99,4 +100,4 @@ });

}).catch(function (err) {
logger('getJob Error:', err);
_this3.emit(enums.status.error, err);
logger('Event: getJob error', _this3.id, err);
_this3.emit(enums.status.error, _this3.id, err);
return Promise.reject(err);

@@ -114,4 +115,4 @@ });

}).catch(function (err) {
logger('findJob Error:', err);
_this4.emit(enums.status.error, err);
logger('Event: findJob error', _this4.id, err);
_this4.emit(enums.status.error, _this4.id, err);
return Promise.reject(err);

@@ -129,4 +130,4 @@ });

}).catch(function (err) {
logger('cancelJob Error:', err);
_this5.emit(enums.status.error, err);
logger('Event: cancelJob error', _this5.id, err);
_this5.emit(enums.status.error, _this5.id, err);
return Promise.reject(err);

@@ -144,4 +145,4 @@ });

}).catch(function (err) {
logger('removeJob Error:', err);
_this6.emit(enums.status.error, err);
logger('Event: removeJob error', _this6.id, err);
_this6.emit(enums.status.error, _this6.id, err);
return Promise.reject(err);

@@ -159,4 +160,4 @@ });

}).catch(function (err) {
logger('process Error:', err);
_this7.emit(enums.status.error, err);
logger('Event: process error', _this7.id, err);
_this7.emit(enums.status.error, _this7.id, err);
return Promise.reject(err);

@@ -174,4 +175,4 @@ });

}).catch(function (err) {
logger('review Error:', err);
_this8.emit(enums.status.error, err);
logger('Event: review error', _this8.id, err);
_this8.emit(enums.status.error, _this8.id, err);
return Promise.reject(err);

@@ -189,4 +190,4 @@ });

}).catch(function (err) {
logger('summary Error:', err);
_this9.emit(enums.status.error, err);
logger('Event: summary error', _this9.id, err);
_this9.emit(enums.status.error, _this9.id, err);
return Promise.reject(err);

@@ -210,4 +211,4 @@ });

}).catch(function (err) {
logger('pause Error:', err);
_this10.emit(enums.status.error, err);
logger('Event: pause error', _this10.id, err);
_this10.emit(enums.status.error, _this10.id, err);
return Promise.reject(err);

@@ -225,4 +226,4 @@ });

}).catch(function (err) {
logger('resume Error:', err);
_this11.emit(enums.status.error, err);
logger('Event: resume error', _this11.id, err);
_this11.emit(enums.status.error, _this11.id, err);
return Promise.reject(err);

@@ -240,4 +241,4 @@ });

}).catch(function (err) {
logger('reset Error:', err);
_this12.emit(enums.status.error, err);
logger('Event: reset error', _this12.id, err);
_this12.emit(enums.status.error, _this12.id, err);
return Promise.reject(err);

@@ -252,5 +253,7 @@ });

logger('stop');
return queueStop(this).catch(function (err) {
logger('stop Error:', err);
_this13.emit(enums.status.error, err);
return queueStop(this).then(function () {
return queueDb.drain(_this13);
}).catch(function (err) {
logger('Event: stop error', _this13.id, err);
_this13.emit(enums.status.error, _this13.id, err);
return Promise.reject(err);

@@ -266,4 +269,4 @@ });

return queueDrop(this).catch(function (err) {
logger('drop Error:', err);
_this14.emit(enums.status.error, err);
logger('Event: drop error', _this14.id, err);
_this14.emit(enums.status.error, _this14.id, err);
return Promise.reject(err);

@@ -344,3 +347,5 @@ });

if (!is.integer(value) || value < 1) {
this.emit(enums.status.error, new Error(enums.message.concurrencyInvalid), value);
var err = new Error(enums.message.concurrencyInvalid);
logger('Event: concurrency error', this.id, err);
this.emit(enums.status.error, this.id, err);
return;

@@ -347,0 +352,0 @@ }

{
"name": "rethinkdb-job-queue",
"version": "1.1.2",
"version": "2.0.0",
"description": "A persistent job or task queue backed by RethinkDB.",

@@ -36,6 +36,13 @@ "main": "index.js",

"clean": "rm -Rf dist",
"test": "tape ./tests/test-runner.js | tap-spec",
"tc": "tape ./tests/test-current.js | tap-spec",
"build": "npm run clean && babel src --presets babel-preset-latest --out-dir dist",
"test": "tap --timeout 1000 ./tests/test-runner.js",
"tv": "tap --timeout 1000 --reporter tap ./tests/test-runner.js",
"tc": "node ./tests/test-current.js",
"tcd": "node --inspect --debug-brk ./tests/test-current.js",
"lint": "standard",
"coverage": "npm run coverage:rm && npm run coverage:cover && npm run coverage:report && npm run coverage:check",
"coverage:cover": "istanbul cover ./tests/test-runner.js",
"coverage:rm": "rm -Rf coverage",
"coverage:report": "istanbul report",
"coverage:check": "istanbul check-coverage ./coverage/coverage.json",
"upgrade": "npm run upgrade:rm && npm run upgrade:ncu && npm run upgrade:install && npm run upgrade:finish",

@@ -52,6 +59,6 @@ "upgrade:rm": "rm -Rf node_modules",

"bluebird": "^3.4.6",
"debug": "^2.2.0",
"rethinkdbdash": "^2.3.23",
"debug": "^2.3.2",
"rethinkdbdash": "^2.3.26",
"serialize-error": "^2.0.0",
"uuid": "^2.0.3"
"uuid": "^3.0.0"
},

@@ -61,2 +68,3 @@ "devDependencies": {

"babel-preset-latest": "^6.16.0",
"istanbul": "0.4.5",
"npm-check-updates": "^2.8.6",

@@ -66,4 +74,4 @@ "proxyquire": "^1.7.10",

"tap-spec": "^4.1.1",
"tape": "^4.6.2"
"tap": "^8.0.1"
}
}

@@ -36,6 +36,7 @@ # Introduction

* [Retrying failed jobs][job-retry-url]
* [Repeatable jobs][job-repeat-url]
* [Job reanimation][job-reanimation-url]
* [Job Editing][job-editing-url]
* Rich job [history log][job-log-url]
* Over 1400 [integration tests][testing-url]
* Over 2000 [integration tests][testing-url]

@@ -47,3 +48,3 @@ [queue-connection-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Queue-Connection

[job-priority-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job-Options#job-priority-option
[job-progress-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job.setProgress
[job-progress-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job.updateProgress
[job-delayed-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Delayed-Job

@@ -55,3 +56,4 @@ [find-job-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Queue.findJob

[job-retry-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job-Retry
[job-reanimation-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job-Editing#job-reanimation
[job-repeat-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job.repeat
[job-reanimation-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Queue.reanimateJob
[job-editing-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job-Editing

@@ -63,12 +65,6 @@ [job-log-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Job.log

For full documentation [please see the wiki][rjq-wiki-url]
* [Full Documentation][rjq-wiki-url]
* [Change Log][rjq-changelog-url]
* [Code Coverage Report][rjq-coverage-url] from more than 2000 Integration Tests
See the [Change Log][rjq-changelog-url] for recent changes.
## Project Status
* This `rethinkdb-job-queue` module is fully functional.
* There are over 1400 integration tests.
* Updated to v1.0.0 to support [SemVer](http://semver.org/).
## Quick Start

@@ -109,3 +105,3 @@

// Do something with your result
return next(result)
return next(null, result)
} catch (err) {

@@ -188,3 +184,3 @@ console.error(err)

console.dir(info)
return next(info)
return next(null, info)
}).catch((err) => {

@@ -244,2 +240,3 @@ // This catch is for nodemailer sendMail errors.

[rjq-changelog-url]: https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki/Change-Log
[rjq-coverage-url]: https://rawgit.com/grantcarthew/node-rethinkdb-job-queue/master/coverage/lcov-report/index.html
[thinker-image]: https://cdn.rawgit.com/grantcarthew/node-rethinkdb-job-queue/master/thinkerjoblist.png

@@ -246,0 +243,0 @@ [nodemailer-url]: https://www.npmjs.com/package/nodemailer

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const is = require('../src/is')

@@ -7,4 +7,4 @@ const tError = require('./test-error')

module.exports = function () {
test('chrono', (t) => {
t.plan(13)
test('datetime', (t) => {
t.plan(23)

@@ -21,10 +21,20 @@ try {

t.equal(datetime.add.ms(tDate, tValue) - tDate, tValue, 'Add ms is valid')
t.ok(is.date(datetime.add.ms(tValue)), 'Add ms only is a date object')
t.ok(is.dateAfter(datetime.add.ms(tValue)), 'Add ms only is valid')
t.ok(is.date(datetime.add.sec(tDate, tValue)), 'Add sec is a date object')
t.equal(datetime.add.sec(tDate, tValue) - tDate, tValue * 1000, 'Add sec is valid')
t.ok(is.date(datetime.add.sec(tValue)), 'Add sec only is a date object')
t.ok(is.dateAfter(datetime.add.sec(tValue)), 'Add sec only is valid')
t.ok(is.date(datetime.add.min(tDate, tValue)), 'Add min is a date object')
t.equal(datetime.add.min(tDate, tValue) - tDate, tValue * 60000, 'Add min is valid')
t.ok(is.date(datetime.add.min(tValue)), 'Add min only is a date object')
t.ok(is.dateAfter(datetime.add.min(tValue)), 'Add min only is valid')
t.ok(is.date(datetime.add.hours(tDate, tValue)), 'Add hours is a date object')
t.equal(datetime.add.hours(tDate, tValue) - tDate, tValue * 3600000, 'Add hours is valid')
t.ok(is.date(datetime.add.hours(tValue)), 'Add hours only is a date object')
t.ok(is.dateAfter(datetime.add.hours(tValue)), 'Add hours only is valid')
t.ok(is.date(datetime.add.days(tDate, tValue)), 'Add days is a date object')
t.equal(datetime.add.days(tDate, tValue) - tDate, tValue * 86400000, 'Add days is valid')
t.ok(is.date(datetime.add.days(tValue)), 'Add days only is a date object')
t.ok(is.dateAfter(datetime.add.days(tValue)), 'Add days only is valid')
t.equal(datetime.formatDate(tDate), dateStings.date, 'formatDate is valid')

@@ -31,0 +41,0 @@ t.equal(datetime.formatTime(tDate), dateStings.time, 'formatTime is valid')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -3,0 +3,0 @@ const tError = require('./test-error')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -3,0 +3,0 @@ const tError = require('./test-error')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -3,0 +3,0 @@ const tError = require('./test-error')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -3,0 +3,0 @@ const tError = require('./test-error')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const tError = require('./test-error')

@@ -3,0 +3,0 @@ const tOpts = require('./test-options')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -3,0 +3,0 @@ const tError = require('./test-error')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -16,7 +16,8 @@ const is = require('../src/is')

const eventHandlers = require('./test-event-handlers')
const testName = 'db-review'
module.exports = function () {
return new Promise((resolve, reject) => {
test('db-review', (t) => {
t.plan(85)
test(testName, (t) => {
t.plan(86)

@@ -33,2 +34,3 @@ let processRestart = 0

let state = {
testName,
enabled: false,

@@ -57,2 +59,3 @@ ready: 0,

terminated: 0,
reanimated: 0,
log: 0,

@@ -246,3 +249,2 @@ updated: 0

t.comment('db-review: event summary')
eventHandlers.remove(t, q, state)

@@ -249,0 +251,0 @@ return q.reset()

@@ -1,4 +0,4 @@

const test = require('tape')
const test = require('tap').test
const tError = require('./test-error')
const enums = require('../dist/enums')
const enums = require('../src/enums')

@@ -18,7 +18,7 @@ module.exports = function () {

t.equal(Object.keys(enums.priority).length, 6, 'Enums priority has correct number of keys')
t.equal(Object.keys(enums.status).length, 25, 'Enums status has correct number of keys')
t.equal(Object.keys(enums.options).length, 11, 'Enums options has correct number of keys')
t.equal(Object.keys(enums.index).length, 3, 'Enums index has correct number of keys')
t.equal(Object.keys(enums.status).length, 26, 'Enums status has correct number of keys')
t.equal(Object.keys(enums.options).length, 14, 'Enums options has correct number of keys')
t.equal(Object.keys(enums.index).length, 4, 'Enums index has correct number of keys')
t.equal(Object.keys(enums.log).length, 3, 'Enums log has correct number of keys')
t.equal(Object.keys(enums.message).length, 25, 'Enums message has correct number of keys')
t.equal(Object.keys(enums.message).length, 29, 'Enums message has correct number of keys')
} catch (err) {

@@ -25,0 +25,0 @@ tError(err, module, t)

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const uuid = require('uuid')

@@ -8,3 +8,3 @@ const is = require('../src/is')

test('is', (t) => {
t.plan(62)
t.plan(71)

@@ -20,3 +20,5 @@ const ms = 5000

priority: enums.priority.normal,
status: enums.status.created
status: enums.status.created,
repeat: false,
processCount: 0
}

@@ -62,2 +64,5 @@ const log = {

t.notOk(is.array({}), 'Is array false with object')
t.ok(is.error(new Error()), 'Is error true with new Error')
t.ok(is.error(Error()), 'Is error true with Error')
t.notOk(is.error('not an error'), 'Is error false with string')
t.ok(is.job(job), 'Is job true with mock job')

@@ -81,2 +86,14 @@ t.notOk(is.job(), 'Is job false with null')

t.notOk(is.job(job), 'Is job false with invalid status')
job.repeat = true
t.ok(is.repeating(job), 'Is job repeating true when repeat is true')
job.repeat = 5
t.ok(is.repeating(job), 'Is job repeating true when repeat is integer')
job.processCount = 5
t.ok(is.repeating(job), 'Is job repeating true when processCount equals repeat')
job.processCount = 6
t.notOk(is.repeating(job), 'Is job repeating false when processCount > repeat')
job.repeat = 0
t.notOk(is.repeating(job), 'Is job repeating false when repeat is 0')
job.repeat = false
t.notOk(is.repeating(job), 'Is job repeating false when repeat is false')
job.status = enums.status.created

@@ -83,0 +100,0 @@ t.notOk(is.active(job), 'Is active false with invalid status')

@@ -1,4 +0,5 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')
const is = require('../src/is')
const datetime = require('../src/datetime')
const tError = require('./test-error')

@@ -10,7 +11,9 @@ const enums = require('../src/enums')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'job-completed'
module.exports = function () {
return new Promise((resolve, reject) => {
test('job-completed', (t) => {
t.plan(23)
test(testName, (t) => {
t.plan(137)

@@ -20,25 +23,34 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

job.data = tData
let beforeDate
let afterDate
// ---------- Event Handler Setup ----------
let testEvents = false
function completedEventHandler (jobId) {
if (testEvents) {
t.equal(jobId, job.id, `Event: Job completed [${jobId}]`)
}
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 1,
reset: 1,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 3,
waiting: 0,
active: 0,
completed: 7,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 5
}
function removedEventHandler (jobId) {
if (testEvents) {
t.equal(jobId, job.id, `Event: Job removed [${jobId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.completed, completedEventHandler)
q.on(enums.status.removed, removedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.completed, completedEventHandler)
q.removeListener(enums.status.removed, removedEventHandler)
}

@@ -52,3 +64,3 @@ return q.reset().then((resetResult) => {

// ---------- Job Completed Test ----------
addEventHandlers()
eventHandlers.add(t, q, state)
t.comment('job-completed: Job Completed')

@@ -74,2 +86,143 @@ return jobCompleted(savedJob[0], tData)

// ---------- Job Repeat True Test ----------
t.comment('job-completed: Job Repeat True')
job = q.createJob().setRepeat(true)
job.data = tData
return q.addJob(job)
}).then((savedJob) => {
t.equal(savedJob[0].id, job.id, 'Job saved successfully')
savedJob[0].processCount = 1
return savedJob[0].update()
}).then((updatedJob) => {
t.ok(is.job(updatedJob), 'Job updated successfully')
return jobCompleted(updatedJob, tData)
}).then((completedIds) => {
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids')
return q.getJob(completedIds[0])
}).then((repeatedJobs) => {
beforeDate = datetime.add.min(-1)
afterDate = datetime.add.min(6)
t.equal(repeatedJobs[0].status, enums.status.waiting, 'Repeat job completed status is waiting')
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes')
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now')
t.ok(repeatedJobs[0].progress === 0, 'Repeat job completed progress is 0 ')
t.equal(repeatedJobs[0].processCount, 1, 'Repeat job completed processCount is 1 ')
t.equal(repeatedJobs[0].log.length, 3, 'Repeat job completed log count is valid')
let log = repeatedJobs[0].getLastLog()
t.ok(is.date(log.date), 'Repeat log date is a date')
t.equal(log.data, tData, 'Repeat log data is valid')
t.equal(log.message, enums.status.completed, 'Repeat log message is valid')
t.equal(log.queueId, q.id, 'Repeat log queueId is valid')
t.equal(log.processCount, 1, 'Repeat log processCount is 1')
t.equal(log.status, enums.status.waiting, 'Repeat log status is valid')
t.equal(log.type, enums.log.information, 'Repeat log type is valid')
repeatedJobs[0].processCount++
return repeatedJobs[0].update()
}).then((updatedJob) => {
t.ok(is.job(updatedJob), 'Job updated successfully')
return jobCompleted(updatedJob, tData)
}).then((completedIds) => {
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids')
return q.getJob(completedIds[0])
}).then((repeatedJobs) => {
beforeDate = datetime.add.min(-1)
afterDate = datetime.add.min(6)
t.equal(repeatedJobs[0].status, enums.status.waiting, 'Repeat job completed status is waiting')
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes')
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now')
t.ok(repeatedJobs[0].progress === 0, 'Repeat job completed progress is 0 ')
t.equal(repeatedJobs[0].processCount, 2, 'Repeat job completed processCount is 2 ')
t.equal(repeatedJobs[0].log.length, 5, 'Repeat job completed log count is valid')
let log = repeatedJobs[0].getLastLog()
t.ok(is.date(log.date), 'Repeat log date is a date')
t.equal(log.data, tData, 'Repeat log data is valid')
t.equal(log.message, enums.status.completed, 'Repeat log message is valid')
t.equal(log.queueId, q.id, 'Repeat log queueId is valid')
t.equal(log.processCount, 2, 'Repeat log processCount is 2')
t.equal(log.status, enums.status.waiting, 'Repeat log status is valid')
t.equal(log.type, enums.log.information, 'Repeat log type is valid')
// ---------- Job Repeat Number Test ----------
t.comment('job-completed: Job Repeat Number')
job = q.createJob().setRepeat(2)
job.data = tData
return q.addJob(job)
}).then((savedJob) => {
t.equal(savedJob[0].id, job.id, 'Job saved successfully')
savedJob[0].processCount++
return savedJob[0].update()
}).then((updatedJob) => {
t.ok(is.job(updatedJob), 'Job updated successfully')
return jobCompleted(updatedJob, tData)
}).then((completedIds) => {
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids')
return q.getJob(completedIds[0])
}).then((repeatedJobs) => {
beforeDate = datetime.add.min(-1)
afterDate = datetime.add.min(6)
t.equal(repeatedJobs[0].status, enums.status.waiting, 'Repeat job completed status is waiting')
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes')
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now')
t.ok(repeatedJobs[0].progress === 0, 'Repeat job completed progress is 0 ')
t.equal(repeatedJobs[0].processCount, 1, 'Repeat job completed processCount is 1 ')
t.equal(repeatedJobs[0].log.length, 3, 'Repeat job completed log count is valid')
let log = repeatedJobs[0].getLastLog()
t.ok(is.date(log.date), 'Repeat log date is a date')
t.equal(log.data, tData, 'Repeat log data is valid')
t.equal(log.message, enums.status.completed, 'Repeat log message is valid')
t.equal(log.queueId, q.id, 'Repeat log queueId is valid')
t.equal(log.processCount, 1, 'Repeat log processCount is 1')
t.equal(log.status, enums.status.waiting, 'Repeat log status is valid')
t.equal(log.type, enums.log.information, 'Repeat log type is valid')
repeatedJobs[0].processCount++
return repeatedJobs[0].update()
}).then((updatedJob) => {
t.ok(is.job(updatedJob), 'Job updated successfully')
return jobCompleted(updatedJob, tData)
}).then((completedIds) => {
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids')
return q.getJob(completedIds[0])
}).then((repeatedJobs) => {
beforeDate = datetime.add.min(-1)
afterDate = datetime.add.min(6)
t.equal(repeatedJobs[0].status, enums.status.waiting, 'Repeat job completed status is waiting')
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes')
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now')
t.ok(repeatedJobs[0].progress === 0, 'Repeat job completed progress is 0 ')
t.equal(repeatedJobs[0].processCount, 2, 'Repeat job completed processCount is 2 ')
t.equal(repeatedJobs[0].log.length, 5, 'Repeat job completed log count is valid')
let log = repeatedJobs[0].getLastLog()
t.ok(is.date(log.date), 'Repeat log date is a date')
t.equal(log.data, tData, 'Repeat log data is valid')
t.equal(log.message, enums.status.completed, 'Repeat log message is valid')
t.equal(log.queueId, q.id, 'Repeat log queueId is valid')
t.equal(log.processCount, 2, 'Repeat log processCount is 2')
t.equal(log.status, enums.status.waiting, 'Repeat log status is valid')
t.equal(log.type, enums.log.information, 'Repeat log type is valid')
repeatedJobs[0].processCount++
return repeatedJobs[0].update()
}).then((updatedJob) => {
t.ok(is.job(updatedJob), 'Job updated successfully')
return jobCompleted(updatedJob, tData)
}).then((completedIds) => {
t.ok(is.uuid(completedIds[0]), 'Job completed returned job ids')
return q.getJob(completedIds[0])
}).then((repeatedJobs) => {
beforeDate = datetime.add.min(-1)
afterDate = datetime.add.min(6)
t.equal(repeatedJobs[0].status, enums.status.completed, 'Repeat job completed status is completed')
t.ok(is.dateBetween(repeatedJobs[0].dateEnable, beforeDate, afterDate), 'Repeat job completed dateEnable is set for five minutes')
t.ok(is.dateBefore(repeatedJobs[0].dateFinished), 'Repeat job completed dateFinished is before now')
t.ok(repeatedJobs[0].progress === 100, 'Repeat job completed progress is 100 ')
t.equal(repeatedJobs[0].processCount, 3, 'Repeat job completed processCount is 3 ')
t.equal(repeatedJobs[0].log.length, 7, 'Repeat job completed log count is valid')
let log = repeatedJobs[0].getLastLog()
t.ok(is.date(log.date), 'Repeat log date is a date')
t.equal(log.data, tData, 'Repeat log data is valid')
t.equal(log.message, enums.status.completed, 'Repeat log message is valid')
t.equal(log.queueId, q.id, 'Repeat log queueId is valid')
t.equal(log.processCount, 3, 'Repeat log processCount is 3')
t.equal(log.status, enums.status.completed, 'Repeat log status is valid')
t.equal(log.type, enums.log.information, 'Repeat log type is valid')
// ---------- Job Completed with Remove Test ----------

@@ -92,3 +245,5 @@ t.comment('job-completed: Job Completed with Remove')

t.ok(resetResult >= 0, 'Queue reset')
removeEventHandlers()
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
q.stop()

@@ -95,0 +250,0 @@ return resolve(t.end())

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -10,7 +10,9 @@ const is = require('../src/is')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'job-failed'
module.exports = function () {
return new Promise((resolve, reject) => {
test('job-failed', (t) => {
t.plan(88)
test(testName, (t) => {
t.plan(155)

@@ -20,33 +22,30 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

// ---------- Event Handler Setup ----------
let testEvents = false
function failedEventHandler (jobId) {
if (testEvents) {
t.equal(jobId, job.id,
`Event: Job failed [${jobId}]`)
}
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 1,
reset: 1,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 2,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 5,
terminated: 2,
reanimated: 0,
log: 0,
updated: 0
}
function terminatedEventHandler (jobId) {
if (testEvents) {
t.equal(jobId, job.id,
`Event: Job terminated [${jobId}]`)
}
}
function removedEventHandler (jobId) {
if (testEvents) {
t.equal(jobId, job.id,
`Event: Job removed [${jobId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.failed, failedEventHandler)
q.on(enums.status.terminated, terminatedEventHandler)
q.on(enums.status.removed, removedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.failed, failedEventHandler)
q.removeListener(enums.status.terminated, terminatedEventHandler)
q.removeListener(enums.status.removed, removedEventHandler)
}

@@ -64,3 +63,3 @@ let job = q.createJob()

// ---------- Job Failed Retry 0 Test ----------
addEventHandlers()
eventHandlers.add(t, q, state)
t.comment('job-failed: Original Job Failure')

@@ -184,6 +183,70 @@ return jobFailed(savedJob[0], err)

t.equal(exist.length, 0, 'Job not in database')
// ---------- Job Failed with Repeat Test ----------
t.comment('job-failed: Job Failed with Repeat')
job = q.createJob()
job.data = tData
job.retryMax = 1
job.retryDelay = 200
job.repeat = 1
job.repeatDelay = 500
q._removeFinishedJobs = false
return q.addJob(job)
}).then((savedJob) => {
t.equal(savedJob[0].id, job.id, 'Job saved successfully')
return jobFailed(savedJob[0], err)
}).then((retry1id) => {
t.equal(retry1id.length, 1, 'Job failed successfully')
t.equal(retry1id[0], job.id, 'Job failed returned job id')
return q.getJob(retry1id[0])
}).then((retry1) => {
t.equal(retry1[0].status, enums.status.failed, 'Job status is failed')
t.equal(retry1[0].retryCount, 1, 'Job retryCount is 1')
t.equal(retry1[0].progress, 0, 'Job progress is 0')
t.equal(retry1[0].queueId, q.id, 'Job queueId is valid')
t.ok(is.date(retry1[0].dateFinished), 'Job dateFinished is a date')
t.ok(is.date(retry1[0].dateEnable), 'Job dateEnable is a date')
t.equal(retry1[0].log.length, 2, 'Job has 1 log entry')
t.ok(is.date(retry1[0].log[1].date), 'Log date is a date')
t.equal(retry1[0].log[1].queueId, q.id, 'Log queueId is valid')
t.equal(retry1[0].log[1].type, enums.log.warning, 'Log type is warning')
t.equal(retry1[0].log[1].status, enums.status.failed, 'Log status is failed')
t.ok(retry1[0].log[1].retryCount = 1, 'Log retryCount is valid')
t.ok(retry1[0].log[1].message, 'Log message exists')
t.equal(retry1[0].log[1].message, enums.message.failed, 'Log message is valid')
t.equal(retry1[0].log[1].errorMessage, err.message, 'Log error message is valid')
t.equal(retry1[0].log[1].errorStack, err.stack, 'Log stack is valid')
t.ok(retry1[0].log[1].duration >= 0, 'Log duration is >= 0')
// ---------- Final Retry Job Repeat Failure Test ----------
t.comment('job-failed: Final Retry Job Repeat Failure')
return jobFailed(retry1[0], err)
}).then((retry2id) => {
t.equal(retry2id.length, 1, 'Job failed successfully')
t.equal(retry2id[0], job.id, 'Job failed returned job id')
return q.getJob(retry2id[0])
}).then((retry2) => {
t.equal(retry2[0].status, enums.status.waiting, 'Job status is waiting')
t.equal(retry2[0].retryCount, 0, 'Job retryCount is 0')
t.equal(retry2[0].progress, 0, 'Job progress is 0')
t.equal(retry2[0].queueId, q.id, 'Job queueId is valid')
t.ok(is.date(retry2[0].dateFinished), 'Job dateFinished is a date')
t.ok(is.date(retry2[0].dateEnable), 'Job dateEnable is a date')
t.equal(retry2[0].log.length, 3, 'Job has 2 log entries')
t.ok(is.date(retry2[0].log[2].date), 'Log date is a date')
t.equal(retry2[0].log[2].queueId, q.id, 'Log queueId is valid')
t.equal(retry2[0].log[2].type, enums.log.error, 'Log type is error')
t.equal(retry2[0].log[2].status, enums.status.waiting, 'Log status is waiting')
t.ok(retry2[0].log[2].retryCount = 2, 'Log retryCount is valid')
t.ok(retry2[0].log[2].message, 'Log message exists')
t.equal(retry2[0].log[2].message, enums.message.failed, 'Log message is valid')
t.equal(retry2[0].log[2].errorMessage, err.message, 'Log error message is valid')
t.equal(retry2[0].log[2].errorStack, err.stack, 'Log stack is valid')
t.ok(retry2[0].log[2].duration >= 0, 'Log duration is >= 0')
return q.reset()
}).then((resetResult) => {
t.ok(resetResult >= 0, 'Queue reset')
removeEventHandlers()
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
q.stop()

@@ -190,0 +253,0 @@ return resolve(t.end())

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -10,7 +10,9 @@ const is = require('../src/is')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'job-log'
module.exports = function () {
return new Promise((resolve, reject) => {
test('job-log', (t) => {
t.plan(47)
test(testName, (t) => {
t.plan(76)

@@ -22,20 +24,35 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

let testEvents = false
function logEventHandler (jobId) {
if (testEvents) {
t.equal(jobId, job.id, `Event: log [${jobId}]`)
}
// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 0,
reset: 1,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 1,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 4,
updated: 0
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.log, logEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.log, logEventHandler)
}
return q.reset().then((resetResult) => {
t.ok(is.integer(resetResult), 'Queue reset')
addEventHandlers()
eventHandlers.add(t, q, state)
return q.addJob(job)

@@ -59,2 +76,3 @@ }).then((newJob) => {

t.ok(jobWithLog1[0].log[1].retryCount >= 0, 'Log retryCount is valid')
t.ok(jobWithLog1[0].log[1].processCount >= 0, 'Log processCount is valid')
t.equal(jobWithLog1[0].log[1].message, tData, 'Log 1 message is valid')

@@ -77,2 +95,3 @@ t.equal(jobWithLog1[0].log[1].data, tData, 'Log 1 detail is valid')

t.ok(jobWithLog2[0].log[2].retryCount >= 0, 'Log retryCount is valid')
t.ok(jobWithLog2[0].log[2].processCount >= 0, 'Log processCount is valid')
t.equal(jobWithLog2[0].log[2].message, tData, 'Log 2 message is valid')

@@ -95,2 +114,3 @@ t.equal(jobWithLog2[0].log[2].data, tData, 'Log 2 data is valid')

t.ok(jobWithLog3[0].log[3].retryCount >= 0, 'Log retryCount is valid')
t.ok(jobWithLog3[0].log[3].processCount >= 0, 'Log processCount is valid')
t.equal(jobWithLog3[0].log[3].message, enums.message.seeLogData, 'Log 3 message is valid')

@@ -113,2 +133,3 @@ t.ok(is.object(jobWithLog3[0].log[3].data), 'Log 3 data is valid')

t.ok(jobWithLog4[0].log[4].retryCount >= 0, 'Log retryCount is valid')
t.ok(jobWithLog4[0].log[4].processCount >= 0, 'Log processCount is valid')
t.equal(jobWithLog4[0].log[4].message, enums.message.seeLogData, 'Log 4 message is valid')

@@ -121,3 +142,5 @@ t.equal(jobWithLog4[0].log[4].data.foo, 'bar', 'Log 4 data object is valid')

t.ok(resetResult >= 0, 'Queue reset')
removeEventHandlers()
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
q.stop()

@@ -124,0 +147,0 @@ return resolve(t.end())

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const tError = require('./test-error')

@@ -8,3 +8,3 @@ const enums = require('../src/enums')

test('job-options', (t) => {
t.plan(28)
t.plan(54)

@@ -15,5 +15,6 @@ try {

t.equal(to.timeout, enums.options.timeout, 'Job default timeout option is valid')
t.equal(to.retryMax, enums.options.retryMax, 'Job default retryMax option is 3')
t.equal(to.retryDelay, enums.options.retryDelay, 'Job default retryDelay option is 600')
t.equal(to.retryMax, enums.options.retryMax, 'Job default retryMax option is valid')
t.equal(to.retryDelay, enums.options.retryDelay, 'Job default retryDelay option is valid')
t.equal(to.repeat, enums.options.repeat, 'Job default repeat option is valid')
t.equal(to.repeatDelay, enums.options.repeatDelay, 'Job default repeatDelay option is valid')
to = jobOptions({

@@ -23,33 +24,61 @@ priority: 'high',

retryMax: 8,
retryDelay: 200000
retryDelay: 200000,
repeat: 4,
repeatDelay: 1000
}, to)
t.equal(to.priority, 'high', 'Job custom priority option is correct')
t.equal(to.timeout, 100000, 'Job custom timeout option is correct')
t.equal(to.retryMax, 8, 'Job custom retryMax option is correct')
t.equal(to.retryDelay, 200000, 'Job custom retryDelay option is correct')
t.equal(to.priority, 'high', 'Job custom priority option is valid')
t.equal(to.timeout, 100000, 'Job custom timeout option is valid')
t.equal(to.retryMax, 8, 'Job custom retryMax option is valid')
t.equal(to.retryDelay, 200000, 'Job custom retryDelay option is valid')
t.equal(to.repeat, 4, 'Job custom repeat option is valid')
t.equal(to.repeatDelay, 1000, 'Job custom repeatDelay option is valid')
to = jobOptions({ priority: 'lowest' }, to)
t.equal(to.priority, 'lowest', 'Job priority custom priority option is correct')
t.equal(to.timeout, 100000, 'Job priority custom timeout option is correct')
t.equal(to.retryMax, 8, 'Job priority custom retryMax option is correct')
t.equal(to.retryDelay, 200000, 'Job priority custom retryDelay option is correct')
t.equal(to.priority, 'lowest', 'Job priority custom priority option is valid')
t.equal(to.timeout, 100000, 'Job priority custom timeout option is valid')
t.equal(to.retryMax, 8, 'Job priority custom retryMax option is valid')
t.equal(to.retryDelay, 200000, 'Job priority custom retryDelay option is valid')
t.equal(to.repeat, 4, 'Job priority custom repeat option is valid')
t.equal(to.repeatDelay, 1000, 'Job priority custom repeatDelay option is valid')
to = jobOptions({ timeout: 700000 }, to)
t.equal(to.priority, 'lowest', 'Job timeout custom priority option is correct')
t.equal(to.timeout, 700000, 'Job timeout custom timeout option is correct')
t.equal(to.retryMax, 8, 'Job timeout custom retryMax option is correct')
t.equal(to.retryDelay, 200000, 'Job timeout custom retryDelay option is correct')
t.equal(to.priority, 'lowest', 'Job timeout custom priority option is valid')
t.equal(to.timeout, 700000, 'Job timeout custom timeout option is valid')
t.equal(to.retryMax, 8, 'Job timeout custom retryMax option is valid')
t.equal(to.retryDelay, 200000, 'Job timeout custom retryDelay option is valid')
t.equal(to.repeat, 4, 'Job timeout custom repeat option is valid')
t.equal(to.repeatDelay, 1000, 'Job timeout custom repeatDelay option is valid')
to = jobOptions({ retryMax: 2 }, to)
t.equal(to.priority, 'lowest', 'Job retryMax custom priority option is correct')
t.equal(to.timeout, 700000, 'Job retryMax custom timeout option is correct')
t.equal(to.retryMax, 2, 'Job retryMax custom retryMax option is correct')
t.equal(to.retryDelay, 200000, 'Job retryMax custom retryDelay option is correct')
t.equal(to.priority, 'lowest', 'Job retryMax custom priority option is valid')
t.equal(to.timeout, 700000, 'Job retryMax custom timeout option is valid')
t.equal(to.retryMax, 2, 'Job retryMax custom retryMax option is valid')
t.equal(to.retryDelay, 200000, 'Job retryMax custom retryDelay option is valid')
t.equal(to.repeat, 4, 'Job retryMax custom repeat option is valid')
t.equal(to.repeatDelay, 1000, 'Job retryMax custom repeatDelay option is valid')
to = jobOptions({ retryDelay: 800000 }, to)
t.equal(to.priority, 'lowest', 'Job retryDelay custom priority option is correct')
t.equal(to.timeout, 700000, 'Job retryDelay custom timeout option is correct')
t.equal(to.retryMax, 2, 'Job retryDelay custom retryMax option is correct')
t.equal(to.retryDelay, 800000, 'Job retryDelay custom retryDelay option is correct')
t.equal(to.priority, 'lowest', 'Job retryDelay custom priority option is valid')
t.equal(to.timeout, 700000, 'Job retryDelay custom timeout option is valid')
t.equal(to.retryMax, 2, 'Job retryDelay custom retryMax option is valid')
t.equal(to.retryDelay, 800000, 'Job retryDelay custom retryDelay option is valid')
t.equal(to.repeat, 4, 'Job retryDelay custom repeat option is valid')
t.equal(to.repeatDelay, 1000, 'Job retryDelay custom repeatDelay option is valid')
to = jobOptions({ repeat: false }, to)
t.equal(to.priority, 'lowest', 'Job repeat custom priority option is valid')
t.equal(to.timeout, 700000, 'Job repeat custom timeout option is valid')
t.equal(to.retryMax, 2, 'Job repeat custom retryMax option is valid')
t.equal(to.retryDelay, 800000, 'Job repeat custom retryDelay option is valid')
t.equal(to.repeat, false, 'Job repeat custom repeat option is valid')
t.equal(to.repeatDelay, 1000, 'Job repeat custom repeatDelay option is valid')
to = jobOptions({ repeatDelay: 2000 }, to)
t.equal(to.priority, 'lowest', 'Job repeatDelay custom priority option is valid')
t.equal(to.timeout, 700000, 'Job repeatDelay custom timeout option is valid')
t.equal(to.retryMax, 2, 'Job repeatDelay custom retryMax option is valid')
t.equal(to.retryDelay, 800000, 'Job repeatDelay custom retryDelay option is valid')
t.equal(to.repeat, false, 'Job repeatDelay custom repeat option is valid')
t.equal(to.repeatDelay, 2000, 'Job repeatDelay custom repeatDelay option is valid')
to = jobOptions({

@@ -59,8 +88,12 @@ priority: 'oops',

retryMax: -30,
retryDelay: -40
retryDelay: -40,
repeat: -50,
repeatDelay: -60
}, to)
t.equal(to.priority, 'lowest', 'Job invalid priority option is correct')
t.equal(to.timeout, 700000, 'Job invalid timeout option is correct')
t.equal(to.retryMax, 2, 'Job invalid retryMax option is correct')
t.equal(to.retryDelay, 800000, 'Job invalid retryDelay option is correct')
t.equal(to.priority, 'lowest', 'Job invalid priority option is reverted')
t.equal(to.timeout, 700000, 'Job invalid timeout option is reverted')
t.equal(to.retryMax, 2, 'Job invalid retryMax option is reverted')
t.equal(to.retryDelay, 800000, 'Job invalid retryDelay option is reverted')
t.equal(to.repeat, false, 'Job invalid repeat option is reverted')
t.equal(to.repeatDelay, 2000, 'Job invalid repeatDelay option is reverted')
} catch (err) {

@@ -67,0 +100,0 @@ tError(err, module, t)

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const is = require('../src/is')

@@ -3,0 +3,0 @@ const jobParse = require('../src/job-parse')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -11,7 +11,8 @@ const datetime = require('../src/datetime')

const eventHandlers = require('./test-event-handlers')
const testName = 'job-progress'
module.exports = function () {
return new Promise((resolve, reject) => {
test('job-progress', (t) => {
t.plan(46)
test(testName, (t) => {
t.plan(47)

@@ -23,6 +24,6 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

job.retryCount = 0
let oldPercent
// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,

@@ -51,2 +52,3 @@ ready: 0,

terminated: 0,
reanimated: 0,
log: 0,

@@ -64,3 +66,3 @@ updated: 0

}).then((updatedJob) => {
t.ok(updatedJob, 'Job updated successfully')
t.ok(is.job(updatedJob), 'Job updated successfully')
return job.q.r.db(job.q.db).table(job.q.name).get(job.id).update({

@@ -82,3 +84,3 @@ retryCount: 1

}).then((updatedJob) => {
t.ok(updatedJob, 'Job updated successfully')
t.ok(is.job(updatedJob), 'Job updated successfully')
return q.getJob(job.id)

@@ -96,3 +98,3 @@ }).then((updatedJob) => {

}).then((updatedJob) => {
t.ok(updatedJob, 'Job updated successfully')
t.ok(is.job(updatedJob), 'Job updated successfully')
return q.getJob(job.id)

@@ -104,3 +106,3 @@ }).then((updatedJob) => {

}).then((updatedJob) => {
t.ok(updatedJob, 'Job updated successfully')
t.ok(is.job(updatedJob), 'Job updated successfully')
return q.getJob(job.id)

@@ -112,3 +114,3 @@ }).then((updatedJob) => {

}).then((updatedJob) => {
t.ok(updatedJob, 'Job updated successfully')
t.ok(is.job(updatedJob), 'Job updated successfully')
return q.getJob(job.id)

@@ -121,3 +123,3 @@ }).then((updatedJob) => {

}).then((updatedJob) => {
t.ok(updatedJob, 'Job updated successfully')
t.ok(is.job(updatedJob), 'Job updated successfully')
return q.getJob(job.id)

@@ -127,8 +129,8 @@ }).then((updatedJob) => {

updatedJob[0].status = enums.status.failed
return jobProgress(updatedJob[0], 101)
}).then((inactiveResult) => {
t.notOk(inactiveResult, 'Inactive job returns false')
return jobProgress(updatedJob[0], 101).catch((err) => {
t.ok(is.error(err), 'Inactive job rejects Promise')
})
}).then(() => {
//
// ---------- Event Summary ----------
t.comment('job-progress: Event Summary')
eventHandlers.remove(t, q, state)

@@ -135,0 +137,0 @@ return q.reset()

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -10,7 +10,9 @@ const is = require('../src/is')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'job-update'
module.exports = function () {
return new Promise((resolve, reject) => {
test('job-update', (t) => {
t.plan(31)
test(testName, (t) => {
t.plan(55)

@@ -23,16 +25,30 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

// ---------- Event Handler Setup ----------
let testEvents = false
function updatedEventHandler (jobId) {
if (testEvents) {
t.equal(jobId, job.id, `Event: updated [${jobId}]`)
}
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 0,
reset: 1,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 0,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 1
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.updated, updatedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.updated, updatedEventHandler)
}

@@ -51,3 +67,3 @@ return q.reset().then((resetResult) => {

// ---------- Job Update Test ----------
addEventHandlers()
eventHandlers.add(t, q, state)
t.comment('job-update: Update')

@@ -59,3 +75,3 @@ job = savedJobs1[0]

}).then((updateResult) => {
t.ok(updateResult, 'Job updated successfully')
t.ok(is.job(updateResult), 'Job updated successfully')
return q.getJob(job.id)

@@ -90,3 +106,5 @@ }).then((updatedJob) => {

t.ok(resetResult >= 0, 'Queue reset')
removeEventHandlers()
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
q.stop()

@@ -93,0 +111,0 @@ return resolve(t.end())

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -10,11 +10,13 @@ const is = require('../src/is')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'job'
module.exports = function () {
return new Promise((resolve, reject) => {
test('job', (t) => {
t.plan(81)
test(testName, (t) => {
t.plan(114)
const q = new Queue(tOpts.cxn(), tOpts.default())
const newJob = new Job(q)
let newJob = new Job(q)
newJob.data = tData

@@ -24,38 +26,31 @@ let savedJob

// ---------- Event Handler Setup ----------
let testEvents = false
function addedEventHandler (jobId) {
if (testEvents) {
t.ok(is.uuid(jobId), `Event: added [${jobId}]`)
}
let state = {
testName,
enabled: false,
ready: 1,
processing: 0,
progress: 1,
pausing: 0,
paused: 0,
resumed: 0,
removed: 0,
reset: 1,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 1,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 1,
updated: 0
}
function logEventHandler (jobId) {
if (testEvents) {
t.ok(is.uuid(jobId), `Event: log [${jobId}]`)
}
}
function progressEventHandler (jobId) {
if (testEvents) {
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`)
}
}
function updatedEventHandler (jobId) {
if (testEvents) {
t.ok(is.uuid(jobId), `Event: updated [${jobId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.added, addedEventHandler)
q.on(enums.status.log, logEventHandler)
q.on(enums.status.progress, progressEventHandler)
q.on(enums.status.updated, updatedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.added, addedEventHandler)
q.removeListener(enums.status.log, logEventHandler)
q.removeListener(enums.status.progress, progressEventHandler)
q.removeListener(enums.status.updated, updatedEventHandler)
}
addEventHandlers()
eventHandlers.add(t, q, state)

@@ -94,2 +89,14 @@ // ---------- New Job Tests ----------

t.equal(newJob.retryDelay, 100, 'Job setRetryDelay successfully changed value')
t.throws(() => { newJob.setRepeat('not valid') }, 'Job setRepeat thows if invalid')
newJob.setRepeat(100)
t.equal(newJob.repeat, 100, 'Job setRepeat successfully changed value')
t.throws(() => { newJob.setRepeat(-10) }, 'Job setRepeat thows if negative')
newJob.setRepeat(true)
t.ok(is.true(newJob.repeat), 'Job setRepeat successfully changed value to true')
newJob.setRepeat(false)
t.ok(is.false(newJob.repeat), 'Job setRepeat successfully changed value to false')
t.throws(() => { newJob.setRepeatDelay('not valid') }, 'Job setRepeatDelay thows if invalid')
t.throws(() => { newJob.setRepeatDelay(-10) }, 'Job setRepeatDelay thows if negative')
newJob.setRepeatDelay(100)
t.equal(newJob.repeatDelay, 100, 'Job setRetryDelay successfully changed value')
t.throws(() => { newJob.setDateEnable('not valid') }, 'Job setDateEnable thows if invalid')

@@ -103,3 +110,3 @@ const testDate = new Date()

let cleanJob = newJob.getCleanCopy()
t.equal(Object.keys(cleanJob).length, 13, 'Clean job has valid number of properties')
t.equal(Object.keys(cleanJob).length, 16, 'Clean job has valid number of properties')
t.equal(cleanJob.id, newJob.id, 'Clean job has valid id')

@@ -165,3 +172,3 @@ t.equal(cleanJob.data, newJob.data, 'Clean job data is valid')

t.equal(custJob.priority, 'high', 'New job with new priority created successfully')
t.throws(() => { new Job(q, () => { }) }, 'New job with function throws error')
t.throws(() => { newJob = new Job(q, () => { }) }, 'New job with function throws error')

@@ -188,5 +195,5 @@ // ---------- Add Job Log ----------

savedJob.status = enums.status.active
return savedJob.setProgress(50)
return savedJob.updateProgress(50)
}).then((progressResult) => {
t.ok(progressResult, 'Job setProgress returned true')
t.ok(progressResult, 'Job updateProgress returned true')
return q.getJob(savedJob.id)

@@ -197,3 +204,3 @@ }).then((jobsFromDb) => {

removeEventHandlers()
eventHandlers.remove(t, q, state)
return q.reset()

@@ -200,0 +207,0 @@ }).then((resetResult) => {

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -9,15 +9,41 @@ const is = require('../src/is')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-add-job'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-add-job', (t) => {
t.plan(30)
test(testName, (t) => {
t.plan(51)
const q = new Queue(tOpts.cxn(), tOpts.default())
let addedCount = 0
function addedEventHandler (jobId) {
addedCount++
t.ok(is.uuid(jobId), `Event: added [${addedCount}] [${jobId}]`)
// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 0,
reset: 0,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 3,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
q.on(enums.status.added, addedEventHandler)

@@ -32,2 +58,3 @@ const job = q.createJob()

t.ok(is.integer(resetResult), 'Queue reset')
eventHandlers.add(t, q, state)

@@ -88,14 +115,5 @@ // ---------- Add Single Job Tests ----------

}).then(() => {
job.status = enums.status.waiting
// ---------- Add Invalid Status Job Tests ----------
t.comment('queue-add-job: Add Invalid Status Job')
return queueAddJob(q, job).then(() => {
t.fail('Promise is not being rejected when job status is invalid')
}).catch((err) => {
t.equal(err.message, enums.message.jobAlreadyAdded, 'Job with status not equal to created returns a rejected promise')
})
}).then(() => {
t.equal(addedCount, 3, 'Jobs added event count is valid')
q.removeListener(enums.status.added, addedEventHandler)
//
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
return q.reset()

@@ -102,0 +120,0 @@ }).then((resetResult) => {

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -10,7 +10,9 @@ const is = require('../src/is')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-cancel-job'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-cancel-job', (t) => {
t.plan(40)
test(testName, (t) => {
t.plan(70)

@@ -20,23 +22,30 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

// ---------- Event Handler Setup ----------
let testEvents = false
function cancelledEventHandler (jobId) {
if (testEvents) {
t.ok(is.uuid(jobId), `Event: Job cancelled [${jobId}]`)
}
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 5,
reset: 1,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 6,
waiting: 0,
active: 0,
completed: 0,
cancelled: 11,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
function removedEventHandler (jobId) {
if (testEvents) {
t.ok(is.uuid(jobId), `Event: Job removed [${jobId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.cancelled, cancelledEventHandler)
q.on(enums.status.removed, removedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.cancelled, cancelledEventHandler)
q.removeListener(enums.status.removed, removedEventHandler)
}

@@ -55,3 +64,3 @@ const jobsToCreate = 5

// ---------- Cancel Multiple Jobs Tests ----------
addEventHandlers()
eventHandlers.add(t, q, state)
t.comment('queue-cancel-job: Cancel Multiple Jobs')

@@ -107,3 +116,6 @@ return queueCancelJob(q, savedJobs, tData)

t.ok(resetResult >= 0, 'Queue reset')
removeEventHandlers()
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
q.stop()

@@ -110,0 +122,0 @@ return resolve(t.end())

@@ -1,14 +0,15 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')
const is = require('../src/is')
const tError = require('./test-error')
const enums = require('../src/enums')
const Queue = require('../src/queue')
const dbReview = require('../src/db-review')
const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-change'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-change', (t) => {
t.plan(84)
test(testName, (t) => {
t.plan(61)

@@ -19,256 +20,35 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

// ---------- Event Handler Setup ----------
let testEvents = false
let readyEventCount = 0
let readyEventTotal = 1
function readyEventHandler (queueId) {
readyEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: ready [${readyEventCount} of ${readyEventTotal}] [${queueId}]`)
}
let state = {
testName,
enabled: false,
ready: 1,
processing: 3,
progress: 3,
pausing: 3,
paused: 3,
resumed: 2,
removed: 1,
reset: 0,
error: 0,
reviewed: 2,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 3,
waiting: 0,
active: 3,
completed: 1,
cancelled: 1,
failed: 1,
terminated: 1,
reanimated: 0,
log: 1,
updated: 0
}
let addedEventCount = 0
let addedEventTotal = 3
function addedEventHandler (jobId) {
addedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: added [${addedEventCount} of ${addedEventTotal}] [${jobId}]`)
}
}
let activeEventCount = 0
let activeEventTotal = 3
function activeEventHandler (jobId) {
activeEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: active [${activeEventCount} of ${activeEventTotal}] [${jobId}]`)
}
}
let processingEventCount = 0
let processingEventTotal = 3
function processingEventHandler (jobId) {
processingEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: processing [${processingEventCount} of ${processingEventTotal}] [${jobId}]`)
}
}
let progressEventCount = 0
let progressEventTotal = 3
function progressEventHandler (jobId, percent) {
progressEventCount++
if (testEvents) {
t.pass(`Event: progress [${progressEventCount} of ${progressEventTotal}]`)
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`)
t.ok(is.number(percent), `Event: progress [${percent}%]`)
}
}
let completedEventCount = 0
let completedEventTotal = 1
function completedEventHandler (jobId) {
completedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: completed [${completedEventCount} of ${completedEventTotal}] [${jobId}]`)
}
}
let cancelledEventCount = 0
let cancelledEventTotal = 1
function cancelledEventHandler (jobId) {
cancelledEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: cancelled [${cancelledEventCount} of ${cancelledEventTotal}] [${jobId}]`)
}
}
let failedEventCount = 0
let failedEventTotal = 1
function failedEventHandler (jobId) {
failedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: failed [${failedEventCount} of ${failedEventTotal}] [${jobId}]`)
}
}
let terminatedEventCount = 0
let terminatedEventTotal = 1
function terminatedEventHandler (jobId) {
terminatedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: terminated [${terminatedEventCount} of ${terminatedEventTotal}] [${jobId}]`)
}
}
let logEventCount = 0
let logEventTotal = 1
function logEventHandler (jobId) {
logEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: log [${logEventCount} of ${logEventTotal}] [${jobId}]`)
}
}
let updatedEventCount = 0
let updatedEventTotal = 0
function updatedEventHandler (jobId) {
updatedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: updated [${updatedEventCount} of ${updatedEventTotal}] [${jobId}]`)
}
}
let pausingEventCount = 0
let pausingEventTotal = 3
function pausingEventHandler (global, queueId) {
pausingEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: pausing [${pausingEventCount} of ${pausingEventTotal}]`)
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`)
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`)
}
}
let pausedEventCount = 0
let pausedEventTotal = 3
function pausedEventHandler (global, queueId) {
pausedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: paused [${pausedEventCount} of ${pausedEventTotal}]`)
t.ok(is.boolean(global), `Event: paused [global: ${global}]`)
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`)
}
}
let resumedEventCount = 0
let resumedEventTotal = 2
function resumedEventHandler (global, queueId) {
resumedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: resumed [${resumedEventCount} of ${resumedEventTotal}]`)
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`)
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`)
}
}
let removedEventCount = 0
let removedEventTotal = 1
function removedEventHandler (jobId) {
removedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: removed [${removedEventCount} of ${removedEventTotal}] [${jobId}]`)
}
}
let idleEventCount = 0
let idleEventTotal = 1
function idleEventHandler (queueId) {
if (idleEventCount < 1) {
idleEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: idle [${idleEventCount} of ${idleEventTotal}] [${queueId}]`)
}
}
}
let reviewedEventCount = 0
let reviewedEventTotal = 2
function reviewedEventHandler (result) {
reviewedEventCount++
if (testEvents) {
t.ok(is.object(result), `Event: reviewed [${reviewedEventCount} of ${reviewedEventTotal}]`)
}
}
let resetEventCount = 0
let resetEventTotal = 0
function resetEventHandler (total) {
resetEventCount++
if (testEvents) {
t.ok(is.integer(total), `Event: reset [${resetEventCount} of ${resetEventTotal}] [${total}]`)
}
}
let errorEventCount = 0
let errorEventTotal = 0
function errorEventHandler (err) {
errorEventCount++
if (testEvents) {
t.ok(is.string(err.message), `Event: error [${errorEventCount} of ${errorEventTotal}] [${err.message}]`)
}
}
let detachedEventCount = 0
let detachedEventTotal = 0
function detachedEventHandler (queueId) {
detachedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: detached [${detachedEventCount} of ${detachedEventTotal}] [${queueId}]`)
}
}
let stoppingEventCount = 0
let stoppingEventTotal = 0
function stoppingEventHandler (queueId) {
stoppingEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: stopping [${stoppingEventCount} of ${stoppingEventTotal}] [${queueId}]`)
}
}
let stoppedEventCount = 0
let stoppedEventTotal = 0
function stoppedEventHandler (queueId) {
stoppedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: stopped [${stoppedEventCount} of ${stoppedEventTotal}] [${queueId}]`)
}
}
let droppedEventCount = 0
let droppedEventTotal = 0
function droppedEventHandler (queueId) {
droppedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: dropped [${droppedEventCount} of ${droppedEventTotal}] [${queueId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.ready, readyEventHandler)
q.on(enums.status.added, addedEventHandler)
q.on(enums.status.active, activeEventHandler)
q.on(enums.status.processing, processingEventHandler)
q.on(enums.status.progress, progressEventHandler)
q.on(enums.status.completed, completedEventHandler)
q.on(enums.status.cancelled, cancelledEventHandler)
q.on(enums.status.failed, failedEventHandler)
q.on(enums.status.terminated, terminatedEventHandler)
q.on(enums.status.paused, pausedEventHandler)
q.on(enums.status.pausing, pausingEventHandler)
q.on(enums.status.resumed, resumedEventHandler)
q.on(enums.status.removed, removedEventHandler)
q.on(enums.status.log, logEventHandler)
q.on(enums.status.updated, updatedEventHandler)
q.on(enums.status.idle, idleEventHandler)
q.on(enums.status.reviewed, reviewedEventHandler)
q.on(enums.status.reset, resetEventHandler)
q.on(enums.status.error, errorEventHandler)
q.on(enums.status.detached, detachedEventHandler)
q.on(enums.status.stopping, stoppingEventHandler)
q.on(enums.status.stopped, stoppedEventHandler)
q.on(enums.status.dropped, droppedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.ready, readyEventHandler)
q.removeListener(enums.status.added, addedEventHandler)
q.removeListener(enums.status.active, activeEventHandler)
q.removeListener(enums.status.processing, processingEventHandler)
q.removeListener(enums.status.progress, progressEventHandler)
q.removeListener(enums.status.completed, completedEventHandler)
q.removeListener(enums.status.cancelled, cancelledEventHandler)
q.removeListener(enums.status.failed, failedEventHandler)
q.removeListener(enums.status.terminated, terminatedEventHandler)
q.removeListener(enums.status.paused, pausedEventHandler)
q.removeListener(enums.status.pausing, pausingEventHandler)
q.removeListener(enums.status.resumed, resumedEventHandler)
q.removeListener(enums.status.removed, removedEventHandler)
q.removeListener(enums.status.log, logEventHandler)
q.removeListener(enums.status.updated, updatedEventHandler)
q.removeListener(enums.status.idle, idleEventHandler)
q.removeListener(enums.status.reviewed, reviewedEventHandler)
q.removeListener(enums.status.reset, resetEventHandler)
q.removeListener(enums.status.error, errorEventHandler)
q.removeListener(enums.status.detached, detachedEventHandler)
q.removeListener(enums.status.stopping, stoppingEventHandler)
q.removeListener(enums.status.stopped, stoppedEventHandler)
q.removeListener(enums.status.dropped, droppedEventHandler)
return q.resume()
}
let job = qPub.createJob()
let processDelay = 500
addEventHandlers()
eventHandlers.add(t, q, state)

@@ -282,5 +62,5 @@ return qPub.reset().then((resetResult) => {

t.equal(j.id, job.id, `Job Processed [${j.id}]`)
next('queue-change')
next(null, 'queue-change')
}, processDelay)
return j.setProgress(50)
return j.updateProgress(50)
})

@@ -335,30 +115,6 @@

}).delay(processDelay).then(() => {
removeEventHandlers()
//
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
// ---------- Event summary Test ----------
t.comment('queue-change: Event Summary')
t.equal(readyEventCount, readyEventTotal, 'Ready event count valid')
t.equal(addedEventCount, addedEventTotal, 'Added event count valid')
t.equal(activeEventCount, activeEventTotal, 'Active event count valid')
t.equal(processingEventCount, processingEventTotal, 'Processing event count valid')
t.equal(progressEventCount, progressEventTotal, 'Progress event count valid')
t.equal(completedEventCount, completedEventTotal, 'Completed event count valid')
t.equal(cancelledEventCount, cancelledEventTotal, 'Cancelled event count valid')
t.equal(failedEventCount, failedEventTotal, 'Failed event count valid')
t.equal(terminatedEventCount, terminatedEventTotal, 'Terminated event count valid')
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid')
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid')
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid')
t.equal(removedEventCount, removedEventTotal, 'Removed event count valid')
t.equal(logEventCount, logEventTotal, 'Log event count valid')
t.equal(updatedEventCount, updatedEventTotal, 'Updated event count valid')
t.equal(idleEventCount, idleEventTotal, 'Idle event count valid')
t.equal(reviewedEventCount, reviewedEventTotal, 'Reviewed event count valid')
t.equal(resetEventCount, resetEventTotal, 'Reset event count valid')
t.equal(errorEventCount, errorEventTotal, 'Error event count valid')
t.equal(detachedEventCount, detachedEventTotal, 'Detached event count valid')
t.equal(stoppingEventCount, stoppingEventTotal, 'Stopping event count valid')
t.equal(stoppedEventCount, stoppedEventTotal, 'Stopped event count valid')
t.equal(droppedEventCount, droppedEventTotal, 'Dropped event count valid')
return q.reset()

@@ -365,0 +121,0 @@ }).then((resetResult) => {

@@ -1,5 +0,4 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')
const is = require('../src/is')
const enums = require('../src/enums')
const tError = require('./test-error')

@@ -10,32 +9,45 @@ const queueDb = require('../src/queue-db')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-db'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-db', (t) => {
t.plan(70)
test(testName, (t) => {
t.plan(75)
const q = new Queue(tOpts.cxn(), tOpts.default())
let readyEventCount = 0
q.on(enums.status.ready, function readyEventHandler (qid) {
readyEventCount++
t.pass(`Event: Queue ready [${qid}]`)
t.equal(qid, q.id, `Event: Queue ready id is valid`)
if (readyEventCount >= 6) {
this.removeListener(enums.status.ready, readyEventHandler)
}
})
// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 0,
reset: 0,
error: 0,
reviewed: 0,
detached: 1,
stopping: 0,
stopped: 0,
dropped: 0,
added: 0,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
let detachEventCount = 0
q.on(enums.status.detached, function detachedEventHandler (qid) {
detachEventCount++
t.pass(`Event: Queue detached [${qid}]`)
t.equal(qid, q.id, `Event: Queue detached id valid`)
if (detachEventCount >= 6) {
this.removeListener(enums.status.detached, detachedEventHandler)
}
})
return q.reset().then((resetResult) => {
t.ok(is.integer(resetResult), 'Queue reset')
eventHandlers.add(t, q, state)
q._masterInterval = 300

@@ -52,4 +64,6 @@ q._changeFeed = true

t.comment('queue-db: Detach with Drain')
return queueDb.detach(q, true)
return queueDb.detach(q)
}).then(() => {
return queueDb.drain(q)
}).then(() => {
t.pass('Pass: Queue detached with pool drain')

@@ -68,2 +82,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled')

}).then((ready) => {
eventHandlers.add(t, q, state)
t.ok(ready, 'Queue in a ready state')

@@ -75,4 +90,6 @@ t.ok(dbReview.isEnabled(q), 'Review is enabled')

t.comment('queue-db: Detach with Drain')
return queueDb.detach(q, true)
return queueDb.detach(q)
}).then(() => {
return queueDb.drain(q)
}).then(() => {
t.pass('Pass: Queue detached with pool drain')

@@ -93,2 +110,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled')

}).then((ready) => {
eventHandlers.add(t, q, state)
t.ok(ready, 'Queue in a ready state')

@@ -100,4 +118,6 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled')

t.comment('queue-db: Detach with Drain')
return queueDb.detach(q, true)
return queueDb.detach(q)
}).then(() => {
return queueDb.drain(q)
}).then(() => {
t.pass('Pass: Queue detached with pool drain')

@@ -118,2 +138,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled')

}).then((ready) => {
eventHandlers.add(t, q, state)
t.ok(ready, 'Queue in a ready state')

@@ -125,4 +146,6 @@ t.ok(dbReview.isEnabled(q), 'Review is enabled')

t.comment('queue-db: Detach with Drain')
return queueDb.detach(q, true)
return queueDb.detach(q)
}).then(() => {
return queueDb.drain(q)
}).then(() => {
t.pass('Pass: Queue detached with pool drain')

@@ -143,2 +166,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled')

}).then((ready) => {
eventHandlers.add(t, q, state)
t.ok(ready, 'Queue in a ready state')

@@ -150,4 +174,6 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled')

t.comment('queue-db: Detach with Drain')
return queueDb.detach(q, true)
return queueDb.detach(q)
}).then(() => {
return queueDb.drain(q)
}).then(() => {
t.pass('Pass: Queue detached with pool drain')

@@ -168,2 +194,3 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled')

}).then((ready) => {
eventHandlers.add(t, q, state)
t.ok(ready, 'Queue in a ready state')

@@ -173,5 +200,5 @@ t.ok(dbReview.isEnabled(q), 'Review is enabled')

// ---------- Detach with Drain ----------
t.comment('queue-db: Detach with Drain')
return queueDb.detach(q, false)
// ---------- Detach without Drain ----------
t.comment('queue-db: Detach without Drain')
return queueDb.detach(q)
}).then(() => {

@@ -187,4 +214,6 @@ t.pass('Pass: Queue detached without pool drain')

t.comment('queue-db: Detach with Drain')
return queueDb.detach(q, true)
return queueDb.detach(q)
}).then(() => {
return queueDb.drain(q)
}).then(() => {
q._masterInterval = false

@@ -202,2 +231,5 @@ q._changeFeed = true

t.ok(q._changeFeedCursor.connection.open, 'Change feed is connected')
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
q.stop()

@@ -204,0 +236,0 @@ return resolve(t.end())

@@ -1,5 +0,4 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')
const is = require('../src/is')
const enums = require('../src/enums')
const tError = require('./test-error')

@@ -10,48 +9,48 @@ const queueDrop = require('../src/queue-drop')

const rethinkdbdash = require('rethinkdbdash')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-drop'
module.exports = function () {
const mockQueue = {
r: rethinkdbdash(Object.assign(tOpts.cxn(), { silent: true })),
db: tOpts.dbName,
name: tOpts.queueName,
id: 'mock:queue:id'
}
return new Promise((resolve, reject) => {
test('queue-drop', (t) => {
t.plan(10)
test(testName, (t) => {
t.plan(33)
const mockQueue = {
r: rethinkdbdash(Object.assign(tOpts.cxn(), { silent: true })),
db: tOpts.dbName,
name: tOpts.queueName,
id: 'mock:queue:id'
}
let q = new Queue(tOpts.cxn(), tOpts.default())
let testEvents = false
function stoppingEventHandler (qid) {
if (testEvents) {
t.pass(`Event: Queue stopping [${qid}]`)
t.equal(qid, q.id, `Event: Queue stopping id is valid`)
}
// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 1,
paused: 1,
resumed: 0,
removed: 0,
reset: 0,
error: 0,
reviewed: 0,
detached: 1,
stopping: 1,
stopped: 1,
dropped: 1,
added: 0,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
function stoppedEventHandler (qid) {
if (testEvents) {
t.pass(`Event: Queue stopped [${qid}]`)
t.equal(qid, q.id, `Event: Queue stopped id is valid`)
}
}
function droppedEventHandler (qid) {
if (testEvents) {
t.pass(`Event: Queue dropped [${qid}]`)
t.equal(qid, q.id, `Event: Queue dropped id is valid`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.stopping, stoppingEventHandler)
q.on(enums.status.stopped, stoppedEventHandler)
q.on(enums.status.dropped, droppedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.stopping, stoppingEventHandler)
q.removeListener(enums.status.stopped, stoppedEventHandler)
q.removeListener(enums.status.dropped, droppedEventHandler)
}

@@ -70,3 +69,3 @@ function simulateJobProcessing () {

t.comment('queue-drop: Drop Queue')
addEventHandlers()
eventHandlers.add(t, q, state)
simulateJobProcessing()

@@ -83,3 +82,4 @@ return queueDrop(q)

removeEventHandlers()
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
mockQueue.r.getPoolMaster().drain()

@@ -86,0 +86,0 @@ return resolve(t.end())

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -3,0 +3,0 @@ const tError = require('./test-error')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -3,0 +3,0 @@ const is = require('../src/is')

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -11,7 +11,9 @@ const datetime = require('../src/datetime')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-get-next-job'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-get-next-job', (t) => {
t.plan(105)
test(testName, (t) => {
t.plan(152)

@@ -21,8 +23,2 @@ // ---------- Creating Priority Test Jobs ----------

q._concurrency = 1
let activeCount = 0
function activeEventHandler (jobId) {
activeCount++
t.ok(is.uuid(jobId), `Event: Job Active [${activeCount}] [${jobId}]`)
}
q.on(enums.status.active, activeEventHandler)

@@ -79,2 +75,32 @@ const jobLowest = q.createJob().setPriority('lowest')

// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 0,
reset: 0,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 24,
waiting: 0,
active: 19,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
// Uncomment below for debugging

@@ -88,2 +114,3 @@ // allCreatedJobs.map((j) => {

t.ok(is.integer(resetResult), 'Queue reset')
eventHandlers.add(t, q, state)
return queueAddJob(q, allCreatedJobs, true)

@@ -223,4 +250,4 @@ }).then((savedJobs) => {

// ---------- Testing dateEnable with retryCount ----------
t.comment('queue-get-next-job: dateEnable with retryCount')
// ---------- Testing dateEnable, retryCount, and processCount ----------
t.comment('queue-get-next-job: dateEnable, retryCount, and processCount')
retryJobs = new Array(4)

@@ -251,2 +278,3 @@ retryJobs[0] = q.createJob().setDateEnable(datetime.add.sec(new Date(), -100))

t.ok(is.dateBefore(retryGet2[1].dateEnable, retryGet2[2].dateEnable), 'dateEnable for third job is valid')
t.equal(retryGet2[0].processCount, 1, 'processCount is valid')
return queueGetNextJob(q)

@@ -256,4 +284,6 @@ }).then((retryGet3) => {

t.equal(retryGet3[0].id, retryJobs[0].id, 'Last job is valid')
t.equal(activeCount, 19, 'Active event count valid')
q.removeListener(enums.status.active, activeEventHandler)
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
return q.reset()

@@ -260,0 +290,0 @@ }).then((resetResult) => {

@@ -1,5 +0,3 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')
const enums = require('../src/enums')
const is = require('../src/is')
const tError = require('./test-error')

@@ -12,7 +10,9 @@ const Queue = require('../src/queue')

{ './queue-process': processStub })
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-interruption'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-interruption', (t) => {
t.plan(19)
test(testName, (t) => {
t.plan(33)

@@ -25,48 +25,33 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

// ---------- Event Handler Setup ----------
let testEvents = false
let pausingEventCount = 0
let pausingEventTotal = 1
function pausingEventHandler (global, queueId) {
pausingEventCount++
if (testEvents) {
t.pass(`Event: pausing [${pausingEventCount} of ${pausingEventTotal}] `)
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`)
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`)
}
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 1,
paused: 1,
resumed: 1,
removed: 0,
reset: 0,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 0,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
let pausedEventCount = 0
let pausedEventTotal = 1
function pausedEventHandler (global, queueId) {
pausedEventCount++
if (testEvents) {
t.pass(`Event: paused [${pausedEventCount} of ${pausedEventTotal}] `)
t.ok(is.boolean(global), `Event: paused [global: ${global}]`)
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`)
}
}
let resumedEventCount = 0
let resumedEventTotal = 1
function resumedEventHandler (global, queueId) {
resumedEventCount++
if (testEvents) {
t.pass(`Event: resumed [${resumedEventCount} of ${resumedEventTotal}] `)
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`)
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.paused, pausingEventHandler)
q.on(enums.status.paused, pausedEventHandler)
q.on(enums.status.resumed, resumedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.pausing, pausedEventHandler)
q.removeListener(enums.status.paused, pausedEventHandler)
q.removeListener(enums.status.resumed, resumedEventHandler)
}
return q.ready().then((ready) => {
addEventHandlers()
eventHandlers.add(t, q, state)
t.ok(ready, 'Queue is ready')

@@ -93,9 +78,5 @@

t.notOk(q.paused, 'Queue is not paused')
removeEventHandlers()
// ---------- Event summary Test ----------
t.comment('queue-interruption: Event Summary')
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid')
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid')
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid')
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)

@@ -102,0 +83,0 @@ q.stop()

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -13,7 +13,8 @@ const datetime = require('../src/datetime')

const eventHandlers = require('./test-event-handlers')
const testName = 'queue-process'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-process', (t) => {
t.plan(304)
test(testName, { timeout: 200000 }, (t) => {
t.plan(409)

@@ -26,14 +27,24 @@ // ---------- Test Setup ----------

let jobDelay = 200
let repeatDelay = 300
const noOfJobsToCreate = 10
const allJobsDelay = jobDelay * (noOfJobsToCreate + 2)
function resumeProcessPauseGet () {
return q.resume().delay(jobDelay * 0.6).then(() => {
return q.pause()
}).delay(jobDelay).then(() => {
return q.getJob(jobs)
}).delay(repeatDelay * 2)
}
// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,
ready: 0,
processing: 38,
processing: 48,
progress: 1,
pausing: 2,
paused: 2,
resumed: 3,
pausing: 14,
paused: 14,
resumed: 14,
removed: 0,

@@ -48,9 +59,10 @@ idle: 12,

dropped: 0,
added: 35,
added: 37,
waiting: 0,
active: 38,
completed: 32,
cancelled: 2,
active: 48,
completed: 42,
cancelled: 3,
failed: 3,
terminated: 1,
reanimated: 0,
log: 0,

@@ -60,5 +72,11 @@ updated: 0

let completedEventCount = 0
const summaryCompleted = 32
const summaryCancelled = 2
// idle event testing is not part of the test-event-handlers module
// because it is too difficult to measure. This handler and one
// test below is to ensure idle is being called.
function idleEventHandler (qid) {
t.ok(is.string(qid), `Event: idle`)
}
const summaryCompleted = 33
const summaryCancelled = 3
const summaryTerminated = 1

@@ -88,3 +106,3 @@

setTimeout(function () {
return job.setProgress(50).then((result) => {
return job.updateProgress(50).then((result) => {
t.ok(result, 'Job progress updated: ' + job.id)

@@ -97,3 +115,3 @@ })

jobProcessTimeoutId = false
next('Job Completed: ' + job.id)
next(null, 'Job Completed: ' + job.id)
.then((runningJobs) => {

@@ -145,2 +163,3 @@ t.ok(is.integer(runningJobs), `Next call returns running jobs [${runningJobs}]`)

}).then(() => {
q.on(enums.status.idle, idleEventHandler)
return q.resume()

@@ -150,6 +169,78 @@ }).delay(jobDelay / 2).then(() => {

}).delay(jobDelay * 8).then(() => {
completedEventCount = state.count.get(enums.status.completed)
t.equal(state.count.get(enums.status.completed), noOfJobsToCreate, `Queue has completed ${completedEventCount} jobs`)
q.removeListener(enums.status.idle, idleEventHandler)
t.ok(q.idle, 'Queue is idle')
// ---------- Repeat True Test ----------
t.comment('queue-process: Repeat True')
jobs = q.createJob().setRepeat(true).setRepeatDelay(repeatDelay)
t.ok(jobs.repeat, 'Job repeat is true')
t.equal(jobs.repeatDelay, repeatDelay, 'Job repeat delay is valid')
return q.pause()
}).then(() => {
return q.addJob(jobs)
}).then((savedJobs) => {
t.equal(savedJobs.length, 1, `Jobs saved successfully: [${savedJobs.length}]`)
return resumeProcessPauseGet()
}).then((repeatJob) => {
t.equal(repeatJob[0].processCount, 1, 'Repeat job processed once')
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting')
return resumeProcessPauseGet()
}).then((repeatJob) => {
t.equal(repeatJob[0].processCount, 2, 'Repeat job processed twice')
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting')
return resumeProcessPauseGet()
}).then((repeatJob) => {
t.equal(repeatJob[0].processCount, 3, 'Repeat job processed three times')
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting')
return resumeProcessPauseGet()
}).then((repeatJob) => {
t.equal(repeatJob[0].processCount, 4, 'Repeat job processed four times')
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting')
return resumeProcessPauseGet()
}).then((repeatJob) => {
t.equal(repeatJob[0].processCount, 5, 'Repeat job processed five times')
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting')
return q.cancelJob(jobs)
}).then((cancelledJobs) => {
t.ok(is.uuid(cancelledJobs[0]), 'repeat job cancelled')
return q.getJob(jobs)
}).then((cancelledJobs) => {
t.equal(cancelledJobs[0].processCount, 5, `Job repeated 5 times before cancel`)
t.equal(cancelledJobs[0].log.length, 12, 'repeat job log count valid')
// ---------- Repeat Number Test ----------
t.comment('queue-process: Repeat Number')
jobs = q.createJob().setRepeat(4).setRepeatDelay(repeatDelay)
t.equal(jobs.repeat, 4, `Job repeat is 4`)
t.equal(jobs.repeatDelay, repeatDelay, 'Job repeat delay is valid')
return q.pause()
}).then(() => {
return q.addJob(jobs)
}).then((savedJobs) => {
t.equal(savedJobs.length, 1, `Jobs saved successfully: [${savedJobs.length}]`)
return resumeProcessPauseGet()
}).then((repeatJob) => {
t.equal(repeatJob[0].processCount, 1, 'Repeat job processed once')
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting')
return resumeProcessPauseGet()
}).then((repeatJob) => {
t.equal(repeatJob[0].processCount, 2, 'Repeat job processed twice')
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting')
return resumeProcessPauseGet()
}).then((repeatJob) => {
t.equal(repeatJob[0].processCount, 3, 'Repeat job processed three times')
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting')
return resumeProcessPauseGet()
}).then((repeatJob) => {
t.equal(repeatJob[0].processCount, 4, 'Repeat job processed four times')
t.equal(repeatJob[0].status, enums.status.waiting, 'Repeat job is waiting')
return resumeProcessPauseGet()
}).then((completedJobs) => {
t.equal(completedJobs[0].processCount, 5, 'Repeat job processed five times')
t.equal(completedJobs[0].status, enums.status.completed, 'Repeat job is completed')
t.equal(completedJobs[0].log.length, 11, 'repeat job log count valid')
return q.resume()
}).then(() => {
// // TODO - Remove below to run all tests.
// return q.stop().then(() => Promise.reject())
// ---------- Processing Restart on Job Add Test ----------

@@ -166,4 +257,2 @@ t.comment('queue-process: Process Restart on Job Add')

}).delay(allJobsDelay).then(() => {
completedEventCount = state.count.get(enums.status.completed)
t.equal(completedEventCount, noOfJobsToCreate * 2, `Queue has completed ${completedEventCount} jobs`)
t.ok(q.idle, 'Queue is idle')

@@ -186,4 +275,2 @@ return q.pause()

}).delay(allJobsDelay).then(() => {
completedEventCount = state.count.get(enums.status.completed)
t.equal(completedEventCount, noOfJobsToCreate * 3, `Queue has completed ${completedEventCount} jobs`)
t.pass('Restart processing succeeded')

@@ -291,3 +378,2 @@ t.ok(q.idle, 'Queue is idle')

// ---------- Event Summary ----------
t.comment('queue-process: Event Summary')
eventHandlers.remove(t, q, state)

@@ -294,0 +380,0 @@

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -9,7 +9,9 @@ const is = require('../src/is')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-remove-job'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-remove-job', (t) => {
t.plan(24)
test(testName, (t) => {
t.plan(55)

@@ -22,20 +24,35 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

let testEvents = false
function removedEventHandler (jobId) {
if (testEvents) {
t.ok(is.uuid(jobId), `Event: removed [${jobId}]`)
}
// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 8,
reset: 0,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 8,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.removed, removedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.removed, removedEventHandler)
}
return q.reset().then((resetResult) => {
t.ok(is.integer(resetResult), 'Queue reset')
addEventHandlers()
eventHandlers.add(t, q, state)
return q.addJob(jobs)

@@ -110,3 +127,5 @@ }).then((savedJobs) => {

}).then(() => {
removeEventHandlers()
//
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
return q.reset()

@@ -113,0 +132,0 @@ }).then((resetResult) => {

@@ -1,5 +0,4 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')
const is = require('../src/is')
const enums = require('../src/enums')
const tError = require('./test-error')

@@ -9,7 +8,9 @@ const queueReset = require('../src/queue-reset')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-reset'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-reset', (t) => {
t.plan(7)
test(testName, (t) => {
t.plan(32)

@@ -22,19 +23,36 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

]
let eventCount = 0
function resetEventHandler (total) {
eventCount++
t.pass('Event: Queue reset')
if (eventCount < 2) { return }
t.equal(total, 3, 'Queue reset removed valid number of jobs')
return q.summary().then((afterSummary) => {
t.equal(afterSummary.waiting, 0, 'Status summary contains no added jobs')
q.removeListener(enums.status.reset, resetEventHandler)
q.stop()
return resolve(t.end())
}).catch(err => tError(err, module, t))
// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 0,
paused: 0,
resumed: 0,
removed: 0,
reset: 1,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 3,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
q.on(enums.status.reset, resetEventHandler)
return q.reset().then((removed) => {
t.ok(is.integer(removed), 'Initial reset succeeded')
eventHandlers.add(t, q, state)
return q.addJob(jobs)

@@ -47,2 +65,13 @@ }).then((savedJobs) => {

return queueReset(q)
}).then((total) => {
t.equal(total, 3, 'Queue reset removed valid number of jobs')
return q.summary()
}).then((afterSummary) => {
t.equal(afterSummary.waiting, 0, 'Status summary contains no added jobs')
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
q.stop()
return resolve(t.end())
}).catch(err => tError(err, module, t))

@@ -49,0 +78,0 @@ })

@@ -1,6 +0,4 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')
// const datetime = require('../src/datetime')
const enums = require('../src/enums')
const is = require('../src/is')
const tError = require('./test-error')

@@ -11,7 +9,9 @@ const tData = require('./test-options').tData

const queueState = require('../src/queue-state')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-state'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-state', (t) => {
t.plan(15)
test(testName, (t) => {
t.plan(29)

@@ -24,49 +24,34 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

// ---------- Event Handler Setup ----------
let testEvents = false
let pausingEventCount = 0
let pausingEventTotal = 1
function pausingEventHandler (global, queueId) {
pausingEventCount++
if (testEvents) {
t.pass(`Event: pausing [${pausingEventCount} of ${pausingEventTotal}] `)
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`)
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`)
}
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 1,
paused: 1,
resumed: 1,
removed: 0,
reset: 0,
error: 0,
reviewed: 0,
detached: 0,
stopping: 0,
stopped: 0,
dropped: 0,
added: 0,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
let pausedEventCount = 0
let pausedEventTotal = 1
function pausedEventHandler (global, queueId) {
pausedEventCount++
if (testEvents) {
t.pass(`Event: paused [${pausedEventCount} of ${pausedEventTotal}] `)
t.ok(is.boolean(global), `Event: paused [global: ${global}]`)
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`)
}
}
let resumedEventCount = 0
let resumedEventTotal = 1
function resumedEventHandler (global, queueId) {
resumedEventCount++
if (testEvents) {
t.pass(`Event: resumed [${resumedEventCount} of ${resumedEventTotal}] `)
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`)
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.paused, pausingEventHandler)
q.on(enums.status.paused, pausedEventHandler)
q.on(enums.status.resumed, resumedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.paused, pausingEventHandler)
q.removeListener(enums.status.paused, pausedEventHandler)
q.removeListener(enums.status.resumed, resumedEventHandler)
}
q.reset().then((resetResult) => {
t.ok(resetResult >= 0, 'Queue reset')
addEventHandlers()
eventHandlers.add(t, q, state)

@@ -85,9 +70,5 @@ // ---------- Global Pause Test ----------

// ---------- Event summary Test ----------
t.comment('queue-state: Event Summary')
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid')
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid')
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid')
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
removeEventHandlers()
q.stop()

@@ -94,0 +75,0 @@ q2.stop()

@@ -1,5 +0,4 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')
const is = require('../src/is')
const enums = require('../src/enums')
const tError = require('./test-error')

@@ -11,33 +10,41 @@ const queueStop = require('../src/queue-stop')

const tOpts = require('./test-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue-stop'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue-stop', (t) => {
t.plan(31)
test(testName, (t) => {
t.plan(79)
const q = new Queue(tOpts.cxn(), tOpts.default())
let q = new Queue(tOpts.cxn(), tOpts.master(999999))
let testEvents = false
function stoppingEventHandler (qid) {
if (testEvents) {
t.pass(`Event: Queue stopping [${qid}]`)
t.equal(qid, q.id, `Event: Queue stopping id is valid`)
}
// ---------- Event Handler Setup ----------
let state = {
testName,
enabled: false,
ready: 0,
processing: 0,
progress: 0,
pausing: 1,
paused: 1,
resumed: 0,
removed: 0,
reset: 0,
error: 0,
reviewed: 0,
detached: 1,
stopping: 1,
stopped: 1,
dropped: 0,
added: 0,
waiting: 0,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
function stoppedEventHandler (qid) {
if (testEvents) {
t.pass(`Event: Queue stopped [${qid}]`)
t.equal(qid, q.id, `Event: Queue stopped id is valid`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.stopping, stoppingEventHandler)
q.on(enums.status.stopped, stoppedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.stopping, stoppingEventHandler)
q.removeListener(enums.status.stopped, stoppedEventHandler)
}

@@ -53,5 +60,2 @@ function simulateJobProcessing () {

t.ok(is.integer(resetResult), 'Queue reset')
q._running = 1
q._masterInterval = 300
dbReview.enable(q)
return q.ready()

@@ -63,3 +67,3 @@ }).then((ready) => {

t.notOk(q.paused, 'Queue is not paused')
addEventHandlers()
eventHandlers.add(t, q, state)

@@ -69,4 +73,6 @@ // ---------- Stop with Drain ----------

simulateJobProcessing()
return queueStop(q, true)
return queueStop(q)
}).then((stopped) => {
return queueDb.drain(q)
}).then((stopped) => {
t.ok(stopped, 'Queue stopped with pool drain')

@@ -80,11 +86,12 @@ t.notOk(dbReview.isEnabled(q), 'Review is disabled')

// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
// ---------- Stop without Drain ----------
t.comment('queue-stop: Stop without Drain')
return queueDb.attach(q, tOpts.cxn())
}).then(() => {
q = new Queue(tOpts.cxn(), tOpts.master(999999))
return q.ready()
}).then((ready) => {
t.ok(ready, 'Queue in a ready state')
return q.resume()
}).then(() => {
eventHandlers.add(t, q, state)
t.ok(dbReview.isEnabled(q), 'Review is enabled')

@@ -94,3 +101,3 @@ t.ok(q._changeFeedCursor.connection.open, 'Change feed is connected')

simulateJobProcessing()
return queueStop(q, false)
return queueStop(q)
}).then((stopped2) => {

@@ -105,4 +112,6 @@ t.ok(stopped2, 'Queue stopped without pool drain')

// detaching with drain or node will not exit gracefully
return queueDb.detach(q, true)
return queueDb.detach(q)
}).then(() => {
return queueDb.drain(q)
}).then(() => {
return queueDb.attach(q, tOpts.cxn())

@@ -119,4 +128,4 @@ }).then(() => {

// ---------- Clean Up ----------
removeEventHandlers()
// ---------- Event Summary ----------
eventHandlers.remove(t, q, state)
q.stop()

@@ -123,0 +132,0 @@ return resolve(t.end())

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -18,3 +18,3 @@ const is = require('../src/is')

let jobs = []
for (let i = 0; i < 7; i++) {
for (let i = 0; i < 6; i++) {
jobs.push(q.createJob())

@@ -26,8 +26,8 @@ }

jobs[3].status = enums.status.cancelled
jobs[5].status = enums.status.failed
jobs[6].status = enums.status.terminated
jobs[4].status = enums.status.failed
jobs[5].status = enums.status.terminated
return q.reset().then((resetResult) => {
t.ok(is.integer(resetResult), 'Queue reset')
return queueAddJob(q, jobs, true)
return queueAddJob(q, jobs)
}).then(() => {

@@ -42,3 +42,3 @@ return queueSummary(q)

t.equal(summary.terminated, 1, 'Queue status summary includes terminated')
t.equal(summary.total, 7, 'Queue status summary includes total')
t.equal(summary.total, 6, 'Queue status summary includes total')
return q.reset()

@@ -45,0 +45,0 @@ }).then((resetResult) => {

@@ -1,2 +0,2 @@

const test = require('tape')
const test = require('tap').test
const Promise = require('bluebird')

@@ -8,10 +8,20 @@ const enums = require('../src/enums')

const Queue = require('../src/queue')
const jobOptions = require('../src/job-options')
const eventHandlers = require('./test-event-handlers')
const testName = 'queue'
module.exports = function () {
return new Promise((resolve, reject) => {
test('queue', (t) => {
t.plan(157)
test(testName, (t) => {
// t.plan(135)
t.plan(142)
let q = new Queue(tOpts.cxn(), tOpts.queueNameOnly())
let q2
// ---------- Test Setup ----------
let qReady = new Queue(tOpts.cxn(), tOpts.queueNameOnly())
let qNotMaster
let qNumMaster
let qMain
let qDrop
let qSub
let qPub

@@ -23,7 +33,9 @@ let job

retryMax: 5,
retryDelay: 400
retryDelay: 400,
repeat: 5,
repeatDelay: 5000
}
function processHandler (job, next) {
setTimeout(function finishJob () {
next(`Job completed [${job.id}]`)
next(null, `Job completed [${job.id}]`)
}, 100)

@@ -33,326 +45,107 @@ }

// ---------- Event Handler Setup ----------
let testEvents = false
let readyEventCount = 0
let readyEventTotal = 0
function readyEventHandler (queueId) {
readyEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: ready [${readyEventCount} of ${readyEventTotal}] [${queueId}]`)
}
let state = {
testName,
enabled: false,
ready: 0,
processing: 2,
progress: 0,
pausing: 2,
paused: 2,
resumed: 1,
removed: 1,
idle: 12,
reset: 1,
error: 3,
reviewed: 0,
detached: 1,
stopping: 1,
stopped: 1,
dropped: 0,
added: 3,
waiting: 0,
active: 2,
completed: 2,
cancelled: 1,
failed: 0,
terminated: 0,
reanimated: 0,
log: 0,
updated: 0
}
let addedEventCount = 0
let addedEventTotal = 4
function addedEventHandler (jobId) {
addedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: added [${addedEventCount} of ${addedEventTotal}] [${jobId}]`)
}
}
let activeEventCount = 0
let activeEventTotal = 3
function activeEventHandler (jobId) {
activeEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: active [${activeEventCount} of ${activeEventTotal}] [${jobId}]`)
}
}
let processingEventCount = 0
let processingEventTotal = 3
function processingEventHandler (jobId) {
processingEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: processing [${processingEventCount} of ${processingEventTotal}] [${jobId}]`)
}
}
let progressEventCount = 0
let progressEventTotal = 0
function progressEventHandler (jobId, percent) {
progressEventCount++
if (testEvents) {
t.pass(`Event: progress [${progressEventCount} of ${progressEventTotal}]`)
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`)
t.ok(is.number(percent), `Event: progress [${percent}%]`)
}
}
let completedEventCount = 0
let completedEventTotal = 3
function completedEventHandler (jobId) {
completedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: completed [${completedEventCount} of ${completedEventTotal}] [${jobId}]`)
}
}
let cancelledEventCount = 0
let cancelledEventTotal = 1
function cancelledEventHandler (jobId) {
cancelledEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: cancelled [${cancelledEventCount} of ${cancelledEventTotal}] [${jobId}]`)
}
}
let failedEventCount = 0
let failedEventTotal = 0
function failedEventHandler (jobId) {
failedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: failed [${failedEventCount} of ${failedEventTotal}] [${jobId}]`)
}
}
let terminatedEventCount = 0
let terminatedEventTotal = 0
function terminatedEventHandler (jobId) {
terminatedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: terminated [${terminatedEventCount} of ${terminatedEventTotal}] [${jobId}]`)
}
}
let logEventCount = 0
let logEventTotal = 0
function logEventHandler (jobId) {
logEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: log [${logEventCount} of ${logEventTotal}] [${jobId}]`)
}
}
let updatedEventCount = 0
let updatedEventTotal = 0
function updatedEventHandler (jobId) {
updatedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: updated [${updatedEventCount} of ${updatedEventTotal}] [${jobId}]`)
}
}
let pausingEventCount = 0
let pausingEventTotal = 2
function pausingEventHandler (global, queueId) {
pausingEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: pausing [${pausingEventCount} of ${pausingEventTotal}]`)
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`)
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`)
}
}
let pausedEventCount = 0
let pausedEventTotal = 2
function pausedEventHandler (global, queueId) {
pausedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: paused [${pausedEventCount} of ${pausedEventTotal}]`)
t.ok(is.boolean(global), `Event: paused [global: ${global}]`)
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`)
}
}
let resumedEventCount = 0
let resumedEventTotal = 1
function resumedEventHandler (global, queueId) {
resumedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: resumed [${resumedEventCount} of ${resumedEventTotal}]`)
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`)
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`)
}
}
let removedEventCount = 0
let removedEventTotal = 1
function removedEventHandler (jobId) {
removedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: removed [${removedEventCount} of ${removedEventTotal}] [${jobId}]`)
}
}
let idleEventCount = 0
let idleEventTotal = 4
function idleEventHandler (queueId) {
if (idleEventCount < 4) {
idleEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: idle [${idleEventCount} of ${idleEventTotal}] [${queueId}]`)
}
}
}
let reviewedEventCount = 0
let reviewedEventTotal = 0
function reviewedEventHandler (queueId) {
reviewedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: reviewed [${reviewedEventCount} of ${reviewedEventTotal}] [${queueId}]`)
}
}
let resetEventCount = 0
let resetEventTotal = 1
function resetEventHandler (total) {
resetEventCount++
if (testEvents) {
t.ok(is.integer(total), `Event: reset [${resetEventCount} of ${resetEventTotal}] [${total}]`)
}
}
let errorEventCount = 0
let errorEventTotal = 3
function errorEventHandler (err) {
errorEventCount++
if (testEvents) {
t.ok(is.string(err.message), `Event: error [${errorEventCount} of ${errorEventTotal}] [${err.message}]`)
}
}
let detachedEventCount = 0
let detachedEventTotal = 1
function detachedEventHandler (queueId) {
detachedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: detached [${detachedEventCount} of ${detachedEventTotal}] [${queueId}]`)
}
}
let stoppingEventCount = 0
let stoppingEventTotal = 1
function stoppingEventHandler (queueId) {
stoppingEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: stopping [${stoppingEventCount} of ${stoppingEventTotal}] [${queueId}]`)
}
}
let stoppedEventCount = 0
let stoppedEventTotal = 1
function stoppedEventHandler (queueId) {
stoppedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: stopped [${stoppedEventCount} of ${stoppedEventTotal}] [${queueId}]`)
}
}
let droppedEventCount = 0
let droppedEventTotal = 1
function droppedEventHandler (queueId) {
droppedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: dropped [${droppedEventCount} of ${droppedEventTotal}] [${queueId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.ready, readyEventHandler)
q.on(enums.status.added, addedEventHandler)
q.on(enums.status.active, activeEventHandler)
q.on(enums.status.processing, processingEventHandler)
q.on(enums.status.progress, progressEventHandler)
q.on(enums.status.completed, completedEventHandler)
q.on(enums.status.cancelled, cancelledEventHandler)
q.on(enums.status.failed, failedEventHandler)
q.on(enums.status.terminated, terminatedEventHandler)
q.on(enums.status.paused, pausedEventHandler)
q.on(enums.status.pausing, pausingEventHandler)
q.on(enums.status.resumed, resumedEventHandler)
q.on(enums.status.removed, removedEventHandler)
q.on(enums.status.log, logEventHandler)
q.on(enums.status.updated, updatedEventHandler)
q.on(enums.status.idle, idleEventHandler)
q.on(enums.status.reviewed, reviewedEventHandler)
q.on(enums.status.reset, resetEventHandler)
q.on(enums.status.error, errorEventHandler)
q.on(enums.status.detached, detachedEventHandler)
q.on(enums.status.stopping, stoppingEventHandler)
q.on(enums.status.stopped, stoppedEventHandler)
q.on(enums.status.dropped, droppedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.ready, readyEventHandler)
q.removeListener(enums.status.added, addedEventHandler)
q.removeListener(enums.status.active, activeEventHandler)
q.removeListener(enums.status.processing, processingEventHandler)
q.removeListener(enums.status.progress, progressEventHandler)
q.removeListener(enums.status.completed, completedEventHandler)
q.removeListener(enums.status.cancelled, cancelledEventHandler)
q.removeListener(enums.status.failed, failedEventHandler)
q.removeListener(enums.status.terminated, terminatedEventHandler)
q.removeListener(enums.status.paused, pausedEventHandler)
q.removeListener(enums.status.pausing, pausingEventHandler)
q.removeListener(enums.status.resumed, resumedEventHandler)
q.removeListener(enums.status.removed, removedEventHandler)
q.removeListener(enums.status.log, logEventHandler)
q.removeListener(enums.status.updated, updatedEventHandler)
q.removeListener(enums.status.idle, idleEventHandler)
q.removeListener(enums.status.reviewed, reviewedEventHandler)
q.removeListener(enums.status.reset, resetEventHandler)
q.removeListener(enums.status.error, errorEventHandler)
q.removeListener(enums.status.detached, detachedEventHandler)
q.removeListener(enums.status.stopping, stoppingEventHandler)
q.removeListener(enums.status.stopped, stoppedEventHandler)
q.removeListener(enums.status.dropped, droppedEventHandler)
}
const stopDelay = 1000
return q.ready().then((ready) => {
return qReady.ready().then((ready) => {
t.ok(ready, 'Queue ready returns true')
return qReady.reset()
}).delay(stopDelay).then(() => {
t.pass('Queue reset')
return qReady.stop()
}).then(() => {
// ---------- masterInterval Options Tests ----------
t.comment('queue: masterInterval Options')
return q.stop()
qNotMaster = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: false }))
t.ok(is.false(qNotMaster.masterInterval), 'False masterInterval is false')
return qNotMaster.ready()
}).delay(stopDelay).then(() => {
return qNotMaster.stop()
}).then(() => {
q = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: true }))
t.ok(is.true(q.masterInterval), 'True masterInterval is true')
return q.ready()
qNumMaster = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: 12345 }))
t.equal(qNumMaster.masterInterval, 12345, 'Number masterInterval is number')
return qNumMaster.ready()
}).delay(stopDelay).then(() => {
return qNumMaster.stop()
}).then(() => {
return q.stop()
}).then(() => {
q = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: false }))
t.ok(is.false(q.masterInterval), 'False masterInterval is false')
return q.ready()
}).then(() => {
return q.stop()
}).then(() => {
q = new Queue(tOpts.cxn(), Object.assign(tOpts.queueNameOnly(), { masterInterval: 12345 }))
t.equal(q.masterInterval, 12345, 'Number masterInterval is number')
return q.ready()
}).then(() => {
return q.stop()
}).then(() => {
q = new Queue(tOpts.cxn(), tOpts.queueNameOnly())
qMain = new Queue(tOpts.cxn(), tOpts.queueNameOnly())
return q.ready()
return qMain.ready()
}).then(() => {
return q.reset()
return qMain.reset()
}).then((totalRemoved) => {
t.ok(is.integer(totalRemoved), 'Queue has been reset')
addEventHandlers()
eventHandlers.add(t, qMain, state)
// ---------- Constructor with Default Options Tests ----------
t.comment('queue: Constructor with Default Options')
t.ok(q, 'Queue created with default options')
t.equal(q.name, tOpts.queueName, 'Default queue name valid')
t.ok(is.string(q.id), 'Queue id is valid')
t.equal(q.host, enums.options.host, 'Default host name is valid')
t.equal(q.port, enums.options.port, 'Default port is valid')
t.equal(q.db, tOpts.dbName, 'Default db name is valid')
t.ok(is.function(q.r), 'Queue r valid')
t.ok(q.changeFeed, 'Queue change feed is enabled')
t.ok(q.master, 'Queue is master queue')
t.equal(q.masterInterval, enums.options.masterInterval, 'Queue masterInterval is valid')
t.ok(is.object(q.jobOptions), 'Queue jobOptions is an object')
t.equal(q.jobOptions.priority, enums.priorityFromValue(40), 'Default job priority is normal')
t.equal(q.jobOptions.timeout, enums.options.timeout, 'Default job timeout is valid')
t.equal(q.jobOptions.retryMax, enums.options.retryMax, 'Default job retryMax is valid')
t.equal(q.jobOptions.retryDelay, enums.options.retryDelay, 'Default job retryDelay is valid')
t.equal(q.removeFinishedJobs, enums.options.removeFinishedJobs, 'Default removeFinishedJobs is valid')
t.equal(q.running, 0, 'Running jobs is zero')
t.equal(q.concurrency, enums.options.concurrency, 'Default concurrency is valid')
t.notOk(q.paused, 'Queue is not paused')
t.ok(q.idle, 'Queue is idle')
t.ok(qMain, 'Queue created with default options')
t.equal(qMain.name, tOpts.queueName, 'Default queue name valid')
t.ok(is.string(qMain.id), 'Queue id is valid')
t.equal(qMain.host, enums.options.host, 'Default host name is valid')
t.equal(qMain.port, enums.options.port, 'Default port is valid')
t.equal(qMain.db, tOpts.dbName, 'Default db name is valid')
t.ok(is.function(qMain.r), 'Queue r valid')
t.ok(qMain.changeFeed, 'Queue change feed is enabled')
t.ok(qMain.master, 'Queue is master queue')
t.equal(qMain.masterInterval, enums.options.masterInterval, 'Queue masterInterval is valid')
t.ok(is.object(qMain.jobOptions), 'Queue jobOptions is an object')
t.equal(qMain.jobOptions.priority, enums.priorityFromValue(40), 'Default job priority is normal')
t.equal(qMain.jobOptions.timeout, enums.options.timeout, 'Default job timeout is valid')
t.equal(qMain.jobOptions.retryMax, enums.options.retryMax, 'Default job retryMax is valid')
t.equal(qMain.jobOptions.retryDelay, enums.options.retryDelay, 'Default job retryDelay is valid')
t.equal(qMain.jobOptions.repeat, enums.options.repeat, 'Default job repeat is valid')
t.equal(qMain.jobOptions.repeatDelay, enums.options.repeatDelay, 'Default job repeatDelay is valid')
t.equal(qMain.removeFinishedJobs, enums.options.removeFinishedJobs, 'Default removeFinishedJobs is valid')
t.equal(qMain.running, 0, 'Running jobs is zero')
t.equal(qMain.concurrency, enums.options.concurrency, 'Default concurrency is valid')
t.notOk(qMain.paused, 'Queue is not paused')
t.ok(qMain.idle, 'Queue is idle')
// ---------- Set Properties Tests ----------
t.comment('queue: Set Properties')
q.jobOptions = customJobOptions
t.deepEqual(q.jobOptions, customJobOptions, 'Job options set successfully')
q.jobOptions = undefined
t.deepEqual(q.jobOptions, customJobOptions, 'Job options restored to default on invalid value')
q.concurrency = 100
t.equal(q.concurrency, 100, 'Queue concurrency set with valid value successfully')
q.concurrency = -50
t.equal(q.concurrency, 100, 'Queue concurrency unchanged with invalid value')
q.concurrency = 1.5
t.equal(q.concurrency, 100, 'Queue concurrency unchanged with invalid value')
q.concurrency = 'string'
t.equal(q.concurrency, 100, 'Queue concurrency unchanged with invalid value')
qMain.jobOptions = customJobOptions
t.deepEqual(qMain.jobOptions, customJobOptions, 'Job options set successfully')
qMain.jobOptions = undefined
t.deepEqual(qMain.jobOptions, customJobOptions, 'Job options restored to default on invalid value')
qMain.concurrency = 100
t.equal(qMain.concurrency, 100, 'Queue concurrency set with valid value successfully')
qMain.concurrency = -50
t.equal(qMain.concurrency, 100, 'Queue concurrency unchanged with invalid value')
qMain.concurrency = 1.5
t.equal(qMain.concurrency, 100, 'Queue concurrency unchanged with invalid value')
qMain.concurrency = 'string'
t.equal(qMain.concurrency, 100, 'Queue concurrency unchanged with invalid value')
// ---------- Create Job Tests ----------
t.comment('queue: Create Job')
job = q.createJob()
job = qMain.createJob()
t.ok(is.job(job), 'Queue createJob created a job object')

@@ -368,5 +161,6 @@ t.equal(job.priority, customJobOptions.priority, 'Queue created job with new default priority')

retryMax: 2,
retryDelay: 900
retryDelay: 900,
repeat: 0
}
job = q.createJob().setPriority('low').setTimeout(400).setRetryMax(2).setRetryDelay(900)
job = qMain.createJob().setPriority('low').setTimeout(400).setRetryMax(2).setRetryDelay(900)
t.ok(is.job(job), 'Queue createJob created a job object')

@@ -378,7 +172,8 @@ t.equal(job.priority, customJobOptions.priority, 'Queue created job with custom priority')

// ---------- Create Job Tests ----------
// ---------- Add Job Tests ----------
t.comment('queue: Add Job')
job = q.createJob()
qMain.jobOptions = jobOptions() // Resetting job options
job = qMain.createJob()
job.data = tOpts.tData
return q.addJob(job)
return qMain.addJob(job)
}).then((savedJobs) => {

@@ -392,3 +187,3 @@ t.ok(is.array(savedJobs), 'Add job returns an array')

t.comment('queue: Get Job')
return q.getJob(savedJobs[0].id)
return qMain.getJob(savedJobs[0].id)
}).then((savedJobs2) => {

@@ -402,3 +197,3 @@ t.ok(is.array(savedJobs2), 'Get job returns an array')

t.comment('queue: Find Job')
return q.findJob({ data: tOpts.tData })
return qMain.findJob({ data: tOpts.tData })
}).then((savedJobs3) => {

@@ -412,7 +207,7 @@ t.ok(is.array(savedJobs3), 'Find job returns an array')

t.comment('queue: Cancel Job')
return q.cancelJob(savedJobs3[0].id)
return qMain.cancelJob(savedJobs3[0].id)
}).then((cancelledJobs) => {
t.ok(is.array(cancelledJobs), 'Cancel job returns an array')
t.ok(is.uuid(cancelledJobs[0]), 'Cancel job returns ids')
return q.getJob(cancelledJobs[0])
return qMain.getJob(cancelledJobs[0])
}).then((cancelledJobs2) => {

@@ -424,7 +219,7 @@ t.ok(is.array(cancelledJobs2), 'Get job returns an array')

t.comment('queue: Remove Job')
return q.removeJob(cancelledJobs2[0].id)
return qMain.removeJob(cancelledJobs2[0].id)
}).then((removedCount) => {
t.ok(is.array(removedCount), 'Remove job returns an array')
t.equal(removedCount.length, 1, 'Removed count is valid')
return q.getJob(job.id)
return qMain.getJob(job.id)
}).then((noJobs) => {

@@ -436,8 +231,8 @@ t.ok(is.array(noJobs), 'Get job returns an array')

t.comment('queue: Process Job')
return q.process(processHandler)
return qMain.process(processHandler)
}).then(() => {
job = q.createJob()
return q.addJob(job)
job = qMain.createJob()
return qMain.addJob(job)
}).delay(400).then((addedJob) => {
return q.getJob(addedJob[0].id)
return qMain.getJob(addedJob[0].id)
}).then((finishedJobs) => {

@@ -449,10 +244,10 @@ t.ok(is.array(finishedJobs), 'Job is in queue')

t.comment('queue: Pause')
return q.pause()
return qMain.pause()
}).then((isPaused) => {
t.ok(isPaused, 'Queue pause returns true')
t.ok(q.paused, 'Queue is paused')
job = q.createJob()
return q.addJob(job)
t.ok(qMain.paused, 'Queue is paused')
job = qMain.createJob()
return qMain.addJob(job)
}).delay(200).then((addedJob) => {
return q.getJob(addedJob[0].id)
return qMain.getJob(addedJob[0].id)
}).then((addedJobs) => {

@@ -464,7 +259,7 @@ t.ok(is.array(addedJobs), 'Job is in queue')

t.comment('queue: Resume')
return q.resume()
return qMain.resume()
}).delay(200).then((isResumed) => {
t.ok(isResumed, 'Queue resume returns true')
t.notOk(q.paused, 'Queue is not paused')
return q.getJob(job.id)
t.notOk(qMain.paused, 'Queue is not paused')
return qMain.getJob(job.id)
}).then((finishedJobs2) => {

@@ -476,3 +271,3 @@ t.ok(is.array(finishedJobs2), 'Job is in queue')

t.comment('queue: Summary')
return q.summary()
return qMain.summary()
}).then((summary) => {

@@ -490,7 +285,7 @@ t.ok(is.object(summary), 'Queue summary returns an object')

t.comment('queue: Reset')
return q.reset()
return qMain.reset()
}).then((totalReset) => {
t.ok(is.integer(totalReset), 'Queue reset returns integer')
t.equal(totalReset, 2, 'Reset return value is valid')
return q.summary()
return qMain.summary()
}).then((summary2) => {

@@ -502,22 +297,21 @@ t.ok(is.object(summary2), 'Queue summary returns an object')

t.comment('queue: Stop')
return q.stop()
return qMain.stop()
}).then((stopped) => {
t.ok(stopped, 'Queue stop returns true')
return q.ready()
return qMain.ready()
}).then((ready) => {
t.ok(is.false(ready), 'Queue ready returns false')
removeEventHandlers()
// ---------- Event Summary ----------
eventHandlers.remove(t, qMain, state)
// ---------- Drop Tests ----------
t.comment('queue: Drop')
q = new Queue(tOpts.cxn(), tOpts.queueNameOnly())
testEvents = true
q.on(enums.status.dropped, droppedEventHandler)
return q.drop()
qDrop = new Queue(tOpts.cxn(), tOpts.queueNameOnly())
}).then(() => {
return qDrop.drop()
}).then((dropped) => {
t.ok(dropped, 'Queue drop returns true')
return q.ready()
return qDrop.ready()
}).then((ready) => {
testEvents = false
q.removeListener(enums.status.dropped, droppedEventHandler)
t.ok(is.false(ready), 'Queue ready returns false')

@@ -527,55 +321,26 @@

t.comment('queue: Multi-Queue')
q = new Queue(tOpts.cxn(), tOpts.queueNameOnly())
return q.ready()
}).then((ready) => {
t.ok(ready, `First queue ready [${q.id}]`)
addEventHandlers()
q2 = new Queue(tOpts.cxn(), tOpts.queueNameOnly())
return q2.ready()
}).then((ready2) => {
t.ok(ready2, `Second queue ready [${q2.id}]`)
q.process(processHandler)
job = q2.createJob()
return q2.addJob(job)
}).then((jobOnQ2) => {
t.equal(jobOnQ2[0].id, job.id, 'Job added to second queue')
qSub = new Queue(tOpts.cxn(), tOpts.default())
return qSub.ready()
}).then((subReady) => {
t.ok(subReady, `Subscriber queue ready [${qSub.id}]`)
qPub = new Queue(tOpts.cxn(), tOpts.default())
return qPub.ready()
}).then((pubReady) => {
t.ok(pubReady, `Publisher queue ready [${qPub.id}]`)
qSub.process(processHandler)
job = qPub.createJob()
return qPub.addJob(job)
}).then((jobOnQPub) => {
t.equal(jobOnQPub[0].id, job.id, 'Job added to publisher queue')
}).delay(1000).then(() => {
return q.getJob(job.id)
return qSub.getJob(job.id)
}).then((jobCheck) => {
t.ok(is.array(jobCheck), 'Job is in queue')
t.equal(jobCheck[0].status, enums.status.completed, 'Job is completed')
removeEventHandlers()
return Promise.all([
q.stop(),
q2.stop()
])
}).then(() => {
//
// ---------- Event summary Test ----------
t.comment('queue: Event Summary')
t.equal(readyEventCount, readyEventTotal, 'Ready event count valid')
t.equal(addedEventCount, addedEventTotal, 'Added event count valid')
t.equal(activeEventCount, activeEventTotal, 'Active event count valid')
t.equal(processingEventCount, processingEventTotal, 'Processing event count valid')
t.equal(progressEventCount, progressEventTotal, 'Progress event count valid')
t.equal(completedEventCount, completedEventTotal, 'Completed event count valid')
t.equal(cancelledEventCount, cancelledEventTotal, 'Cancelled event count valid')
t.equal(failedEventCount, failedEventTotal, 'Failed event count valid')
t.equal(terminatedEventCount, terminatedEventTotal, 'Terminated event count valid')
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid')
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid')
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid')
t.equal(removedEventCount, removedEventTotal, 'Removed event count valid')
t.equal(logEventCount, logEventTotal, 'Log event count valid')
t.equal(updatedEventCount, updatedEventTotal, 'Updated event count valid')
t.equal(idleEventCount, idleEventTotal, 'Idle event count valid')
t.equal(reviewedEventCount, reviewedEventTotal, 'Reviewed event count valid')
t.equal(resetEventCount, resetEventTotal, 'Reset event count valid')
t.equal(errorEventCount, errorEventTotal, 'Error event count valid')
t.equal(detachedEventCount, detachedEventTotal, 'Detached event count valid')
t.equal(stoppingEventCount, stoppingEventTotal, 'Stopping event count valid')
t.equal(stoppedEventCount, stoppedEventTotal, 'Stopped event count valid')
t.equal(droppedEventCount, droppedEventTotal, 'Dropped event count valid')
return qSub.stop()
}).delay(stopDelay).then(() => {
t.pass('Subscriber Queue Stopped')
return qPub.stop()
}).delay(stopDelay).then(() => {
t.pass('Publisher Queue Stopped')
return resolve(t.end())

@@ -582,0 +347,0 @@ }).catch(err => tError(err, module, t))

@@ -1,40 +0,54 @@

const Promise = require('bluebird')
const enums = require('./enums.spec')
const is = require('./is.spec')
const datetime = require('./datetime.spec')
const dbAssertDatabase = require('./db-assert-database.spec')
const dbAssertTable = require('./db-assert-table.spec')
const dbAssertIndex = require('./db-assert-index.spec')
const dbAssert = require('./db-assert.spec')
const dbDriver = require('./db-driver.spec')
const dbResult = require('./db-result.spec')
const dbReview = require('./db-review.spec')
const job = require('./job.spec')
const jobOptions = require('./job-options.spec')
const jobParse = require('./job-parse.spec')
const jobProgress = require('./job-progress.spec')
const jobCompleted = require('./job-completed.spec')
const jobUpdate = require('./job-update.spec')
const jobFailed = require('./job-failed.spec')
const jobLog = require('./job-log.spec')
const queue = require('./queue.spec')
const queueDb = require('./queue-db.spec')
const queueState = require('./queue-state.spec')
const queueAddJob = require('./queue-add-job.spec')
const queueGetJob = require('./queue-get-job.spec')
const queueFindJob = require('./queue-find-job.spec')
const queueGetNextJob = require('./queue-get-next-job.spec')
const queueProcess = require('./queue-process.spec')
const queueChange = require('./queue-change.spec')
const queueInterruption = require('./queue-interruption.spec')
const queueCancelJob = require('./queue-cancel-job.spec')
const queueRemoveJob = require('./queue-remove-job.spec')
const queueReset = require('./queue-reset.spec')
const queueStop = require('./queue-stop.spec')
const queueDrop = require('./queue-drop.spec')
const queueSummary = require('./queue-summary.spec')
const tests = new Map()
tests.set('enums', require('./enums.spec'))
tests.set('is', require('./is.spec'))
tests.set('datetime', require('./datetime.spec'))
tests.set('dbAssertDatabase', require('./db-assert-database.spec'))
tests.set('dbAssertTable', require('./db-assert-table.spec'))
tests.set('dbAssertIndex', require('./db-assert-index.spec'))
tests.set('dbAssert', require('./db-assert.spec'))
tests.set('dbDriver', require('./db-driver.spec'))
tests.set('dbResult', require('./db-result.spec'))
tests.set('dbReview', require('./db-review.spec'))
tests.set('job', require('./job.spec'))
tests.set('jobOptions', require('./job-options.spec'))
tests.set('jobParse', require('./job-parse.spec'))
tests.set('jobProgress', require('./job-progress.spec'))
tests.set('jobCompleted', require('./job-completed.spec'))
tests.set('jobUpdate', require('./job-update.spec'))
tests.set('jobFailed', require('./job-failed.spec'))
tests.set('jobLog', require('./job-log.spec'))
tests.set('queue', require('./queue.spec'))
tests.set('queueDb', require('./queue-db.spec'))
tests.set('queueState', require('./queue-state.spec'))
tests.set('queueAddJob', require('./queue-add-job.spec'))
tests.set('queueGetJob', require('./queue-get-job.spec'))
tests.set('queueFindJob', require('./queue-find-job.spec'))
tests.set('queueGetNextJob', require('./queue-get-next-job.spec'))
tests.set('queueReanimateJob', require('./queue-reanimate-job.spec'))
tests.set('queueProcess', require('./queue-process.spec'))
tests.set('queueChange', require('./queue-change.spec'))
tests.set('queueInterruption', require('./queue-interruption.spec'))
tests.set('queueCancelJob', require('./queue-cancel-job.spec'))
tests.set('queueRemoveJob', require('./queue-remove-job.spec'))
tests.set('queueReset', require('./queue-reset.spec'))
tests.set('queueStop', require('./queue-stop.spec'))
tests.set('queueDrop', require('./queue-drop.spec'))
tests.set('queueSummary', require('./queue-summary.spec'))
return dbAssert().then(() => {
}).then(() => {
return queueChange()
})
if (tests.has(process.argv[2])) {
tests.get('dbAssert')().then(() => {
return tests.get(process.argv[2])()
})
} else {
const line = '=============================================='
console.log('\x1b[32m', line, '\x1b[0m')
console.log('\x1b[36m', ' INVALID TEST NAME!', '\x1b[0m')
console.log('\x1b[36m', ' Use one of the test names below:', '\x1b[0m')
console.log('\x1b[32m', line, '\x1b[0m')
for (let test of tests.keys()) {
console.log('\x1b[34m', ` ${test}`, '\x1b[0m')
}
console.log('\x1b[32m', line, '\x1b[0m')
console.log('\x1b[36m', ' Example: npm run tc jobOptions', '\x1b[0m')
console.log('\x1b[32m', line, '\x1b[0m')
}

@@ -16,6 +16,6 @@ const is = require('../src/is')

function readyEventHandler () {
function readyEventHandler (qid) {
if (state.enabled) {
let total = incCount(enums.status.ready)
t.pass(`Event: ready [${total} of ${state.ready}]`)
t.ok(is.string(qid), `Event: ready [${total} of ${state.ready}]`)
}

@@ -25,6 +25,6 @@ }

function processingEventHandler (jobId) {
function processingEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.processing)
t.ok(is.uuid(jobId),
t.ok(is.string(qid) && is.uuid(jobId),
`Event: processing [${total} of ${state.processing}] [${jobId}]`)

@@ -35,6 +35,7 @@ }

function progressEventHandler (jobId, percent) {
function progressEventHandler (qid, jobId, percent) {
if (state.enabled) {
let total = incCount(enums.status.progress)
t.ok(is.uuid(jobId),
let percentTest = is.integer(percent) && percent >= 0 && percent <= 100
t.ok(is.string(qid) && is.uuid(jobId) && percentTest,
`Event: progress ${percent} [${total} of ${state.progress}] [${jobId}]`)

@@ -48,3 +49,3 @@ }

let total = incCount(enums.status.pausing)
t.pass(`Event: pausing [${total} of ${state.pausing}] [${qid}]`)
t.ok(is.string(qid), `Event: pausing [${total} of ${state.pausing}] [${qid}]`)
}

@@ -57,3 +58,3 @@ }

let total = incCount(enums.status.paused)
t.pass(`Event: paused [${total} of ${state.paused}] [${qid}]`)
t.ok(is.string(qid), `Event: paused [${total} of ${state.paused}] [${qid}]`)
}

@@ -66,3 +67,3 @@ }

let total = incCount(enums.status.resumed)
t.pass(`Event: resumed [${total} of ${state.resumed}] [${qid}]`)
t.ok(is.string(qid), `Event: resumed [${total} of ${state.resumed}] [${qid}]`)
}

@@ -72,6 +73,6 @@ }

function removedEventHandler (qid) {
function removedEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.removed)
t.pass(`Event: removed [${total} of ${state.removed}] [${qid}]`)
t.ok(is.string(qid) && is.uuid(jobId), `Event: removed [${total} of ${state.removed}] [${qid}]`)
}

@@ -81,14 +82,7 @@ }

function idleEventHandler (qid) {
function resetEventHandler (qid, totalRemoved) {
if (state.enabled) {
let total = incCount(enums.status.idle)
t.pass(`Event: idle [${total}] [${qid}]`)
}
}
state.handler.set(enums.status.idle, idleEventHandler)
function resetEventHandler (qid) {
if (state.enabled) {
let total = incCount(enums.status.reset)
t.pass(`Event: reset [${total} of ${state.reset}] [${qid}]`)
t.ok(is.string(qid) && is.integer(totalRemoved) && totalRemoved >= 0,
`Event: reset [${total} of ${state.reset}] [${qid}]`)
}

@@ -98,6 +92,17 @@ }

function reviewedEventHandler (replaceCount) {
function reviewedEventHandler (qid, reviewResult) {
if (state.enabled) {
let total = incCount(enums.status.reviewed)
t.pass(`Event: reviewed [${total} of ${state.reviewed}] [${replaceCount}]`)
let result
if (is.object(reviewResult) && reviewResult.local) {
result = is.integer(reviewResult.reviewed) &&
is.integer(reviewResult.removed) &&
reviewResult.reviewed >= 0 &&
reviewResult.removed >= 0
} else {
// global events will contain null in reviewed and removed.
result = is.object(reviewResult) && !reviewResult.local
}
t.ok(result,
`Event: reviewed [${total} of ${state.reviewed}] [${reviewResult}]`)
}

@@ -110,3 +115,4 @@ }

let total = incCount(enums.status.detached)
t.pass(`Event: detached [${total} of ${state.detached}] [${qid}]`)
t.ok(is.string(qid),
`Event: detached [${total} of ${state.detached}] [${qid}]`)
}

@@ -119,3 +125,4 @@ }

let total = incCount(enums.status.stopping)
t.pass(`Event: stopping [${total} of ${state.stopping}] [${qid}]`)
t.ok(is.string(qid),
`Event: stopping [${total} of ${state.stopping}] [${qid}]`)
}

@@ -128,3 +135,4 @@ }

let total = incCount(enums.status.stopped)
t.pass(`Event: stopped [${total} of ${state.stopped}] [${qid}]`)
t.ok(is.string(qid),
`Event: stopped [${total} of ${state.stopped}] [${qid}]`)
}

@@ -137,3 +145,4 @@ }

let total = incCount(enums.status.dropped)
t.pass(`Event: dropped [${total} of ${state.dropped}] [${qid}]`)
t.ok(is.string(qid),
`Event: dropped [${total} of ${state.dropped}] [${qid}]`)
}

@@ -143,6 +152,7 @@ }

function addedEventHandler (jobId) {
function addedEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.added)
t.pass(`Event: added [${total} of ${state.added}] [${jobId}]`)
t.ok(is.string(qid) && is.uuid(jobId),
`Event: added [${total} of ${state.added}] [${jobId}]`)
}

@@ -152,6 +162,7 @@ }

function activeEventHandler (jobId) {
function activeEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.active)
t.pass(`Event: active [${total} of ${state.active}] [${jobId}]`)
t.ok(is.string(qid) && is.uuid(jobId),
`Event: active [${total} of ${state.active}] [${jobId}]`)
}

@@ -161,7 +172,7 @@ }

function completedEventHandler (jobId) {
function completedEventHandler (qid, jobId, isRepeating) {
if (state.enabled) {
let total = incCount(enums.status.completed)
t.ok(is.uuid(jobId),
`Event: completed [${total} of ${state.completed}] [${jobId}]`)
t.ok(is.string(qid) && is.uuid(jobId) && is.boolean(isRepeating),
`Event: completed [${total} of ${state.completed}] [${jobId}] isRepeating: [${isRepeating}]`)
}

@@ -171,6 +182,6 @@ }

function cancelledEventHandler (jobId) {
function cancelledEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.cancelled)
t.ok(is.uuid(jobId),
t.ok(is.string(qid) && is.uuid(jobId),
`Event: cancelled [${total} of ${state.cancelled}] [${jobId}]`)

@@ -181,6 +192,6 @@ }

function failedEventHandler (jobId) {
function failedEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.failed)
t.ok(is.uuid(jobId),
t.ok(is.string(qid) && is.uuid(jobId),
`Event: failed [${total} of ${state.failed}] [${jobId}]`)

@@ -191,6 +202,6 @@ }

function terminatedEventHandler (jobId) {
function terminatedEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.terminated)
t.ok(is.uuid(jobId),
t.ok(is.string(qid) && is.uuid(jobId),
`Event: terminated [${total} of ${state.terminated}] [${jobId}]`)

@@ -201,6 +212,15 @@ }

function logEventHandler (jobId) {
function reanimatedEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.reanimated)
t.ok(is.string(qid) && is.uuid(jobId),
`Event: reanimated [${total} of ${state.reanimated}] [${jobId}]`)
}
}
state.handler.set(enums.status.reanimated, reanimatedEventHandler)
function logEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.log)
t.ok(is.uuid(jobId),
t.ok(is.string(qid) && is.uuid(jobId),
`Event: log [${total} of ${state.log}] [${jobId}]`)

@@ -211,6 +231,6 @@ }

function updatedEventHandler (jobId) {
function updatedEventHandler (qid, jobId) {
if (state.enabled) {
let total = incCount(enums.status.updated)
t.ok(is.uuid(jobId),
t.ok(is.string(qid) && is.uuid(jobId),
`Event: updated [${total} of ${state.updated}] [${jobId}]`)

@@ -221,2 +241,11 @@ }

function errorEventHandler (qid, err) {
if (state.enabled) {
let total = incCount(enums.status.error)
t.ok(is.string(qid) && is.error(err),
`Event: error [${total} of ${state.error}] Error: [${err.message}]`)
}
}
state.handler.set(enums.status.error, errorEventHandler)
state.enabled = true

@@ -226,2 +255,3 @@ state.handler.forEach((value, key) => {

})
t.comment(state.testName + ': Event Handlers Added')
}

@@ -231,2 +261,4 @@

state.enabled = false
t.comment(state.testName + ': Event Summary')
state.handler.forEach((fn, status) => {

@@ -236,2 +268,3 @@ t.equal(state.count.get(status), state[status], `Total ${status} events: [${state[status]}]`)

})
t.comment(state.testName + ': Event Handlers Removed')
}

@@ -27,2 +27,3 @@ const Promise = require('bluebird')

const queueGetNextJob = require('./queue-get-next-job.spec')
const queueReanimateJob = require('./queue-reanimate-job.spec')
const queueProcess = require('./queue-process.spec')

@@ -38,3 +39,3 @@ const queueChange = require('./queue-change.spec')

return dbAssertDatabase().then(() => {
dbAssertDatabase().then(() => {
}).then(() => {

@@ -47,2 +48,4 @@ return dbAssertTable()

}).then(() => {
return queueReset()
}).then(() => {
return Promise.all([

@@ -78,2 +81,4 @@ dbDriver(),

}).then(() => {
return queueReanimateJob()
}).then(() => {
return queueCancelJob()

@@ -87,8 +92,4 @@ }).then(() => {

}).then(() => {
// This test has bee removed for now due to this issue on Tape
// https://github.com/substack/tape/issues/223
// To test queueChange, use the test-current.js file by running
// npm run tc
// return queueChange()
// }).then(() => {
return queueChange()
}).then(() => {
return queue()

@@ -95,0 +96,0 @@ }).then(() => {

@@ -1,51 +0,90 @@

const test = require('tape')
const Promise = require('bluebird')
// const test = require('tap').test
// const Promise = require('bluebird')
// const datetime = require('../src/datetime')
const enums = require('../src/enums')
// const is = require('../src/is')
const tError = require('./test-error')
const tData = require('./test-options').tData
const tOpts = require('./test-options')
const Queue = require('../src/queue')
module.exports = function () {
return new Promise((resolve, reject) => {
test('XXXXXXXX', (t) => {
t.plan(2)
const q = new Queue(tOpts.cxn(), tOpts.default())
const job = q.createJob()
job.data = tData
// ---------- Event Handler Setup ----------
let testEvents = false
function completedEventHandler (jobId) {
if (testEvents) {
t.equal(jobId, job.id, `Event: Job completed [${jobId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.on(enums.status.completed, completedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.removeListener(enums.status.completed, completedEventHandler)
}
q.addJob(job).then((savedJob) => {
t.equal(savedJob[0].id, job.id, 'Job saved successfully')
return q.reset()
}).then((resetResult) => {
t.ok(resetResult >= 0, 'Queue reset')
addEventHandlers()
// ---------- First Test ----------
removeEventHandlers()
q.stop()
return resolve(t.end())
}).catch(err => tError(err, module, t))
})
})
}
// const enums = require('../src/enums')
// const tError = require('./test-error')
// const tOpts = require('./test-options')
// const tData = require('./test-options').tData
// const queueProcess = require('../src/queue-process')
// const dbReview = require('../src/db-review')
// const Queue = require('../src/queue')
// const eventHandlers = require('./test-event-handlers')
// const testName = 'XXXXXXXXXXXX'
//
// module.exports = function () {
// return new Promise((resolve, reject) => {
// test(testName, (t) => {
// t.plan(1000)
//
// // ---------- Test Setup ----------
// const q = new Queue(tOpts.cxn(), tOpts.default())
//
// let jobs
// let jobDelay = 200
// const noOfJobsToCreate = 10
//
// // ---------- Event Handler Setup ----------
// let state = {
// testName,
// enabled: false,
// ready: 0,
// processing: 0,
// progress: 0,
// pausing: 0,
// paused: 0,
// resumed: 0,
// removed: 0,
// reset: 0,
// error: 0,
// reviewed: 0,
// detached: 0,
// stopping: 0,
// stopped: 0,
// dropped: 0,
// added: 0,
// waiting: 0,
// active: 0,
// completed: 0,
// cancelled: 0,
// failed: 0,
// terminated: 0,
// reanimated: 0,
// log: 0,
// updated: 0
// }
//
// // ---------- Test Setup ----------
// jobs = []
// for (let i = 0; i < noOfJobsToCreate; i++) {
// jobs.push(q.createJob())
// }
// return q.reset().then((resetResult) => {
// t.ok(is.integer(resetResult), 'Queue reset')
// return q.pause()
// }).then(() => {
// eventHandlers.add(t, q, state)
//
// // ---------- Processing, Pause, and Concurrency Test ----------
// t.comment('queue-process: Process, Pause, and Concurrency')
// return q.addJob(jobs)
// }).then((savedJobs) => {
// t.equal(savedJobs.length, noOfJobsToCreate, `Jobs saved successfully: [${savedJobs.length}]`)
//
// // ---------- Queue Summary ----------
// t.comment('queue-process: Queue Summary')
// return q.summary()
// }).then((queueSummary) => {
//
// // ---------- Event Summary ----------
// eventHandlers.remove(t, q, state)
//
// return q.reset()
// }).then((resetResult) => {
// t.ok(resetResult >= 0, 'Queue reset')
// q.stop()
// return resolve(t.end())
// }).catch(err => tError(err, module, t))
// })
// })
// }
# WORKLOG
## Working On
## TODO
* global queue reset, reviewed, error event?
* Add failedCount?
* Add global rate limiter?
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc