Comparing version 2.0.0-beta5 to 2.0.0-rc1
@@ -12,3 +12,5 @@ 'use strict'; | ||
checkPublishArgs: checkPublishArgs, | ||
checkSubscribeArgs: checkSubscribeArgs | ||
checkSubscribeArgs: checkSubscribeArgs, | ||
checkFetchArgs: checkFetchArgs, | ||
assertAsync: assertAsync | ||
}; | ||
@@ -79,2 +81,17 @@ | ||
function checkFetchArgs(name, batchSize) { | ||
return assertAsync(name, 'missing job name').then(function () { | ||
return assert(!batchSize || batchSize >= 1, 'fetch() assert: optional batchSize arg must be at least 1'); | ||
}); | ||
} | ||
function assertAsync(arg, errorMessage) { | ||
try { | ||
assert(arg, errorMessage); | ||
return Promise.resolve(arg); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
function applyConfig(config) { | ||
@@ -81,0 +98,0 @@ |
@@ -47,6 +47,30 @@ 'use strict'; | ||
_this.completeJobCommand = plans.completeJob(config.schema); | ||
_this.completeJobsCommand = plans.completeJobs(config.schema); | ||
_this.cancelJobCommand = plans.cancelJob(config.schema); | ||
_this.cancelJobsCommand = plans.cancelJobs(config.schema); | ||
_this.failJobCommand = plans.failJob(config.schema); | ||
_this.failJobsCommand = plans.failJobs(config.schema); | ||
_this.functions = [_this.fetch, _this.complete, _this.cancel, _this.fail, _this.publish, _this.subscribe, _this.unsubscribe, _this.onComplete, _this.offComplete, _this.onExpire, _this.offExpire, _this.onFail, _this.offFail]; | ||
_this.offFail = function (name) { | ||
return _this.unsubscribe(name + failedJobSuffix); | ||
}; | ||
_this.offExpire = function (name) { | ||
return _this.unsubscribe(name + expiredJobSuffix); | ||
}; | ||
_this.offComplete = function (name) { | ||
return _this.unsubscribe(name + completedJobSuffix); | ||
}; | ||
_this.fetchFailed = function (name, batchSize) { | ||
return _this.fetch(name + failedJobSuffix, batchSize); | ||
}; | ||
_this.fetchExpired = function (name, batchSize) { | ||
return _this.fetch(name + expiredJobSuffix, batchSize); | ||
}; | ||
_this.fetchCompleted = function (name, batchSize) { | ||
return _this.fetch(name + completedJobSuffix, batchSize); | ||
}; | ||
// exported api to index | ||
_this.functions = [_this.fetch, _this.complete, _this.cancel, _this.fail, _this.publish, _this.subscribe, _this.unsubscribe, _this.onComplete, _this.offComplete, _this.onExpire, _this.offExpire, _this.onFail, _this.offFail, _this.fetchFailed, _this.fetchExpired, _this.fetchCompleted]; | ||
return _this; | ||
@@ -71,5 +95,3 @@ } | ||
this.subscriptions[name].workers.forEach(function (worker) { | ||
return worker.stop(); | ||
}); | ||
this.subscriptions[name].worker.stop(); | ||
delete this.subscriptions[name]; | ||
@@ -107,4 +129,4 @@ | ||
callback = _ref2.callback; | ||
return _this4.watch(name + expiredJobSuffix, options, function (job, done) { | ||
return callback(job.data, done); | ||
return _this4.watch(name + expiredJobSuffix, options, function (job) { | ||
return callback(job.data); | ||
}); | ||
@@ -114,17 +136,2 @@ }); | ||
}, { | ||
key: 'offExpire', | ||
value: function offExpire(name) { | ||
return this.unsubscribe(name + expiredJobSuffix); | ||
} | ||
}, { | ||
key: 'offComplete', | ||
value: function offComplete(name) { | ||
return this.unsubscribe(name + completedJobSuffix); | ||
} | ||
}, { | ||
key: 'offFail', | ||
value: function offFail(name) { | ||
return this.unsubscribe(name + failedJobSuffix); | ||
} | ||
}, { | ||
key: 'onComplete', | ||
@@ -166,7 +173,7 @@ value: function onComplete(name) { | ||
options.teamSize = options.teamSize || 1; | ||
options.batchSize = options.batchSize || 1; | ||
if ('newJobCheckInterval' in options || 'newJobCheckIntervalSeconds' in options) options = Attorney.applyNewJobCheckInterval(options);else options.newJobCheckInterval = this.config.newJobCheckInterval; | ||
var subscription = this.subscriptions[name] = { workers: [] }; | ||
var subscription = this.subscriptions[name] = { worker: null }; | ||
@@ -185,14 +192,21 @@ var onError = function onError(error) { | ||
var respond = function respond(job) { | ||
if (!job) return; | ||
var respond = function respond(jobs) { | ||
if (!jobs) return; | ||
_this7.emit(events.job, job); | ||
if (!Array.isArray(jobs)) jobs = [jobs]; | ||
jobs.forEach(function (job) { | ||
_this7.emit(events.job, job); | ||
job.done = function (error) { | ||
return complete(error, job); | ||
}; | ||
}); | ||
setImmediate(function () { | ||
try { | ||
callback(job, function (error) { | ||
return complete(error, job); | ||
if (jobs.length === 1) callback(jobs[0], jobs[0].done);else callback(jobs); | ||
} catch (error) { | ||
jobs.forEach(function (job) { | ||
return _this7.emit(events.failed, { job: job, error: error }); | ||
}); | ||
} catch (error) { | ||
_this7.emit(events.failed, { job: job, error: error }); | ||
} | ||
@@ -203,3 +217,3 @@ }); | ||
var fetch = function fetch() { | ||
return _this7.fetch(name); | ||
return _this7.fetch(name, options.batchSize); | ||
}; | ||
@@ -215,7 +229,5 @@ | ||
for (var w = 0; w < options.teamSize; w++) { | ||
var worker = new Worker(workerConfig); | ||
worker.start(); | ||
subscription.workers.push(worker); | ||
} | ||
var worker = new Worker(workerConfig); | ||
worker.start(); | ||
subscription.worker = worker; | ||
} | ||
@@ -280,60 +292,97 @@ }, { | ||
key: 'fetch', | ||
value: function fetch(name) { | ||
return this.db.executeSql(this.nextJobCommand, name).then(function (result) { | ||
if (result.rows.length === 0) return null; | ||
value: function fetch(name, batchSize) { | ||
var _this10 = this; | ||
var job = result.rows[0]; | ||
job.name = name; | ||
return job; | ||
return Attorney.checkFetchArgs(name, batchSize).then(function () { | ||
return _this10.db.executeSql(_this10.nextJobCommand, [name, batchSize || 1]); | ||
}).then(function (result) { | ||
return result.rows.length === 0 ? null : result.rows.length === 1 ? result.rows[0] : result.rows; | ||
}); | ||
} | ||
}, { | ||
key: 'complete', | ||
value: function complete(id, data) { | ||
var _this10 = this; | ||
key: 'setStateForJob', | ||
value: function setStateForJob(id, data, actionName, command, stateSuffix, bypassNotify) { | ||
var _this11 = this; | ||
return this.db.executeSql(this.completeJobCommand, [id]).then(function (result) { | ||
assert(result.rowCount === 1, 'Job ' + id + ' could not be completed.'); | ||
var job = void 0; | ||
var job = result.rows[0]; | ||
return this.db.executeSql(command, [id]).then(function (result) { | ||
assert(result.rowCount === 1, actionName + '(): Job ' + id + ' could not be updated.'); | ||
return _this10.respond(job, completedJobSuffix, data).then(function () { | ||
return job; | ||
}); | ||
job = result.rows[0]; | ||
return bypassNotify ? null : _this11.publish(job.name + stateSuffix, { request: job, response: data || null }); | ||
}).then(function () { | ||
return job; | ||
}); | ||
} | ||
}, { | ||
key: 'fail', | ||
value: function fail(id, data) { | ||
var _this11 = this; | ||
key: 'setStateForJobs', | ||
value: function setStateForJobs(ids, actionName, command) { | ||
return this.db.executeSql(command, [ids]).then(function (result) { | ||
assert(result.rowCount === ids.length, actionName + '(): ' + ids.length + ' jobs submitted, ' + result.rowCount + ' updated'); | ||
}); | ||
} | ||
}, { | ||
key: 'setState', | ||
value: function setState(config) { | ||
var _this12 = this; | ||
return this.db.executeSql(this.failJobCommand, [id]).then(function (result) { | ||
assert(result.rowCount === 1, 'Job ' + id + ' could not be marked as failed.'); | ||
var id = config.id, | ||
data = config.data, | ||
actionName = config.actionName, | ||
command = config.command, | ||
batchCommand = config.batchCommand, | ||
stateSuffix = config.stateSuffix, | ||
bypassNotify = config.bypassNotify; | ||
var job = result.rows[0]; | ||
return _this11.respond(job, failedJobSuffix, data).then(function () { | ||
return job; | ||
}); | ||
return Attorney.assertAsync(id, actionName + '() requires id argument').then(function () { | ||
var ids = Array.isArray(id) ? id : [id]; | ||
assert(ids.length, actionName + '() requires at least 1 item in an array argument'); | ||
return ids.length === 1 ? _this12.setStateForJob(ids[0], data, actionName, command, stateSuffix, bypassNotify) : _this12.setStateForJobs(ids, actionName, batchCommand); | ||
}); | ||
} | ||
}, { | ||
key: 'respond', | ||
value: function respond(job, suffix, data) { | ||
var payload = { | ||
request: job, | ||
response: data || null | ||
key: 'complete', | ||
value: function complete(id, data) { | ||
var config = { | ||
id: id, | ||
data: data, | ||
actionName: 'complete', | ||
command: this.completeJobCommand, | ||
batchCommand: this.completeJobsCommand, | ||
stateSuffix: completedJobSuffix | ||
}; | ||
return this.publish(job.name + suffix, payload); | ||
return this.setState(config); | ||
} | ||
}, { | ||
key: 'fail', | ||
value: function fail(id, data) { | ||
var config = { | ||
id: id, | ||
data: data, | ||
actionName: 'fail', | ||
command: this.failJobCommand, | ||
batchCommand: this.failJobsCommand, | ||
stateSuffix: failedJobSuffix | ||
}; | ||
return this.setState(config); | ||
} | ||
}, { | ||
key: 'cancel', | ||
value: function cancel(id) { | ||
return this.db.executeSql(this.cancelJobCommand, [id]).then(function (result) { | ||
assert(result.rowCount === 1, 'Job ' + id + ' could not be cancelled.'); | ||
return result.rows[0]; | ||
}); | ||
var config = { | ||
id: id, | ||
actionName: 'cancel', | ||
command: this.cancelJobCommand, | ||
batchCommand: this.cancelJobsCommand, | ||
bypassNotify: true | ||
}; | ||
return this.setState(config); | ||
} | ||
@@ -340,0 +389,0 @@ }]); |
@@ -71,5 +71,10 @@ 'use strict'; | ||
previous: '4', | ||
install: ['ALTER TABLE ' + schema + '.job ALTER COLUMN startIn SET DEFAULT (interval \'0\')', 'ALTER TABLE ' + schema + '.job ALTER COLUMN state SET DEFAULT (\'created\')'], | ||
uninstall: ['ALTER TABLE ' + schema + '.job ALTER COLUMN startIn DROP DEFAULT', 'ALTER TABLE ' + schema + '.job ALTER COLUMN state DROP DEFAULT'] | ||
install: ['ALTER TABLE ' + schema + '.job ALTER COLUMN startIn SET DEFAULT (interval \'0\')', 'ALTER TABLE ' + schema + '.job ALTER COLUMN state SET DEFAULT (\'created\')', 'UPDATE ' + schema + '.job SET name = left(name, -9) || \'__state__expired\' WHERE name LIKE \'%__expired\''], | ||
uninstall: ['ALTER TABLE ' + schema + '.job ALTER COLUMN startIn DROP DEFAULT', 'ALTER TABLE ' + schema + '.job ALTER COLUMN state DROP DEFAULT', 'UPDATE ' + schema + '.job SET name = left(name, -16) || \'__expired\' WHERE name LIKE \'%__state__expired\''] | ||
}, { | ||
version: '6', | ||
previous: '5', | ||
install: ['CREATE INDEX job_fetch ON ' + schema + '.job (priority desc, createdOn, id) WHERE state < \'active\''], | ||
uninstall: ['DROP INDEX ' + schema + '.job_fetch'] | ||
}]; | ||
} |
@@ -25,4 +25,7 @@ 'use strict'; | ||
completeJob: completeJob, | ||
completeJobs: completeJobs, | ||
cancelJob: cancelJob, | ||
cancelJobs: cancelJobs, | ||
failJob: failJob, | ||
failJobs: failJobs, | ||
insertJob: insertJob, | ||
@@ -39,3 +42,3 @@ expire: expire, | ||
function create(schema) { | ||
return [createSchema(schema), createVersionTable(schema), createJobStateEnum(schema), createJobTable(schema), createIndexSingletonOn(schema), createIndexSingletonKeyOn(schema), createIndexSingletonKey(schema)]; | ||
return [createSchema(schema), createVersionTable(schema), createJobStateEnum(schema), createJobTable(schema), createIndexJobFetch(schema), createIndexSingletonOn(schema), createIndexSingletonKeyOn(schema), createIndexSingletonKey(schema)]; | ||
} | ||
@@ -76,2 +79,6 @@ | ||
function createIndexJobFetch(schema) { | ||
return '\n CREATE INDEX job_fetch ON ' + schema + '.job (priority desc, createdOn, id) WHERE state < \'' + states.active + '\'\n '; | ||
} | ||
function getVersion(schema) { | ||
@@ -90,3 +97,3 @@ return '\n SELECT version from ' + schema + '.version\n '; | ||
function fetchNextJob(schema) { | ||
return '\n WITH nextJob as (\n SELECT id\n FROM ' + schema + '.job\n WHERE state < \'' + states.active + '\'\n AND name = $1\n AND (createdOn + startIn) < now()\n ORDER BY priority desc, createdOn, id\n LIMIT 1\n FOR UPDATE SKIP LOCKED\n )\n UPDATE ' + schema + '.job SET\n state = \'' + states.active + '\',\n startedOn = now(),\n retryCount = CASE WHEN state = \'' + states.retry + '\' THEN retryCount + 1 ELSE retryCount END\n FROM nextJob\n WHERE ' + schema + '.job.id = nextJob.id\n RETURNING ' + schema + '.job.id, ' + schema + '.job.data\n '; | ||
return '\n WITH nextJob as (\n SELECT id\n FROM ' + schema + '.job\n WHERE state < \'' + states.active + '\'\n AND name = $1\n AND (createdOn + startIn) < now()\n ORDER BY priority desc, createdOn, id\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n )\n UPDATE ' + schema + '.job SET\n state = \'' + states.active + '\',\n startedOn = now(),\n retryCount = CASE WHEN state = \'' + states.retry + '\' THEN retryCount + 1 ELSE retryCount END\n FROM nextJob\n WHERE ' + schema + '.job.id = nextJob.id\n RETURNING ' + schema + '.job.id, $1 as name, ' + schema + '.job.data\n '; | ||
} | ||
@@ -98,2 +105,6 @@ | ||
function completeJobs(schema) { | ||
return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.complete + '\'\n WHERE id = ANY($1)\n AND state = \'' + states.active + '\'\n '; | ||
} | ||
function cancelJob(schema) { | ||
@@ -103,2 +114,6 @@ return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.cancelled + '\'\n WHERE id = $1\n AND state < \'' + states.complete + '\'\n RETURNING id, name, data\n '; | ||
function cancelJobs(schema) { | ||
return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.cancelled + '\'\n WHERE id = ANY($1)\n AND state < \'' + states.complete + '\'\n '; | ||
} | ||
function failJob(schema) { | ||
@@ -108,2 +123,6 @@ return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.failed + '\'\n WHERE id = $1\n AND state < \'' + states.complete + '\'\n RETURNING id, name, data\n '; | ||
function failJobs(schema) { | ||
return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.failed + '\'\n WHERE id = ANY($1)\n AND state < \'' + states.complete + '\' \n '; | ||
} | ||
function insertJob(schema) { | ||
@@ -110,0 +129,0 @@ return '\n INSERT INTO ' + schema + '.job (id, name, priority, state, retryLimit, startIn, expireIn, data, singletonKey, singletonOn)\n VALUES (\n $1, $2, $3, \'' + states.created + '\', $4, CAST($5 as interval), CAST($6 as interval), $7, $8,\n CASE WHEN $9::integer IS NOT NULL THEN \'epoch\'::timestamp + \'1 second\'::interval * ($9 * floor((date_part(\'epoch\', now()) + $10) / $9)) ELSE NULL END\n )\n ON CONFLICT DO NOTHING\n '; |
{ | ||
"name": "pg-boss", | ||
"version": "2.0.0-beta5", | ||
"version": "2.0.0-rc1", | ||
"description": "Queueing jobs in Node.js using PostgreSQL like a boss", | ||
@@ -5,0 +5,0 @@ "main": "./lib/index.js", |
@@ -29,7 +29,7 @@ Queueing jobs in Node.js using PostgreSQL like a boss. | ||
function someJobHandler(job, done) { | ||
function someJobHandler(job) { | ||
console.log(`received ${job.name} ${job.id}`); | ||
console.log(`data: ${JSON.stringify(job.data)}`); | ||
done() | ||
job.done() | ||
.then(() => console.log(`some-job ${job.id} completed`)) | ||
@@ -36,0 +36,0 @@ .catch(onError); |
{ | ||
"schema": "5" | ||
"schema": "6" | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
244029
1095