rethinkdb-job-queue
Advanced tools
Comparing version 2.2.0 to 2.3.0
@@ -52,2 +52,15 @@ 'use strict'; | ||
function createIndexName(q) { | ||
logger('createIndexName'); | ||
var indexName = enums.index.indexName; | ||
return Promise.resolve().then(function () { | ||
return q.r.db(q.db).table(q.name).indexList().contains(indexName).run(q.queryRunOptions); | ||
}).then(function (exists) { | ||
if (exists) { | ||
return exists; | ||
} | ||
return q.r.db(q.db).table(q.name).indexCreate(indexName).run(q.queryRunOptions); | ||
}); | ||
} | ||
function createIndexStatus(q) { | ||
@@ -68,3 +81,3 @@ logger('createIndexStatus'); | ||
logger('assertIndex'); | ||
return Promise.all([createIndexActiveDateEnable(q), createIndexInactivePriorityDateCreated(q), createIndexFinishedDateFinished(q), createIndexStatus(q)]).then(function (indexCreateResult) { | ||
return Promise.all([createIndexActiveDateEnable(q), createIndexInactivePriorityDateCreated(q), createIndexFinishedDateFinished(q), createIndexName(q), createIndexStatus(q)]).then(function (indexCreateResult) { | ||
logger('Waiting for index...'); | ||
@@ -71,0 +84,0 @@ return q.r.db(q.db).table(q.name).indexWait().run(q.queryRunOptions); |
@@ -68,2 +68,3 @@ 'use strict'; | ||
indexFinishedDateFinished: 'indexFinishedDateFinished', | ||
indexName: 'name', | ||
indexStatus: 'status' | ||
@@ -100,2 +101,3 @@ }); | ||
idInvalid: 'The job id is invalid', | ||
nameInvalid: 'The job name must be a string', | ||
priorityInvalid: 'The job priority value is invalid', | ||
@@ -102,0 +104,0 @@ timeoutInvalid: 'The job timeout value is invalid', |
@@ -14,2 +14,3 @@ 'use strict'; | ||
var finalOptions = {}; | ||
finalOptions.name = null; | ||
finalOptions.priority = enums.options.priority; | ||
@@ -22,2 +23,6 @@ finalOptions.timeout = enums.options.timeout; | ||
if (is.string(oldOptions.name)) { | ||
finalOptions.name = oldOptions.name; | ||
} | ||
if (Object.keys(enums.priority).includes(oldOptions.priority)) { | ||
@@ -47,2 +52,6 @@ finalOptions.priority = oldOptions.priority; | ||
if (is.string(newOptions.name)) { | ||
finalOptions.name = newOptions.name; | ||
} | ||
if (Object.keys(enums.priority).includes(newOptions.priority)) { | ||
@@ -49,0 +58,0 @@ finalOptions.priority = newOptions.priority; |
@@ -34,3 +34,5 @@ 'use strict'; | ||
var now = new Date(); | ||
this.id = uuid.v4(); | ||
var newId = uuid.v4(); | ||
this.id = newId; | ||
this.name = options.name || newId; | ||
this.priority = options.priority; | ||
@@ -65,2 +67,11 @@ this.timeout = options.timeout; | ||
_createClass(Job, [{ | ||
key: 'setName', | ||
value: function setName(newName) { | ||
if (is.string(newName)) { | ||
this.name = newName; | ||
return this; | ||
} | ||
throw new Error(enums.message.nameInvalid); | ||
} | ||
}, { | ||
key: 'setPriority', | ||
@@ -67,0 +78,0 @@ value: function setPriority(newPriority) { |
@@ -10,3 +10,3 @@ 'use strict'; | ||
return Promise.resolve().then(function () { | ||
return q.r.db(q.db).table(q.name).filter(predicate).run(q.queryRunOptions); | ||
return q.r.db(q.db).table(q.name).filter(predicate).orderBy('dateCreated').run(q.queryRunOptions); | ||
}).then(function (jobsData) { | ||
@@ -13,0 +13,0 @@ logger('jobsData', jobsData); |
@@ -23,2 +23,3 @@ 'use strict'; | ||
var queueFindJob = require('./queue-find-job'); | ||
var queueFindJobByName = require('./queue-find-job-by-name'); | ||
var queueInterruption = require('./queue-interruption'); | ||
@@ -111,3 +112,3 @@ var queueCancelJob = require('./queue-cancel-job'); | ||
logger('findJob', predicate); | ||
logger('findJob', predicate, raw); | ||
return this.ready().then(function () { | ||
@@ -122,12 +123,42 @@ return queueFindJob(_this4, predicate, raw); | ||
}, { | ||
key: 'findJobByName', | ||
value: function findJobByName(name, raw) { | ||
var _this5 = this; | ||
logger('findJobByName', name, raw); | ||
return this.ready().then(function () { | ||
return queueFindJobByName(_this5, name, raw); | ||
}).catch(function (err) { | ||
logger('Event: findJobByName error', _this5.id, err); | ||
_this5.emit(enums.status.error, _this5.id, err); | ||
return Promise.reject(err); | ||
}); | ||
} | ||
}, { | ||
key: 'containsJobByName', | ||
value: function containsJobByName(name) { | ||
var _this6 = this; | ||
logger('containsJobByName', name); | ||
return this.ready().then(function () { | ||
return queueFindJobByName(_this6, name, true); | ||
}).then(function (namedJobs) { | ||
return namedJobs.length > 0; | ||
}).catch(function (err) { | ||
logger('Event: containsJobByName error', _this6.id, err); | ||
_this6.emit(enums.status.error, _this6.id, err); | ||
return Promise.reject(err); | ||
}); | ||
} | ||
}, { | ||
key: 'cancelJob', | ||
value: function cancelJob(jobOrId, reason) { | ||
var _this5 = this; | ||
var _this7 = this; | ||
logger('cancelJob', jobOrId, reason); | ||
return this.ready().then(function () { | ||
return queueCancelJob(_this5, jobOrId, reason); | ||
return queueCancelJob(_this7, jobOrId, reason); | ||
}).catch(function (err) { | ||
logger('Event: cancelJob error', _this5.id, err); | ||
_this5.emit(enums.status.error, _this5.id, err); | ||
logger('Event: cancelJob error', _this7.id, err); | ||
_this7.emit(enums.status.error, _this7.id, err); | ||
return Promise.reject(err); | ||
@@ -139,10 +170,10 @@ }); | ||
value: function reanimateJob(jobOrId, dateEnable) { | ||
var _this6 = this; | ||
var _this8 = this; | ||
logger('reanimateJob', jobOrId, dateEnable); | ||
return this.ready().then(function () { | ||
return queueReanimateJob(_this6, jobOrId, dateEnable); | ||
return queueReanimateJob(_this8, jobOrId, dateEnable); | ||
}).catch(function (err) { | ||
logger('Event: reanimateJob error', _this6.id, err); | ||
_this6.emit(enums.status.error, _this6.id, err); | ||
logger('Event: reanimateJob error', _this8.id, err); | ||
_this8.emit(enums.status.error, _this8.id, err); | ||
return Promise.reject(err); | ||
@@ -154,10 +185,10 @@ }); | ||
value: function removeJob(jobOrId) { | ||
var _this7 = this; | ||
var _this9 = this; | ||
logger('removeJob', jobOrId); | ||
return this.ready().then(function () { | ||
return queueRemoveJob(_this7, jobOrId); | ||
return queueRemoveJob(_this9, jobOrId); | ||
}).catch(function (err) { | ||
logger('Event: removeJob error', _this7.id, err); | ||
_this7.emit(enums.status.error, _this7.id, err); | ||
logger('Event: removeJob error', _this9.id, err); | ||
_this9.emit(enums.status.error, _this9.id, err); | ||
return Promise.reject(err); | ||
@@ -169,10 +200,10 @@ }); | ||
value: function process(handler) { | ||
var _this8 = this; | ||
var _this10 = this; | ||
logger('process', handler); | ||
return this.ready().then(function () { | ||
return queueProcess.addHandler(_this8, handler); | ||
return queueProcess.addHandler(_this10, handler); | ||
}).catch(function (err) { | ||
logger('Event: process error', _this8.id, err); | ||
_this8.emit(enums.status.error, _this8.id, err); | ||
logger('Event: process error', _this10.id, err); | ||
_this10.emit(enums.status.error, _this10.id, err); | ||
return Promise.reject(err); | ||
@@ -184,10 +215,10 @@ }); | ||
value: function review() { | ||
var _this9 = this; | ||
var _this11 = this; | ||
logger('review'); | ||
return this.ready().then(function () { | ||
return dbReview.runOnce(_this9); | ||
return dbReview.runOnce(_this11); | ||
}).catch(function (err) { | ||
logger('Event: review error', _this9.id, err); | ||
_this9.emit(enums.status.error, _this9.id, err); | ||
logger('Event: review error', _this11.id, err); | ||
_this11.emit(enums.status.error, _this11.id, err); | ||
return Promise.reject(err); | ||
@@ -199,10 +230,10 @@ }); | ||
value: function summary() { | ||
var _this10 = this; | ||
var _this12 = this; | ||
logger('summary'); | ||
return this.ready().then(function () { | ||
return queueSummary(_this10); | ||
return queueSummary(_this12); | ||
}).catch(function (err) { | ||
logger('Event: summary error', _this10.id, err); | ||
_this10.emit(enums.status.error, _this10.id, err); | ||
logger('Event: summary error', _this12.id, err); | ||
_this12.emit(enums.status.error, _this12.id, err); | ||
return Promise.reject(err); | ||
@@ -220,10 +251,10 @@ }); | ||
value: function pause(global) { | ||
var _this11 = this; | ||
var _this13 = this; | ||
logger('pause'); | ||
return this.ready().then(function () { | ||
return queueInterruption.pause(_this11, global); | ||
return queueInterruption.pause(_this13, global); | ||
}).catch(function (err) { | ||
logger('Event: pause error', _this11.id, err); | ||
_this11.emit(enums.status.error, _this11.id, err); | ||
logger('Event: pause error', _this13.id, err); | ||
_this13.emit(enums.status.error, _this13.id, err); | ||
return Promise.reject(err); | ||
@@ -235,10 +266,10 @@ }); | ||
value: function resume(global) { | ||
var _this12 = this; | ||
var _this14 = this; | ||
logger('resume'); | ||
return this.ready().then(function () { | ||
return queueInterruption.resume(_this12, global); | ||
return queueInterruption.resume(_this14, global); | ||
}).catch(function (err) { | ||
logger('Event: resume error', _this12.id, err); | ||
_this12.emit(enums.status.error, _this12.id, err); | ||
logger('Event: resume error', _this14.id, err); | ||
_this14.emit(enums.status.error, _this14.id, err); | ||
return Promise.reject(err); | ||
@@ -250,10 +281,10 @@ }); | ||
value: function reset() { | ||
var _this13 = this; | ||
var _this15 = this; | ||
logger('reset'); | ||
return this.ready().then(function () { | ||
return queueReset(_this13); | ||
return queueReset(_this15); | ||
}).catch(function (err) { | ||
logger('Event: reset error', _this13.id, err); | ||
_this13.emit(enums.status.error, _this13.id, err); | ||
logger('Event: reset error', _this15.id, err); | ||
_this15.emit(enums.status.error, _this15.id, err); | ||
return Promise.reject(err); | ||
@@ -265,10 +296,10 @@ }); | ||
value: function stop() { | ||
var _this14 = this; | ||
var _this16 = this; | ||
logger('stop'); | ||
return queueStop(this).then(function () { | ||
return queueDb.drain(_this14); | ||
return queueDb.drain(_this16); | ||
}).catch(function (err) { | ||
logger('Event: stop error', _this14.id, err); | ||
_this14.emit(enums.status.error, _this14.id, err); | ||
logger('Event: stop error', _this16.id, err); | ||
_this16.emit(enums.status.error, _this16.id, err); | ||
return Promise.reject(err); | ||
@@ -280,8 +311,8 @@ }); | ||
value: function drop() { | ||
var _this15 = this; | ||
var _this17 = this; | ||
logger('drop'); | ||
return queueDrop(this).catch(function (err) { | ||
logger('Event: drop error', _this15.id, err); | ||
_this15.emit(enums.status.error, _this15.id, err); | ||
logger('Event: drop error', _this17.id, err); | ||
_this17.emit(enums.status.error, _this17.id, err); | ||
return Promise.reject(err); | ||
@@ -288,0 +319,0 @@ }); |
{ | ||
"name": "rethinkdb-job-queue", | ||
"version": "2.2.0", | ||
"version": "2.3.0", | ||
"description": "A persistent job or task queue backed by RethinkDB.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -202,2 +202,4 @@ # Introduction | ||
See my [other projects on NPM](https://www.npmjs.com/~grantcarthew). | ||
## Contributing | ||
@@ -204,0 +206,0 @@ |
@@ -20,5 +20,5 @@ const test = require('tap').test | ||
t.equal(Object.keys(enums.options).length, 15, 'Enums options has correct number of keys') | ||
t.equal(Object.keys(enums.index).length, 4, 'Enums index has correct number of keys') | ||
t.equal(Object.keys(enums.index).length, 5, 'Enums index has correct number of keys') | ||
t.equal(Object.keys(enums.log).length, 3, 'Enums log has correct number of keys') | ||
t.equal(Object.keys(enums.message).length, 28, 'Enums message has correct number of keys') | ||
t.equal(Object.keys(enums.message).length, 29, 'Enums message has correct number of keys') | ||
} catch (err) { | ||
@@ -25,0 +25,0 @@ tError(err, module, t) |
@@ -8,6 +8,7 @@ const test = require('tap').test | ||
test('job-options', (t) => { | ||
t.plan(54) | ||
t.plan(70) | ||
try { | ||
let to = jobOptions() | ||
t.equal(to.name, null, 'Job default name option is null') | ||
t.equal(to.priority, 'normal', 'Job default priority option is normal') | ||
@@ -20,2 +21,3 @@ t.equal(to.timeout, enums.options.timeout, 'Job default timeout option is valid') | ||
to = jobOptions({ | ||
name: 'one', | ||
priority: 'high', | ||
@@ -28,2 +30,3 @@ timeout: 100000, | ||
}, to) | ||
t.equal(to.name, 'one', 'Job custom name option is valid') | ||
t.equal(to.priority, 'high', 'Job custom priority option is valid') | ||
@@ -36,3 +39,13 @@ t.equal(to.timeout, 100000, 'Job custom timeout option is valid') | ||
to = jobOptions({ name: 'two' }, to) | ||
t.equal(to.name, 'two', 'Job name custom name option is valid') | ||
t.equal(to.priority, 'high', 'Job priority custom priority option is valid') | ||
t.equal(to.timeout, 100000, 'Job priority custom timeout option is valid') | ||
t.equal(to.retryMax, 8, 'Job priority custom retryMax option is valid') | ||
t.equal(to.retryDelay, 200000, 'Job priority custom retryDelay option is valid') | ||
t.equal(to.repeat, 4, 'Job priority custom repeat option is valid') | ||
t.equal(to.repeatDelay, 1000, 'Job priority custom repeatDelay option is valid') | ||
to = jobOptions({ priority: 'lowest' }, to) | ||
t.equal(to.name, 'two', 'Job name custom name option is valid') | ||
t.equal(to.priority, 'lowest', 'Job priority custom priority option is valid') | ||
@@ -46,2 +59,3 @@ t.equal(to.timeout, 100000, 'Job priority custom timeout option is valid') | ||
to = jobOptions({ timeout: 700000 }, to) | ||
t.equal(to.name, 'two', 'Job name custom name option is valid') | ||
t.equal(to.priority, 'lowest', 'Job timeout custom priority option is valid') | ||
@@ -55,2 +69,3 @@ t.equal(to.timeout, 700000, 'Job timeout custom timeout option is valid') | ||
to = jobOptions({ retryMax: 2 }, to) | ||
t.equal(to.name, 'two', 'Job name custom name option is valid') | ||
t.equal(to.priority, 'lowest', 'Job retryMax custom priority option is valid') | ||
@@ -64,2 +79,3 @@ t.equal(to.timeout, 700000, 'Job retryMax custom timeout option is valid') | ||
to = jobOptions({ retryDelay: 800000 }, to) | ||
t.equal(to.name, 'two', 'Job name custom name option is valid') | ||
t.equal(to.priority, 'lowest', 'Job retryDelay custom priority option is valid') | ||
@@ -73,2 +89,3 @@ t.equal(to.timeout, 700000, 'Job retryDelay custom timeout option is valid') | ||
to = jobOptions({ repeat: false }, to) | ||
t.equal(to.name, 'two', 'Job name custom name option is valid') | ||
t.equal(to.priority, 'lowest', 'Job repeat custom priority option is valid') | ||
@@ -82,2 +99,3 @@ t.equal(to.timeout, 700000, 'Job repeat custom timeout option is valid') | ||
to = jobOptions({ repeatDelay: 2000 }, to) | ||
t.equal(to.name, 'two', 'Job name custom name option is valid') | ||
t.equal(to.priority, 'lowest', 'Job repeatDelay custom priority option is valid') | ||
@@ -91,2 +109,3 @@ t.equal(to.timeout, 700000, 'Job repeatDelay custom timeout option is valid') | ||
to = jobOptions({ | ||
name: true, | ||
priority: 'oops', | ||
@@ -99,2 +118,3 @@ timeout: -20, | ||
}, to) | ||
t.equal(to.name, 'two', 'Job invalid name option is reverted') | ||
t.equal(to.priority, 'lowest', 'Job invalid priority option is reverted') | ||
@@ -101,0 +121,0 @@ t.equal(to.timeout, 700000, 'Job invalid timeout option is reverted') |
@@ -16,3 +16,3 @@ const test = require('tap').test | ||
test(testName, (t) => { | ||
t.plan(114) | ||
t.plan(117) | ||
@@ -76,3 +76,7 @@ const q = new Queue(tOpts.cxn(), tOpts.default()) | ||
t.comment('job: Change Options') | ||
t.throws(() => { newJob.setPriority('not valid') }, 'Job setPriority thows if invalid') | ||
t.equal(newJob.name, newJob.id, 'New job name is the same as job id') | ||
t.throws(() => { newJob.setName([]) }, 'Job setName throws if invalid') | ||
newJob.setName('test name') | ||
t.equal(newJob.name, 'test name', 'Job setName successfully changed value') | ||
t.throws(() => { newJob.setPriority('not valid') }, 'Job setPriority throws if invalid') | ||
newJob.setPriority('highest') | ||
@@ -109,3 +113,3 @@ t.equal(newJob.priority, 'highest', 'Job setPriority successfully changed value') | ||
let cleanJob = newJob.getCleanCopy() | ||
t.equal(Object.keys(cleanJob).length, 16, 'Clean job has valid number of properties') | ||
t.equal(Object.keys(cleanJob).length, 17, 'Clean job has valid number of properties') | ||
t.equal(cleanJob.id, newJob.id, 'Clean job has valid id') | ||
@@ -112,0 +116,0 @@ t.equal(cleanJob.data, newJob.data, 'Clean job data is valid') |
@@ -15,3 +15,3 @@ const test = require('tap').test | ||
test(testName, (t) => { | ||
t.plan(147) | ||
t.plan(156) | ||
@@ -28,3 +28,5 @@ // ---------- Test Setup ---------- | ||
let job | ||
let jobName = 'rjqTestJob' | ||
let customJobOptions = { | ||
name: jobName, | ||
priority: 'high', | ||
@@ -150,2 +152,3 @@ timeout: 200, | ||
t.ok(is.job(job), 'Queue createJob created a job object') | ||
t.equal(job.name, customJobOptions.name, 'Queue created job with new default name') | ||
t.equal(job.priority, customJobOptions.priority, 'Queue created job with new default priority') | ||
@@ -157,2 +160,3 @@ t.equal(job.timeout, customJobOptions.timeout, 'Queue created job with new default timeout') | ||
customJobOptions = { | ||
name: 'aaa', | ||
priority: 'low', | ||
@@ -164,4 +168,5 @@ timeout: 400, | ||
} | ||
job = qMain.createJob().setPriority('low').setTimeout(400).setRetryMax(2).setRetryDelay(900) | ||
job = qMain.createJob().setName('aaa').setPriority('low').setTimeout(400).setRetryMax(2).setRetryDelay(900) | ||
t.ok(is.job(job), 'Queue createJob created a job object') | ||
t.equal(job.name, customJobOptions.name, 'Queue created job with custom name') | ||
t.equal(job.priority, customJobOptions.priority, 'Queue created job with custom priority') | ||
@@ -202,5 +207,24 @@ t.equal(job.timeout, customJobOptions.timeout, 'Queue created job with custom timeout') | ||
// ---------- Contains Job By Name Tests ---------- | ||
t.comment('queue: Contains Job By Name') | ||
return qMain.containsJobByName(jobName) | ||
}).then((exists) => { | ||
t.ok(is.true(exists), 'Contains job by name returns true') | ||
return qMain.containsJobByName('not a valid name!') | ||
}).then((exists) => { | ||
t.ok(is.false(exists), 'Contains job by name returns false') | ||
// ---------- Find Job By Name Tests ---------- | ||
t.comment('queue: Find Job By Name') | ||
return qMain.findJobByName(jobName) | ||
}).then((savedJobs4) => { | ||
t.ok(is.array(savedJobs4), 'Find job by name returns an array') | ||
t.ok(is.job(savedJobs4[0]), 'Job retrieved successfully') | ||
t.equal(savedJobs4[0].id, job.id, 'Job id is valid') | ||
t.equal(savedJobs4[0].name, jobName, 'Job name is valid') | ||
t.equal(savedJobs4[0].status, enums.status.waiting, 'Job status is valid') | ||
// ---------- Cancel Job Tests ---------- | ||
t.comment('queue: Cancel Job') | ||
return qMain.cancelJob(savedJobs3[0].id) | ||
return qMain.cancelJob(savedJobs4[0].id) | ||
}).then((cancelledJobs) => { | ||
@@ -207,0 +231,0 @@ t.ok(is.array(cancelledJobs), 'Cancel job returns an array') |
@@ -26,2 +26,3 @@ const tests = new Map() | ||
tests.set('queueFindJob', require('./queue-find-job.spec')) | ||
tests.set('queueFindJobByName', require('./queue-find-job-by-name.spec')) | ||
tests.set('queueGetNextJob', require('./queue-get-next-job.spec')) | ||
@@ -28,0 +29,0 @@ tests.set('queueReanimateJob', require('./queue-reanimate-job.spec')) |
@@ -26,2 +26,3 @@ const Promise = require('bluebird') | ||
const queueFindJob = require('./queue-find-job.spec') | ||
const queueFindJobByName = require('./queue-find-job-by-name.spec') | ||
const queueGetNextJob = require('./queue-get-next-job.spec') | ||
@@ -62,2 +63,3 @@ const queueReanimateJob = require('./queue-reanimate-job.spec') | ||
queueFindJob(), | ||
queueFindJobByName(), | ||
dbResult(), | ||
@@ -64,0 +66,0 @@ queueAddJob(), |
@@ -5,7 +5,3 @@ # WORKLOG | ||
## TODO | ||
* global queue reset, reviewed, error event? | ||
* Add failedCount? | ||
* Add global rate limiter? |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
566826
78
97
7881
254