Comparing version 8.4.2 to 9.0.0
{ | ||
"name": "pg-boss", | ||
"version": "8.4.2", | ||
"version": "9.0.0", | ||
"description": "Queueing jobs in Node.js using PostgreSQL like a boss", | ||
"main": "./src/index.js", | ||
"engines": { | ||
"node": ">=14" | ||
"node": ">=16" | ||
}, | ||
@@ -9,0 +9,0 @@ "dependencies": { |
Queueing jobs in Node.js using PostgreSQL like a boss. | ||
[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-9.5+-blue.svg?maxAge=2592000)](http://www.postgresql.org) | ||
[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-11+-blue.svg?maxAge=2592000)](http://www.postgresql.org) | ||
[![npm version](https://badge.fury.io/js/pg-boss.svg)](https://badge.fury.io/js/pg-boss) | ||
@@ -53,4 +53,4 @@ [![Build Status](https://app.travis-ci.com/timgit/pg-boss.svg?branch=master)](https://app.travis-ci.com/github/timgit/pg-boss) | ||
## Requirements | ||
* Node 14 or higher | ||
* PostgreSQL 9.5 or higher | ||
* Node 16 or higher | ||
* PostgreSQL 11 or higher | ||
@@ -57,0 +57,0 @@ ## Installation |
@@ -132,2 +132,3 @@ const assert = require('assert') | ||
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean') | ||
assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean') | ||
@@ -144,2 +145,3 @@ return { options, callback } | ||
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean') | ||
assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean') | ||
@@ -146,0 +148,0 @@ return { name } |
@@ -143,3 +143,3 @@ const EventEmitter = require('events') | ||
if (!this.stopped) { | ||
await job.done() // pre-complete to bypass throttling | ||
await this.manager.complete(job.id) // pre-complete to bypass throttling | ||
await this.maintenanceAsync({ startAfter: this.maintenanceIntervalSeconds }) | ||
@@ -163,3 +163,3 @@ } | ||
if (!this.stopped && this.monitorStates) { | ||
await job.done() // pre-complete to bypass throttling | ||
await this.manager.complete(job.id) // pre-complete to bypass throttling | ||
await this.monitorStatesAsync({ startAfter: this.monitorIntervalSeconds }) | ||
@@ -166,0 +166,0 @@ } |
@@ -31,4 +31,12 @@ const EventEmitter = require('events') | ||
} | ||
static quotePostgresStr (str) { | ||
const delimeter = '$sanitize$' | ||
if (str.includes(delimeter)) { | ||
throw new Error(`Attempted to quote string that contains reserved Postgres delimeter: ${str}`) | ||
} | ||
return `${delimeter}${str}${delimeter}` | ||
} | ||
} | ||
module.exports = Db |
@@ -9,2 +9,3 @@ const assert = require('assert') | ||
const Worker = require('./worker') | ||
const Db = require('./db') | ||
const pMap = require('p-map') | ||
@@ -188,3 +189,4 @@ | ||
teamRefill: refill = false, | ||
includeMetadata = false | ||
includeMetadata = false, | ||
enforceSingletonQueueActiveLimit = false | ||
} = options | ||
@@ -213,3 +215,3 @@ | ||
const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata }) | ||
const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata, enforceSingletonQueueActiveLimit }) | ||
@@ -226,4 +228,4 @@ const onFetch = async (jobs) => { | ||
// Failing will fail all fetched jobs | ||
await resolveWithinSeconds(Promise.all([callback(jobs)]), maxExpiration) | ||
.then(() => this.complete(jobs.map(job => job.id))) | ||
.catch(err => this.fail(jobs.map(job => job.id), err)) | ||
@@ -481,7 +483,23 @@ } else { | ||
const db = options.db || this.db | ||
const result = await db.executeSql( | ||
this.nextJobCommand(options.includeMetadata || false), | ||
[values.name, batchSize || 1] | ||
) | ||
const preparedStatement = this.nextJobCommand(options.includeMetadata || false, options.enforceSingletonQueueActiveLimit || false) | ||
const statementValues = [values.name, batchSize || 1] | ||
let result | ||
if (options.enforceSingletonQueueActiveLimit && !options.db) { | ||
// Prepare/format now and send multi-statement transaction | ||
const fetchQuery = preparedStatement | ||
.replace('$1', Db.quotePostgresStr(statementValues[0])) | ||
.replace('$2', statementValues[1].toString()) | ||
// eslint-disable-next-line no-unused-vars | ||
const [_begin, _setLocal, fetchResult, _commit] = await db.executeSql([ | ||
'BEGIN', | ||
'SET LOCAL jit = OFF', // JIT can slow things down significantly | ||
fetchQuery, | ||
'COMMIT' | ||
].join(';\n')) | ||
result = fetchResult | ||
} else { | ||
result = await db.executeSql(preparedStatement, statementValues) | ||
} | ||
if (!result || result.rows.length === 0) { | ||
@@ -491,14 +509,3 @@ return null | ||
const jobs = result.rows.map(job => { | ||
job.done = async (error, response) => { | ||
if (error) { | ||
await this.fail(job.id, error) | ||
} else { | ||
await this.complete(job.id, response) | ||
} | ||
} | ||
return job | ||
}) | ||
return jobs.length === 1 && !batchSize ? jobs[0] : jobs | ||
return result.rows.length === 1 && !batchSize ? result.rows[0] : result.rows | ||
} | ||
@@ -505,0 +512,0 @@ |
@@ -354,9 +354,27 @@ const assert = require('assert') | ||
function fetchNextJob (schema) { | ||
return (includeMetadata) => ` | ||
return (includeMetadata, enforceSingletonQueueActiveLimit) => ` | ||
WITH nextJob as ( | ||
SELECT id | ||
FROM ${schema}.job | ||
FROM ${schema}.job j | ||
WHERE state < '${states.active}' | ||
AND name LIKE $1 | ||
AND startAfter < now() | ||
${enforceSingletonQueueActiveLimit | ||
? `AND ( | ||
CASE | ||
WHEN singletonKey IS NOT NULL | ||
AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%' | ||
THEN NOT EXISTS ( | ||
SELECT 1 | ||
FROM ${schema}.job active_job | ||
WHERE active_job.state = '${states.active}' | ||
AND active_job.name = j.name | ||
AND active_job.singletonKey = j.singletonKey | ||
LIMIT 1 | ||
) | ||
ELSE | ||
true | ||
END | ||
)` | ||
: ''} | ||
ORDER BY priority desc, createdOn, id | ||
@@ -363,0 +381,0 @@ LIMIT $2 |
@@ -116,2 +116,3 @@ import { EventEmitter } from 'events' | ||
includeMetadata?: boolean; | ||
enforceSingletonQueueActiveLimit?: boolean; | ||
} | ||
@@ -123,10 +124,11 @@ | ||
includeMetadata?: boolean; | ||
enforceSingletonQueueActiveLimit?: boolean; | ||
} & ConnectionOptions; | ||
interface WorkHandler<ReqData, ResData> { | ||
(job: PgBoss.JobWithDoneCallback<ReqData, ResData>): Promise<ResData> | void; | ||
interface WorkHandler<ReqData> { | ||
(job: PgBoss.Job<ReqData>): void; | ||
} | ||
interface WorkWithMetadataHandler<ReqData, ResData> { | ||
(job: PgBoss.JobWithMetadataDoneCallback<ReqData, ResData>): Promise<ResData> | void; | ||
interface WorkWithMetadataHandler<ReqData> { | ||
(job: PgBoss.JobWithMetadata<ReqData>): void; | ||
} | ||
@@ -147,6 +149,2 @@ | ||
interface JobDoneCallback<T> { | ||
(err?: Error | null, data?: T): void; | ||
} | ||
// source (for now): https://github.com/bendrucker/postgres-interval/blob/master/index.d.ts | ||
@@ -210,10 +208,2 @@ interface PostgresInterval { | ||
interface JobWithDoneCallback<ReqData, ResData> extends Job<ReqData> { | ||
done: JobDoneCallback<ResData>; | ||
} | ||
interface JobWithMetadataDoneCallback<ReqData, ResData> extends JobWithMetadata<ReqData> { | ||
done: JobDoneCallback<ResData>; | ||
} | ||
interface MonitorState { | ||
@@ -315,5 +305,5 @@ all: number; | ||
work<ReqData, ResData>(name: string, handler: PgBoss.WorkHandler<ReqData, ResData>): Promise<string>; | ||
work<ReqData, ResData>(name: string, options: PgBoss.WorkOptions & { includeMetadata: true }, handler: PgBoss.WorkWithMetadataHandler<ReqData, ResData>): Promise<string>; | ||
work<ReqData, ResData>(name: string, options: PgBoss.WorkOptions, handler: PgBoss.WorkHandler<ReqData, ResData>): Promise<string>; | ||
work<ReqData>(name: string, handler: PgBoss.WorkHandler<ReqData>): Promise<string>; | ||
work<ReqData>(name: string, options: PgBoss.WorkOptions & { includeMetadata: true }, handler: PgBoss.WorkWithMetadataHandler<ReqData>): Promise<string>; | ||
work<ReqData>(name: string, options: PgBoss.WorkOptions, handler: PgBoss.WorkHandler<ReqData>): Promise<string>; | ||
@@ -320,0 +310,0 @@ onComplete(name: string, handler: Function): Promise<string>; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
100019
2595