Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pg-boss

Package Overview
Dependencies
Maintainers
1
Versions
205
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-boss - npm Package Compare versions

Comparing version 2.0.0-beta5 to 2.0.0-rc1

.nyc_output/3d49807107a54205b146d13d942ddd09.json

19

lib/attorney.js

@@ -12,3 +12,5 @@ 'use strict';

checkPublishArgs: checkPublishArgs,
checkSubscribeArgs: checkSubscribeArgs
checkSubscribeArgs: checkSubscribeArgs,
checkFetchArgs: checkFetchArgs,
assertAsync: assertAsync
};

@@ -79,2 +81,17 @@

function checkFetchArgs(name, batchSize) {
return assertAsync(name, 'missing job name').then(function () {
return assert(!batchSize || batchSize >= 1, 'fetch() assert: optional batchSize arg must be at least 1');
});
}
function assertAsync(arg, errorMessage) {
try {
assert(arg, errorMessage);
return Promise.resolve(arg);
} catch (e) {
return Promise.reject(e);
}
}
function applyConfig(config) {

@@ -81,0 +98,0 @@

193

lib/manager.js

@@ -47,6 +47,30 @@ 'use strict';

_this.completeJobCommand = plans.completeJob(config.schema);
_this.completeJobsCommand = plans.completeJobs(config.schema);
_this.cancelJobCommand = plans.cancelJob(config.schema);
_this.cancelJobsCommand = plans.cancelJobs(config.schema);
_this.failJobCommand = plans.failJob(config.schema);
_this.failJobsCommand = plans.failJobs(config.schema);
_this.functions = [_this.fetch, _this.complete, _this.cancel, _this.fail, _this.publish, _this.subscribe, _this.unsubscribe, _this.onComplete, _this.offComplete, _this.onExpire, _this.offExpire, _this.onFail, _this.offFail];
_this.offFail = function (name) {
return _this.unsubscribe(name + failedJobSuffix);
};
_this.offExpire = function (name) {
return _this.unsubscribe(name + expiredJobSuffix);
};
_this.offComplete = function (name) {
return _this.unsubscribe(name + completedJobSuffix);
};
_this.fetchFailed = function (name, batchSize) {
return _this.fetch(name + failedJobSuffix, batchSize);
};
_this.fetchExpired = function (name, batchSize) {
return _this.fetch(name + expiredJobSuffix, batchSize);
};
_this.fetchCompleted = function (name, batchSize) {
return _this.fetch(name + completedJobSuffix, batchSize);
};
// exported api to index
_this.functions = [_this.fetch, _this.complete, _this.cancel, _this.fail, _this.publish, _this.subscribe, _this.unsubscribe, _this.onComplete, _this.offComplete, _this.onExpire, _this.offExpire, _this.onFail, _this.offFail, _this.fetchFailed, _this.fetchExpired, _this.fetchCompleted];
return _this;

@@ -71,5 +95,3 @@ }

this.subscriptions[name].workers.forEach(function (worker) {
return worker.stop();
});
this.subscriptions[name].worker.stop();
delete this.subscriptions[name];

@@ -107,4 +129,4 @@

callback = _ref2.callback;
return _this4.watch(name + expiredJobSuffix, options, function (job, done) {
return callback(job.data, done);
return _this4.watch(name + expiredJobSuffix, options, function (job) {
return callback(job.data);
});

@@ -114,17 +136,2 @@ });

}, {
key: 'offExpire',
value: function offExpire(name) {
return this.unsubscribe(name + expiredJobSuffix);
}
}, {
key: 'offComplete',
value: function offComplete(name) {
return this.unsubscribe(name + completedJobSuffix);
}
}, {
key: 'offFail',
value: function offFail(name) {
return this.unsubscribe(name + failedJobSuffix);
}
}, {
key: 'onComplete',

@@ -166,7 +173,7 @@ value: function onComplete(name) {

options.teamSize = options.teamSize || 1;
options.batchSize = options.batchSize || 1;
if ('newJobCheckInterval' in options || 'newJobCheckIntervalSeconds' in options) options = Attorney.applyNewJobCheckInterval(options);else options.newJobCheckInterval = this.config.newJobCheckInterval;
var subscription = this.subscriptions[name] = { workers: [] };
var subscription = this.subscriptions[name] = { worker: null };

@@ -185,14 +192,21 @@ var onError = function onError(error) {

var respond = function respond(job) {
if (!job) return;
var respond = function respond(jobs) {
if (!jobs) return;
_this7.emit(events.job, job);
if (!Array.isArray(jobs)) jobs = [jobs];
jobs.forEach(function (job) {
_this7.emit(events.job, job);
job.done = function (error) {
return complete(error, job);
};
});
setImmediate(function () {
try {
callback(job, function (error) {
return complete(error, job);
if (jobs.length === 1) callback(jobs[0], jobs[0].done);else callback(jobs);
} catch (error) {
jobs.forEach(function (job) {
return _this7.emit(events.failed, { job: job, error: error });
});
} catch (error) {
_this7.emit(events.failed, { job: job, error: error });
}

@@ -203,3 +217,3 @@ });

var fetch = function fetch() {
return _this7.fetch(name);
return _this7.fetch(name, options.batchSize);
};

@@ -215,7 +229,5 @@

for (var w = 0; w < options.teamSize; w++) {
var worker = new Worker(workerConfig);
worker.start();
subscription.workers.push(worker);
}
var worker = new Worker(workerConfig);
worker.start();
subscription.worker = worker;
}

@@ -280,60 +292,97 @@ }, {

key: 'fetch',
value: function fetch(name) {
return this.db.executeSql(this.nextJobCommand, name).then(function (result) {
if (result.rows.length === 0) return null;
value: function fetch(name, batchSize) {
var _this10 = this;
var job = result.rows[0];
job.name = name;
return job;
return Attorney.checkFetchArgs(name, batchSize).then(function () {
return _this10.db.executeSql(_this10.nextJobCommand, [name, batchSize || 1]);
}).then(function (result) {
return result.rows.length === 0 ? null : result.rows.length === 1 ? result.rows[0] : result.rows;
});
}
}, {
key: 'complete',
value: function complete(id, data) {
var _this10 = this;
key: 'setStateForJob',
value: function setStateForJob(id, data, actionName, command, stateSuffix, bypassNotify) {
var _this11 = this;
return this.db.executeSql(this.completeJobCommand, [id]).then(function (result) {
assert(result.rowCount === 1, 'Job ' + id + ' could not be completed.');
var job = void 0;
var job = result.rows[0];
return this.db.executeSql(command, [id]).then(function (result) {
assert(result.rowCount === 1, actionName + '(): Job ' + id + ' could not be updated.');
return _this10.respond(job, completedJobSuffix, data).then(function () {
return job;
});
job = result.rows[0];
return bypassNotify ? null : _this11.publish(job.name + stateSuffix, { request: job, response: data || null });
}).then(function () {
return job;
});
}
}, {
key: 'fail',
value: function fail(id, data) {
var _this11 = this;
key: 'setStateForJobs',
value: function setStateForJobs(ids, actionName, command) {
return this.db.executeSql(command, [ids]).then(function (result) {
assert(result.rowCount === ids.length, actionName + '(): ' + ids.length + ' jobs submitted, ' + result.rowCount + ' updated');
});
}
}, {
key: 'setState',
value: function setState(config) {
var _this12 = this;
return this.db.executeSql(this.failJobCommand, [id]).then(function (result) {
assert(result.rowCount === 1, 'Job ' + id + ' could not be marked as failed.');
var id = config.id,
data = config.data,
actionName = config.actionName,
command = config.command,
batchCommand = config.batchCommand,
stateSuffix = config.stateSuffix,
bypassNotify = config.bypassNotify;
var job = result.rows[0];
return _this11.respond(job, failedJobSuffix, data).then(function () {
return job;
});
return Attorney.assertAsync(id, actionName + '() requires id argument').then(function () {
var ids = Array.isArray(id) ? id : [id];
assert(ids.length, actionName + '() requires at least 1 item in an array argument');
return ids.length === 1 ? _this12.setStateForJob(ids[0], data, actionName, command, stateSuffix, bypassNotify) : _this12.setStateForJobs(ids, actionName, batchCommand);
});
}
}, {
key: 'respond',
value: function respond(job, suffix, data) {
var payload = {
request: job,
response: data || null
key: 'complete',
value: function complete(id, data) {
var config = {
id: id,
data: data,
actionName: 'complete',
command: this.completeJobCommand,
batchCommand: this.completeJobsCommand,
stateSuffix: completedJobSuffix
};
return this.publish(job.name + suffix, payload);
return this.setState(config);
}
}, {
key: 'fail',
value: function fail(id, data) {
var config = {
id: id,
data: data,
actionName: 'fail',
command: this.failJobCommand,
batchCommand: this.failJobsCommand,
stateSuffix: failedJobSuffix
};
return this.setState(config);
}
}, {
key: 'cancel',
value: function cancel(id) {
return this.db.executeSql(this.cancelJobCommand, [id]).then(function (result) {
assert(result.rowCount === 1, 'Job ' + id + ' could not be cancelled.');
return result.rows[0];
});
var config = {
id: id,
actionName: 'cancel',
command: this.cancelJobCommand,
batchCommand: this.cancelJobsCommand,
bypassNotify: true
};
return this.setState(config);
}

@@ -340,0 +389,0 @@ }]);

@@ -71,5 +71,10 @@ 'use strict';

previous: '4',
install: ['ALTER TABLE ' + schema + '.job ALTER COLUMN startIn SET DEFAULT (interval \'0\')', 'ALTER TABLE ' + schema + '.job ALTER COLUMN state SET DEFAULT (\'created\')'],
uninstall: ['ALTER TABLE ' + schema + '.job ALTER COLUMN startIn DROP DEFAULT', 'ALTER TABLE ' + schema + '.job ALTER COLUMN state DROP DEFAULT']
install: ['ALTER TABLE ' + schema + '.job ALTER COLUMN startIn SET DEFAULT (interval \'0\')', 'ALTER TABLE ' + schema + '.job ALTER COLUMN state SET DEFAULT (\'created\')', 'UPDATE ' + schema + '.job SET name = left(name, -9) || \'__state__expired\' WHERE name LIKE \'%__expired\''],
uninstall: ['ALTER TABLE ' + schema + '.job ALTER COLUMN startIn DROP DEFAULT', 'ALTER TABLE ' + schema + '.job ALTER COLUMN state DROP DEFAULT', 'UPDATE ' + schema + '.job SET name = left(name, -16) || \'__expired\' WHERE name LIKE \'%__state__expired\'']
}, {
version: '6',
previous: '5',
install: ['CREATE INDEX job_fetch ON ' + schema + '.job (priority desc, createdOn, id) WHERE state < \'active\''],
uninstall: ['DROP INDEX ' + schema + '.job_fetch']
}];
}

@@ -25,4 +25,7 @@ 'use strict';

completeJob: completeJob,
completeJobs: completeJobs,
cancelJob: cancelJob,
cancelJobs: cancelJobs,
failJob: failJob,
failJobs: failJobs,
insertJob: insertJob,

@@ -39,3 +42,3 @@ expire: expire,

function create(schema) {
return [createSchema(schema), createVersionTable(schema), createJobStateEnum(schema), createJobTable(schema), createIndexSingletonOn(schema), createIndexSingletonKeyOn(schema), createIndexSingletonKey(schema)];
return [createSchema(schema), createVersionTable(schema), createJobStateEnum(schema), createJobTable(schema), createIndexJobFetch(schema), createIndexSingletonOn(schema), createIndexSingletonKeyOn(schema), createIndexSingletonKey(schema)];
}

@@ -76,2 +79,6 @@

function createIndexJobFetch(schema) {
return '\n CREATE INDEX job_fetch ON ' + schema + '.job (priority desc, createdOn, id) WHERE state < \'' + states.active + '\'\n ';
}
function getVersion(schema) {

@@ -90,3 +97,3 @@ return '\n SELECT version from ' + schema + '.version\n ';

function fetchNextJob(schema) {
return '\n WITH nextJob as (\n SELECT id\n FROM ' + schema + '.job\n WHERE state < \'' + states.active + '\'\n AND name = $1\n AND (createdOn + startIn) < now()\n ORDER BY priority desc, createdOn, id\n LIMIT 1\n FOR UPDATE SKIP LOCKED\n )\n UPDATE ' + schema + '.job SET\n state = \'' + states.active + '\',\n startedOn = now(),\n retryCount = CASE WHEN state = \'' + states.retry + '\' THEN retryCount + 1 ELSE retryCount END\n FROM nextJob\n WHERE ' + schema + '.job.id = nextJob.id\n RETURNING ' + schema + '.job.id, ' + schema + '.job.data\n ';
return '\n WITH nextJob as (\n SELECT id\n FROM ' + schema + '.job\n WHERE state < \'' + states.active + '\'\n AND name = $1\n AND (createdOn + startIn) < now()\n ORDER BY priority desc, createdOn, id\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n )\n UPDATE ' + schema + '.job SET\n state = \'' + states.active + '\',\n startedOn = now(),\n retryCount = CASE WHEN state = \'' + states.retry + '\' THEN retryCount + 1 ELSE retryCount END\n FROM nextJob\n WHERE ' + schema + '.job.id = nextJob.id\n RETURNING ' + schema + '.job.id, $1 as name, ' + schema + '.job.data\n ';
}

@@ -98,2 +105,6 @@

function completeJobs(schema) {
return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.complete + '\'\n WHERE id = ANY($1)\n AND state = \'' + states.active + '\'\n ';
}
function cancelJob(schema) {

@@ -103,2 +114,6 @@ return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.cancelled + '\'\n WHERE id = $1\n AND state < \'' + states.complete + '\'\n RETURNING id, name, data\n ';

function cancelJobs(schema) {
return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.cancelled + '\'\n WHERE id = ANY($1)\n AND state < \'' + states.complete + '\'\n ';
}
function failJob(schema) {

@@ -108,2 +123,6 @@ return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.failed + '\'\n WHERE id = $1\n AND state < \'' + states.complete + '\'\n RETURNING id, name, data\n ';

function failJobs(schema) {
return '\n UPDATE ' + schema + '.job\n SET completedOn = now(),\n state = \'' + states.failed + '\'\n WHERE id = ANY($1)\n AND state < \'' + states.complete + '\' \n ';
}
function insertJob(schema) {

@@ -110,0 +129,0 @@ return '\n INSERT INTO ' + schema + '.job (id, name, priority, state, retryLimit, startIn, expireIn, data, singletonKey, singletonOn)\n VALUES (\n $1, $2, $3, \'' + states.created + '\', $4, CAST($5 as interval), CAST($6 as interval), $7, $8,\n CASE WHEN $9::integer IS NOT NULL THEN \'epoch\'::timestamp + \'1 second\'::interval * ($9 * floor((date_part(\'epoch\', now()) + $10) / $9)) ELSE NULL END\n )\n ON CONFLICT DO NOTHING\n ';

{
"name": "pg-boss",
"version": "2.0.0-beta5",
"version": "2.0.0-rc1",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",

@@ -5,0 +5,0 @@ "main": "./lib/index.js",

@@ -29,7 +29,7 @@ Queueing jobs in Node.js using PostgreSQL like a boss.

function someJobHandler(job, done) {
function someJobHandler(job) {
console.log(`received ${job.name} ${job.id}`);
console.log(`data: ${JSON.stringify(job.data)}`);
done()
job.done()
.then(() => console.log(`some-job ${job.id} completed`))

@@ -36,0 +36,0 @@ .catch(onError);

{
"schema": "5"
"schema": "6"
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc