pg-scheduler
Advanced tools
Comparing version 4.0.0-beta5 to 4.0.0-beta6
@@ -75,2 +75,3 @@ /// <reference types="sequelize" /> | ||
private startLocksChecking(); | ||
private delayTaskHandling(task); | ||
private recursiveQueueAddHandler(task?); | ||
@@ -77,0 +78,0 @@ private queueAddedHandler(); |
@@ -244,2 +244,6 @@ "use strict"; | ||
} | ||
delayTaskHandling(task) { | ||
this.noProcessors.push(task); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
} | ||
async recursiveQueueAddHandler(task) { | ||
@@ -252,4 +256,3 @@ if (this.stopping || !task) { | ||
debug(`${process.pid} maxConcurrency (${this.options.maxConcurrency}) limit reached (${workerRunningCount})`); | ||
this.noProcessors.push(task); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
this.delayTaskHandling(task); | ||
return; | ||
@@ -262,4 +265,3 @@ } | ||
debug(`${process.pid} task concurrency limit reached (max: ${task.concurrency})`); | ||
this.noProcessors.push(task); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
this.delayTaskHandling(task); | ||
return; | ||
@@ -271,4 +273,3 @@ } | ||
debug(`${process.pid} no processors for task "${task.name} (${task.id})"`); | ||
this.noProcessors.push(task); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
this.delayTaskHandling(task); | ||
return; | ||
@@ -297,3 +298,20 @@ } | ||
} | ||
const createdLock = await task.createLock({ workerName: this.options.workerName }, { transaction: t }); | ||
let createdLock; | ||
try { | ||
createdLock = await task.createLock({ workerName: this.options.workerName }, { transaction: t }); | ||
} | ||
catch (err) { | ||
if (err instanceof sequelize_1.ForeignKeyConstraintError) { | ||
// remove task from queue and go to next | ||
await t.rollback(); | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
return; | ||
} | ||
else if (err instanceof sequelize_1.DatabaseError && err.message.indexOf('could not serialize access') !== -1) { | ||
// re-queue task and go to next | ||
this.delayTaskHandling(task); | ||
return; | ||
} | ||
throw err; | ||
} | ||
if (processor.isLocked) { | ||
@@ -311,3 +329,2 @@ debug(`${process.pid} processor already locked`); | ||
debug(`${process.pid} lock ${createdLock.id} created for task ${task.name} (${task.id}). start processor`); | ||
await t.commit(); | ||
try { | ||
@@ -322,4 +339,6 @@ processor.start(task, (err) => { | ||
this.recursiveQueueAddHandler(this.queue.shift()); | ||
await t.commit(); | ||
} | ||
catch (err) { | ||
console.error(`${process.pid} recursiveQueueAddHandler: caught error`, err); | ||
await t.rollback(); | ||
@@ -326,0 +345,0 @@ this.errorHandler(err); |
{ | ||
"name": "pg-scheduler", | ||
"version": "4.0.0-beta5", | ||
"version": "4.0.0-beta6", | ||
"scripts": { | ||
@@ -5,0 +5,0 @@ "test": "NODE_ENV=testing mocha test/**/*.ts", |
36590
923