🚨 Active Supply Chain Attack:node-ipc Package Compromised.Learn More
Socket
Book a DemoSign in
Socket

@emartech/program-executor

Package Overview
Dependencies
Maintainers
297
Versions
57
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@emartech/program-executor - npm Package Compare versions

Comparing version
3.18.0
to
3.19.0
+6
-6
package.json

@@ -26,3 +26,3 @@ {

"camelcase-keys": "^6.2.2",
"graphql": "^16.12.0"
"graphql": "^16.13.0"
},

@@ -33,13 +33,13 @@ "engines": {

"devDependencies": {
"dotenv": "^17.2.4",
"eslint": "^10.0.0",
"dotenv": "^17.3.1",
"eslint": "^10.0.2",
"eslint-config-emarsys": "5.1.0",
"eslint-plugin-no-only-tests": "^3.3.0",
"eslint-plugin-security": "^3.0.1",
"eslint-plugin-security": "^4.0.0",
"jest": "29.0.0",
"knex": "^3.1.0",
"pg": "^8.18.0",
"pg": "^8.19.0",
"semantic-release": "^25.0.3"
},
"version": "3.18.0"
"version": "3.19.0"
}

@@ -8,3 +8,3 @@ # Program Executor JS [![npm version](https://badge.fury.io/js/%40emartech%2Fprogram-executor.svg)](https://badge.fury.io/js/%40emartech%2Fprogram-executor) [![Codeship Status for emartech/program-executor-js](https://app.codeship.com/projects/54a48ad0-585e-0137-fd0d-7a4a3edc3b8c/status?branch=master)](https://app.codeship.com/projects/341917)

- NodeJS 7.10.1+
- RabbitMQ
- RabbitMQ / Google PubSub
- Database (Postgres, MSSQL, MySQL, MariaDB, SQLite3, Oracle, and Amazon Redshift) and a connected [knex](https://knexjs.org/#Installation-client) instance

@@ -11,0 +11,0 @@

@@ -34,2 +34,22 @@ 'use strict';

class AnotherSuccessfulJob {
static get name() {
return 'another_successful_job';
}
static create(programData) {
return new AnotherSuccessfulJob(programData);
}
// eslint-disable-next-line no-unused-vars
constructor(programData) {
/// ... initialize member variables based on programData if needed
}
async execute(message, jobDataHandler) {
// eslint-disable-next-line no-unused-vars
const jobSpecificData = await jobDataHandler.get();
}
}
class ProblematicJob {

@@ -59,3 +79,4 @@ static get name() {

successful_job: SuccessfulJob,
problematic_job: ProblematicJob
problematic_job: ProblematicJob,
another_successful_job: AnotherSuccessfulJob
};

@@ -208,3 +229,29 @@

});
it('should cancel the duplicate job', async function () {
const program = {
programData: {
customerId: customerId
},
jobs: ['successful_job', 'another_successful_job']
}
const runId = await ProgramExecutor.create(config).createProgram(program);
program.runId = runId;
await ProgramExecutor.create(config)._queueManager.queueProgram(program);
await ProgramExecutor.create(config).processPrograms(testJobLibrary);
await e2eTestReuse.waitForMessages(1000);
const result = await dbConnection('programs').where({ run_id: runId }).first();
expect(result.run_id).toBe(runId);
expect(result.jobs).toEqual(['successful_job', 'another_successful_job']);
expect(result.step).toEqual(1);
expect(result.errored_at).toEqual(null);
expect(result.finished_at).not.toEqual(null);
expect(result.program_data).toEqual({ customerId: 1234 });
})
});
});

@@ -58,2 +58,8 @@ 'use strict';

const { currentJobInDb, currentStep } = await this._programHandler.getCurrentJob(runId);
if (currentJob !== currentJobInDb && this._jobLibrary[currentJob]) {
this.log('duplicate-cancelled', { ...message });
return;
}
this.log('executing-job', { ...message, currentJob });

@@ -72,4 +78,8 @@

devLogger.info('queue update remainingJobs', { remainingJobs, run_id: runId, OSPID: process.pid });
await this._queueManager.queueProgram({ ...message, jobs: remainingJobs });
await this._programHandler.incrementStep(runId);
const res = await this._programHandler.incrementStep(runId, currentStep);
if (res) {
await this._queueManager.queueProgram({ ...message, jobs: remainingJobs });
} else{
this.log('discard-duplicate', { ...message, currentJob })
}
} else {

@@ -76,0 +86,0 @@ devLogger.info('finished program', { run_id: runId, OSPID: process.pid });

@@ -25,2 +25,3 @@ 'use strict';

let mockQueueProgram;
let getCurrentJobStub;

@@ -75,2 +76,3 @@ beforeEach(async function () {

setProgramToErrorStub = jest.fn().mockResolvedValue(true);
getCurrentJobStub = jest.fn();

@@ -84,3 +86,4 @@ mockProgramHandler = {

isProgramFinishedWithError: programHandlerIsProgramFinishedWithErrorStub,
setProgramToError: setProgramToErrorStub
setProgramToError: setProgramToErrorStub,
getCurrentJob: getCurrentJobStub
};

@@ -100,2 +103,3 @@ ProgramHandler.create = jest.fn().mockReturnValue(mockProgramHandler);

it('should execute job from program library when its the next job', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'testJob', currentStep: 0 });
const programData = {

@@ -121,2 +125,3 @@ customerId: 123,

it('should pass job data handler for the given job and runId pair', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'testJob', currentStep: 0 });
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

@@ -133,2 +138,3 @@ jobs: ['testJob'],

it('should cancel execution if program already encountered an error', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'testJob', currentStep: 0 });
programHandlerIsProgramFinishedWithErrorStub.mockResolvedValue(true)(true);

@@ -145,3 +151,21 @@ await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

it('should cancel execution if current job doesnt match current job in DB', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'nextJob', currentStep: 0 });
const programData = {
customerId: 123,
hostname: 'yolo.myshopify.com'
};
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
programData,
jobs: ['testJob'],
runId: '1'
});
expect(testJobExecuteStub).not.toHaveBeenCalled();
expect(mockQueueProgram).not.toHaveBeenCalled();
});
it('should throw an error if a job fails with non-ignorable error', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJob', currentStep: 0 });
let caughtError;

@@ -163,2 +187,3 @@ try {

it('should throw a retryable error if a job fails with ExecutionTimeExceededError', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJobWithExecutionTimeExceededError', currentStep: 0 });
let caughtError;

@@ -179,2 +204,3 @@ try {

it('should not throw an error if a job fails with an ignorable error', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJobWithIgnorableError', currentStep: 0 });
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

@@ -192,2 +218,3 @@ jobs: ['failingJobWithIgnorableError'],

it('should set program to error if job fails with non retriable error', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJob', currentStep: 0 });
try {

@@ -207,2 +234,3 @@ await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

it('should increment try count if job fails with retriable error', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJobWithRetryableError', currentStep: 0 });
try {

@@ -221,2 +249,3 @@ await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

it('should update error message only if job fails with retriable error', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJobWithRetryableError', currentStep: 0 });
try {

@@ -239,2 +268,3 @@ await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

it('should requeue with the next program', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 });
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

@@ -245,3 +275,4 @@ jobs: ['currentJob', 'nextJob'],

hostname: 'yolo.myshopify.com'
}
},
runId: '1'
});

@@ -254,3 +285,4 @@

hostname: 'yolo.myshopify.com'
}
},
runId: '1'
});

@@ -260,2 +292,3 @@ });

it('should call increment on requeue', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 });
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

@@ -267,6 +300,7 @@ jobs: ['currentJob', 'nextJob'],

expect(mockProgramHandler.incrementStep).toHaveBeenCalledWith('1');
expect(mockProgramHandler.incrementStep).toHaveBeenCalledWith('1', 0);
});
it('should not requeue when it was the last job', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 });
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

@@ -282,2 +316,3 @@ jobs: ['currentJob'],

it('should set program to finished when it was the last job', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 });
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

@@ -293,2 +328,3 @@ jobs: ['currentJob'],

it('should not set job to finished when it was not the last job', async function () {
getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 });
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({

@@ -295,0 +331,0 @@ jobs: ['currentJob', 'nextJob'],

@@ -29,2 +29,10 @@ 'use strict';

async getCurrentJob(runId) {
const program = await this._programsRepository.getProgramByRunId(runId);
return {
currentJobInDb: program.jobs[program.step],
currentStep: program.step
}
}
finishProgram(runId) {

@@ -47,4 +55,4 @@ return this._programsRepository.finishProgram(runId);

incrementStep(runId) {
return this._programsRepository.incrementStep(runId);
incrementStep(runId, currentStep) {
return this._programsRepository.incrementStep(runId, currentStep);
}

@@ -51,0 +59,0 @@

@@ -145,2 +145,14 @@ 'use strict';

describe('#getCurrentJob', function () {
it('should get the current job of the program', async function () {
const programHandler = ProgramHandler.create(programsRepository, queueManager);
const runId = await programHandler.createProgram({
jobs: ['current_program', 'next_program']
});
const currentState = await programHandler.getCurrentJob(runId);
expect(currentState.currentJobInDb).toEqual('current_program');
});
});
describe('#setProgramToError', function () {

@@ -147,0 +159,0 @@ it('should set job to errored in db', async function () {

@@ -24,3 +24,3 @@ 'use strict';

await pubsub.insert(queueData);
await pubsub.insert(queueData, queueData.runId);

@@ -27,0 +27,0 @@ } catch (error) {

@@ -19,3 +19,3 @@ 'use strict';

const queueManager = new QueueManager(testTopicName, testProjectId, testGcpKeyFileName);
const queueData = { test_data: 123 };
const queueData = { test_data: 123, runId: 123 };

@@ -27,3 +27,3 @@ await queueManager.queueProgram(queueData);

);
expect(insertMock).toHaveBeenCalledWith(queueData);
expect(insertMock).toHaveBeenCalledWith(queueData, queueData.runId);
});

@@ -30,0 +30,0 @@

@@ -42,5 +42,13 @@ 'use strict';

incrementStep(runId) {
incrementStep(runId, currentStep) {
const conditions = {
run_id: runId
};
if (currentStep !== undefined) {
conditions.step = currentStep
}
return this._db(this._tableName)
.where({ run_id: runId })
.where(conditions)
.update({

@@ -47,0 +55,0 @@ updated_at: new Date(),