pg-scheduler
Advanced tools
Comparing version 4.0.0-beta1 to 4.0.0-beta10
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const EventEmitter = require("events"); | ||
const sequelize_1 = require("sequelize"); | ||
const debugLog = require("debug"); | ||
@@ -103,9 +104,8 @@ const debug = debugLog('pg-scheduler:models'); | ||
this.emit('touch'); | ||
const foundLocks = await this.getLocks(); | ||
debug(`${process.pid} '.touch()' found ${foundLocks.length} locks for task ${this.name} (${this.id})`); | ||
await Promise.all(foundLocks.map((Lock) => { | ||
Lock.updatedAt = new Date(); | ||
Lock.changed('updatedAt'); | ||
return Lock.save(); | ||
})); | ||
await this.sequelize.query(`UPDATE "Locks" SET "updatedAt" = NOW() WHERE "TaskId" = :task_id`, { | ||
replacements: { | ||
task_id: this.id, | ||
}, | ||
type: sequelize_1.QueryTypes.UPDATE, | ||
}); | ||
}; | ||
@@ -112,0 +112,0 @@ return Task; |
@@ -20,3 +20,3 @@ "use strict"; | ||
const taskName = task.name, processor = this.processors[taskName]; | ||
if (!processor || !processor.processors || !processor.processors.length) { | ||
if (!processor || !processor.active || !processor.processors || !processor.processors.length) { | ||
debug(`no processors for task ${taskName} found`); | ||
@@ -23,0 +23,0 @@ return null; |
@@ -12,2 +12,4 @@ /// <reference types="node" /> | ||
shift(): TaskInstance | undefined; | ||
getTasksIds(): number[]; | ||
readonly length: number; | ||
} |
@@ -42,3 +42,9 @@ "use strict"; | ||
} | ||
getTasksIds() { | ||
return this.queue.map(t => t.id); | ||
} | ||
get length() { | ||
return this.queue.length; | ||
} | ||
} | ||
exports.Queue = Queue; |
/// <reference types="sequelize" /> | ||
/// <reference types="node" /> | ||
/// <reference types="bluebird" /> | ||
import { Options, Sequelize as SequelizeType } from 'sequelize'; | ||
import { Options, Sequelize as SequelizeType, Transaction } from 'sequelize'; | ||
import * as Bluebird from 'bluebird'; | ||
@@ -30,2 +30,3 @@ import { ProcessorsStorage } from './ProcessorsStorage'; | ||
errorHandler: (err: Error) => void; | ||
maxQueueCapacity: number; | ||
} | ||
@@ -40,2 +41,3 @@ export interface TaskOptions { | ||
repeatOnError?: boolean; | ||
transaction?: Transaction; | ||
} | ||
@@ -75,2 +77,3 @@ export declare class Scheduler { | ||
private startLocksChecking(); | ||
private delayTaskHandling(task); | ||
private recursiveQueueAddHandler(task?); | ||
@@ -77,0 +80,0 @@ private queueAddedHandler(); |
@@ -26,2 +26,3 @@ "use strict"; | ||
maxConcurrency: 20, | ||
maxQueueCapacity: 50, | ||
}; | ||
@@ -67,3 +68,3 @@ class Scheduler { | ||
let nextRunAt = RunAt_1.RunAt.calcNextRunAt(interval); | ||
const { startAt, endAt, concurrency, priority, timeout, now } = options; | ||
const { startAt, endAt, concurrency, priority, timeout, now, transaction } = options; | ||
if (now) { | ||
@@ -82,3 +83,3 @@ nextRunAt = new Date(); | ||
timeout, | ||
}); | ||
}, { transaction }); | ||
} | ||
@@ -88,3 +89,3 @@ everyDayAt(runAtTime, taskName, data = {}, options = {}) { | ||
let nextRunAt = RunAt_1.RunAt.calcNextRunAt(null, runAtTime); | ||
const { startAt, endAt, concurrency, priority, timeout, now } = options; | ||
const { startAt, endAt, concurrency, priority, timeout, now, transaction } = options; | ||
if (now) { | ||
@@ -103,3 +104,3 @@ nextRunAt = new Date(); | ||
timeout, | ||
}); | ||
}, { transaction }); | ||
} | ||
@@ -109,3 +110,3 @@ once(date, taskName, data = {}, options = {}) { | ||
let nextRunAt = new Date(date instanceof Date ? date.getTime() : date); | ||
const { startAt, endAt, concurrency, priority, timeout, now, repeatOnError } = options; | ||
const { startAt, endAt, concurrency, priority, timeout, now, repeatOnError, transaction } = options; | ||
if (now) { | ||
@@ -124,3 +125,3 @@ nextRunAt = new Date(); | ||
repeatOnError, | ||
}); | ||
}, { transaction }); | ||
} | ||
@@ -165,2 +166,11 @@ get totalProcessedCount() { | ||
clearTimeout(this.pollingTimeout); | ||
if (this.stopping) { | ||
return; | ||
} | ||
if (this.queue.length + this.noProcessors.length > this.options.maxQueueCapacity) { | ||
debug(`${process.pid} maxQueueCapacity overflow: ${this.queue.length} + ${this.noProcessors.length} > ${this.options.maxQueueCapacity}`); | ||
this.queue.emit('added'); | ||
this.pollingRepeat(); | ||
return; | ||
} | ||
const currDate = new Date(), defaultWhere = { | ||
@@ -171,5 +181,16 @@ nextRunAt: { $lte: currDate }, | ||
name: { $in: Object.keys(this.processorsStorage.processors) }, | ||
id: { | ||
$and: [ | ||
{ | ||
$notIn: this.sequelize.literal('(SELECT "TaskId" FROM "Locks")'), | ||
}, | ||
{ | ||
$notIn: this.queue.getTasksIds(), | ||
}, | ||
], | ||
}, | ||
}, where = lodash_1.defaultsDeep({}, this.options.pollingWhereClause, defaultWhere), findOptions = { | ||
where, | ||
include: [this.models.Lock], | ||
order: [['priority', 'ASC']], | ||
}; | ||
@@ -233,2 +254,6 @@ if (this.options.maxConcurrency) { | ||
} | ||
delayTaskHandling(task) { | ||
this.noProcessors.push(task); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
} | ||
async recursiveQueueAddHandler(task) { | ||
@@ -238,7 +263,7 @@ if (this.stopping || !task) { | ||
} | ||
// FIXME: sometimes `taskRunningCount` has negative value | ||
const taskRunningCount = this.processorsStorage.runningCount(task.name), workerRunningCount = this.processorsStorage.runningCount(); | ||
if (workerRunningCount >= this.options.maxConcurrency) { | ||
debug(`${process.pid} maxConcurrency (${this.options.maxConcurrency}) limit reached (${workerRunningCount})`); | ||
this.noProcessors.push(task); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
this.delayTaskHandling(task); | ||
return; | ||
@@ -251,4 +276,3 @@ } | ||
debug(`${process.pid} task concurrency limit reached (max: ${task.concurrency})`); | ||
this.noProcessors.push(task); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
this.delayTaskHandling(task); | ||
return; | ||
@@ -260,4 +284,3 @@ } | ||
debug(`${process.pid} no processors for task "${task.name} (${task.id})"`); | ||
this.noProcessors.push(task); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
this.delayTaskHandling(task); | ||
return; | ||
@@ -286,3 +309,21 @@ } | ||
} | ||
const createdLock = await task.createLock({ workerName: this.options.workerName }, { transaction: t }); | ||
let createdLock; | ||
try { | ||
createdLock = await task.createLock({ workerName: this.options.workerName }, { transaction: t }); | ||
} | ||
catch (err) { | ||
if (err instanceof sequelize_1.ForeignKeyConstraintError) { | ||
// remove task from queue and go to next | ||
await t.rollback(); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
return; | ||
} | ||
else if (err instanceof sequelize_1.DatabaseError && err.message.indexOf('could not serialize access') !== -1) { | ||
// re-queue task and go to next | ||
await t.rollback(); | ||
this.delayTaskHandling(task); | ||
return; | ||
} | ||
throw err; | ||
} | ||
if (processor.isLocked) { | ||
@@ -300,3 +341,2 @@ debug(`${process.pid} processor already locked`); | ||
debug(`${process.pid} lock ${createdLock.id} created for task ${task.name} (${task.id}). start processor`); | ||
await t.commit(); | ||
try { | ||
@@ -311,4 +351,6 @@ processor.start(task, (err) => { | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
await t.commit(); | ||
} | ||
catch (err) { | ||
console.error(`${process.pid} recursiveQueueAddHandler: caught error`, err); | ||
await t.rollback(); | ||
@@ -331,3 +373,4 @@ this.errorHandler(err); | ||
if (err) { | ||
console.error('processor completes with error', err); | ||
console.error('processor completes with error', err.stack || err); | ||
// TODO: make optional saving error to "TasksErrors" table | ||
task.failsCount++; | ||
@@ -334,0 +377,0 @@ } |
{ | ||
"name": "pg-scheduler", | ||
"version": "4.0.0-beta1", | ||
"version": "4.0.0-beta10", | ||
"scripts": { | ||
@@ -5,0 +5,0 @@ "test": "NODE_ENV=testing mocha test/**/*.ts", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
36714
926
0