Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pg-boss

Package Overview
Dependencies
Maintainers
1
Versions
205
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-boss - npm Package Compare versions

Comparing version 8.4.2 to 9.0.0

4

package.json
{
"name": "pg-boss",
"version": "8.4.2",
"version": "9.0.0",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./src/index.js",
"engines": {
"node": ">=14"
"node": ">=16"
},

@@ -9,0 +9,0 @@ "dependencies": {

Queueing jobs in Node.js using PostgreSQL like a boss.
[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-9.5+-blue.svg?maxAge=2592000)](http://www.postgresql.org)
[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-11+-blue.svg?maxAge=2592000)](http://www.postgresql.org)
[![npm version](https://badge.fury.io/js/pg-boss.svg)](https://badge.fury.io/js/pg-boss)

@@ -53,4 +53,4 @@ [![Build Status](https://app.travis-ci.com/timgit/pg-boss.svg?branch=master)](https://app.travis-ci.com/github/timgit/pg-boss)

## Requirements
* Node 14 or higher
* PostgreSQL 9.5 or higher
* Node 16 or higher
* PostgreSQL 11 or higher

@@ -57,0 +57,0 @@ ## Installation

@@ -132,2 +132,3 @@ const assert = require('assert')

assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean')
assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean')

@@ -144,2 +145,3 @@ return { options, callback }

assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean')
assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean')

@@ -146,0 +148,0 @@ return { name }

@@ -143,3 +143,3 @@ const EventEmitter = require('events')

if (!this.stopped) {
await job.done() // pre-complete to bypass throttling
await this.manager.complete(job.id) // pre-complete to bypass throttling
await this.maintenanceAsync({ startAfter: this.maintenanceIntervalSeconds })

@@ -163,3 +163,3 @@ }

if (!this.stopped && this.monitorStates) {
await job.done() // pre-complete to bypass throttling
await this.manager.complete(job.id) // pre-complete to bypass throttling
await this.monitorStatesAsync({ startAfter: this.monitorIntervalSeconds })

@@ -166,0 +166,0 @@ }

@@ -31,4 +31,12 @@ const EventEmitter = require('events')

}
static quotePostgresStr (str) {
const delimeter = '$sanitize$'
if (str.includes(delimeter)) {
throw new Error(`Attempted to quote string that contains reserved Postgres delimeter: ${str}`)
}
return `${delimeter}${str}${delimeter}`
}
}
module.exports = Db

@@ -9,2 +9,3 @@ const assert = require('assert')

const Worker = require('./worker')
const Db = require('./db')
const pMap = require('p-map')

@@ -188,3 +189,4 @@

teamRefill: refill = false,
includeMetadata = false
includeMetadata = false,
enforceSingletonQueueActiveLimit = false
} = options

@@ -213,3 +215,3 @@

const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata })
const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata, enforceSingletonQueueActiveLimit })

@@ -226,4 +228,4 @@ const onFetch = async (jobs) => {

// Failing will fail all fetched jobs
await resolveWithinSeconds(Promise.all([callback(jobs)]), maxExpiration)
.then(() => this.complete(jobs.map(job => job.id)))
.catch(err => this.fail(jobs.map(job => job.id), err))

@@ -481,7 +483,23 @@ } else {

const db = options.db || this.db
const result = await db.executeSql(
this.nextJobCommand(options.includeMetadata || false),
[values.name, batchSize || 1]
)
const preparedStatement = this.nextJobCommand(options.includeMetadata || false, options.enforceSingletonQueueActiveLimit || false)
const statementValues = [values.name, batchSize || 1]
let result
if (options.enforceSingletonQueueActiveLimit && !options.db) {
// Prepare/format now and send multi-statement transaction
const fetchQuery = preparedStatement
.replace('$1', Db.quotePostgresStr(statementValues[0]))
.replace('$2', statementValues[1].toString())
// eslint-disable-next-line no-unused-vars
const [_begin, _setLocal, fetchResult, _commit] = await db.executeSql([
'BEGIN',
'SET LOCAL jit = OFF', // JIT can slow things down significantly
fetchQuery,
'COMMIT'
].join(';\n'))
result = fetchResult
} else {
result = await db.executeSql(preparedStatement, statementValues)
}
if (!result || result.rows.length === 0) {

@@ -491,14 +509,3 @@ return null

const jobs = result.rows.map(job => {
job.done = async (error, response) => {
if (error) {
await this.fail(job.id, error)
} else {
await this.complete(job.id, response)
}
}
return job
})
return jobs.length === 1 && !batchSize ? jobs[0] : jobs
return result.rows.length === 1 && !batchSize ? result.rows[0] : result.rows
}

@@ -505,0 +512,0 @@

@@ -354,9 +354,27 @@ const assert = require('assert')

function fetchNextJob (schema) {
return (includeMetadata) => `
return (includeMetadata, enforceSingletonQueueActiveLimit) => `
WITH nextJob as (
SELECT id
FROM ${schema}.job
FROM ${schema}.job j
WHERE state < '${states.active}'
AND name LIKE $1
AND startAfter < now()
${enforceSingletonQueueActiveLimit
? `AND (
CASE
WHEN singletonKey IS NOT NULL
AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%'
THEN NOT EXISTS (
SELECT 1
FROM ${schema}.job active_job
WHERE active_job.state = '${states.active}'
AND active_job.name = j.name
AND active_job.singletonKey = j.singletonKey
LIMIT 1
)
ELSE
true
END
)`
: ''}
ORDER BY priority desc, createdOn, id

@@ -363,0 +381,0 @@ LIMIT $2

@@ -116,2 +116,3 @@ import { EventEmitter } from 'events'

includeMetadata?: boolean;
enforceSingletonQueueActiveLimit?: boolean;
}

@@ -123,10 +124,11 @@

includeMetadata?: boolean;
enforceSingletonQueueActiveLimit?: boolean;
} & ConnectionOptions;
interface WorkHandler<ReqData, ResData> {
(job: PgBoss.JobWithDoneCallback<ReqData, ResData>): Promise<ResData> | void;
interface WorkHandler<ReqData> {
(job: PgBoss.Job<ReqData>): void;
}
interface WorkWithMetadataHandler<ReqData, ResData> {
(job: PgBoss.JobWithMetadataDoneCallback<ReqData, ResData>): Promise<ResData> | void;
interface WorkWithMetadataHandler<ReqData> {
(job: PgBoss.JobWithMetadata<ReqData>): void;
}

@@ -147,6 +149,2 @@

interface JobDoneCallback<T> {
(err?: Error | null, data?: T): void;
}
// source (for now): https://github.com/bendrucker/postgres-interval/blob/master/index.d.ts

@@ -210,10 +208,2 @@ interface PostgresInterval {

interface JobWithDoneCallback<ReqData, ResData> extends Job<ReqData> {
done: JobDoneCallback<ResData>;
}
interface JobWithMetadataDoneCallback<ReqData, ResData> extends JobWithMetadata<ReqData> {
done: JobDoneCallback<ResData>;
}
interface MonitorState {

@@ -315,5 +305,5 @@ all: number;

work<ReqData, ResData>(name: string, handler: PgBoss.WorkHandler<ReqData, ResData>): Promise<string>;
work<ReqData, ResData>(name: string, options: PgBoss.WorkOptions & { includeMetadata: true }, handler: PgBoss.WorkWithMetadataHandler<ReqData, ResData>): Promise<string>;
work<ReqData, ResData>(name: string, options: PgBoss.WorkOptions, handler: PgBoss.WorkHandler<ReqData, ResData>): Promise<string>;
work<ReqData>(name: string, handler: PgBoss.WorkHandler<ReqData>): Promise<string>;
work<ReqData>(name: string, options: PgBoss.WorkOptions & { includeMetadata: true }, handler: PgBoss.WorkWithMetadataHandler<ReqData>): Promise<string>;
work<ReqData>(name: string, options: PgBoss.WorkOptions, handler: PgBoss.WorkHandler<ReqData>): Promise<string>;

@@ -320,0 +310,0 @@ onComplete(name: string, handler: Function): Promise<string>;

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc