Comparing version 10.0.0-beta9 to 10.0.0
{ | ||
"name": "pg-boss", | ||
"version": "10.0.0-beta9", | ||
"version": "10.0.0", | ||
"description": "Queueing jobs in Postgres from Node.js like a boss", | ||
@@ -11,4 +11,2 @@ "main": "./src/index.js", | ||
"cron-parser": "^4.0.0", | ||
"lodash.debounce": "^4.0.8", | ||
"p-map": "^4.0.0", | ||
"pg": "^8.5.1", | ||
@@ -15,0 +13,0 @@ "serialize-error": "^8.1.0" |
@@ -22,3 +22,3 @@ Queueing jobs in Postgres from Node.js like a boss. | ||
await boss.work(queue, async job => { | ||
await boss.work(queue, async ([ job ]) => { | ||
console.log(`received job ${job.id} with data ${JSON.stringify(job.data)}`) | ||
@@ -31,15 +31,17 @@ }) | ||
pg-boss relies on [SKIP LOCKED](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/), a feature added to postgres specifically for message queues, in order to resolve record locking challenges inherent with relational databases. This brings the safety of guaranteed atomic commits of a relational database to your asynchronous job processing. | ||
pg-boss relies on [SKIP LOCKED](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/), a feature built specifically for message queues to resolve record locking challenges inherent with relational databases. This provides exactly-once delivery and the safety of guaranteed atomic commits to asynchronous job processing. | ||
This will likely cater the most to teams already familiar with the simplicity of relational database semantics and operations (SQL, querying, and backups). It will be especially useful to those already relying on PostgreSQL that want to limit how many systems are required to monitor and support in their architecture. | ||
## Features | ||
## Summary | ||
* Exactly-once job delivery | ||
* Backpressure-compatible polling workers | ||
* Cron scheduling | ||
* Queue storage policies to support a variety of rate limiting, debouncing, and concurrency use cases | ||
* Priority queues, dead letter queues, job deferral, automatic retries with exponential backoff | ||
* Pub/sub API for fan-out queue relationships | ||
* Priority queues, deferral, retries (with exponential backoff), rate limiting, debouncing | ||
* Table operations via SQL for bulk loads via COPY or INSERT | ||
* Raw SQL support for non-Node.js runtimes via INSERT or COPY | ||
* Serverless function compatible | ||
* Multi-master compatible (for example, in a Kubernetes ReplicaSet) | ||
* Dead letter queues | ||
@@ -52,3 +54,3 @@ ## Requirements | ||
``` bash | ||
```bash | ||
# npm | ||
@@ -55,0 +57,0 @@ npm install pg-boss |
@@ -125,22 +125,21 @@ const assert = require('assert') | ||
applyNewJobCheckInterval(options, defaults) | ||
applyPollingInterval(options, defaults) | ||
assert(!('teamConcurrency' in options) || | ||
(Number.isInteger(options.teamConcurrency) && options.teamConcurrency >= 1 && options.teamConcurrency <= 1000), | ||
'teamConcurrency must be an integer between 1 and 1000') | ||
assert(!('teamSize' in options) || (Number.isInteger(options.teamSize) && options.teamSize >= 1), 'teamSize must be an integer > 0') | ||
assert(!('batchSize' in options) || (Number.isInteger(options.batchSize) && options.batchSize >= 1), 'batchSize must be an integer > 0') | ||
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean') | ||
assert(!('priority' in options) || typeof options.priority === 'boolean', 'priority must be a boolean') | ||
options.batchSize = options.batchSize || 1 | ||
return { options, callback } | ||
} | ||
function checkFetchArgs (name, batchSize, options) { | ||
function checkFetchArgs (name, options) { | ||
assert(name, 'missing queue name') | ||
assert(!batchSize || (Number.isInteger(batchSize) && batchSize >= 1), 'batchSize must be an integer > 0') | ||
assert(!('batchSize' in options) || (Number.isInteger(options.batchSize) && options.batchSize >= 1), 'batchSize must be an integer > 0') | ||
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean') | ||
assert(!('priority' in options) || typeof options.priority === 'boolean', 'priority must be a boolean') | ||
return { name } | ||
options.batchSize = options.batchSize || 1 | ||
} | ||
@@ -167,3 +166,3 @@ | ||
applyNewJobCheckInterval(config) | ||
applyPollingInterval(config) | ||
applyExpirationConfig(config) | ||
@@ -191,4 +190,4 @@ applyRetentionConfig(config) | ||
function assertQueueName (name) { | ||
assert(name, 'Name is required') | ||
assert(typeof name === 'string', 'Name must be a string') | ||
assert(name.length <= 50, 'Name cannot exceed 50 characters') | ||
assert(/[\w-]/.test(name), 'Name can only contain alphanumeric characters, underscores, or hyphens') | ||
@@ -285,14 +284,9 @@ } | ||
function applyNewJobCheckInterval (config, defaults) { | ||
assert(!('newJobCheckInterval' in config) || config.newJobCheckInterval >= 500, | ||
'configuration assert: newJobCheckInterval must be at least every 500ms') | ||
function applyPollingInterval (config, defaults) { | ||
assert(!('pollingIntervalSeconds' in config) || config.pollingIntervalSeconds >= 0.5, | ||
'configuration assert: pollingIntervalSeconds must be at least every 500ms') | ||
assert(!('newJobCheckIntervalSeconds' in config) || config.newJobCheckIntervalSeconds >= 1, | ||
'configuration assert: newJobCheckIntervalSeconds must be at least every second') | ||
config.newJobCheckInterval = ('newJobCheckIntervalSeconds' in config) | ||
? config.newJobCheckIntervalSeconds * 1000 | ||
: ('newJobCheckInterval' in config) | ||
? config.newJobCheckInterval | ||
: defaults?.newJobCheckInterval || 2000 | ||
config.pollingInterval = ('pollingIntervalSeconds' in config) | ||
? config.pollingIntervalSeconds * 1000 | ||
: defaults?.pollingInterval || 2000 | ||
} | ||
@@ -299,0 +293,0 @@ |
@@ -174,32 +174,4 @@ const EventEmitter = require('events') | ||
} | ||
async setMaintenanceTime () { | ||
await this.db.executeSql(this.setMaintenanceTimeCommand) | ||
} | ||
async getMaintenanceTime () { | ||
const { rows } = await this.db.executeSql(this.getMaintenanceTimeCommand) | ||
let { maintained_on: maintainedOn, seconds_ago: secondsAgo } = rows[0] | ||
secondsAgo = secondsAgo !== null ? parseFloat(secondsAgo) : 999_999_999 | ||
return { maintainedOn, secondsAgo } | ||
} | ||
async setMonitorTime () { | ||
await this.db.executeSql(this.setMonitorTimeCommand) | ||
} | ||
async getMonitorTime () { | ||
const { rows } = await this.db.executeSql(this.getMonitorTimeCommand) | ||
let { monitored_on: monitoredOn, seconds_ago: secondsAgo } = rows[0] | ||
secondsAgo = secondsAgo !== null ? parseFloat(secondsAgo) : 999_999_999 | ||
return { monitoredOn, secondsAgo } | ||
} | ||
} | ||
module.exports = Boss |
@@ -89,13 +89,13 @@ const assert = require('assert') | ||
async next (version) { | ||
const commands = migrationStore.next(this.config.schema, version, this.migrations) | ||
await this.db.executeSql(commands) | ||
} | ||
// async next (version) { | ||
// const commands = migrationStore.next(this.config.schema, version, this.migrations) | ||
// await this.db.executeSql(commands) | ||
// } | ||
async rollback (version) { | ||
const commands = migrationStore.rollback(this.config.schema, version, this.migrations) | ||
await this.db.executeSql(commands) | ||
} | ||
// async rollback (version) { | ||
// const commands = migrationStore.rollback(this.config.schema, version, this.migrations) | ||
// await this.db.executeSql(commands) | ||
// } | ||
} | ||
module.exports = Contractor |
@@ -32,11 +32,11 @@ const EventEmitter = require('events') | ||
if (this.opened) { | ||
if (this.config.debug === true) { | ||
console.log(`${new Date().toISOString()}: DEBUG SQL`) | ||
console.log(text) | ||
// if (this.config.debug === true) { | ||
// console.log(`${new Date().toISOString()}: DEBUG SQL`) | ||
// console.log(text) | ||
if (values) { | ||
console.log(`${new Date().toISOString()}: DEBUG VALUES`) | ||
console.log(values) | ||
} | ||
} | ||
// if (values) { | ||
// console.log(`${new Date().toISOString()}: DEBUG VALUES`) | ||
// console.log(values) | ||
// } | ||
// } | ||
@@ -43,0 +43,0 @@ return await this.pool.query(text, values) |
@@ -152,3 +152,3 @@ const EventEmitter = require('events') | ||
let { destroy = false, graceful = true, timeout = 30000, wait = true } = options | ||
let { close = true, graceful = true, timeout = 30000, wait = true } = options | ||
@@ -172,3 +172,3 @@ timeout = Math.max(timeout, 1000) | ||
if (this.#db.isOurs && this.#db.opened && destroy) { | ||
if (this.#db.isOurs && this.#db.opened && close) { | ||
await this.#db.close() | ||
@@ -175,0 +175,0 @@ } |
const assert = require('assert') | ||
const EventEmitter = require('events') | ||
const { randomUUID } = require('crypto') | ||
const debounce = require('lodash.debounce') | ||
const { serializeError: stringify } = require('serialize-error') | ||
const pMap = require('p-map') | ||
const { delay } = require('./tools') | ||
@@ -17,5 +15,2 @@ const Attorney = require('./attorney') | ||
const WIP_EVENT_INTERVAL = 2000 | ||
const WIP_DEBOUNCE_OPTIONS = { leading: true, trailing: true, maxWait: WIP_EVENT_INTERVAL } | ||
const events = { | ||
@@ -49,2 +44,3 @@ error: 'error', | ||
this.events = events | ||
this.wipTs = Date.now() | ||
this.workers = new Map() | ||
@@ -58,2 +54,3 @@ | ||
this.resumeJobsCommand = plans.resumeJobs(config.schema) | ||
this.deleteJobsCommand = plans.deleteJobs(config.schema) | ||
this.failJobsByIdCommand = plans.failJobsById(config.schema) | ||
@@ -64,3 +61,10 @@ this.getJobByIdCommand = plans.getJobById(config.schema) | ||
this.unsubscribeCommand = plans.unsubscribe(config.schema) | ||
this.getQueuesCommand = plans.getQueues(config.schema) | ||
this.getQueueByNameCommand = plans.getQueueByName(config.schema) | ||
this.getQueuesForEventCommand = plans.getQueuesForEvent(config.schema) | ||
this.createQueueCommand = plans.createQueue(config.schema) | ||
this.updateQueueCommand = plans.updateQueue(config.schema) | ||
this.purgeQueueCommand = plans.purgeQueue(config.schema) | ||
this.deleteQueueCommand = plans.deleteQueue(config.schema) | ||
this.clearStorageCommand = plans.clearStorage(config.schema) | ||
@@ -72,2 +76,3 @@ // exported api to index | ||
this.resume, | ||
this.deleteJob, | ||
this.fail, | ||
@@ -88,11 +93,10 @@ this.fetch, | ||
this.updateQueue, | ||
this.getQueue, | ||
this.deleteQueue, | ||
this.purgeQueue, | ||
this.getQueueSize, | ||
this.getQueue, | ||
this.getQueues, | ||
this.clearStorage, | ||
this.getJobById | ||
] | ||
this.emitWipThrottled = debounce(() => this.emit(events.wip, this.getWipData()), WIP_EVENT_INTERVAL, WIP_DEBOUNCE_OPTIONS) | ||
} | ||
@@ -142,3 +146,8 @@ | ||
if (!INTERNAL_QUEUES[name]) { | ||
this.emitWipThrottled() | ||
const now = Date.now() | ||
if (now - this.wipTs > 2000) { | ||
this.emit(events.wip, this.getWipData()) | ||
this.wipTs = now | ||
} | ||
} | ||
@@ -187,7 +196,4 @@ } | ||
const { | ||
newJobCheckInterval: interval = this.config.newJobCheckInterval, | ||
pollingInterval: interval = this.config.pollingInterval, | ||
batchSize, | ||
teamSize = 1, | ||
teamConcurrency = 1, | ||
teamRefill: refill = false, | ||
includeMetadata = false, | ||
@@ -199,24 +205,9 @@ priority = true | ||
let queueSize = 0 | ||
const fetch = () => this.fetch(name, { batchSize, includeMetadata, priority }) | ||
let refillTeamPromise | ||
let resolveRefillTeam | ||
const onFetch = async (jobs) => { | ||
if (!jobs.length) { | ||
return | ||
} | ||
// Setup a promise that onFetch can await for when at least one | ||
// job is finished and so the team is ready to be topped up | ||
const createTeamRefillPromise = () => { | ||
refillTeamPromise = new Promise((resolve) => { resolveRefillTeam = resolve }) | ||
} | ||
createTeamRefillPromise() | ||
const onRefill = () => { | ||
queueSize-- | ||
resolveRefillTeam() | ||
createTeamRefillPromise() | ||
} | ||
const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata, priority }) | ||
const onFetch = async (jobs) => { | ||
if (this.config.__test__throw_worker) { | ||
@@ -228,30 +219,10 @@ throw new Error('__test__throw_worker') | ||
if (batchSize) { | ||
const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expireInSeconds), 0) | ||
const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expireInSeconds), 0) | ||
const jobIds = jobs.map(job => job.id) | ||
await resolveWithinSeconds(Promise.all([callback(jobs)]), maxExpiration) | ||
.then(() => this.complete(name, jobs.map(job => job.id))) | ||
.catch(err => this.fail(name, jobs.map(job => job.id), err)) | ||
} else { | ||
if (refill) { | ||
queueSize += jobs.length || 1 | ||
} | ||
const allTeamPromise = pMap(jobs, job => | ||
resolveWithinSeconds(callback(job), job.expireInSeconds) | ||
.then(result => this.complete(name, job.id, result)) | ||
.catch(err => this.fail(name, job.id, err)) | ||
.then(() => refill ? onRefill() : null) | ||
, { concurrency: teamConcurrency } | ||
).catch(() => {}) // allow promises & non-promises to live together in harmony | ||
if (refill) { | ||
if (queueSize < teamSize) { | ||
return | ||
} else { | ||
await refillTeamPromise | ||
} | ||
} else { | ||
await allTeamPromise | ||
} | ||
try { | ||
const result = await resolveWithinSeconds(callback(jobs), maxExpiration) | ||
this.complete(name, jobIds, jobIds.length === 1 ? result : undefined) | ||
} catch (err) { | ||
this.fail(name, jobIds, err) | ||
} | ||
@@ -332,3 +303,3 @@ | ||
return await Promise.all(rows.map(({ name }) => this.send(name, ...args))) | ||
await Promise.allSettled(rows.map(({ name }) => this.send(name, ...args))) | ||
} | ||
@@ -471,7 +442,6 @@ | ||
async fetch (name, batchSize, options = {}) { | ||
const values = Attorney.checkFetchArgs(name, batchSize, options) | ||
async fetch (name, options = {}) { | ||
Attorney.checkFetchArgs(name, options) | ||
const db = options.db || this.db | ||
const nextJobSql = this.nextJobCommand({ ...options }) | ||
const statementValues = [values.name, batchSize || 1] | ||
@@ -481,3 +451,3 @@ let result | ||
try { | ||
result = await db.executeSql(nextJobSql, statementValues) | ||
result = await db.executeSql(nextJobSql, [name, options.batchSize]) | ||
} catch (err) { | ||
@@ -487,7 +457,3 @@ // errors from fetchquery should only be unique constraint violations | ||
if (!result || result.rows.length === 0) { | ||
return null | ||
} | ||
return result.rows.length === 1 && !batchSize ? result.rows[0] : result.rows | ||
return result?.rows || [] | ||
} | ||
@@ -517,7 +483,7 @@ | ||
mapCompletionResponse (ids, result) { | ||
mapCommandResponse (ids, result) { | ||
return { | ||
jobs: ids, | ||
requested: ids.length, | ||
updated: result && result.rows ? parseInt(result.rows[0].count) : 0 | ||
affected: result && result.rows ? parseInt(result.rows[0].count) : 0 | ||
} | ||
@@ -527,35 +493,43 @@ } | ||
async complete (name, id, data, options = {}) { | ||
assert(name, 'Missing queue name argument') | ||
Attorney.assertQueueName(name) | ||
const db = options.db || this.db | ||
const ids = this.mapCompletionIdArg(id, 'complete') | ||
const result = await db.executeSql(this.completeJobsCommand, [name, ids, this.mapCompletionDataArg(data)]) | ||
return this.mapCompletionResponse(ids, result) | ||
return this.mapCommandResponse(ids, result) | ||
} | ||
async fail (name, id, data, options = {}) { | ||
assert(name, 'Missing queue name argument') | ||
Attorney.assertQueueName(name) | ||
const db = options.db || this.db | ||
const ids = this.mapCompletionIdArg(id, 'fail') | ||
const result = await db.executeSql(this.failJobsByIdCommand, [name, ids, this.mapCompletionDataArg(data)]) | ||
return this.mapCompletionResponse(ids, result) | ||
return this.mapCommandResponse(ids, result) | ||
} | ||
async cancel (name, id, options = {}) { | ||
assert(name, 'Missing queue name argument') | ||
Attorney.assertQueueName(name) | ||
const db = options.db || this.db | ||
const ids = this.mapCompletionIdArg(id, 'cancel') | ||
const result = await db.executeSql(this.cancelJobsCommand, [name, ids]) | ||
return this.mapCompletionResponse(ids, result) | ||
return this.mapCommandResponse(ids, result) | ||
} | ||
async deleteJob (name, id, options = {}) { | ||
Attorney.assertQueueName(name) | ||
const db = options.db || this.db | ||
const ids = this.mapCompletionIdArg(id, 'deleteJob') | ||
const result = await db.executeSql(this.deleteJobsCommand, [name, ids]) | ||
return this.mapCommandResponse(ids, result) | ||
} | ||
async resume (name, id, options = {}) { | ||
assert(name, 'Missing queue name argument') | ||
Attorney.assertQueueName(name) | ||
const db = options.db || this.db | ||
const ids = this.mapCompletionIdArg(id, 'resume') | ||
const result = await db.executeSql(this.resumeJobsCommand, [name, ids]) | ||
return this.mapCompletionResponse(ids, result) | ||
return this.mapCommandResponse(ids, result) | ||
} | ||
async createQueue (name, options = {}) { | ||
assert(name, 'Missing queue name argument') | ||
name = name || options.name | ||
@@ -577,10 +551,8 @@ Attorney.assertQueueName(name) | ||
const paritionSql = plans.createPartition(this.config.schema, name) | ||
if (deadLetter) { | ||
Attorney.assertQueueName(deadLetter) | ||
} | ||
await this.db.executeSql(paritionSql) | ||
const sql = plans.createQueue(this.config.schema, name) | ||
const params = [ | ||
name, | ||
// todo: pull in defaults from constructor config | ||
const data = { | ||
policy, | ||
@@ -593,10 +565,19 @@ retryLimit, | ||
deadLetter | ||
] | ||
} | ||
await this.db.executeSql(sql, params) | ||
await this.db.executeSql(this.createQueueCommand, [name, data]) | ||
} | ||
async getQueues () { | ||
const { rows } = await this.db.executeSql(this.getQueuesCommand) | ||
return rows | ||
} | ||
async updateQueue (name, options = {}) { | ||
assert(name, 'Missing queue name argument') | ||
Attorney.assertQueueName(name) | ||
const { policy = QUEUE_POLICIES.standard } = options | ||
assert(policy in QUEUE_POLICIES, `${policy} is not a valid queue policy`) | ||
const { | ||
@@ -611,6 +592,5 @@ retryLimit, | ||
const sql = plans.updateQueue(this.config.schema) | ||
const params = [ | ||
name, | ||
policy, | ||
retryLimit, | ||
@@ -624,70 +604,38 @@ retryDelay, | ||
await this.db.executeSql(sql, params) | ||
await this.db.executeSql(this.updateQueueCommand, params) | ||
} | ||
async getQueue (name) { | ||
assert(name, 'Missing queue name argument') | ||
Attorney.assertQueueName(name) | ||
const sql = plans.getQueueByName(this.config.schema) | ||
const result = await this.db.executeSql(sql, [name]) | ||
const { rows } = await this.db.executeSql(this.getQueueByNameCommand, [name]) | ||
if (result.rows.length === 0) { | ||
return null | ||
} | ||
const { | ||
policy, | ||
retry_limit: retryLimit, | ||
retry_delay: retryDelay, | ||
retry_backoff: retryBackoff, | ||
expire_seconds: expireInSeconds, | ||
retention_minutes: retentionMinutes, | ||
dead_letter: deadLetter | ||
} = result.rows[0] | ||
return { | ||
name, | ||
policy, | ||
retryLimit, | ||
retryDelay, | ||
retryBackoff, | ||
expireInSeconds, | ||
retentionMinutes, | ||
deadLetter | ||
} | ||
return rows[0] || null | ||
} | ||
async deleteQueue (name) { | ||
assert(name, 'Missing queue name argument') | ||
Attorney.assertQueueName(name) | ||
const queueSql = plans.getQueueByName(this.config.schema) | ||
const { rows } = await this.db.executeSql(queueSql, [name]) | ||
const { rows } = await this.db.executeSql(this.getQueueByNameCommand, [name]) | ||
if (rows.length) { | ||
Attorney.assertQueueName(name) | ||
const sql = plans.dropPartition(this.config.schema, name) | ||
await this.db.executeSql(sql) | ||
if (rows.length === 1) { | ||
await this.db.executeSql(this.deleteQueueCommand, [name]) | ||
} | ||
const sql = plans.deleteQueueRecords(this.config.schema) | ||
await this.db.executeSql(sql, [name]) | ||
} | ||
async purgeQueue (queue) { | ||
assert(queue, 'Missing queue name argument') | ||
const sql = plans.purgeQueue(this.config.schema) | ||
await this.db.executeSql(sql, [queue]) | ||
async purgeQueue (name) { | ||
Attorney.assertQueueName(name) | ||
await this.db.executeSql(this.purgeQueueCommand, [name]) | ||
} | ||
async clearStorage () { | ||
const sql = plans.clearStorage(this.config.schema) | ||
await this.db.executeSql(sql) | ||
await this.db.executeSql(this.clearStorageCommand) | ||
} | ||
async getQueueSize (queue, options) { | ||
assert(queue, 'Missing queue name argument') | ||
async getQueueSize (name, options) { | ||
Attorney.assertQueueName(name) | ||
const sql = plans.getQueueSize(this.config.schema, options) | ||
const result = await this.db.executeSql(sql, [queue]) | ||
const result = await this.db.executeSql(sql, [name]) | ||
@@ -697,17 +645,17 @@ return result ? parseFloat(result.rows[0].count) : null | ||
async getJobById (queue, id, options = {}) { | ||
async getJobById (name, id, options = {}) { | ||
Attorney.assertQueueName(name) | ||
const db = options.db || this.db | ||
const result1 = await db.executeSql(this.getJobByIdCommand, [queue, id]) | ||
if (result1 && result1.rows && result1.rows.length === 1) { | ||
const result1 = await db.executeSql(this.getJobByIdCommand, [name, id]) | ||
if (result1?.rows?.length === 1) { | ||
return result1.rows[0] | ||
} else if (options.includeArchive) { | ||
const result2 = await db.executeSql(this.getArchivedJobByIdCommand, [name, id]) | ||
return result2?.rows[0] || null | ||
} else { | ||
return null | ||
} | ||
const result2 = await db.executeSql(this.getArchivedJobByIdCommand, [queue, id]) | ||
if (result2 && result2.rows && result2.rows.length === 1) { | ||
return result2.rows[0] | ||
} | ||
return null | ||
} | ||
@@ -714,0 +662,0 @@ } |
383
src/plans.js
@@ -31,2 +31,3 @@ const DEFAULT_SCHEMA = 'pgboss' | ||
resumeJobs, | ||
deleteJobs, | ||
failJobsById, | ||
@@ -46,7 +47,6 @@ failJobsByTimeout, | ||
countStates, | ||
updateQueue, | ||
createQueue, | ||
updateQueue, | ||
createPartition, | ||
dropPartition, | ||
deleteQueueRecords, | ||
deleteQueue, | ||
getQueues, | ||
getQueueByName, | ||
@@ -77,9 +77,8 @@ getQueueSize, | ||
createTableVersion(schema), | ||
createTableQueue(schema), | ||
createTableSchedule(schema), | ||
createTableSubscription(schema), | ||
createTableJob(schema), | ||
createIndexJobFetch(schema), | ||
createIndexJobPolicyStately(schema), | ||
createIndexJobPolicyShort(schema), | ||
createIndexJobPolicySingleton(schema), | ||
createIndexJobThrottleOn(schema), | ||
createIndexJobThrottleKey(schema), | ||
@@ -91,11 +90,5 @@ createTableArchive(schema), | ||
createTableVersion(schema), | ||
createTableQueue(schema), | ||
createTableSchedule(schema), | ||
createTableSubscription(schema), | ||
createQueueFunction(schema), | ||
deleteQueueFunction(schema), | ||
getPartitionFunction(schema), | ||
createPartitionFunction(schema), | ||
dropPartitionFunction(schema), | ||
insertVersion(schema, version) | ||
@@ -113,2 +106,17 @@ ] | ||
function createEnumJobState (schema) { | ||
// ENUM definition order is important | ||
// base type is numeric and first values are less than last values | ||
return ` | ||
CREATE TYPE ${schema}.job_state AS ENUM ( | ||
'${JOB_STATES.created}', | ||
'${JOB_STATES.retry}', | ||
'${JOB_STATES.active}', | ||
'${JOB_STATES.completed}', | ||
'${JOB_STATES.cancelled}', | ||
'${JOB_STATES.failed}' | ||
) | ||
` | ||
} | ||
function createTableVersion (schema) { | ||
@@ -125,13 +133,17 @@ return ` | ||
function createEnumJobState (schema) { | ||
// ENUM definition order is important | ||
// base type is numeric and first values are less than last values | ||
function createTableQueue (schema) { | ||
return ` | ||
CREATE TYPE ${schema}.job_state AS ENUM ( | ||
'${JOB_STATES.created}', | ||
'${JOB_STATES.retry}', | ||
'${JOB_STATES.active}', | ||
'${JOB_STATES.completed}', | ||
'${JOB_STATES.cancelled}', | ||
'${JOB_STATES.failed}' | ||
CREATE TABLE ${schema}.queue ( | ||
name text, | ||
policy text, | ||
retry_limit int, | ||
retry_delay int, | ||
retry_backoff bool, | ||
expire_seconds int, | ||
retention_minutes int, | ||
dead_letter text REFERENCES ${schema}.queue (name), | ||
partition_name text, | ||
created_on timestamp with time zone not null default now(), | ||
updated_on timestamp with time zone not null default now(), | ||
PRIMARY KEY (name) | ||
) | ||
@@ -141,2 +153,29 @@ ` | ||
function createTableSchedule (schema) { | ||
return ` | ||
CREATE TABLE ${schema}.schedule ( | ||
name text REFERENCES ${schema}.queue ON DELETE CASCADE, | ||
cron text not null, | ||
timezone text, | ||
data jsonb, | ||
options jsonb, | ||
created_on timestamp with time zone not null default now(), | ||
updated_on timestamp with time zone not null default now(), | ||
PRIMARY KEY (name) | ||
) | ||
` | ||
} | ||
function createTableSubscription (schema) { | ||
return ` | ||
CREATE TABLE ${schema}.subscription ( | ||
event text not null, | ||
name text not null REFERENCES ${schema}.queue ON DELETE CASCADE, | ||
created_on timestamp with time zone not null default now(), | ||
updated_on timestamp with time zone not null default now(), | ||
PRIMARY KEY(event, name) | ||
) | ||
` | ||
} | ||
function createTableJob (schema) { | ||
@@ -164,4 +203,3 @@ return ` | ||
dead_letter text, | ||
policy text, | ||
CONSTRAINT job_pkey PRIMARY KEY (name, id) | ||
policy text | ||
) PARTITION BY LIST (name) | ||
@@ -172,3 +210,6 @@ ` | ||
const baseJobColumns = 'id, name, data, EXTRACT(epoch FROM expire_in) as "expireInSeconds"' | ||
const allJobColumns = `${baseJobColumns}, policy, state, priority, | ||
const allJobColumns = `${baseJobColumns}, | ||
policy, | ||
state, | ||
priority, | ||
retry_limit as "retryLimit", | ||
@@ -190,27 +231,46 @@ retry_count as "retryCount", | ||
function createPartition (schema, name) { | ||
return `SELECT ${schema}.create_partition('${name}');` | ||
} | ||
function getPartitionFunction (schema) { | ||
function createQueueFunction (schema) { | ||
return ` | ||
CREATE FUNCTION ${schema}.get_partition(queue_name text, out name text) AS | ||
$$ | ||
SELECT 'job_' || encode(sha224(queue_name::bytea), 'hex'); | ||
$$ | ||
LANGUAGE SQL | ||
IMMUTABLE | ||
` | ||
} | ||
function createPartitionFunction (schema) { | ||
return ` | ||
CREATE FUNCTION ${schema}.create_partition(queue_name text) | ||
CREATE FUNCTION ${schema}.create_queue(queue_name text, options json) | ||
RETURNS VOID AS | ||
$$ | ||
DECLARE | ||
table_name varchar := ${schema}.get_partition(queue_name); | ||
table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex'); | ||
BEGIN | ||
EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS INCLUDING CONSTRAINTS)', table_name); | ||
EXECUTE format('ALTER TABLE ${schema}.%I ADD CHECK (name=%L)', table_name, queue_name); | ||
INSERT INTO ${schema}.queue ( | ||
name, | ||
policy, | ||
retry_limit, | ||
retry_delay, | ||
retry_backoff, | ||
expire_seconds, | ||
retention_minutes, | ||
dead_letter, | ||
partition_name | ||
) | ||
VALUES ( | ||
queue_name, | ||
options->>'policy', | ||
(options->>'retryLimit')::int, | ||
(options->>'retryDelay')::int, | ||
(options->>'retryBackoff')::bool, | ||
(options->>'expireInSeconds')::int, | ||
(options->>'retentionMinutes')::int, | ||
options->>'deadLetter', | ||
table_name | ||
); | ||
EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name); | ||
EXECUTE format('${formatPartitionCommand(createPrimaryKeyJob(schema))}', table_name); | ||
EXECUTE format('${formatPartitionCommand(createQueueForeignKeyJob(schema))}', table_name); | ||
EXECUTE format('${formatPartitionCommand(createQueueForeignKeyJobDeadLetter(schema))}', table_name); | ||
EXECUTE format('${formatPartitionCommand(createIndexJobPolicyShort(schema))}', table_name); | ||
EXECUTE format('${formatPartitionCommand(createIndexJobPolicySingleton(schema))}', table_name); | ||
EXECUTE format('${formatPartitionCommand(createIndexJobPolicyStately(schema))}', table_name); | ||
EXECUTE format('${formatPartitionCommand(createIndexJobThrottle(schema))}', table_name); | ||
EXECUTE format('${formatPartitionCommand(createIndexJobFetch(schema))}', table_name); | ||
EXECUTE format('ALTER TABLE ${schema}.%I ADD CONSTRAINT cjc CHECK (name=%L)', table_name, queue_name); | ||
EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION ${schema}.%I FOR VALUES IN (%L)', table_name, queue_name); | ||
@@ -223,9 +283,22 @@ END; | ||
function dropPartitionFunction (schema) { | ||
function formatPartitionCommand (command) { | ||
return command.replace('.job', '.%1$I').replace('job_i', '%1$s_i').replaceAll('\'', '\'\'') | ||
} | ||
function deleteQueueFunction (schema) { | ||
return ` | ||
CREATE FUNCTION ${schema}.drop_partition(queue_name text) | ||
CREATE FUNCTION ${schema}.delete_queue(queue_name text) | ||
RETURNS VOID AS | ||
$$ | ||
DECLARE | ||
table_name varchar; | ||
BEGIN | ||
EXECUTE format('DROP TABLE IF EXISTS %I', ${schema}.get_partition(queue_name)); | ||
WITH deleted as ( | ||
DELETE FROM ${schema}.queue | ||
WHERE name = queue_name | ||
RETURNING partition_name | ||
) | ||
SELECT partition_name from deleted INTO table_name; | ||
EXECUTE format('DROP TABLE IF EXISTS ${schema}.%I', table_name); | ||
END; | ||
@@ -237,32 +310,44 @@ $$ | ||
function dropPartition (schema, name) { | ||
return `SELECT ${schema}.drop_partition('${name}');` | ||
function createQueue (schema) { | ||
return `SELECT ${schema}.create_queue($1, $2)` | ||
} | ||
function deleteQueue (schema) { | ||
return `SELECT ${schema}.delete_queue($1)` | ||
} | ||
function createPrimaryKeyJob (schema) { | ||
return `ALTER TABLE ${schema}.job ADD PRIMARY KEY (name, id)` | ||
} | ||
function createQueueForeignKeyJob (schema) { | ||
return `ALTER TABLE ${schema}.job ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED` | ||
} | ||
function createQueueForeignKeyJobDeadLetter (schema) { | ||
return `ALTER TABLE ${schema}.job ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED` | ||
} | ||
function createPrimaryKeyArchive (schema) { | ||
return `ALTER TABLE ${schema}.archive ADD CONSTRAINT archive_pkey PRIMARY KEY (name, id)` | ||
return `ALTER TABLE ${schema}.archive ADD PRIMARY KEY (name, id)` | ||
} | ||
function createIndexJobPolicyShort (schema) { | ||
return `CREATE UNIQUE INDEX job_policy_short ON ${schema}.job (name) WHERE state = '${JOB_STATES.created}' AND policy = '${QUEUE_POLICIES.short}'` | ||
return `CREATE UNIQUE INDEX job_i1 ON ${schema}.job (name, COALESCE(singleton_key, '')) WHERE state = '${JOB_STATES.created}' AND policy = '${QUEUE_POLICIES.short}';` | ||
} | ||
function createIndexJobPolicySingleton (schema) { | ||
return `CREATE UNIQUE INDEX job_policy_singleton ON ${schema}.job (name) WHERE state = '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.singleton}'` | ||
return `CREATE UNIQUE INDEX job_i2 ON ${schema}.job (name, COALESCE(singleton_key, '')) WHERE state = '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.singleton}'` | ||
} | ||
function createIndexJobPolicyStately (schema) { | ||
return `CREATE UNIQUE INDEX job_policy_stately ON ${schema}.job (name, state) WHERE state <= '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.stately}'` | ||
return `CREATE UNIQUE INDEX job_i3 ON ${schema}.job (name, state, COALESCE(singleton_key, '')) WHERE state <= '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.stately}'` | ||
} | ||
function createIndexJobThrottleOn (schema) { | ||
return `CREATE UNIQUE INDEX job_throttle_on ON ${schema}.job (name, singleton_on, COALESCE(singleton_key, '')) WHERE state <= '${JOB_STATES.completed}' AND singleton_on IS NOT NULL` | ||
function createIndexJobThrottle (schema) { | ||
return `CREATE UNIQUE INDEX job_i4 ON ${schema}.job (name, singleton_on, COALESCE(singleton_key, '')) WHERE state <> '${JOB_STATES.cancelled}' AND singleton_on IS NOT NULL` | ||
} | ||
function createIndexJobThrottleKey (schema) { | ||
return `CREATE UNIQUE INDEX job_throttle_key ON ${schema}.job (name, singleton_key) WHERE state <= '${JOB_STATES.completed}' AND singleton_on IS NULL` | ||
} | ||
function createIndexJobFetch (schema) { | ||
return `CREATE INDEX job_fetch ON ${schema}.job (name, start_after) INCLUDE (priority, created_on, id) WHERE state < '${JOB_STATES.active}'` | ||
return `CREATE INDEX job_i5 ON ${schema}.job (name, start_after) INCLUDE (priority, created_on, id) WHERE state < '${JOB_STATES.active}'` | ||
} | ||
@@ -279,3 +364,3 @@ | ||
function createIndexArchiveArchivedOn (schema) { | ||
return `CREATE INDEX archive_archived_on_idx ON ${schema}.archive(archived_on)` | ||
return `CREATE INDEX archive_i1 ON ${schema}.archive(archived_on)` | ||
} | ||
@@ -303,18 +388,13 @@ | ||
function createQueue (schema) { | ||
return ` | ||
INSERT INTO ${schema}.queue (name, policy, retry_limit, retry_delay, retry_backoff, expire_seconds, retention_minutes, dead_letter) | ||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) | ||
` | ||
} | ||
function updateQueue (schema) { | ||
return ` | ||
UPDATE ${schema}.queue SET | ||
retry_limit = COALESCE($2, retry_limit), | ||
retry_delay = COALESCE($3, retry_delay), | ||
retry_backoff = COALESCE($4, retry_backoff), | ||
expire_seconds = COALESCE($5, expire_seconds), | ||
retention_minutes = COALESCE($6, retention_minutes), | ||
dead_letter = COALESCE($7, dead_letter) | ||
policy = COALESCE($2, policy), | ||
retry_limit = COALESCE($3, retry_limit), | ||
retry_delay = COALESCE($4, retry_delay), | ||
retry_backoff = COALESCE($5, retry_backoff), | ||
expire_seconds = COALESCE($6, expire_seconds), | ||
retention_minutes = COALESCE($7, retention_minutes), | ||
dead_letter = COALESCE($8, dead_letter), | ||
updated_on = now() | ||
WHERE name = $1 | ||
@@ -324,14 +404,21 @@ ` | ||
function getQueueByName (schema) { | ||
return `SELECT * FROM ${schema}.queue WHERE name = $1` | ||
function getQueues (schema) { | ||
return ` | ||
SELECT | ||
name, | ||
policy, | ||
retry_limit as "retryLimit", | ||
retry_delay as "retryDelay", | ||
retry_backoff as "retryBackoff", | ||
expire_seconds as "expireInSeconds", | ||
retention_minutes as "retentionMinutes", | ||
dead_letter as "deadLetter", | ||
created_on as "createdOn", | ||
updated_on as "updatedOn" | ||
FROM ${schema}.queue | ||
` | ||
} | ||
function deleteQueueRecords (schema) { | ||
return `WITH dq AS ( | ||
DELETE FROM ${schema}.queue WHERE name = $1 | ||
), ds AS ( | ||
DELETE FROM ${schema}.schedule WHERE name = $1 | ||
) | ||
DELETE FROM ${schema}.job WHERE name = $1 | ||
` | ||
function getQueueByName (schema) { | ||
return `${getQueues(schema)} WHERE name = $1` | ||
} | ||
@@ -353,50 +440,4 @@ | ||
function createTableQueue (schema) { | ||
return ` | ||
CREATE TABLE ${schema}.queue ( | ||
name text primary key, | ||
policy text, | ||
retry_limit int, | ||
retry_delay int, | ||
retry_backoff bool, | ||
expire_seconds int, | ||
retention_minutes int, | ||
dead_letter text, | ||
created_on timestamp with time zone not null default now() | ||
) | ||
` | ||
} | ||
function createTableSchedule (schema) { | ||
return ` | ||
CREATE TABLE ${schema}.schedule ( | ||
name text primary key, | ||
cron text not null, | ||
timezone text, | ||
data jsonb, | ||
options jsonb, | ||
created_on timestamp with time zone not null default now(), | ||
updated_on timestamp with time zone not null default now() | ||
) | ||
` | ||
} | ||
function createTableSubscription (schema) { | ||
return ` | ||
CREATE TABLE ${schema}.subscription ( | ||
event text not null, | ||
name text not null, | ||
created_on timestamp with time zone not null default now(), | ||
updated_on timestamp with time zone not null default now(), | ||
PRIMARY KEY(event, name) | ||
) | ||
` | ||
} | ||
function getSchedules (schema) { | ||
return ` | ||
SELECT s.* | ||
FROM ${schema}.schedule s | ||
JOIN ${schema}.queue q on s.name = q.name | ||
` | ||
return `SELECT * FROM ${schema}.schedule` | ||
} | ||
@@ -583,2 +624,3 @@ | ||
AND id IN (SELECT UNNEST($2::uuid[])) | ||
AND state = '${JOB_STATES.cancelled}' | ||
RETURNING 1 | ||
@@ -590,2 +632,14 @@ ) | ||
function deleteJobs (schema) { | ||
return ` | ||
with results as ( | ||
DELETE FROM ${schema}.job | ||
WHERE name = $1 | ||
AND id IN (SELECT UNNEST($2::uuid[])) | ||
RETURNING 1 | ||
) | ||
SELECT COUNT(*) from results | ||
` | ||
} | ||
function insertJob (schema) { | ||
@@ -662,3 +716,3 @@ return ` | ||
$19::bool as retry_backoff_default | ||
) j LEFT JOIN ${schema}.queue q ON j.name = q.name | ||
) j JOIN ${schema}.queue q ON j.name = q.name | ||
ON CONFLICT DO NOTHING | ||
@@ -698,6 +752,6 @@ RETURNING id | ||
data, | ||
COALESCE(priority, 0), | ||
COALESCE("startAfter", now()), | ||
"singletonKey", | ||
COALESCE("deadLetter", q.dead_letter), | ||
COALESCE(priority, 0) as priority, | ||
j.start_after, | ||
"singletonKey" as singleton_key, | ||
COALESCE("deadLetter", q.dead_letter) as dead_letter, | ||
CASE | ||
@@ -711,3 +765,3 @@ WHEN "expireInSeconds" IS NOT NULL THEN "expireInSeconds" * interval '1s' | ||
WHEN "keepUntil" IS NOT NULL THEN "keepUntil" | ||
ELSE COALESCE("startAfter", now()) + CAST(COALESCE((q.retention_minutes * 60)::text, defaults.keep_until, '14 days') as interval) | ||
ELSE COALESCE(j.start_after, now()) + CAST(COALESCE((q.retention_minutes * 60)::text, defaults.keep_until, '14 days') as interval) | ||
END as keep_until, | ||
@@ -722,17 +776,25 @@ COALESCE("retryLimit", q.retry_limit, defaults.retry_limit, 2), | ||
q.policy | ||
FROM json_to_recordset($1) as j ( | ||
id uuid, | ||
name text, | ||
priority integer, | ||
data jsonb, | ||
"startAfter" timestamp with time zone, | ||
"retryLimit" integer, | ||
"retryDelay" integer, | ||
"retryBackoff" boolean, | ||
"singletonKey" text, | ||
"expireInSeconds" integer, | ||
"keepUntil" timestamp with time zone, | ||
"deadLetter" text | ||
) | ||
LEFT JOIN ${schema}.queue q ON j.name = q.name, | ||
FROM ( | ||
SELECT *, | ||
CASE | ||
WHEN right("startAfter", 1) = 'Z' THEN CAST("startAfter" as timestamp with time zone) | ||
ELSE now() + CAST(COALESCE("startAfter",'0') as interval) | ||
END as start_after | ||
FROM json_to_recordset($1) as x ( | ||
id uuid, | ||
name text, | ||
priority integer, | ||
data jsonb, | ||
"startAfter" text, | ||
"retryLimit" integer, | ||
"retryDelay" integer, | ||
"retryBackoff" boolean, | ||
"singletonKey" text, | ||
"singletonOn" text, | ||
"expireInSeconds" integer, | ||
"keepUntil" timestamp with time zone, | ||
"deadLetter" text | ||
) | ||
) j | ||
JOIN ${schema}.queue q ON j.name = q.name, | ||
defaults | ||
@@ -764,2 +826,3 @@ ON CONFLICT DO NOTHING | ||
FROM archived_rows | ||
ON CONFLICT DO NOTHING | ||
` | ||
@@ -792,3 +855,3 @@ } | ||
function advisoryLock (schema, key) { | ||
return `SELECT pg_advisory_xact_lock( | ||
return `SELECT pg_advisory_xact_lock( | ||
('x' || encode(sha224((current_database() || '.pgboss.${schema}${key || ''}')::bytea), 'hex'))::bit(64)::bigint | ||
@@ -795,0 +858,0 @@ )` |
@@ -5,9 +5,8 @@ const EventEmitter = require('events') | ||
const Attorney = require('./attorney') | ||
const pMap = require('p-map') | ||
const queues = { | ||
const QUEUES = { | ||
SEND_IT: '__pgboss__send-it' | ||
} | ||
const events = { | ||
const EVENTS = { | ||
error: 'error', | ||
@@ -28,3 +27,3 @@ schedule: 'schedule' | ||
this.events = events | ||
this.events = EVENTS | ||
@@ -58,12 +57,11 @@ this.getTimeCommand = plans.getTime(config.schema) | ||
try { | ||
await this.manager.createQueue(queues.SEND_IT) | ||
await this.manager.createQueue(QUEUES.SEND_IT) | ||
} catch {} | ||
const options = { | ||
newJobCheckIntervalSeconds: this.config.cronWorkerIntervalSeconds, | ||
teamSize: 50, | ||
teamConcurrency: 5 | ||
pollingIntervalSeconds: this.config.cronWorkerIntervalSeconds, | ||
batchSize: 50 | ||
} | ||
await this.manager.work(queues.SEND_IT, options, (job) => this.onSendIt(job)) | ||
await this.manager.work(QUEUES.SEND_IT, options, (jobs) => this.manager.insert(jobs.map(i => i.data))) | ||
@@ -83,3 +81,3 @@ setImmediate(() => this.onCron()) | ||
await this.manager.offWork(queues.SEND_IT) | ||
await this.manager.offWork(QUEUES.SEND_IT) | ||
@@ -148,8 +146,11 @@ if (this.skewMonitorInterval) { | ||
async cron () { | ||
const items = await this.getSchedules() | ||
const schedules = await this.getSchedules() | ||
const sending = items.filter(i => this.shouldSendIt(i.cron, i.timezone)) | ||
const scheduled = schedules | ||
.filter(i => this.shouldSendIt(i.cron, i.timezone)) | ||
.map(({ name, data, options }) => | ||
({ name: QUEUES.SEND_IT, data: { name, data, options }, options: { singletonKey: name, singletonSeconds: 60 } })) | ||
if (sending.length && !this.stopped) { | ||
await pMap(sending, it => this.send(it), { concurrency: 5 }) | ||
if (scheduled.length > 0 && !this.stopped) { | ||
await this.manager.insert(scheduled) | ||
} | ||
@@ -170,12 +171,2 @@ } | ||
async send (job) { | ||
await this.manager.send(queues.SEND_IT, job, { singletonKey: job.name, singletonSeconds: 60 }) | ||
} | ||
async onSendIt (job) { | ||
if (this.stopped) return | ||
const { name, data, options } = job.data | ||
await this.manager.send(name, data, options) | ||
} | ||
async getSchedules () { | ||
@@ -191,15 +182,15 @@ const { rows } = await this.db.executeSql(this.getSchedulesCommand) | ||
// validation pre-check | ||
Attorney.checkSendArgs([name, data, options], this.config) | ||
// make sure queue exists before scheduling | ||
const queue = await this.db.executeSql(this.getQueueCommand, [name]) | ||
const values = [name, cron, tz, data, options] | ||
if (!queue.rows.length === 0) { | ||
throw new Error(`Queue '${name}' not found`) | ||
try { | ||
await this.db.executeSql(this.scheduleCommand, values) | ||
} catch (err) { | ||
if (err.message.includes('foreign key')) { | ||
err.message = `Queue ${name} not found` | ||
} | ||
throw err | ||
} | ||
const values = [name, cron, tz, data, options] | ||
await this.db.executeSql(this.scheduleCommand, values) | ||
} | ||
@@ -213,2 +204,2 @@ | ||
module.exports = Timekeeper | ||
module.exports.QUEUES = queues | ||
module.exports.QUEUES = QUEUES |
@@ -116,39 +116,21 @@ import { EventEmitter } from 'events' | ||
type QueuePolicy = 'standard' | 'short' | 'singleton' | 'stately' | ||
type Queue = ExpirationOptions & RetentionOptions & RetryOptions & { policy: QueuePolicy } | ||
type QueueUpdateOptions = ExpirationOptions & RetentionOptions & RetryOptions | ||
type Queue = RetryOptions & ExpirationOptions & RetentionOptions & { name: string, policy?: QueuePolicy, deadLetter?: string } | ||
type QueueResult = Queue & { createdOn: Date, updatedOn: Date } | ||
type ScheduleOptions = SendOptions & { tz?: string } | ||
interface JobPollingOptions { | ||
newJobCheckInterval?: number; | ||
newJobCheckIntervalSeconds?: number; | ||
pollingIntervalSeconds?: number; | ||
} | ||
interface CommonJobFetchOptions { | ||
interface JobFetchOptions { | ||
includeMetadata?: boolean; | ||
priority?: boolean; | ||
batchSize?: number; | ||
} | ||
type JobFetchOptions = CommonJobFetchOptions & { | ||
teamSize?: number; | ||
teamConcurrency?: number; | ||
teamRefill?: boolean; | ||
} | ||
type BatchJobFetchOptions = CommonJobFetchOptions & { | ||
batchSize: number; | ||
} | ||
type WorkOptions = JobFetchOptions & JobPollingOptions | ||
type BatchWorkOptions = BatchJobFetchOptions & JobPollingOptions | ||
type FetchOptions = JobFetchOptions & ConnectionOptions; | ||
type FetchOptions = { | ||
includeMetadata?: boolean; | ||
} & ConnectionOptions; | ||
interface WorkHandler<ReqData> { | ||
(job: PgBoss.Job<ReqData>): Promise<any>; | ||
} | ||
interface BatchWorkHandler<ReqData> { | ||
(job: PgBoss.Job<ReqData>[]): Promise<any>; | ||
@@ -158,6 +140,2 @@ } | ||
interface WorkWithMetadataHandler<ReqData> { | ||
(job: PgBoss.JobWithMetadata<ReqData>): Promise<any>; | ||
} | ||
interface BatchWorkWithMetadataHandler<ReqData> { | ||
(job: PgBoss.JobWithMetadata<ReqData>[]): Promise<any>; | ||
@@ -269,3 +247,3 @@ } | ||
interface StopOptions { | ||
destroy?: boolean, | ||
close?: boolean, | ||
graceful?: boolean, | ||
@@ -335,2 +313,6 @@ timeout?: number, | ||
fetch<T>(name: string): Promise<PgBoss.Job<T>[]>; | ||
fetch<T>(name: string, options: PgBoss.FetchOptions & { includeMetadata: true }): Promise<PgBoss.JobWithMetadata<T>[]>; | ||
fetch<T>(name: string, options: PgBoss.FetchOptions): Promise<PgBoss.Job<T>[]>; | ||
work<ReqData>(name: string, handler: PgBoss.WorkHandler<ReqData>): Promise<string>; | ||
@@ -340,5 +322,2 @@ work<ReqData>(name: string, options: PgBoss.WorkOptions & { includeMetadata: true }, handler: PgBoss.WorkWithMetadataHandler<ReqData>): Promise<string>; | ||
work<ReqData>(name: string, options: PgBoss.BatchWorkOptions & { includeMetadata: true }, handler: PgBoss.BatchWorkWithMetadataHandler<ReqData>): Promise<string>; | ||
work<ReqData>(name: string, options: PgBoss.BatchWorkOptions, handler: PgBoss.BatchWorkHandler<ReqData>): Promise<string>; | ||
offWork(name: string): Promise<void>; | ||
@@ -351,11 +330,6 @@ offWork(options: PgBoss.OffWorkOptions): Promise<void>; | ||
unsubscribe(event: string, name: string): Promise<void>; | ||
publish(event: string): Promise<string[]>; | ||
publish(event: string, data: object): Promise<string[]>; | ||
publish(event: string, data: object, options: PgBoss.SendOptions): Promise<string[]>; | ||
publish(event: string): Promise<void>; | ||
publish(event: string, data: object): Promise<void>; | ||
publish(event: string, data: object, options: PgBoss.SendOptions): Promise<void>; | ||
fetch<T>(name: string): Promise<PgBoss.Job<T> | null>; | ||
fetch<T>(name: string, batchSize: number): Promise<PgBoss.Job<T>[] | null>; | ||
fetch<T>(name: string, batchSize: number, options: PgBoss.FetchOptions & { includeMetadata: true }): Promise<PgBoss.JobWithMetadata<T>[] | null>; | ||
fetch<T>(name: string, batchSize: number, options: PgBoss.FetchOptions): Promise<PgBoss.Job<T>[] | null>; | ||
cancel(name: string, id: string, options?: PgBoss.ConnectionOptions): Promise<void>; | ||
@@ -367,2 +341,5 @@ cancel(name: string, ids: string[], options?: PgBoss.ConnectionOptions): Promise<void>; | ||
deleteJob(name: string, id: string, options?: PgBoss.ConnectionOptions): Promise<void>; | ||
deleteJob(name: string, ids: string[], options?: PgBoss.ConnectionOptions): Promise<void>; | ||
complete(name: string, id: string, options?: PgBoss.ConnectionOptions): Promise<void>; | ||
@@ -376,10 +353,11 @@ complete(name: string, id: string, data: object, options?: PgBoss.ConnectionOptions): Promise<void>; | ||
getQueueSize(name: string, options?: object): Promise<number>; | ||
getJobById(name: string, id: string, options?: PgBoss.ConnectionOptions): Promise<PgBoss.JobWithMetadata | null>; | ||
getJobById(name: string, id: string, options?: PgBoss.ConnectionOptions & { includeArchive: bool }): Promise<PgBoss.JobWithMetadata | null>; | ||
createQueue(name: string, options?: PgBoss.Queue): Promise<void>; | ||
getQueue(name: string): Promise<PgBoss.Queue | null>; | ||
updateQueue(name: string, options?: PgBoss.QueueUpdateOptions): Promise<void>; | ||
updateQueue(name: string, options?: PgBoss.Queue): Promise<void>; | ||
deleteQueue(name: string): Promise<void>; | ||
purgeQueue(name: string): Promise<void>; | ||
getQueues(): Promise<PgBoss.QueueResult[]>; | ||
getQueue(name: string): Promise<PgBoss.QueueResult | null>; | ||
getQueueSize(name: string, options?: { before: 'retry' | 'active' | 'completed' | 'cancelled' | 'failed' }): Promise<number>; | ||
@@ -386,0 +364,0 @@ clearStorage(): Promise<void>; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
3
0
77
95683
2594
- Removedlodash.debounce@^4.0.8
- Removedp-map@^4.0.0
- Removedaggregate-error@3.1.0(transitive)
- Removedclean-stack@2.2.0(transitive)
- Removedindent-string@4.0.0(transitive)
- Removedlodash.debounce@4.0.8(transitive)
- Removedp-map@4.0.0(transitive)