Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pg-boss

Package Overview
Dependencies
Maintainers
0
Versions
205
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-boss - npm Package Compare versions

Comparing version 10.0.0-beta9 to 10.0.0

4

package.json
{
"name": "pg-boss",
"version": "10.0.0-beta9",
"version": "10.0.0",
"description": "Queueing jobs in Postgres from Node.js like a boss",

@@ -11,4 +11,2 @@ "main": "./src/index.js",

"cron-parser": "^4.0.0",
"lodash.debounce": "^4.0.8",
"p-map": "^4.0.0",
"pg": "^8.5.1",

@@ -15,0 +13,0 @@ "serialize-error": "^8.1.0"

@@ -22,3 +22,3 @@ Queueing jobs in Postgres from Node.js like a boss.

await boss.work(queue, async job => {
await boss.work(queue, async ([ job ]) => {
console.log(`received job ${job.id} with data ${JSON.stringify(job.data)}`)

@@ -31,15 +31,17 @@ })

pg-boss relies on [SKIP LOCKED](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/), a feature added to postgres specifically for message queues, in order to resolve record locking challenges inherent with relational databases. This brings the safety of guaranteed atomic commits of a relational database to your asynchronous job processing.
pg-boss relies on [SKIP LOCKED](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/), a feature built specifically for message queues to resolve record locking challenges inherent with relational databases. This provides exactly-once delivery and the safety of guaranteed atomic commits to asynchronous job processing.
This will likely cater the most to teams already familiar with the simplicity of relational database semantics and operations (SQL, querying, and backups). It will be especially useful to those already relying on PostgreSQL that want to limit how many systems are required to monitor and support in their architecture.
## Features
## Summary
* Exactly-once job delivery
* Backpressure-compatible polling workers
* Cron scheduling
* Queue storage policies to support a variety of rate limiting, debouncing, and concurrency use cases
* Priority queues, dead letter queues, job deferral, automatic retries with exponential backoff
* Pub/sub API for fan-out queue relationships
* Priority queues, deferral, retries (with exponential backoff), rate limiting, debouncing
* Table operations via SQL for bulk loads via COPY or INSERT
* Raw SQL support for non-Node.js runtimes via INSERT or COPY
* Serverless function compatible
* Multi-master compatible (for example, in a Kubernetes ReplicaSet)
* Dead letter queues

@@ -52,3 +54,3 @@ ## Requirements

``` bash
```bash
# npm

@@ -55,0 +57,0 @@ npm install pg-boss

@@ -125,22 +125,21 @@ const assert = require('assert')

applyNewJobCheckInterval(options, defaults)
applyPollingInterval(options, defaults)
assert(!('teamConcurrency' in options) ||
(Number.isInteger(options.teamConcurrency) && options.teamConcurrency >= 1 && options.teamConcurrency <= 1000),
'teamConcurrency must be an integer between 1 and 1000')
assert(!('teamSize' in options) || (Number.isInteger(options.teamSize) && options.teamSize >= 1), 'teamSize must be an integer > 0')
assert(!('batchSize' in options) || (Number.isInteger(options.batchSize) && options.batchSize >= 1), 'batchSize must be an integer > 0')
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean')
assert(!('priority' in options) || typeof options.priority === 'boolean', 'priority must be a boolean')
options.batchSize = options.batchSize || 1
return { options, callback }
}
function checkFetchArgs (name, batchSize, options) {
function checkFetchArgs (name, options) {
assert(name, 'missing queue name')
assert(!batchSize || (Number.isInteger(batchSize) && batchSize >= 1), 'batchSize must be an integer > 0')
assert(!('batchSize' in options) || (Number.isInteger(options.batchSize) && options.batchSize >= 1), 'batchSize must be an integer > 0')
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean')
assert(!('priority' in options) || typeof options.priority === 'boolean', 'priority must be a boolean')
return { name }
options.batchSize = options.batchSize || 1
}

@@ -167,3 +166,3 @@

applyNewJobCheckInterval(config)
applyPollingInterval(config)
applyExpirationConfig(config)

@@ -191,4 +190,4 @@ applyRetentionConfig(config)

function assertQueueName (name) {
assert(name, 'Name is required')
assert(typeof name === 'string', 'Name must be a string')
assert(name.length <= 50, 'Name cannot exceed 50 characters')
assert(/[\w-]/.test(name), 'Name can only contain alphanumeric characters, underscores, or hyphens')

@@ -285,14 +284,9 @@ }

function applyNewJobCheckInterval (config, defaults) {
assert(!('newJobCheckInterval' in config) || config.newJobCheckInterval >= 500,
'configuration assert: newJobCheckInterval must be at least every 500ms')
function applyPollingInterval (config, defaults) {
assert(!('pollingIntervalSeconds' in config) || config.pollingIntervalSeconds >= 0.5,
'configuration assert: pollingIntervalSeconds must be at least every 500ms')
assert(!('newJobCheckIntervalSeconds' in config) || config.newJobCheckIntervalSeconds >= 1,
'configuration assert: newJobCheckIntervalSeconds must be at least every second')
config.newJobCheckInterval = ('newJobCheckIntervalSeconds' in config)
? config.newJobCheckIntervalSeconds * 1000
: ('newJobCheckInterval' in config)
? config.newJobCheckInterval
: defaults?.newJobCheckInterval || 2000
config.pollingInterval = ('pollingIntervalSeconds' in config)
? config.pollingIntervalSeconds * 1000
: defaults?.pollingInterval || 2000
}

@@ -299,0 +293,0 @@

@@ -174,32 +174,4 @@ const EventEmitter = require('events')

}
async setMaintenanceTime () {
await this.db.executeSql(this.setMaintenanceTimeCommand)
}
async getMaintenanceTime () {
const { rows } = await this.db.executeSql(this.getMaintenanceTimeCommand)
let { maintained_on: maintainedOn, seconds_ago: secondsAgo } = rows[0]
secondsAgo = secondsAgo !== null ? parseFloat(secondsAgo) : 999_999_999
return { maintainedOn, secondsAgo }
}
async setMonitorTime () {
await this.db.executeSql(this.setMonitorTimeCommand)
}
async getMonitorTime () {
const { rows } = await this.db.executeSql(this.getMonitorTimeCommand)
let { monitored_on: monitoredOn, seconds_ago: secondsAgo } = rows[0]
secondsAgo = secondsAgo !== null ? parseFloat(secondsAgo) : 999_999_999
return { monitoredOn, secondsAgo }
}
}
module.exports = Boss

@@ -89,13 +89,13 @@ const assert = require('assert')

async next (version) {
const commands = migrationStore.next(this.config.schema, version, this.migrations)
await this.db.executeSql(commands)
}
// async next (version) {
// const commands = migrationStore.next(this.config.schema, version, this.migrations)
// await this.db.executeSql(commands)
// }
async rollback (version) {
const commands = migrationStore.rollback(this.config.schema, version, this.migrations)
await this.db.executeSql(commands)
}
// async rollback (version) {
// const commands = migrationStore.rollback(this.config.schema, version, this.migrations)
// await this.db.executeSql(commands)
// }
}
module.exports = Contractor

@@ -32,11 +32,11 @@ const EventEmitter = require('events')

if (this.opened) {
if (this.config.debug === true) {
console.log(`${new Date().toISOString()}: DEBUG SQL`)
console.log(text)
// if (this.config.debug === true) {
// console.log(`${new Date().toISOString()}: DEBUG SQL`)
// console.log(text)
if (values) {
console.log(`${new Date().toISOString()}: DEBUG VALUES`)
console.log(values)
}
}
// if (values) {
// console.log(`${new Date().toISOString()}: DEBUG VALUES`)
// console.log(values)
// }
// }

@@ -43,0 +43,0 @@ return await this.pool.query(text, values)

@@ -152,3 +152,3 @@ const EventEmitter = require('events')

let { destroy = false, graceful = true, timeout = 30000, wait = true } = options
let { close = true, graceful = true, timeout = 30000, wait = true } = options

@@ -172,3 +172,3 @@ timeout = Math.max(timeout, 1000)

if (this.#db.isOurs && this.#db.opened && destroy) {
if (this.#db.isOurs && this.#db.opened && close) {
await this.#db.close()

@@ -175,0 +175,0 @@ }

const assert = require('assert')
const EventEmitter = require('events')
const { randomUUID } = require('crypto')
const debounce = require('lodash.debounce')
const { serializeError: stringify } = require('serialize-error')
const pMap = require('p-map')
const { delay } = require('./tools')

@@ -17,5 +15,2 @@ const Attorney = require('./attorney')

const WIP_EVENT_INTERVAL = 2000
const WIP_DEBOUNCE_OPTIONS = { leading: true, trailing: true, maxWait: WIP_EVENT_INTERVAL }
const events = {

@@ -49,2 +44,3 @@ error: 'error',

this.events = events
this.wipTs = Date.now()
this.workers = new Map()

@@ -58,2 +54,3 @@

this.resumeJobsCommand = plans.resumeJobs(config.schema)
this.deleteJobsCommand = plans.deleteJobs(config.schema)
this.failJobsByIdCommand = plans.failJobsById(config.schema)

@@ -64,3 +61,10 @@ this.getJobByIdCommand = plans.getJobById(config.schema)

this.unsubscribeCommand = plans.unsubscribe(config.schema)
this.getQueuesCommand = plans.getQueues(config.schema)
this.getQueueByNameCommand = plans.getQueueByName(config.schema)
this.getQueuesForEventCommand = plans.getQueuesForEvent(config.schema)
this.createQueueCommand = plans.createQueue(config.schema)
this.updateQueueCommand = plans.updateQueue(config.schema)
this.purgeQueueCommand = plans.purgeQueue(config.schema)
this.deleteQueueCommand = plans.deleteQueue(config.schema)
this.clearStorageCommand = plans.clearStorage(config.schema)

@@ -72,2 +76,3 @@ // exported api to index

this.resume,
this.deleteJob,
this.fail,

@@ -88,11 +93,10 @@ this.fetch,

this.updateQueue,
this.getQueue,
this.deleteQueue,
this.purgeQueue,
this.getQueueSize,
this.getQueue,
this.getQueues,
this.clearStorage,
this.getJobById
]
this.emitWipThrottled = debounce(() => this.emit(events.wip, this.getWipData()), WIP_EVENT_INTERVAL, WIP_DEBOUNCE_OPTIONS)
}

@@ -142,3 +146,8 @@

if (!INTERNAL_QUEUES[name]) {
this.emitWipThrottled()
const now = Date.now()
if (now - this.wipTs > 2000) {
this.emit(events.wip, this.getWipData())
this.wipTs = now
}
}

@@ -187,7 +196,4 @@ }

const {
newJobCheckInterval: interval = this.config.newJobCheckInterval,
pollingInterval: interval = this.config.pollingInterval,
batchSize,
teamSize = 1,
teamConcurrency = 1,
teamRefill: refill = false,
includeMetadata = false,

@@ -199,24 +205,9 @@ priority = true

let queueSize = 0
const fetch = () => this.fetch(name, { batchSize, includeMetadata, priority })
let refillTeamPromise
let resolveRefillTeam
const onFetch = async (jobs) => {
if (!jobs.length) {
return
}
// Setup a promise that onFetch can await for when at least one
// job is finished and so the team is ready to be topped up
const createTeamRefillPromise = () => {
refillTeamPromise = new Promise((resolve) => { resolveRefillTeam = resolve })
}
createTeamRefillPromise()
const onRefill = () => {
queueSize--
resolveRefillTeam()
createTeamRefillPromise()
}
const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata, priority })
const onFetch = async (jobs) => {
if (this.config.__test__throw_worker) {

@@ -228,30 +219,10 @@ throw new Error('__test__throw_worker')

if (batchSize) {
const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expireInSeconds), 0)
const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expireInSeconds), 0)
const jobIds = jobs.map(job => job.id)
await resolveWithinSeconds(Promise.all([callback(jobs)]), maxExpiration)
.then(() => this.complete(name, jobs.map(job => job.id)))
.catch(err => this.fail(name, jobs.map(job => job.id), err))
} else {
if (refill) {
queueSize += jobs.length || 1
}
const allTeamPromise = pMap(jobs, job =>
resolveWithinSeconds(callback(job), job.expireInSeconds)
.then(result => this.complete(name, job.id, result))
.catch(err => this.fail(name, job.id, err))
.then(() => refill ? onRefill() : null)
, { concurrency: teamConcurrency }
).catch(() => {}) // allow promises & non-promises to live together in harmony
if (refill) {
if (queueSize < teamSize) {
return
} else {
await refillTeamPromise
}
} else {
await allTeamPromise
}
try {
const result = await resolveWithinSeconds(callback(jobs), maxExpiration)
this.complete(name, jobIds, jobIds.length === 1 ? result : undefined)
} catch (err) {
this.fail(name, jobIds, err)
}

@@ -332,3 +303,3 @@

return await Promise.all(rows.map(({ name }) => this.send(name, ...args)))
await Promise.allSettled(rows.map(({ name }) => this.send(name, ...args)))
}

@@ -471,7 +442,6 @@

async fetch (name, batchSize, options = {}) {
const values = Attorney.checkFetchArgs(name, batchSize, options)
async fetch (name, options = {}) {
Attorney.checkFetchArgs(name, options)
const db = options.db || this.db
const nextJobSql = this.nextJobCommand({ ...options })
const statementValues = [values.name, batchSize || 1]

@@ -481,3 +451,3 @@ let result

try {
result = await db.executeSql(nextJobSql, statementValues)
result = await db.executeSql(nextJobSql, [name, options.batchSize])
} catch (err) {

@@ -487,7 +457,3 @@ // errors from fetchquery should only be unique constraint violations

if (!result || result.rows.length === 0) {
return null
}
return result.rows.length === 1 && !batchSize ? result.rows[0] : result.rows
return result?.rows || []
}

@@ -517,7 +483,7 @@

mapCompletionResponse (ids, result) {
mapCommandResponse (ids, result) {
return {
jobs: ids,
requested: ids.length,
updated: result && result.rows ? parseInt(result.rows[0].count) : 0
affected: result && result.rows ? parseInt(result.rows[0].count) : 0
}

@@ -527,35 +493,43 @@ }

async complete (name, id, data, options = {}) {
assert(name, 'Missing queue name argument')
Attorney.assertQueueName(name)
const db = options.db || this.db
const ids = this.mapCompletionIdArg(id, 'complete')
const result = await db.executeSql(this.completeJobsCommand, [name, ids, this.mapCompletionDataArg(data)])
return this.mapCompletionResponse(ids, result)
return this.mapCommandResponse(ids, result)
}
async fail (name, id, data, options = {}) {
assert(name, 'Missing queue name argument')
Attorney.assertQueueName(name)
const db = options.db || this.db
const ids = this.mapCompletionIdArg(id, 'fail')
const result = await db.executeSql(this.failJobsByIdCommand, [name, ids, this.mapCompletionDataArg(data)])
return this.mapCompletionResponse(ids, result)
return this.mapCommandResponse(ids, result)
}
async cancel (name, id, options = {}) {
assert(name, 'Missing queue name argument')
Attorney.assertQueueName(name)
const db = options.db || this.db
const ids = this.mapCompletionIdArg(id, 'cancel')
const result = await db.executeSql(this.cancelJobsCommand, [name, ids])
return this.mapCompletionResponse(ids, result)
return this.mapCommandResponse(ids, result)
}
async deleteJob (name, id, options = {}) {
Attorney.assertQueueName(name)
const db = options.db || this.db
const ids = this.mapCompletionIdArg(id, 'deleteJob')
const result = await db.executeSql(this.deleteJobsCommand, [name, ids])
return this.mapCommandResponse(ids, result)
}
async resume (name, id, options = {}) {
assert(name, 'Missing queue name argument')
Attorney.assertQueueName(name)
const db = options.db || this.db
const ids = this.mapCompletionIdArg(id, 'resume')
const result = await db.executeSql(this.resumeJobsCommand, [name, ids])
return this.mapCompletionResponse(ids, result)
return this.mapCommandResponse(ids, result)
}
async createQueue (name, options = {}) {
assert(name, 'Missing queue name argument')
name = name || options.name

@@ -577,10 +551,8 @@ Attorney.assertQueueName(name)

const paritionSql = plans.createPartition(this.config.schema, name)
if (deadLetter) {
Attorney.assertQueueName(deadLetter)
}
await this.db.executeSql(paritionSql)
const sql = plans.createQueue(this.config.schema, name)
const params = [
name,
// todo: pull in defaults from constructor config
const data = {
policy,

@@ -593,10 +565,19 @@ retryLimit,

deadLetter
]
}
await this.db.executeSql(sql, params)
await this.db.executeSql(this.createQueueCommand, [name, data])
}
async getQueues () {
const { rows } = await this.db.executeSql(this.getQueuesCommand)
return rows
}
async updateQueue (name, options = {}) {
assert(name, 'Missing queue name argument')
Attorney.assertQueueName(name)
const { policy = QUEUE_POLICIES.standard } = options
assert(policy in QUEUE_POLICIES, `${policy} is not a valid queue policy`)
const {

@@ -611,6 +592,5 @@ retryLimit,

const sql = plans.updateQueue(this.config.schema)
const params = [
name,
policy,
retryLimit,

@@ -624,70 +604,38 @@ retryDelay,

await this.db.executeSql(sql, params)
await this.db.executeSql(this.updateQueueCommand, params)
}
async getQueue (name) {
assert(name, 'Missing queue name argument')
Attorney.assertQueueName(name)
const sql = plans.getQueueByName(this.config.schema)
const result = await this.db.executeSql(sql, [name])
const { rows } = await this.db.executeSql(this.getQueueByNameCommand, [name])
if (result.rows.length === 0) {
return null
}
const {
policy,
retry_limit: retryLimit,
retry_delay: retryDelay,
retry_backoff: retryBackoff,
expire_seconds: expireInSeconds,
retention_minutes: retentionMinutes,
dead_letter: deadLetter
} = result.rows[0]
return {
name,
policy,
retryLimit,
retryDelay,
retryBackoff,
expireInSeconds,
retentionMinutes,
deadLetter
}
return rows[0] || null
}
async deleteQueue (name) {
assert(name, 'Missing queue name argument')
Attorney.assertQueueName(name)
const queueSql = plans.getQueueByName(this.config.schema)
const { rows } = await this.db.executeSql(queueSql, [name])
const { rows } = await this.db.executeSql(this.getQueueByNameCommand, [name])
if (rows.length) {
Attorney.assertQueueName(name)
const sql = plans.dropPartition(this.config.schema, name)
await this.db.executeSql(sql)
if (rows.length === 1) {
await this.db.executeSql(this.deleteQueueCommand, [name])
}
const sql = plans.deleteQueueRecords(this.config.schema)
await this.db.executeSql(sql, [name])
}
async purgeQueue (queue) {
assert(queue, 'Missing queue name argument')
const sql = plans.purgeQueue(this.config.schema)
await this.db.executeSql(sql, [queue])
async purgeQueue (name) {
Attorney.assertQueueName(name)
await this.db.executeSql(this.purgeQueueCommand, [name])
}
async clearStorage () {
const sql = plans.clearStorage(this.config.schema)
await this.db.executeSql(sql)
await this.db.executeSql(this.clearStorageCommand)
}
async getQueueSize (queue, options) {
assert(queue, 'Missing queue name argument')
async getQueueSize (name, options) {
Attorney.assertQueueName(name)
const sql = plans.getQueueSize(this.config.schema, options)
const result = await this.db.executeSql(sql, [queue])
const result = await this.db.executeSql(sql, [name])

@@ -697,17 +645,17 @@ return result ? parseFloat(result.rows[0].count) : null

async getJobById (queue, id, options = {}) {
async getJobById (name, id, options = {}) {
Attorney.assertQueueName(name)
const db = options.db || this.db
const result1 = await db.executeSql(this.getJobByIdCommand, [queue, id])
if (result1 && result1.rows && result1.rows.length === 1) {
const result1 = await db.executeSql(this.getJobByIdCommand, [name, id])
if (result1?.rows?.length === 1) {
return result1.rows[0]
} else if (options.includeArchive) {
const result2 = await db.executeSql(this.getArchivedJobByIdCommand, [name, id])
return result2?.rows[0] || null
} else {
return null
}
const result2 = await db.executeSql(this.getArchivedJobByIdCommand, [queue, id])
if (result2 && result2.rows && result2.rows.length === 1) {
return result2.rows[0]
}
return null
}

@@ -714,0 +662,0 @@ }

@@ -31,2 +31,3 @@ const DEFAULT_SCHEMA = 'pgboss'

resumeJobs,
deleteJobs,
failJobsById,

@@ -46,7 +47,6 @@ failJobsByTimeout,

countStates,
updateQueue,
createQueue,
updateQueue,
createPartition,
dropPartition,
deleteQueueRecords,
deleteQueue,
getQueues,
getQueueByName,

@@ -77,9 +77,8 @@ getQueueSize,

createTableVersion(schema),
createTableQueue(schema),
createTableSchedule(schema),
createTableSubscription(schema),
createTableJob(schema),
createIndexJobFetch(schema),
createIndexJobPolicyStately(schema),
createIndexJobPolicyShort(schema),
createIndexJobPolicySingleton(schema),
createIndexJobThrottleOn(schema),
createIndexJobThrottleKey(schema),

@@ -91,11 +90,5 @@ createTableArchive(schema),

createTableVersion(schema),
createTableQueue(schema),
createTableSchedule(schema),
createTableSubscription(schema),
createQueueFunction(schema),
deleteQueueFunction(schema),
getPartitionFunction(schema),
createPartitionFunction(schema),
dropPartitionFunction(schema),
insertVersion(schema, version)

@@ -113,2 +106,17 @@ ]

function createEnumJobState (schema) {
// ENUM definition order is important
// base type is numeric and first values are less than last values
return `
CREATE TYPE ${schema}.job_state AS ENUM (
'${JOB_STATES.created}',
'${JOB_STATES.retry}',
'${JOB_STATES.active}',
'${JOB_STATES.completed}',
'${JOB_STATES.cancelled}',
'${JOB_STATES.failed}'
)
`
}
function createTableVersion (schema) {

@@ -125,13 +133,17 @@ return `

function createEnumJobState (schema) {
// ENUM definition order is important
// base type is numeric and first values are less than last values
function createTableQueue (schema) {
return `
CREATE TYPE ${schema}.job_state AS ENUM (
'${JOB_STATES.created}',
'${JOB_STATES.retry}',
'${JOB_STATES.active}',
'${JOB_STATES.completed}',
'${JOB_STATES.cancelled}',
'${JOB_STATES.failed}'
CREATE TABLE ${schema}.queue (
name text,
policy text,
retry_limit int,
retry_delay int,
retry_backoff bool,
expire_seconds int,
retention_minutes int,
dead_letter text REFERENCES ${schema}.queue (name),
partition_name text,
created_on timestamp with time zone not null default now(),
updated_on timestamp with time zone not null default now(),
PRIMARY KEY (name)
)

@@ -141,2 +153,29 @@ `

function createTableSchedule (schema) {
return `
CREATE TABLE ${schema}.schedule (
name text REFERENCES ${schema}.queue ON DELETE CASCADE,
cron text not null,
timezone text,
data jsonb,
options jsonb,
created_on timestamp with time zone not null default now(),
updated_on timestamp with time zone not null default now(),
PRIMARY KEY (name)
)
`
}
function createTableSubscription (schema) {
return `
CREATE TABLE ${schema}.subscription (
event text not null,
name text not null REFERENCES ${schema}.queue ON DELETE CASCADE,
created_on timestamp with time zone not null default now(),
updated_on timestamp with time zone not null default now(),
PRIMARY KEY(event, name)
)
`
}
function createTableJob (schema) {

@@ -164,4 +203,3 @@ return `

dead_letter text,
policy text,
CONSTRAINT job_pkey PRIMARY KEY (name, id)
policy text
) PARTITION BY LIST (name)

@@ -172,3 +210,6 @@ `

const baseJobColumns = 'id, name, data, EXTRACT(epoch FROM expire_in) as "expireInSeconds"'
const allJobColumns = `${baseJobColumns}, policy, state, priority,
const allJobColumns = `${baseJobColumns},
policy,
state,
priority,
retry_limit as "retryLimit",

@@ -190,27 +231,46 @@ retry_count as "retryCount",

function createPartition (schema, name) {
return `SELECT ${schema}.create_partition('${name}');`
}
function getPartitionFunction (schema) {
function createQueueFunction (schema) {
return `
CREATE FUNCTION ${schema}.get_partition(queue_name text, out name text) AS
$$
SELECT 'job_' || encode(sha224(queue_name::bytea), 'hex');
$$
LANGUAGE SQL
IMMUTABLE
`
}
function createPartitionFunction (schema) {
return `
CREATE FUNCTION ${schema}.create_partition(queue_name text)
CREATE FUNCTION ${schema}.create_queue(queue_name text, options json)
RETURNS VOID AS
$$
DECLARE
table_name varchar := ${schema}.get_partition(queue_name);
table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex');
BEGIN
EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS INCLUDING CONSTRAINTS)', table_name);
EXECUTE format('ALTER TABLE ${schema}.%I ADD CHECK (name=%L)', table_name, queue_name);
INSERT INTO ${schema}.queue (
name,
policy,
retry_limit,
retry_delay,
retry_backoff,
expire_seconds,
retention_minutes,
dead_letter,
partition_name
)
VALUES (
queue_name,
options->>'policy',
(options->>'retryLimit')::int,
(options->>'retryDelay')::int,
(options->>'retryBackoff')::bool,
(options->>'expireInSeconds')::int,
(options->>'retentionMinutes')::int,
options->>'deadLetter',
table_name
);
EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name);
EXECUTE format('${formatPartitionCommand(createPrimaryKeyJob(schema))}', table_name);
EXECUTE format('${formatPartitionCommand(createQueueForeignKeyJob(schema))}', table_name);
EXECUTE format('${formatPartitionCommand(createQueueForeignKeyJobDeadLetter(schema))}', table_name);
EXECUTE format('${formatPartitionCommand(createIndexJobPolicyShort(schema))}', table_name);
EXECUTE format('${formatPartitionCommand(createIndexJobPolicySingleton(schema))}', table_name);
EXECUTE format('${formatPartitionCommand(createIndexJobPolicyStately(schema))}', table_name);
EXECUTE format('${formatPartitionCommand(createIndexJobThrottle(schema))}', table_name);
EXECUTE format('${formatPartitionCommand(createIndexJobFetch(schema))}', table_name);
EXECUTE format('ALTER TABLE ${schema}.%I ADD CONSTRAINT cjc CHECK (name=%L)', table_name, queue_name);
EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION ${schema}.%I FOR VALUES IN (%L)', table_name, queue_name);

@@ -223,9 +283,22 @@ END;

function dropPartitionFunction (schema) {
function formatPartitionCommand (command) {
return command.replace('.job', '.%1$I').replace('job_i', '%1$s_i').replaceAll('\'', '\'\'')
}
function deleteQueueFunction (schema) {
return `
CREATE FUNCTION ${schema}.drop_partition(queue_name text)
CREATE FUNCTION ${schema}.delete_queue(queue_name text)
RETURNS VOID AS
$$
DECLARE
table_name varchar;
BEGIN
EXECUTE format('DROP TABLE IF EXISTS %I', ${schema}.get_partition(queue_name));
WITH deleted as (
DELETE FROM ${schema}.queue
WHERE name = queue_name
RETURNING partition_name
)
SELECT partition_name from deleted INTO table_name;
EXECUTE format('DROP TABLE IF EXISTS ${schema}.%I', table_name);
END;

@@ -237,32 +310,44 @@ $$

function dropPartition (schema, name) {
return `SELECT ${schema}.drop_partition('${name}');`
function createQueue (schema) {
return `SELECT ${schema}.create_queue($1, $2)`
}
function deleteQueue (schema) {
return `SELECT ${schema}.delete_queue($1)`
}
function createPrimaryKeyJob (schema) {
return `ALTER TABLE ${schema}.job ADD PRIMARY KEY (name, id)`
}
function createQueueForeignKeyJob (schema) {
return `ALTER TABLE ${schema}.job ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED`
}
function createQueueForeignKeyJobDeadLetter (schema) {
return `ALTER TABLE ${schema}.job ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED`
}
function createPrimaryKeyArchive (schema) {
return `ALTER TABLE ${schema}.archive ADD CONSTRAINT archive_pkey PRIMARY KEY (name, id)`
return `ALTER TABLE ${schema}.archive ADD PRIMARY KEY (name, id)`
}
function createIndexJobPolicyShort (schema) {
return `CREATE UNIQUE INDEX job_policy_short ON ${schema}.job (name) WHERE state = '${JOB_STATES.created}' AND policy = '${QUEUE_POLICIES.short}'`
return `CREATE UNIQUE INDEX job_i1 ON ${schema}.job (name, COALESCE(singleton_key, '')) WHERE state = '${JOB_STATES.created}' AND policy = '${QUEUE_POLICIES.short}';`
}
function createIndexJobPolicySingleton (schema) {
return `CREATE UNIQUE INDEX job_policy_singleton ON ${schema}.job (name) WHERE state = '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.singleton}'`
return `CREATE UNIQUE INDEX job_i2 ON ${schema}.job (name, COALESCE(singleton_key, '')) WHERE state = '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.singleton}'`
}
function createIndexJobPolicyStately (schema) {
return `CREATE UNIQUE INDEX job_policy_stately ON ${schema}.job (name, state) WHERE state <= '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.stately}'`
return `CREATE UNIQUE INDEX job_i3 ON ${schema}.job (name, state, COALESCE(singleton_key, '')) WHERE state <= '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.stately}'`
}
function createIndexJobThrottleOn (schema) {
return `CREATE UNIQUE INDEX job_throttle_on ON ${schema}.job (name, singleton_on, COALESCE(singleton_key, '')) WHERE state <= '${JOB_STATES.completed}' AND singleton_on IS NOT NULL`
function createIndexJobThrottle (schema) {
return `CREATE UNIQUE INDEX job_i4 ON ${schema}.job (name, singleton_on, COALESCE(singleton_key, '')) WHERE state <> '${JOB_STATES.cancelled}' AND singleton_on IS NOT NULL`
}
function createIndexJobThrottleKey (schema) {
return `CREATE UNIQUE INDEX job_throttle_key ON ${schema}.job (name, singleton_key) WHERE state <= '${JOB_STATES.completed}' AND singleton_on IS NULL`
}
function createIndexJobFetch (schema) {
return `CREATE INDEX job_fetch ON ${schema}.job (name, start_after) INCLUDE (priority, created_on, id) WHERE state < '${JOB_STATES.active}'`
return `CREATE INDEX job_i5 ON ${schema}.job (name, start_after) INCLUDE (priority, created_on, id) WHERE state < '${JOB_STATES.active}'`
}

@@ -279,3 +364,3 @@

function createIndexArchiveArchivedOn (schema) {
return `CREATE INDEX archive_archived_on_idx ON ${schema}.archive(archived_on)`
return `CREATE INDEX archive_i1 ON ${schema}.archive(archived_on)`
}

@@ -303,18 +388,13 @@

function createQueue (schema) {
return `
INSERT INTO ${schema}.queue (name, policy, retry_limit, retry_delay, retry_backoff, expire_seconds, retention_minutes, dead_letter)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`
}
function updateQueue (schema) {
return `
UPDATE ${schema}.queue SET
retry_limit = COALESCE($2, retry_limit),
retry_delay = COALESCE($3, retry_delay),
retry_backoff = COALESCE($4, retry_backoff),
expire_seconds = COALESCE($5, expire_seconds),
retention_minutes = COALESCE($6, retention_minutes),
dead_letter = COALESCE($7, dead_letter)
policy = COALESCE($2, policy),
retry_limit = COALESCE($3, retry_limit),
retry_delay = COALESCE($4, retry_delay),
retry_backoff = COALESCE($5, retry_backoff),
expire_seconds = COALESCE($6, expire_seconds),
retention_minutes = COALESCE($7, retention_minutes),
dead_letter = COALESCE($8, dead_letter),
updated_on = now()
WHERE name = $1

@@ -324,14 +404,21 @@ `

function getQueueByName (schema) {
return `SELECT * FROM ${schema}.queue WHERE name = $1`
function getQueues (schema) {
return `
SELECT
name,
policy,
retry_limit as "retryLimit",
retry_delay as "retryDelay",
retry_backoff as "retryBackoff",
expire_seconds as "expireInSeconds",
retention_minutes as "retentionMinutes",
dead_letter as "deadLetter",
created_on as "createdOn",
updated_on as "updatedOn"
FROM ${schema}.queue
`
}
function deleteQueueRecords (schema) {
return `WITH dq AS (
DELETE FROM ${schema}.queue WHERE name = $1
), ds AS (
DELETE FROM ${schema}.schedule WHERE name = $1
)
DELETE FROM ${schema}.job WHERE name = $1
`
function getQueueByName (schema) {
return `${getQueues(schema)} WHERE name = $1`
}

@@ -353,50 +440,4 @@

function createTableQueue (schema) {
return `
CREATE TABLE ${schema}.queue (
name text primary key,
policy text,
retry_limit int,
retry_delay int,
retry_backoff bool,
expire_seconds int,
retention_minutes int,
dead_letter text,
created_on timestamp with time zone not null default now()
)
`
}
function createTableSchedule (schema) {
return `
CREATE TABLE ${schema}.schedule (
name text primary key,
cron text not null,
timezone text,
data jsonb,
options jsonb,
created_on timestamp with time zone not null default now(),
updated_on timestamp with time zone not null default now()
)
`
}
function createTableSubscription (schema) {
return `
CREATE TABLE ${schema}.subscription (
event text not null,
name text not null,
created_on timestamp with time zone not null default now(),
updated_on timestamp with time zone not null default now(),
PRIMARY KEY(event, name)
)
`
}
function getSchedules (schema) {
return `
SELECT s.*
FROM ${schema}.schedule s
JOIN ${schema}.queue q on s.name = q.name
`
return `SELECT * FROM ${schema}.schedule`
}

@@ -583,2 +624,3 @@

AND id IN (SELECT UNNEST($2::uuid[]))
AND state = '${JOB_STATES.cancelled}'
RETURNING 1

@@ -590,2 +632,14 @@ )

function deleteJobs (schema) {
return `
with results as (
DELETE FROM ${schema}.job
WHERE name = $1
AND id IN (SELECT UNNEST($2::uuid[]))
RETURNING 1
)
SELECT COUNT(*) from results
`
}
function insertJob (schema) {

@@ -662,3 +716,3 @@ return `

$19::bool as retry_backoff_default
) j LEFT JOIN ${schema}.queue q ON j.name = q.name
) j JOIN ${schema}.queue q ON j.name = q.name
ON CONFLICT DO NOTHING

@@ -698,6 +752,6 @@ RETURNING id

data,
COALESCE(priority, 0),
COALESCE("startAfter", now()),
"singletonKey",
COALESCE("deadLetter", q.dead_letter),
COALESCE(priority, 0) as priority,
j.start_after,
"singletonKey" as singleton_key,
COALESCE("deadLetter", q.dead_letter) as dead_letter,
CASE

@@ -711,3 +765,3 @@ WHEN "expireInSeconds" IS NOT NULL THEN "expireInSeconds" * interval '1s'

WHEN "keepUntil" IS NOT NULL THEN "keepUntil"
ELSE COALESCE("startAfter", now()) + CAST(COALESCE((q.retention_minutes * 60)::text, defaults.keep_until, '14 days') as interval)
ELSE COALESCE(j.start_after, now()) + CAST(COALESCE((q.retention_minutes * 60)::text, defaults.keep_until, '14 days') as interval)
END as keep_until,

@@ -722,17 +776,25 @@ COALESCE("retryLimit", q.retry_limit, defaults.retry_limit, 2),

q.policy
FROM json_to_recordset($1) as j (
id uuid,
name text,
priority integer,
data jsonb,
"startAfter" timestamp with time zone,
"retryLimit" integer,
"retryDelay" integer,
"retryBackoff" boolean,
"singletonKey" text,
"expireInSeconds" integer,
"keepUntil" timestamp with time zone,
"deadLetter" text
)
LEFT JOIN ${schema}.queue q ON j.name = q.name,
FROM (
SELECT *,
CASE
WHEN right("startAfter", 1) = 'Z' THEN CAST("startAfter" as timestamp with time zone)
ELSE now() + CAST(COALESCE("startAfter",'0') as interval)
END as start_after
FROM json_to_recordset($1) as x (
id uuid,
name text,
priority integer,
data jsonb,
"startAfter" text,
"retryLimit" integer,
"retryDelay" integer,
"retryBackoff" boolean,
"singletonKey" text,
"singletonOn" text,
"expireInSeconds" integer,
"keepUntil" timestamp with time zone,
"deadLetter" text
)
) j
JOIN ${schema}.queue q ON j.name = q.name,
defaults

@@ -764,2 +826,3 @@ ON CONFLICT DO NOTHING

FROM archived_rows
ON CONFLICT DO NOTHING
`

@@ -792,3 +855,3 @@ }

function advisoryLock (schema, key) {
return `SELECT pg_advisory_xact_lock(
return `SELECT pg_advisory_xact_lock(
('x' || encode(sha224((current_database() || '.pgboss.${schema}${key || ''}')::bytea), 'hex'))::bit(64)::bigint

@@ -795,0 +858,0 @@ )`

@@ -5,9 +5,8 @@ const EventEmitter = require('events')

const Attorney = require('./attorney')
const pMap = require('p-map')
const queues = {
const QUEUES = {
SEND_IT: '__pgboss__send-it'
}
const events = {
const EVENTS = {
error: 'error',

@@ -28,3 +27,3 @@ schedule: 'schedule'

this.events = events
this.events = EVENTS

@@ -58,12 +57,11 @@ this.getTimeCommand = plans.getTime(config.schema)

try {
await this.manager.createQueue(queues.SEND_IT)
await this.manager.createQueue(QUEUES.SEND_IT)
} catch {}
const options = {
newJobCheckIntervalSeconds: this.config.cronWorkerIntervalSeconds,
teamSize: 50,
teamConcurrency: 5
pollingIntervalSeconds: this.config.cronWorkerIntervalSeconds,
batchSize: 50
}
await this.manager.work(queues.SEND_IT, options, (job) => this.onSendIt(job))
await this.manager.work(QUEUES.SEND_IT, options, (jobs) => this.manager.insert(jobs.map(i => i.data)))

@@ -83,3 +81,3 @@ setImmediate(() => this.onCron())

await this.manager.offWork(queues.SEND_IT)
await this.manager.offWork(QUEUES.SEND_IT)

@@ -148,8 +146,11 @@ if (this.skewMonitorInterval) {

async cron () {
const items = await this.getSchedules()
const schedules = await this.getSchedules()
const sending = items.filter(i => this.shouldSendIt(i.cron, i.timezone))
const scheduled = schedules
.filter(i => this.shouldSendIt(i.cron, i.timezone))
.map(({ name, data, options }) =>
({ name: QUEUES.SEND_IT, data: { name, data, options }, options: { singletonKey: name, singletonSeconds: 60 } }))
if (sending.length && !this.stopped) {
await pMap(sending, it => this.send(it), { concurrency: 5 })
if (scheduled.length > 0 && !this.stopped) {
await this.manager.insert(scheduled)
}

@@ -170,12 +171,2 @@ }

async send (job) {
await this.manager.send(queues.SEND_IT, job, { singletonKey: job.name, singletonSeconds: 60 })
}
async onSendIt (job) {
if (this.stopped) return
const { name, data, options } = job.data
await this.manager.send(name, data, options)
}
async getSchedules () {

@@ -191,15 +182,15 @@ const { rows } = await this.db.executeSql(this.getSchedulesCommand)

// validation pre-check
Attorney.checkSendArgs([name, data, options], this.config)
// make sure queue exists before scheduling
const queue = await this.db.executeSql(this.getQueueCommand, [name])
const values = [name, cron, tz, data, options]
if (!queue.rows.length === 0) {
throw new Error(`Queue '${name}' not found`)
try {
await this.db.executeSql(this.scheduleCommand, values)
} catch (err) {
if (err.message.includes('foreign key')) {
err.message = `Queue ${name} not found`
}
throw err
}
const values = [name, cron, tz, data, options]
await this.db.executeSql(this.scheduleCommand, values)
}

@@ -213,2 +204,2 @@

module.exports = Timekeeper
module.exports.QUEUES = queues
module.exports.QUEUES = QUEUES

@@ -116,39 +116,21 @@ import { EventEmitter } from 'events'

type QueuePolicy = 'standard' | 'short' | 'singleton' | 'stately'
type Queue = ExpirationOptions & RetentionOptions & RetryOptions & { policy: QueuePolicy }
type QueueUpdateOptions = ExpirationOptions & RetentionOptions & RetryOptions
type Queue = RetryOptions & ExpirationOptions & RetentionOptions & { name: string, policy?: QueuePolicy, deadLetter?: string }
type QueueResult = Queue & { createdOn: Date, updatedOn: Date }
type ScheduleOptions = SendOptions & { tz?: string }
interface JobPollingOptions {
newJobCheckInterval?: number;
newJobCheckIntervalSeconds?: number;
pollingIntervalSeconds?: number;
}
interface CommonJobFetchOptions {
interface JobFetchOptions {
includeMetadata?: boolean;
priority?: boolean;
batchSize?: number;
}
type JobFetchOptions = CommonJobFetchOptions & {
teamSize?: number;
teamConcurrency?: number;
teamRefill?: boolean;
}
type BatchJobFetchOptions = CommonJobFetchOptions & {
batchSize: number;
}
type WorkOptions = JobFetchOptions & JobPollingOptions
type BatchWorkOptions = BatchJobFetchOptions & JobPollingOptions
type FetchOptions = JobFetchOptions & ConnectionOptions;
type FetchOptions = {
includeMetadata?: boolean;
} & ConnectionOptions;
interface WorkHandler<ReqData> {
(job: PgBoss.Job<ReqData>): Promise<any>;
}
interface BatchWorkHandler<ReqData> {
(job: PgBoss.Job<ReqData>[]): Promise<any>;

@@ -158,6 +140,2 @@ }

interface WorkWithMetadataHandler<ReqData> {
(job: PgBoss.JobWithMetadata<ReqData>): Promise<any>;
}
interface BatchWorkWithMetadataHandler<ReqData> {
(job: PgBoss.JobWithMetadata<ReqData>[]): Promise<any>;

@@ -269,3 +247,3 @@ }

interface StopOptions {
destroy?: boolean,
close?: boolean,
graceful?: boolean,

@@ -335,2 +313,6 @@ timeout?: number,

fetch<T>(name: string): Promise<PgBoss.Job<T>[]>;
fetch<T>(name: string, options: PgBoss.FetchOptions & { includeMetadata: true }): Promise<PgBoss.JobWithMetadata<T>[]>;
fetch<T>(name: string, options: PgBoss.FetchOptions): Promise<PgBoss.Job<T>[]>;
work<ReqData>(name: string, handler: PgBoss.WorkHandler<ReqData>): Promise<string>;

@@ -340,5 +322,2 @@ work<ReqData>(name: string, options: PgBoss.WorkOptions & { includeMetadata: true }, handler: PgBoss.WorkWithMetadataHandler<ReqData>): Promise<string>;

work<ReqData>(name: string, options: PgBoss.BatchWorkOptions & { includeMetadata: true }, handler: PgBoss.BatchWorkWithMetadataHandler<ReqData>): Promise<string>;
work<ReqData>(name: string, options: PgBoss.BatchWorkOptions, handler: PgBoss.BatchWorkHandler<ReqData>): Promise<string>;
offWork(name: string): Promise<void>;

@@ -351,11 +330,6 @@ offWork(options: PgBoss.OffWorkOptions): Promise<void>;

unsubscribe(event: string, name: string): Promise<void>;
publish(event: string): Promise<string[]>;
publish(event: string, data: object): Promise<string[]>;
publish(event: string, data: object, options: PgBoss.SendOptions): Promise<string[]>;
publish(event: string): Promise<void>;
publish(event: string, data: object): Promise<void>;
publish(event: string, data: object, options: PgBoss.SendOptions): Promise<void>;
fetch<T>(name: string): Promise<PgBoss.Job<T> | null>;
fetch<T>(name: string, batchSize: number): Promise<PgBoss.Job<T>[] | null>;
fetch<T>(name: string, batchSize: number, options: PgBoss.FetchOptions & { includeMetadata: true }): Promise<PgBoss.JobWithMetadata<T>[] | null>;
fetch<T>(name: string, batchSize: number, options: PgBoss.FetchOptions): Promise<PgBoss.Job<T>[] | null>;
cancel(name: string, id: string, options?: PgBoss.ConnectionOptions): Promise<void>;

@@ -367,2 +341,5 @@ cancel(name: string, ids: string[], options?: PgBoss.ConnectionOptions): Promise<void>;

deleteJob(name: string, id: string, options?: PgBoss.ConnectionOptions): Promise<void>;
deleteJob(name: string, ids: string[], options?: PgBoss.ConnectionOptions): Promise<void>;
complete(name: string, id: string, options?: PgBoss.ConnectionOptions): Promise<void>;

@@ -376,10 +353,11 @@ complete(name: string, id: string, data: object, options?: PgBoss.ConnectionOptions): Promise<void>;

getQueueSize(name: string, options?: object): Promise<number>;
getJobById(name: string, id: string, options?: PgBoss.ConnectionOptions): Promise<PgBoss.JobWithMetadata | null>;
getJobById(name: string, id: string, options?: PgBoss.ConnectionOptions & { includeArchive: bool }): Promise<PgBoss.JobWithMetadata | null>;
createQueue(name: string, options?: PgBoss.Queue): Promise<void>;
getQueue(name: string): Promise<PgBoss.Queue | null>;
updateQueue(name: string, options?: PgBoss.QueueUpdateOptions): Promise<void>;
updateQueue(name: string, options?: PgBoss.Queue): Promise<void>;
deleteQueue(name: string): Promise<void>;
purgeQueue(name: string): Promise<void>;
getQueues(): Promise<PgBoss.QueueResult[]>;
getQueue(name: string): Promise<PgBoss.QueueResult | null>;
getQueueSize(name: string, options?: { before: 'retry' | 'active' | 'completed' | 'cancelled' | 'failed' }): Promise<number>;

@@ -386,0 +364,0 @@ clearStorage(): Promise<void>;

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc