Socket
Socket
Sign inDemoInstall

@tngtech/momo-scheduler

Package Overview
Dependencies
227
Maintainers
6
Versions
16
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.0.0-beta.1 to 2.0.0-beta.2

1

CHANGELOG.md

@@ -13,2 +13,3 @@ # Change Log

- Breaking: Schedules need a name now
- Breaking: Removed `startJob`, `stopJob`, `cancelJob`, and `removeJob` from the schedule

@@ -15,0 +16,0 @@ ## v1.1.1 (2023-01-11)

2

dist/Connection.d.ts

@@ -18,3 +18,3 @@ import { SchedulesRepository } from './repository/SchedulesRepository';

private constructor();
static create({ url, collectionsPrefix }: MomoConnectionOptions, pingIntervalMs: number, scheduleId: string): Promise<Connection>;
static create({ url, collectionsPrefix }: MomoConnectionOptions, pingIntervalMs: number, scheduleId: string, scheduleName: string): Promise<Connection>;
getSchedulesRepository(): SchedulesRepository;

@@ -21,0 +21,0 @@ getJobRepository(): JobRepository;

@@ -14,7 +14,7 @@ "use strict";

}
static create({ url, collectionsPrefix }, pingIntervalMs, scheduleId) {
static create({ url, collectionsPrefix }, pingIntervalMs, scheduleId, scheduleName) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
const mongoClient = new mongodb_1.MongoClient(url);
yield mongoClient.connect();
const schedulesRepository = new SchedulesRepository_1.SchedulesRepository(mongoClient, 2 * pingIntervalMs, scheduleId, collectionsPrefix);
const schedulesRepository = new SchedulesRepository_1.SchedulesRepository(mongoClient, 2 * pingIntervalMs, scheduleId, scheduleName, collectionsPrefix);
yield schedulesRepository.createIndex();

@@ -21,0 +21,0 @@ const jobRepository = new JobRepository_1.JobRepository(mongoClient, collectionsPrefix);

@@ -15,4 +15,4 @@ import { JobResult } from '../job/ExecutionInfo';

stop(): void;
execute(jobEntity: JobEntity, parameters?: JobParameters, force?: boolean): Promise<JobResult>;
execute(jobEntity: JobEntity, parameters?: JobParameters): Promise<JobResult>;
private executeHandler;
}

@@ -19,15 +19,13 @@ "use strict";

}
execute(jobEntity, parameters, force = false) {
execute(jobEntity, parameters) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
if (!force) {
const { added, running } = yield this.schedulesRepository.addExecution(jobEntity.name, jobEntity.maxRunning);
if (!added) {
this.logger.debug('maxRunning reached, skip', {
name: jobEntity.name,
running,
});
return {
status: ExecutionInfo_1.ExecutionStatus.maxRunningReached,
};
}
const { added, running } = yield this.schedulesRepository.addExecution(jobEntity.name, jobEntity.maxRunning);
if (!added) {
this.logger.debug('maxRunning reached, skip', {
name: jobEntity.name,
running,
});
return {
status: ExecutionInfo_1.ExecutionStatus.maxRunningReached,
};
}

@@ -47,3 +45,3 @@ const { started, result } = yield this.executeHandler(jobEntity, parameters);

});
if (!force && !this.stopped) {
if (!this.stopped) {
yield this.schedulesRepository.removeExecution(jobEntity.name);

@@ -50,0 +48,0 @@ }

@@ -9,4 +9,5 @@ import { MongoClient } from 'mongodb';

private readonly scheduleId;
private readonly name;
private logger;
constructor(mongoClient: MongoClient, deadScheduleThreshold: number, scheduleId: string, collectionPrefix?: string);
constructor(mongoClient: MongoClient, deadScheduleThreshold: number, scheduleId: string, name: string, collectionPrefix?: string);
setLogger(logger: Logger): void;

@@ -13,0 +14,0 @@ /**

@@ -10,6 +10,7 @@ "use strict";

class SchedulesRepository extends Repository_1.Repository {
constructor(mongoClient, deadScheduleThreshold, scheduleId, collectionPrefix) {
constructor(mongoClient, deadScheduleThreshold, scheduleId, name, collectionPrefix) {
super(mongoClient, exports.SCHEDULES_COLLECTION_NAME, collectionPrefix);
this.deadScheduleThreshold = deadScheduleThreshold;
this.scheduleId = scheduleId;
this.name = name;
}

@@ -79,9 +80,13 @@ setLogger(logger) {

addExecution(name, maxRunning) {
var _a, _b, _c, _d;
return tslib_1.__awaiter(this, void 0, void 0, function* () {
const running = yield this.countRunningExecutions(name);
if (maxRunning > 0 && running >= maxRunning) {
return { added: false, running };
if (maxRunning < 1) {
const schedule = yield this.collection.findOneAndUpdate({ name: this.name }, { $inc: { [`executions.${name}`]: 1 } }, { returnDocument: 'after' });
return { added: schedule.value !== null, running: (_b = (_a = schedule.value) === null || _a === void 0 ? void 0 : _a.executions[name]) !== null && _b !== void 0 ? _b : 0 };
}
yield this.updateOne({ scheduleId: this.scheduleId }, { $inc: { [`executions.${name}`]: 1 } });
return { added: true, running };
const schedule = yield this.collection.findOneAndUpdate({
name: this.name,
$or: [{ [`executions.${name}`]: { $lt: maxRunning } }, { [`executions.${name}`]: { $exists: false } }],
}, { $inc: { [`executions.${name}`]: 1 } }, { returnDocument: 'after' });
return { added: schedule.value !== null, running: (_d = (_c = schedule.value) === null || _c === void 0 ? void 0 : _c.executions[name]) !== null && _d !== void 0 ? _d : maxRunning };
});

@@ -91,3 +96,3 @@ }

return tslib_1.__awaiter(this, void 0, void 0, function* () {
yield this.updateOne({ scheduleId: this.scheduleId }, { $inc: { [`executions.${name}`]: -1 } });
yield this.updateOne({ name: this.name }, { $inc: { [`executions.${name}`]: -1 } });
});

@@ -94,0 +99,0 @@ }

@@ -31,3 +31,3 @@ "use strict";

const scheduleId = (0, uuid_1.v4)();
const connection = yield Connection_1.Connection.create(connectionOptions, pingIntervalMs, scheduleId);
const connection = yield Connection_1.Connection.create(connectionOptions, pingIntervalMs, scheduleId, scheduleName);
return new MongoSchedule(scheduleId, connection, pingIntervalMs, scheduleName);

@@ -34,0 +34,0 @@ });

@@ -56,10 +56,2 @@ import { ExecutionInfo, JobResult } from '../job/ExecutionInfo';

/**
* Stops a scheduled job without removing it from neither the schedule nor the database.
* Does nothing if no job with the given name exists.
* Jobs can be started again using start().
*
* @param name the job to stop
*/
stopJob(name: string): Promise<void>;
/**
* Stops all scheduled jobs without removing them from neither the schedule nor the database.

@@ -70,9 +62,2 @@ * Jobs can be started again using start().

/**
* Stops a scheduled job and removes it from the schedule (but not from the database).
* Does nothing if no job with the given name exists.
*
* @param name the job to cancel
*/
cancelJob(name: string): Promise<void>;
/**
* Stops all scheduled jobs and removes them from the schedule (but not from the database).

@@ -82,11 +67,2 @@ */

/**
* Stops a scheduled job and removes it from the schedule and the database.
* Does nothing if no job with the given name exists.
*
* @param name the job to remove
*
* @throws if the database throws
*/
removeJob(name: string): Promise<void>;
/**
* Stops all scheduled jobs and removes them from the schedule and the database.

@@ -93,0 +69,0 @@ *

@@ -54,2 +54,3 @@ "use strict";

define(momoJob) {
var _a;
return tslib_1.__awaiter(this, void 0, void 0, function* () {

@@ -63,3 +64,8 @@ const jobResult = (0, Job_1.tryToJob)(momoJob);

const job = jobResult.value;
yield this.stopJob(job.name);
try {
yield ((_a = this.jobSchedulers[job.name]) === null || _a === void 0 ? void 0 : _a.stop());
}
catch (error) {
this.logger.error('message failed to stop job', MomoErrorType_1.MomoErrorType.stopJob, { name: job.name }, error);
}
yield this.jobRepository.define(job);

@@ -96,21 +102,2 @@ this.jobSchedulers[job.name] = JobScheduler_1.JobScheduler.forJob(this.scheduleId, job, this.logger, this.schedulesRepository, this.jobRepository);

/**
* Stops a scheduled job without removing it from neither the schedule nor the database.
* Does nothing if no job with the given name exists.
* Jobs can be started again using start().
*
* @param name the job to stop
*/
stopJob(name) {
var _a;
return tslib_1.__awaiter(this, void 0, void 0, function* () {
this.logger.debug('stop', { name });
try {
yield ((_a = this.jobSchedulers[name]) === null || _a === void 0 ? void 0 : _a.stop());
}
catch (error) {
this.logger.error('message failed to stop job', MomoErrorType_1.MomoErrorType.stopJob, { name }, error);
}
});
}
/**
* Stops all scheduled jobs without removing them from neither the schedule nor the database.

@@ -131,15 +118,2 @@ * Jobs can be started again using start().

/**
* Stops a scheduled job and removes it from the schedule (but not from the database).
* Does nothing if no job with the given name exists.
*
* @param name the job to cancel
*/
cancelJob(name) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
yield this.stopJob(name);
this.logger.debug('cancel', { name });
delete this.jobSchedulers[name];
});
}
/**
* Stops all scheduled jobs and removes them from the schedule (but not from the database).

@@ -155,17 +129,2 @@ */

/**
* Stops a scheduled job and removes it from the schedule and the database.
* Does nothing if no job with the given name exists.
*
* @param name the job to remove
*
* @throws if the database throws
*/
removeJob(name) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
yield this.cancelJob(name);
this.logger.debug('remove', { name });
yield this.jobRepository.deleteOne({ name });
});
}
/**
* Stops all scheduled jobs and removes them from the schedule and the database.

@@ -172,0 +131,0 @@ *

@@ -88,3 +88,3 @@ "use strict";

}
return this.jobExecutor.execute(jobEntity, parameters, true);
return this.jobExecutor.execute(jobEntity, parameters);
}

@@ -91,0 +91,0 @@ catch (e) {

{
"name": "@tngtech/momo-scheduler",
"version": "2.0.0-beta.1",
"version": "2.0.0-beta.2",
"description": "momo is a scheduler that persists jobs in mongodb",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

@@ -27,3 +27,4 @@ import { MongoClient } from 'mongodb';

pingIntervalMs: number,
scheduleId: string
scheduleId: string,
scheduleName: string
): Promise<Connection> {

@@ -33,3 +34,9 @@ const mongoClient = new MongoClient(url);

const schedulesRepository = new SchedulesRepository(mongoClient, 2 * pingIntervalMs, scheduleId, collectionsPrefix);
const schedulesRepository = new SchedulesRepository(
mongoClient,
2 * pingIntervalMs,
scheduleId,
scheduleName,
collectionsPrefix
);
await schedulesRepository.createIndex();

@@ -36,0 +43,0 @@ const jobRepository = new JobRepository(mongoClient, collectionsPrefix);

@@ -25,14 +25,12 @@ import { DateTime } from 'luxon';

async execute(jobEntity: JobEntity, parameters?: JobParameters, force: boolean = false): Promise<JobResult> {
if (!force) {
const { added, running } = await this.schedulesRepository.addExecution(jobEntity.name, jobEntity.maxRunning);
if (!added) {
this.logger.debug('maxRunning reached, skip', {
name: jobEntity.name,
running,
});
return {
status: ExecutionStatus.maxRunningReached,
};
}
async execute(jobEntity: JobEntity, parameters?: JobParameters): Promise<JobResult> {
const { added, running } = await this.schedulesRepository.addExecution(jobEntity.name, jobEntity.maxRunning);
if (!added) {
this.logger.debug('maxRunning reached, skip', {
name: jobEntity.name,
running,
});
return {
status: ExecutionStatus.maxRunningReached,
};
}

@@ -56,3 +54,3 @@

if (!force && !this.stopped) {
if (!this.stopped) {
await this.schedulesRepository.removeExecution(jobEntity.name);

@@ -59,0 +57,0 @@ }

@@ -18,2 +18,3 @@ import { DateTime } from 'luxon';

private readonly scheduleId: string,
private readonly name: string,
collectionPrefix?: string

@@ -93,14 +94,25 @@ ) {

async addExecution(name: string, maxRunning: number): Promise<{ added: boolean; running: number }> {
const running = await this.countRunningExecutions(name);
if (maxRunning > 0 && running >= maxRunning) {
return { added: false, running };
if (maxRunning < 1) {
const schedule = await this.collection.findOneAndUpdate(
{ name: this.name },
{ $inc: { [`executions.${name}`]: 1 } },
{ returnDocument: 'after' }
);
return { added: schedule.value !== null, running: schedule.value?.executions[name] ?? 0 };
}
await this.updateOne({ scheduleId: this.scheduleId }, { $inc: { [`executions.${name}`]: 1 } });
const schedule = await this.collection.findOneAndUpdate(
{
name: this.name,
$or: [{ [`executions.${name}`]: { $lt: maxRunning } }, { [`executions.${name}`]: { $exists: false } }],
},
{ $inc: { [`executions.${name}`]: 1 } },
{ returnDocument: 'after' }
);
return { added: true, running };
return { added: schedule.value !== null, running: schedule.value?.executions[name] ?? maxRunning };
}
async removeExecution(name: string): Promise<void> {
await this.updateOne({ scheduleId: this.scheduleId }, { $inc: { [`executions.${name}`]: -1 } });
await this.updateOne({ name: this.name }, { $inc: { [`executions.${name}`]: -1 } });
}

@@ -107,0 +119,0 @@

@@ -62,3 +62,3 @@ import { v4 as uuid } from 'uuid';

const scheduleId = uuid();
const connection = await Connection.create(connectionOptions, pingIntervalMs, scheduleId);
const connection = await Connection.create(connectionOptions, pingIntervalMs, scheduleId, scheduleName);

@@ -65,0 +65,0 @@ return new MongoSchedule(scheduleId, connection, pingIntervalMs, scheduleName);

@@ -72,3 +72,7 @@ import { sum } from 'lodash';

await this.stopJob(job.name);
try {
await this.jobSchedulers[job.name]?.stop();
} catch (error) {
this.logger.error('message failed to stop job', MomoErrorType.stopJob, { name: job.name }, error);
}

@@ -123,18 +127,2 @@ await this.jobRepository.define(job);

/**
* Stops a scheduled job without removing it from neither the schedule nor the database.
* Does nothing if no job with the given name exists.
* Jobs can be started again using start().
*
* @param name the job to stop
*/
public async stopJob(name: string): Promise<void> {
this.logger.debug('stop', { name });
try {
await this.jobSchedulers[name]?.stop();
} catch (error) {
this.logger.error('message failed to stop job', MomoErrorType.stopJob, { name }, error);
}
}
/**
* Stops all scheduled jobs without removing them from neither the schedule nor the database.

@@ -153,14 +141,2 @@ * Jobs can be started again using start().

/**
* Stops a scheduled job and removes it from the schedule (but not from the database).
* Does nothing if no job with the given name exists.
*
* @param name the job to cancel
*/
public async cancelJob(name: string): Promise<void> {
await this.stopJob(name);
this.logger.debug('cancel', { name });
delete this.jobSchedulers[name];
}
/**
* Stops all scheduled jobs and removes them from the schedule (but not from the database).

@@ -175,16 +151,2 @@ */

/**
* Stops a scheduled job and removes it from the schedule and the database.
* Does nothing if no job with the given name exists.
*
* @param name the job to remove
*
* @throws if the database throws
*/
public async removeJob(name: string): Promise<void> {
await this.cancelJob(name);
this.logger.debug('remove', { name });
await this.jobRepository.deleteOne({ name });
}
/**
* Stops all scheduled jobs and removes them from the schedule and the database.

@@ -191,0 +153,0 @@ *

@@ -125,3 +125,3 @@ import { min } from 'lodash';

return this.jobExecutor.execute(jobEntity, parameters, true);
return this.jobExecutor.execute(jobEntity, parameters);
} catch (e) {

@@ -128,0 +128,0 @@ this.handleUnexpectedError(e);

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc