Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pg-scheduler

Package Overview
Dependencies
Maintainers
1
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-scheduler - npm Package Compare versions

Comparing version 4.0.0-beta1 to 4.0.0-beta10

14

dst/Models.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const EventEmitter = require("events");
const sequelize_1 = require("sequelize");
const debugLog = require("debug");

@@ -103,9 +104,8 @@ const debug = debugLog('pg-scheduler:models');

this.emit('touch');
const foundLocks = await this.getLocks();
debug(`${process.pid} '.touch()' found ${foundLocks.length} locks for task ${this.name} (${this.id})`);
await Promise.all(foundLocks.map((Lock) => {
Lock.updatedAt = new Date();
Lock.changed('updatedAt');
return Lock.save();
}));
await this.sequelize.query(`UPDATE "Locks" SET "updatedAt" = NOW() WHERE "TaskId" = :task_id`, {
replacements: {
task_id: this.id,
},
type: sequelize_1.QueryTypes.UPDATE,
});
};

@@ -112,0 +112,0 @@ return Task;

@@ -20,3 +20,3 @@ "use strict";

const taskName = task.name, processor = this.processors[taskName];
if (!processor || !processor.processors || !processor.processors.length) {
if (!processor || !processor.active || !processor.processors || !processor.processors.length) {
debug(`no processors for task ${taskName} found`);

@@ -23,0 +23,0 @@ return null;

@@ -12,2 +12,4 @@ /// <reference types="node" />

shift(): TaskInstance | undefined;
getTasksIds(): number[];
readonly length: number;
}

@@ -42,3 +42,9 @@ "use strict";

}
getTasksIds() {
return this.queue.map(t => t.id);
}
get length() {
return this.queue.length;
}
}
exports.Queue = Queue;
/// <reference types="sequelize" />
/// <reference types="node" />
/// <reference types="bluebird" />
import { Options, Sequelize as SequelizeType } from 'sequelize';
import { Options, Sequelize as SequelizeType, Transaction } from 'sequelize';
import * as Bluebird from 'bluebird';

@@ -30,2 +30,3 @@ import { ProcessorsStorage } from './ProcessorsStorage';

errorHandler: (err: Error) => void;
maxQueueCapacity: number;
}

@@ -40,2 +41,3 @@ export interface TaskOptions {

repeatOnError?: boolean;
transaction?: Transaction;
}

@@ -75,2 +77,3 @@ export declare class Scheduler {

private startLocksChecking();
private delayTaskHandling(task);
private recursiveQueueAddHandler(task?);

@@ -77,0 +80,0 @@ private queueAddedHandler();

@@ -26,2 +26,3 @@ "use strict";

maxConcurrency: 20,
maxQueueCapacity: 50,
};

@@ -67,3 +68,3 @@ class Scheduler {

let nextRunAt = RunAt_1.RunAt.calcNextRunAt(interval);
const { startAt, endAt, concurrency, priority, timeout, now } = options;
const { startAt, endAt, concurrency, priority, timeout, now, transaction } = options;
if (now) {

@@ -82,3 +83,3 @@ nextRunAt = new Date();

timeout,
});
}, { transaction });
}

@@ -88,3 +89,3 @@ everyDayAt(runAtTime, taskName, data = {}, options = {}) {

let nextRunAt = RunAt_1.RunAt.calcNextRunAt(null, runAtTime);
const { startAt, endAt, concurrency, priority, timeout, now } = options;
const { startAt, endAt, concurrency, priority, timeout, now, transaction } = options;
if (now) {

@@ -103,3 +104,3 @@ nextRunAt = new Date();

timeout,
});
}, { transaction });
}

@@ -109,3 +110,3 @@ once(date, taskName, data = {}, options = {}) {

let nextRunAt = new Date(date instanceof Date ? date.getTime() : date);
const { startAt, endAt, concurrency, priority, timeout, now, repeatOnError } = options;
const { startAt, endAt, concurrency, priority, timeout, now, repeatOnError, transaction } = options;
if (now) {

@@ -124,3 +125,3 @@ nextRunAt = new Date();

repeatOnError,
});
}, { transaction });
}

@@ -165,2 +166,11 @@ get totalProcessedCount() {

clearTimeout(this.pollingTimeout);
if (this.stopping) {
return;
}
if (this.queue.length + this.noProcessors.length > this.options.maxQueueCapacity) {
debug(`${process.pid} maxQueueCapacity overflow: ${this.queue.length} + ${this.noProcessors.length} > ${this.options.maxQueueCapacity}`);
this.queue.emit('added');
this.pollingRepeat();
return;
}
const currDate = new Date(), defaultWhere = {

@@ -171,5 +181,16 @@ nextRunAt: { $lte: currDate },

name: { $in: Object.keys(this.processorsStorage.processors) },
id: {
$and: [
{
$notIn: this.sequelize.literal('(SELECT "TaskId" FROM "Locks")'),
},
{
$notIn: this.queue.getTasksIds(),
},
],
},
}, where = lodash_1.defaultsDeep({}, this.options.pollingWhereClause, defaultWhere), findOptions = {
where,
include: [this.models.Lock],
order: [['priority', 'ASC']],
};

@@ -233,2 +254,6 @@ if (this.options.maxConcurrency) {

}
delayTaskHandling(task) {
this.noProcessors.push(task);
this.recursiveQueueAddHandler(this.queue.shift());
}
async recursiveQueueAddHandler(task) {

@@ -238,7 +263,7 @@ if (this.stopping || !task) {

}
// FIXME: sometimes `taskRunningCount` has negative value
const taskRunningCount = this.processorsStorage.runningCount(task.name), workerRunningCount = this.processorsStorage.runningCount();
if (workerRunningCount >= this.options.maxConcurrency) {
debug(`${process.pid} maxConcurrency (${this.options.maxConcurrency}) limit reached (${workerRunningCount})`);
this.noProcessors.push(task);
this.recursiveQueueAddHandler(this.queue.shift());
this.delayTaskHandling(task);
return;

@@ -251,4 +276,3 @@ }

debug(`${process.pid} task concurrency limit reached (max: ${task.concurrency})`);
this.noProcessors.push(task);
this.recursiveQueueAddHandler(this.queue.shift());
this.delayTaskHandling(task);
return;

@@ -260,4 +284,3 @@ }

debug(`${process.pid} no processors for task "${task.name} (${task.id})"`);
this.noProcessors.push(task);
this.recursiveQueueAddHandler(this.queue.shift());
this.delayTaskHandling(task);
return;

@@ -286,3 +309,21 @@ }

}
const createdLock = await task.createLock({ workerName: this.options.workerName }, { transaction: t });
let createdLock;
try {
createdLock = await task.createLock({ workerName: this.options.workerName }, { transaction: t });
}
catch (err) {
if (err instanceof sequelize_1.ForeignKeyConstraintError) {
// remove task from queue and go to next
await t.rollback();
this.recursiveQueueAddHandler(this.queue.shift());
return;
}
else if (err instanceof sequelize_1.DatabaseError && err.message.indexOf('could not serialize access') !== -1) {
// re-queue task and go to next
await t.rollback();
this.delayTaskHandling(task);
return;
}
throw err;
}
if (processor.isLocked) {

@@ -300,3 +341,2 @@ debug(`${process.pid} processor already locked`);

debug(`${process.pid} lock ${createdLock.id} created for task ${task.name} (${task.id}). start processor`);
await t.commit();
try {

@@ -311,4 +351,6 @@ processor.start(task, (err) => {

this.recursiveQueueAddHandler(this.queue.shift());
await t.commit();
}
catch (err) {
console.error(`${process.pid} recursiveQueueAddHandler: caught error`, err);
await t.rollback();

@@ -331,3 +373,4 @@ this.errorHandler(err);

if (err) {
console.error('processor completes with error', err);
console.error('processor completes with error', err.stack || err);
// TODO: make optional saving error to "TasksErrors" table
task.failsCount++;

@@ -334,0 +377,0 @@ }

{
"name": "pg-scheduler",
"version": "4.0.0-beta1",
"version": "4.0.0-beta10",
"scripts": {

@@ -5,0 +5,0 @@ "test": "NODE_ENV=testing mocha test/**/*.ts",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc