Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pg-boss

Package Overview
Dependencies
Maintainers
0
Versions
205
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-boss - npm Package Compare versions

Comparing version 10.0.0-beta2 to 10.0.0-beta3

2

package.json
{
"name": "pg-boss",
"version": "10.0.0-beta2",
"version": "10.0.0-beta3",
"description": "Queueing jobs in Postgres from Node.js like a boss",

@@ -5,0 +5,0 @@ "main": "./src/index.js",

@@ -11,3 +11,4 @@ const assert = require('assert')

warnClockSkew,
assertPostgresObjectName
assertPostgresObjectName,
assertQueueName
}

@@ -33,4 +34,2 @@

function checkQueueArgs (name, options = {}) {
assertPostgresObjectName(name)
assert(!('deadLetter' in options) || (typeof options.deadLetter === 'string'), 'deadLetter must be a string')

@@ -183,6 +182,12 @@

assert(name.length <= 50, 'Name cannot exceed 50 characters')
assert(!/\W/.test(name), 'Name can only contain alphanumeric characters and underscores')
assert(!/^d/.test(name), 'Name cannot start with a number')
assert(!/\W/.test(name), 'Name can only contain alphanumeric characters or underscores')
assert(!/^\d/.test(name), 'Name cannot start with a number')
}
function assertQueueName (name) {
assert(typeof name === 'string', 'Name must be a string')
assert(name.length <= 50, 'Name cannot exceed 50 characters')
assert(/[\w-]/.test(name), 'Name can only contain alphanumeric characters, underscores, or hyphens')
}
function applyArchiveConfig (config) {

@@ -189,0 +194,0 @@ const ARCHIVE_DEFAULT = 60 * 60 * 12

@@ -24,2 +24,8 @@ const assert = require('assert')

this.migrations = this.config.migrations || migrationStore.getAll(this.config.schema)
// exported api to index
this.functions = [
this.version,
this.isInstalled
]
}

@@ -26,0 +32,0 @@

@@ -14,2 +14,6 @@ const EventEmitter = require('events')

events = {
error: 'error'
}
async open () {

@@ -16,0 +20,0 @@ this.pool = new pg.Pool(this.config)

@@ -16,2 +16,11 @@ const EventEmitter = require('events')

class PgBoss extends EventEmitter {
#stoppingOn
#stopped
#config
#db
#boss
#contractor
#manager
#timekeeper
static getConstructionPlans (schema) {

@@ -30,52 +39,58 @@ return Contractor.constructionPlans(schema)

constructor (value) {
super()
this.#stoppingOn = null
this.#stopped = true
const config = Attorney.getConfig(value)
this.#config = config
super()
const db = this.getDb()
this.#db = db
const db = getDb(config)
if (db.isOurs) {
promoteEvent.call(this, db, 'error')
this.#promoteEvents(db)
}
const contractor = new Contractor(db, config)
const manager = new Manager(db, config)
Object.keys(manager.events).forEach(event => promoteEvent.call(this, manager, manager.events[event]))
manager.functions.forEach(func => promoteFunction.call(this, manager, func))
const bossConfig = { ...config, manager }
const boss = new Boss(db, bossConfig)
Object.keys(boss.events).forEach(event => promoteEvent.call(this, boss, boss.events[event]))
boss.functions.forEach(func => promoteFunction.call(this, boss, func))
const timekeeper = new Timekeeper(db, bossConfig)
Object.keys(timekeeper.events).forEach(event => promoteEvent.call(this, timekeeper, timekeeper.events[event]))
timekeeper.functions.forEach(func => promoteFunction.call(this, timekeeper, func))
manager.timekeeper = timekeeper
this.stoppingOn = null
this.stopped = true
this.config = config
this.db = db
this.boss = boss
this.contractor = new Contractor(db, config)
this.manager = manager
this.timekeeper = timekeeper
this.#promoteEvents(manager)
this.#promoteEvents(boss)
this.#promoteEvents(timekeeper)
function getDb (config) {
if (config.db) {
return config.db
}
this.#promoteFunctions(boss)
this.#promoteFunctions(contractor)
this.#promoteFunctions(manager)
this.#promoteFunctions(timekeeper)
const db = new Db(config)
db.isOurs = true
return db
this.#boss = boss
this.#contractor = contractor
this.#manager = manager
this.#timekeeper = timekeeper
}
getDb () {
if (this.#db) {
return this.#db
}
function promoteFunction (obj, func) {
this[func.name] = (...args) => func.apply(obj, args)
if (this.#config.db) {
return this.#config.db
}
function promoteEvent (emitter, event) {
const db = new Db(this.#config)
db.isOurs = true
return db
}
#promoteEvents (emitter) {
for (const event of Object.values(emitter?.events)) {
emitter.on(event, arg => this.emit(event, arg))

@@ -85,2 +100,8 @@ }

#promoteFunctions (obj) {
for (const func of obj?.functions) {
this[func.name] = (...args) => func.apply(obj, args)
}
}
async start () {

@@ -93,24 +114,24 @@ if (this.starting || this.started) {

if (this.db.isOurs && !this.db.opened) {
await this.db.open()
if (this.#db.isOurs && !this.#db.opened) {
await this.#db.open()
}
if (this.config.migrate) {
await this.contractor.start()
if (this.#config.migrate) {
await this.#contractor.start()
} else {
await this.contractor.check()
await this.#contractor.check()
}
this.manager.start()
this.#manager.start()
if (this.config.supervise) {
await this.boss.supervise()
if (this.#config.supervise) {
await this.#boss.supervise()
}
if (this.config.monitorStateIntervalSeconds) {
await this.boss.monitor()
if (this.#config.monitorStateIntervalSeconds) {
await this.#boss.monitor()
}
if (this.config.schedule) {
await this.timekeeper.start()
if (this.#config.schedule) {
await this.#timekeeper.start()
}

@@ -120,3 +141,3 @@

this.started = true
this.stopped = false
this.#stopped = false

@@ -127,3 +148,3 @@ return this

async stop (options = {}) {
if (this.stoppingOn || this.stopped) {
if (this.#stoppingOn || this.#stopped) {
return

@@ -136,7 +157,7 @@ }

this.stoppingOn = Date.now()
this.#stoppingOn = Date.now()
await this.manager.stop()
await this.timekeeper.stop()
await this.boss.stop()
await this.#manager.stop()
await this.#timekeeper.stop()
await this.#boss.stop()

@@ -146,14 +167,14 @@ await new Promise((resolve, reject) => {

try {
if (this.config.__test__throw_shutdown) {
throw new Error(this.config.__test__throw_shutdown)
if (this.#config.__test__throw_shutdown) {
throw new Error(this.#config.__test__throw_shutdown)
}
await this.manager.failWip()
await this.#manager.failWip()
if (this.db.isOurs && this.db.opened && destroy) {
await this.db.close()
if (this.#db.isOurs && this.#db.opened && destroy) {
await this.#db.close()
}
this.stopped = true
this.stoppingOn = null
this.#stopped = true
this.#stoppingOn = null
this.started = false

@@ -179,9 +200,9 @@

try {
if (this.config.__test__throw_stop_monitor) {
throw new Error(this.config.__test__throw_stop_monitor)
if (this.#config.__test__throw_stop_monitor) {
throw new Error(this.#config.__test__throw_stop_monitor)
}
const isWip = () => this.manager.getWipData({ includeInternal: false }).length > 0
const isWip = () => this.#manager.getWipData({ includeInternal: false }).length > 0
while ((Date.now() - this.stoppingOn) < timeout && isWip()) {
while ((Date.now() - this.#stoppingOn) < timeout && isWip()) {
await delay(500)

@@ -188,0 +209,0 @@ }

@@ -550,2 +550,4 @@ const assert = require('assert')

Attorney.assertQueueName(name)
const { policy = QUEUE_POLICY.standard } = options

@@ -564,3 +566,3 @@

const paritionSql = plans.partitionCreateJobName(this.config.schema, name)
const paritionSql = plans.createPartition(this.config.schema, name)

@@ -651,4 +653,4 @@ await this.db.executeSql(paritionSql)

if (result?.rows?.length) {
Attorney.assertPostgresObjectName(name)
const sql = plans.dropJobTablePartition(this.config.schema, name)
Attorney.assertQueueName(name)
const sql = plans.dropPartition(this.config.schema, name)
await this.db.executeSql(sql)

@@ -655,0 +657,0 @@ }

@@ -49,4 +49,4 @@ const assert = require('assert')

updateQueue,
partitionCreateJobName,
dropJobTablePartition,
createPartition,
dropPartition,
deleteQueueRecords,

@@ -100,2 +100,6 @@ getQueueByName,

getPartitionFunction(schema),
createPartitionFunction(schema),
dropPartitionFunction(schema),
insertVersion(schema, version)

@@ -167,14 +171,51 @@ ]

function partitionCreateJobName (schema, name) {
function createPartition (schema, name) {
return `SELECT ${schema}.create_partition('${name}');`
}
function getPartitionFunction (schema) {
return `
CREATE TABLE ${schema}.job_${name} (LIKE ${schema}.job INCLUDING DEFAULTS INCLUDING CONSTRAINTS);
ALTER TABLE ${schema}.job_${name} ADD CONSTRAINT job_check_${name} CHECK (name='${name}');
ALTER TABLE ${schema}.job ATTACH PARTITION ${schema}.job_${name} FOR VALUES IN ('${name}');
CREATE FUNCTION ${schema}.get_partition(queue_name text, out name text) AS
$$
SELECT '${schema}.j' || encode(digest(queue_name, 'sha1'), 'hex');
$$
LANGUAGE SQL
IMMUTABLE
`
}
function dropJobTablePartition (schema, name) {
return `DROP TABLE IF EXISTS ${schema}.job_${name}`
function createPartitionFunction (schema) {
return `
CREATE FUNCTION ${schema}.create_partition(queue_name text)
RETURNS VOID AS
$$
DECLARE
table_name varchar := ${schema}.get_partition(queue_name);
BEGIN
EXECUTE format('CREATE TABLE %I (LIKE ${schema}.job INCLUDING DEFAULTS INCLUDING CONSTRAINTS)', table_name);
EXECUTE format('ALTER TABLE %I ADD CHECK (name=%L)', table_name, queue_name);
EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION %I FOR VALUES IN (%L)', table_name, queue_name);
END;
$$
LANGUAGE plpgsql;
`
}
function dropPartitionFunction (schema) {
return `
CREATE FUNCTION ${schema}.drop_partition(queue_name text)
RETURNS VOID AS
$$
BEGIN
EXECUTE format('DROP TABLE IF EXISTS %I', ${schema}.get_partition(queue_name));
END;
$$
LANGUAGE plpgsql;
`
}
function dropPartition (schema, name) {
return `SELECT ${schema}.drop_partition('${name}');`
}
function createPrimaryKeyArchive (schema) {

@@ -729,4 +770,4 @@ return `ALTER TABLE ${schema}.archive ADD CONSTRAINT archive_pkey PRIMARY KEY (name, id)`

function advisoryLock (schema, key) {
return `SELECT pg_advisory_xact_lock(
('x' || md5(current_database() || '.pgboss.${schema}${key || ''}'))::bit(64)::bigint
return `SELECT pg_advisory_xact_lock(
('x' || encode(digest(current_database() || '.pgboss.${schema}${key || ''}', 'sha256'), 'hex'))::bit(64)::bigint
)`

@@ -733,0 +774,0 @@ }

@@ -9,3 +9,3 @@ const EventEmitter = require('events')

CRON: '__pgboss__cron',
SEND_IT: '__pgboss__send_it'
SEND_IT: '__pgboss__send-it'
}

@@ -12,0 +12,0 @@

@@ -351,3 +351,3 @@ import { EventEmitter } from 'events'

createQueue(name: string, policy: 'standard' | 'short' | 'singleton' | 'stately'): Promise<void>;
createQueue(name: string, options?: { policy: 'standard' | 'short' | 'singleton' | 'stately' }): Promise<void>;
deleteQueue(name: string): Promise<void>;

@@ -354,0 +354,0 @@ purgeQueue(name: string): Promise<void>;

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc