Comparing version 10.0.0-beta2 to 10.0.0-beta3
{ | ||
"name": "pg-boss", | ||
"version": "10.0.0-beta2", | ||
"version": "10.0.0-beta3", | ||
"description": "Queueing jobs in Postgres from Node.js like a boss", | ||
@@ -5,0 +5,0 @@ "main": "./src/index.js", |
@@ -11,3 +11,4 @@ const assert = require('assert') | ||
warnClockSkew, | ||
assertPostgresObjectName | ||
assertPostgresObjectName, | ||
assertQueueName | ||
} | ||
@@ -33,4 +34,2 @@ | ||
function checkQueueArgs (name, options = {}) { | ||
assertPostgresObjectName(name) | ||
assert(!('deadLetter' in options) || (typeof options.deadLetter === 'string'), 'deadLetter must be a string') | ||
@@ -183,6 +182,12 @@ | ||
assert(name.length <= 50, 'Name cannot exceed 50 characters') | ||
assert(!/\W/.test(name), 'Name can only contain alphanumeric characters and underscores') | ||
assert(!/^d/.test(name), 'Name cannot start with a number') | ||
assert(!/\W/.test(name), 'Name can only contain alphanumeric characters or underscores') | ||
assert(!/^\d/.test(name), 'Name cannot start with a number') | ||
} | ||
function assertQueueName (name) { | ||
assert(typeof name === 'string', 'Name must be a string') | ||
assert(name.length <= 50, 'Name cannot exceed 50 characters') | ||
assert(/[\w-]/.test(name), 'Name can only contain alphanumeric characters, underscores, or hyphens') | ||
} | ||
function applyArchiveConfig (config) { | ||
@@ -189,0 +194,0 @@ const ARCHIVE_DEFAULT = 60 * 60 * 12 |
@@ -24,2 +24,8 @@ const assert = require('assert') | ||
this.migrations = this.config.migrations || migrationStore.getAll(this.config.schema) | ||
// exported api to index | ||
this.functions = [ | ||
this.version, | ||
this.isInstalled | ||
] | ||
} | ||
@@ -26,0 +32,0 @@ |
@@ -14,2 +14,6 @@ const EventEmitter = require('events') | ||
events = { | ||
error: 'error' | ||
} | ||
async open () { | ||
@@ -16,0 +20,0 @@ this.pool = new pg.Pool(this.config) |
139
src/index.js
@@ -16,2 +16,11 @@ const EventEmitter = require('events') | ||
class PgBoss extends EventEmitter { | ||
#stoppingOn | ||
#stopped | ||
#config | ||
#db | ||
#boss | ||
#contractor | ||
#manager | ||
#timekeeper | ||
static getConstructionPlans (schema) { | ||
@@ -30,52 +39,58 @@ return Contractor.constructionPlans(schema) | ||
constructor (value) { | ||
super() | ||
this.#stoppingOn = null | ||
this.#stopped = true | ||
const config = Attorney.getConfig(value) | ||
this.#config = config | ||
super() | ||
const db = this.getDb() | ||
this.#db = db | ||
const db = getDb(config) | ||
if (db.isOurs) { | ||
promoteEvent.call(this, db, 'error') | ||
this.#promoteEvents(db) | ||
} | ||
const contractor = new Contractor(db, config) | ||
const manager = new Manager(db, config) | ||
Object.keys(manager.events).forEach(event => promoteEvent.call(this, manager, manager.events[event])) | ||
manager.functions.forEach(func => promoteFunction.call(this, manager, func)) | ||
const bossConfig = { ...config, manager } | ||
const boss = new Boss(db, bossConfig) | ||
Object.keys(boss.events).forEach(event => promoteEvent.call(this, boss, boss.events[event])) | ||
boss.functions.forEach(func => promoteFunction.call(this, boss, func)) | ||
const timekeeper = new Timekeeper(db, bossConfig) | ||
Object.keys(timekeeper.events).forEach(event => promoteEvent.call(this, timekeeper, timekeeper.events[event])) | ||
timekeeper.functions.forEach(func => promoteFunction.call(this, timekeeper, func)) | ||
manager.timekeeper = timekeeper | ||
this.stoppingOn = null | ||
this.stopped = true | ||
this.config = config | ||
this.db = db | ||
this.boss = boss | ||
this.contractor = new Contractor(db, config) | ||
this.manager = manager | ||
this.timekeeper = timekeeper | ||
this.#promoteEvents(manager) | ||
this.#promoteEvents(boss) | ||
this.#promoteEvents(timekeeper) | ||
function getDb (config) { | ||
if (config.db) { | ||
return config.db | ||
} | ||
this.#promoteFunctions(boss) | ||
this.#promoteFunctions(contractor) | ||
this.#promoteFunctions(manager) | ||
this.#promoteFunctions(timekeeper) | ||
const db = new Db(config) | ||
db.isOurs = true | ||
return db | ||
this.#boss = boss | ||
this.#contractor = contractor | ||
this.#manager = manager | ||
this.#timekeeper = timekeeper | ||
} | ||
getDb () { | ||
if (this.#db) { | ||
return this.#db | ||
} | ||
function promoteFunction (obj, func) { | ||
this[func.name] = (...args) => func.apply(obj, args) | ||
if (this.#config.db) { | ||
return this.#config.db | ||
} | ||
function promoteEvent (emitter, event) { | ||
const db = new Db(this.#config) | ||
db.isOurs = true | ||
return db | ||
} | ||
#promoteEvents (emitter) { | ||
for (const event of Object.values(emitter?.events)) { | ||
emitter.on(event, arg => this.emit(event, arg)) | ||
@@ -85,2 +100,8 @@ } | ||
#promoteFunctions (obj) { | ||
for (const func of obj?.functions) { | ||
this[func.name] = (...args) => func.apply(obj, args) | ||
} | ||
} | ||
async start () { | ||
@@ -93,24 +114,24 @@ if (this.starting || this.started) { | ||
if (this.db.isOurs && !this.db.opened) { | ||
await this.db.open() | ||
if (this.#db.isOurs && !this.#db.opened) { | ||
await this.#db.open() | ||
} | ||
if (this.config.migrate) { | ||
await this.contractor.start() | ||
if (this.#config.migrate) { | ||
await this.#contractor.start() | ||
} else { | ||
await this.contractor.check() | ||
await this.#contractor.check() | ||
} | ||
this.manager.start() | ||
this.#manager.start() | ||
if (this.config.supervise) { | ||
await this.boss.supervise() | ||
if (this.#config.supervise) { | ||
await this.#boss.supervise() | ||
} | ||
if (this.config.monitorStateIntervalSeconds) { | ||
await this.boss.monitor() | ||
if (this.#config.monitorStateIntervalSeconds) { | ||
await this.#boss.monitor() | ||
} | ||
if (this.config.schedule) { | ||
await this.timekeeper.start() | ||
if (this.#config.schedule) { | ||
await this.#timekeeper.start() | ||
} | ||
@@ -120,3 +141,3 @@ | ||
this.started = true | ||
this.stopped = false | ||
this.#stopped = false | ||
@@ -127,3 +148,3 @@ return this | ||
async stop (options = {}) { | ||
if (this.stoppingOn || this.stopped) { | ||
if (this.#stoppingOn || this.#stopped) { | ||
return | ||
@@ -136,7 +157,7 @@ } | ||
this.stoppingOn = Date.now() | ||
this.#stoppingOn = Date.now() | ||
await this.manager.stop() | ||
await this.timekeeper.stop() | ||
await this.boss.stop() | ||
await this.#manager.stop() | ||
await this.#timekeeper.stop() | ||
await this.#boss.stop() | ||
@@ -146,14 +167,14 @@ await new Promise((resolve, reject) => { | ||
try { | ||
if (this.config.__test__throw_shutdown) { | ||
throw new Error(this.config.__test__throw_shutdown) | ||
if (this.#config.__test__throw_shutdown) { | ||
throw new Error(this.#config.__test__throw_shutdown) | ||
} | ||
await this.manager.failWip() | ||
await this.#manager.failWip() | ||
if (this.db.isOurs && this.db.opened && destroy) { | ||
await this.db.close() | ||
if (this.#db.isOurs && this.#db.opened && destroy) { | ||
await this.#db.close() | ||
} | ||
this.stopped = true | ||
this.stoppingOn = null | ||
this.#stopped = true | ||
this.#stoppingOn = null | ||
this.started = false | ||
@@ -179,9 +200,9 @@ | ||
try { | ||
if (this.config.__test__throw_stop_monitor) { | ||
throw new Error(this.config.__test__throw_stop_monitor) | ||
if (this.#config.__test__throw_stop_monitor) { | ||
throw new Error(this.#config.__test__throw_stop_monitor) | ||
} | ||
const isWip = () => this.manager.getWipData({ includeInternal: false }).length > 0 | ||
const isWip = () => this.#manager.getWipData({ includeInternal: false }).length > 0 | ||
while ((Date.now() - this.stoppingOn) < timeout && isWip()) { | ||
while ((Date.now() - this.#stoppingOn) < timeout && isWip()) { | ||
await delay(500) | ||
@@ -188,0 +209,0 @@ } |
@@ -550,2 +550,4 @@ const assert = require('assert') | ||
Attorney.assertQueueName(name) | ||
const { policy = QUEUE_POLICY.standard } = options | ||
@@ -564,3 +566,3 @@ | ||
const paritionSql = plans.partitionCreateJobName(this.config.schema, name) | ||
const paritionSql = plans.createPartition(this.config.schema, name) | ||
@@ -651,4 +653,4 @@ await this.db.executeSql(paritionSql) | ||
if (result?.rows?.length) { | ||
Attorney.assertPostgresObjectName(name) | ||
const sql = plans.dropJobTablePartition(this.config.schema, name) | ||
Attorney.assertQueueName(name) | ||
const sql = plans.dropPartition(this.config.schema, name) | ||
await this.db.executeSql(sql) | ||
@@ -655,0 +657,0 @@ } |
@@ -49,4 +49,4 @@ const assert = require('assert') | ||
updateQueue, | ||
partitionCreateJobName, | ||
dropJobTablePartition, | ||
createPartition, | ||
dropPartition, | ||
deleteQueueRecords, | ||
@@ -100,2 +100,6 @@ getQueueByName, | ||
getPartitionFunction(schema), | ||
createPartitionFunction(schema), | ||
dropPartitionFunction(schema), | ||
insertVersion(schema, version) | ||
@@ -167,14 +171,51 @@ ] | ||
function partitionCreateJobName (schema, name) { | ||
function createPartition (schema, name) { | ||
return `SELECT ${schema}.create_partition('${name}');` | ||
} | ||
function getPartitionFunction (schema) { | ||
return ` | ||
CREATE TABLE ${schema}.job_${name} (LIKE ${schema}.job INCLUDING DEFAULTS INCLUDING CONSTRAINTS); | ||
ALTER TABLE ${schema}.job_${name} ADD CONSTRAINT job_check_${name} CHECK (name='${name}'); | ||
ALTER TABLE ${schema}.job ATTACH PARTITION ${schema}.job_${name} FOR VALUES IN ('${name}'); | ||
CREATE FUNCTION ${schema}.get_partition(queue_name text, out name text) AS | ||
$$ | ||
SELECT '${schema}.j' || encode(digest(queue_name, 'sha1'), 'hex'); | ||
$$ | ||
LANGUAGE SQL | ||
IMMUTABLE | ||
` | ||
} | ||
function dropJobTablePartition (schema, name) { | ||
return `DROP TABLE IF EXISTS ${schema}.job_${name}` | ||
function createPartitionFunction (schema) { | ||
return ` | ||
CREATE FUNCTION ${schema}.create_partition(queue_name text) | ||
RETURNS VOID AS | ||
$$ | ||
DECLARE | ||
table_name varchar := ${schema}.get_partition(queue_name); | ||
BEGIN | ||
EXECUTE format('CREATE TABLE %I (LIKE ${schema}.job INCLUDING DEFAULTS INCLUDING CONSTRAINTS)', table_name); | ||
EXECUTE format('ALTER TABLE %I ADD CHECK (name=%L)', table_name, queue_name); | ||
EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION %I FOR VALUES IN (%L)', table_name, queue_name); | ||
END; | ||
$$ | ||
LANGUAGE plpgsql; | ||
` | ||
} | ||
function dropPartitionFunction (schema) { | ||
return ` | ||
CREATE FUNCTION ${schema}.drop_partition(queue_name text) | ||
RETURNS VOID AS | ||
$$ | ||
BEGIN | ||
EXECUTE format('DROP TABLE IF EXISTS %I', ${schema}.get_partition(queue_name)); | ||
END; | ||
$$ | ||
LANGUAGE plpgsql; | ||
` | ||
} | ||
function dropPartition (schema, name) { | ||
return `SELECT ${schema}.drop_partition('${name}');` | ||
} | ||
function createPrimaryKeyArchive (schema) { | ||
@@ -729,4 +770,4 @@ return `ALTER TABLE ${schema}.archive ADD CONSTRAINT archive_pkey PRIMARY KEY (name, id)` | ||
function advisoryLock (schema, key) { | ||
return `SELECT pg_advisory_xact_lock( | ||
('x' || md5(current_database() || '.pgboss.${schema}${key || ''}'))::bit(64)::bigint | ||
return `SELECT pg_advisory_xact_lock( | ||
('x' || encode(digest(current_database() || '.pgboss.${schema}${key || ''}', 'sha256'), 'hex'))::bit(64)::bigint | ||
)` | ||
@@ -733,0 +774,0 @@ } |
@@ -9,3 +9,3 @@ const EventEmitter = require('events') | ||
CRON: '__pgboss__cron', | ||
SEND_IT: '__pgboss__send_it' | ||
SEND_IT: '__pgboss__send-it' | ||
} | ||
@@ -12,0 +12,0 @@ |
@@ -351,3 +351,3 @@ import { EventEmitter } from 'events' | ||
createQueue(name: string, policy: 'standard' | 'short' | 'singleton' | 'stately'): Promise<void>; | ||
createQueue(name: string, options?: { policy: 'standard' | 'short' | 'singleton' | 'stately' }): Promise<void>; | ||
deleteQueue(name: string): Promise<void>; | ||
@@ -354,0 +354,0 @@ purgeQueue(name: string): Promise<void>; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
98464
2664