Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

@emartech/program-executor

Package Overview
Dependencies
Maintainers
95
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@emartech/program-executor - npm Package Compare versions

Comparing version
1.0.1
to
1.1.0
+70
src/program-executor-processor/index.js
'use strict';
const logger = require('@emartech/json-logger')('program-executor');
const JobDataHandler = require('../job-data-handler');
class ProgramExecutorProcessor {
constructor(programHandler, queueManager, jobLibrary) {
this._jobLibrary = jobLibrary;
this._programHandler = programHandler;
this._queueManager = queueManager;
}
async process(message) {
const { runId } = message;
try {
await this._programHandler.incrementStepRetryCount(runId);
await this._executeNextJob(message);
} catch (error) {
if (error.retryable) {
await this._programHandler.setJobRetriableErrorMessage(runId, error.message);
} else {
await this._programHandler.setProgramToError(runId, error.message);
}
if (!error.ignorable) {
throw error;
}
}
}
async _executeNextJob(message) {
const { programData, jobs, runId } = message;
const currentJob = jobs[0];
const isProgramFinishedWithError = await this._programHandler.isProgramFinishedWithError(runId);
if (isProgramFinishedWithError) {
this.log('execution-cancelled', { ...message });
return;
}
this.log('executing-job', { ...message, currentJob });
if (this._jobLibrary[currentJob]) {
const jobDataHandler = JobDataHandler.create(this._programHandler, runId, currentJob);
await this._jobLibrary[currentJob].create(programData).execute(message, jobDataHandler);
this.log('job-finished', { ...message, currentJob });
} else {
this.log('job-skipped', { ...message, currentJob, level: 'warn' });
}
const remainingJobs = jobs.slice(1);
if (remainingJobs.length > 0) {
this._queueManager.queueProgram({ ...message, jobs: remainingJobs });
await this._programHandler.incrementStep(runId);
} else {
await this._programHandler.finishProgram(runId);
}
}
log(action, { programData, jobs, runId, currentJob, level = 'info' }) {
logger[level](action, { executor: { current_job: currentJob, program_data: programData, jobs, run_id: runId } });
}
static create(programHandler, queueManager, jobLibrary) {
return new ProgramExecutorProcessor(programHandler, queueManager, jobLibrary);
}
}
module.exports = ProgramExecutorProcessor;
'use strict';
const ProgramExecutorProcessor = require('./');
const ProgramHandler = require('../program-handler');
const JobDataHandler = require('../job-data-handler');
const QueueManager = require('../queue-manager');
describe('ProgramExecutorProcessor', function() {
let programHandler;
let queueManager;
let jobLibrary;
let testJobExecuteStub;
let failingJobExecuteStub;
let failingJobExecuteWithIgnorableErrorStub;
let failingJobExecuteWithRetryableErrorStub;
let programHandlerIsProgramFinishedWithErrorStub;
beforeEach(async function() {
const ignorableError = new Error('Something wrong happened, but ignore this!');
ignorableError.ignorable = true;
const retryableError = new Error('Something wrong happened, but please retry!');
retryableError.retryable = true;
testJobExecuteStub = this.sandbox.stub();
failingJobExecuteStub = this.sandbox.stub().rejects(new Error('Something wrong happened!'));
failingJobExecuteWithIgnorableErrorStub = this.sandbox.stub().rejects(ignorableError);
failingJobExecuteWithRetryableErrorStub = this.sandbox.stub().rejects(retryableError);
jobLibrary = {
testJob: {
create: this.sandbox.stub().returns({
execute: testJobExecuteStub
})
},
currentJob: {
create: this.sandbox.stub().returns({ execute() {} })
},
nextJob: {
create: this.sandbox.stub().returns({ execute() {} })
},
failingJob: {
create: this.sandbox.stub().returns({
execute: failingJobExecuteStub
})
},
failingJobWithIgnorableError: {
create: this.sandbox.stub().returns({
execute: failingJobExecuteWithIgnorableErrorStub
})
},
failingJobWithRetryableError: {
create: this.sandbox.stub().returns({
execute: failingJobExecuteWithRetryableErrorStub
})
}
};
this.sandbox.spy(JobDataHandler, 'create');
this.sandbox.stub(ProgramHandler.prototype, 'finishProgram').resolves(true);
this.sandbox.stub(ProgramHandler.prototype, 'setProgramToError').resolves(true);
this.sandbox.stub(ProgramHandler.prototype, 'setJobRetriableErrorMessage').resolves(true);
this.sandbox.stub(ProgramHandler.prototype, 'incrementStep').resolves(true);
this.sandbox.stub(ProgramHandler.prototype, 'incrementStepRetryCount').resolves(true);
programHandlerIsProgramFinishedWithErrorStub = this.sandbox
.stub(ProgramHandler.prototype, 'isProgramFinishedWithError')
.resolves(false);
this.sandbox.stub(QueueManager.prototype, 'queueProgram').resolves(true);
programHandler = ProgramHandler.create();
queueManager = QueueManager.create();
});
describe('process', function() {
it('should execute job from program library when its the next job', async function() {
const programData = {
customerId: 123,
hostname: 'yolo.myshopify.com'
};
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
programData,
jobs: ['testJob'],
runId: '1'
});
expect(jobLibrary.testJob.create).to.be.calledWithExactly(programData);
expect(testJobExecuteStub).to.be.calledWith({
programData,
jobs: ['testJob'],
runId: '1'
});
});
it('should pass job data handler for the given job and runId pair', async function() {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['testJob'],
programData: {},
runId: '1'
});
const jobDataHandler = testJobExecuteStub.getCall(0).args[1];
expect(JobDataHandler.create).to.be.calledWith(this.sinon.match.instanceOf(ProgramHandler), '1', 'testJob');
expect(jobDataHandler).to.be.an.instanceOf(JobDataHandler);
});
it('should increment try count on process', async function() {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['testJob'],
programData: {},
runId: '1'
});
expect(ProgramHandler.prototype.incrementStepRetryCount).to.have.been.calledWith('1');
});
it('should cancel execution if program already encountered an error', async function() {
programHandlerIsProgramFinishedWithErrorStub.resolves(true);
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['testJob'],
programData: {},
runId: '1'
});
expect(testJobExecuteStub).not.to.have.been.called;
expect(QueueManager.prototype.queueProgram).not.to.have.been.called;
});
it('should throw an error if a job fails with non-ignorable error', async function() {
let caughtError;
try {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['failingJob'],
programData: {},
runId: '1'
});
} catch (error) {
caughtError = error;
}
expect(caughtError).not.to.be.undefined;
expect(failingJobExecuteStub).to.be.called;
});
it('should not throw an error if a job fails with an ignorable error', async function() {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['failingJobWithIgnorableError'],
programData: {},
runId: '1'
});
expect(ProgramHandler.prototype.setProgramToError).to.be.calledWith(
'1',
'Something wrong happened, but ignore this!'
);
});
it('should set program to error if job fails with non retriable error', async function() {
try {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['failingJob'],
programData: {},
runId: '1'
});
} catch (error) {
expect(ProgramHandler.prototype.setProgramToError).to.be.calledWith('1', 'Something wrong happened!');
expect(ProgramHandler.prototype.setJobRetriableErrorMessage).not.have.been.called;
}
});
it('should update error message only if job fails with retriable error', async function() {
try {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['failingJobWithRetryableError'],
programData: {},
runId: '1'
});
} catch (error) {
expect(ProgramHandler.prototype.setProgramToError).not.have.been.called;
expect(ProgramHandler.prototype.setJobRetriableErrorMessage).to.be.calledWith(
'1',
'Something wrong happened, but please retry!'
);
}
});
it('should requeue with the next program', async function() {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['currentJob', 'nextJob'],
programData: {
customerId: 123,
hostname: 'yolo.myshopify.com'
}
});
expect(QueueManager.prototype.queueProgram).to.have.been.calledWith({
jobs: ['nextJob'],
programData: {
customerId: 123,
hostname: 'yolo.myshopify.com'
}
});
});
it('should call increment on requeue', async function() {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['currentJob', 'nextJob'],
programData: {},
runId: '1'
});
expect(ProgramHandler.prototype.incrementStep).to.have.been.calledWith('1');
});
it('should not requeue when it was the last job', async function() {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['currentJob'],
programData: {},
runId: '1'
});
expect(QueueManager.prototype.queueProgram).not.to.have.been.called;
});
it('should set program to finished when it was the last job', async function() {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['currentJob'],
programData: {},
runId: '1'
});
expect(ProgramHandler.prototype.finishProgram).to.have.been.calledWith('1');
});
it('should not set job to finished when it was not the last job', async function() {
await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({
jobs: ['currentJob', 'nextJob'],
programData: {},
runId: '1'
});
expect(ProgramHandler.prototype.finishProgram).not.to.have.been.called;
});
});
});
+1
-1

@@ -45,3 +45,3 @@ {

},
"version": "1.0.1"
"version": "1.1.0"
}
'use strict';
const ProgramHandler = require('./program-handler');
const ProgramsRepository = require('./repositories/programs');
const QueueManager = require('./queue-manager');

@@ -15,2 +17,4 @@ class ProgramExecutor {

this._config = config;
this._programsRepository = ProgramsRepository.create(config.knex, config.tableName);
this._queueManager = QueueManager.create(config.amqpUrl, config.queueName);
}

@@ -25,3 +29,3 @@

createProgram(data) {
return ProgramHandler.create(this._config).createProgram(data);
return ProgramHandler.create(this._programsRepository, this._queueManager).createProgram(data);
}

@@ -28,0 +32,0 @@

@@ -5,2 +5,4 @@ 'use strict';

const ProgramHandler = require('./program-handler');
const ProgramsRepository = require('./repositories/programs');
const QueueManager = require('./queue-manager');

@@ -20,6 +22,9 @@ describe('ProgramExecutor', function() {

this.sandbox.stub(ProgramHandler.prototype, 'createProgram');
this.sandbox.spy(ProgramsRepository, 'create');
this.sandbox.spy(QueueManager, 'create');
});
describe('#createProgram', async function() {
it('should create program handler with config and call createProgram with given data', async function() {
it('should create program handler and call createProgram with given data', async function() {
await ProgramExecutor.create(config).createProgram({

@@ -29,3 +34,8 @@ jobs: ['current_program', 'next_program']

expect(ProgramHandler.create).to.have.been.calledWith(config);
expect(ProgramHandler.create).to.have.been.calledWith(
this.sinon.match.instanceOf(ProgramsRepository),
this.sinon.match.instanceOf(QueueManager)
);
expect(ProgramsRepository.create).to.have.been.calledWith(config.knex, config.tableName);
expect(QueueManager.create).to.have.been.calledWith(config.amqpUrl, config.queueName);
expect(ProgramHandler.prototype.createProgram).to.have.been.calledWith({

@@ -32,0 +42,0 @@ jobs: ['current_program', 'next_program']

'use strict';
const RunIdGenerator = require('../runid-generator');
const QueueManager = require('../queue-manager');
const ProgramsRepository = require('../repositories/programs');
class ProgramHandler {
constructor({ knex, amqpUrl, tableName, queueName }) {
this._programsRepository = ProgramsRepository.create(knex, tableName);
this._queueManager = QueueManager.create(amqpUrl, queueName);
constructor(programsRepository, queueManager) {
this._programsRepository = programsRepository;
this._queueManager = queueManager;
}

@@ -58,4 +56,4 @@

static create(config) {
return new ProgramHandler(config);
static create(programsRepository, queueManager) {
return new ProgramHandler(programsRepository, queueManager);
}

@@ -62,0 +60,0 @@ }

@@ -11,31 +11,17 @@ 'use strict';

describe('ProgramHandler', function() {
let programsRepository;
let queueManager;
let queueProgramStub;
let programsRepository;
let programHandlerConfig;
beforeEach(async function() {
programsRepository = ProgramsRepository.create(this.db, 'programs');
this.sandbox.spy(QueueManager, 'create');
queueManager = QueueManager.create('amqp://guest:guest@localhost:9999', 'program-executor');
queueProgramStub = this.sandbox.stub(QueueManager.prototype, 'queueProgram').resolves(true);
programHandlerConfig = {
knex: this.db,
tableName: 'programs',
queueName: 'program-executor',
amqpUrl: 'amqp://guest:guest@localhost:9999'
};
});
describe('#createProgram', async function() {
it('should use the given rabbitMq and channel', async function() {
await ProgramHandler.create(programHandlerConfig).createProgram({
jobs: ['current_program', 'next_program']
});
expect(QueueManager.create).to.have.been.calledWith(programHandlerConfig.amqpUrl, programHandlerConfig.queueName);
});
it('should create a program with the given jobs and program data', async function() {
const programData = { test: 'data' };
await ProgramHandler.create(programHandlerConfig).createProgram({
await ProgramHandler.create(programsRepository, queueManager).createProgram({
programData,

@@ -52,3 +38,3 @@ jobs: ['current_program', 'next_program']

it('should save program in db', async function() {
const runId = await ProgramHandler.create(programHandlerConfig).createProgram({
const runId = await ProgramHandler.create(programsRepository, queueManager).createProgram({
jobs: ['current_program', 'next_program']

@@ -62,3 +48,3 @@ });

it('should save program data in db', async function() {
const runId = await ProgramHandler.create(programHandlerConfig).createProgram({
const runId = await ProgramHandler.create(programsRepository, queueManager).createProgram({
programData: {

@@ -83,3 +69,3 @@ hostname,

const runId = await ProgramHandler.create(programHandlerConfig).createProgram({
const runId = await ProgramHandler.create(programsRepository, queueManager).createProgram({
jobs: ['current_program', 'next_program'],

@@ -94,3 +80,5 @@ jobData

it('should generate a runid for the program', async function() {
await ProgramHandler.create(programHandlerConfig).createProgram({ jobs: ['current_program', 'next_program'] });
await ProgramHandler.create(programsRepository, queueManager).createProgram({
jobs: ['current_program', 'next_program']
});

@@ -101,3 +89,3 @@ expect(queueProgramStub.lastCall.lastArg.runId).not.to.be.undefined;

it('should return the generated runid', async function() {
const result = await ProgramHandler.create(programHandlerConfig).createProgram({
const result = await ProgramHandler.create(programsRepository, queueManager).createProgram({
jobs: ['current_program', 'next_program']

@@ -113,3 +101,3 @@ });

it('should set job to finished in db', async function() {
const programHandler = ProgramHandler.create(programHandlerConfig);
const programHandler = ProgramHandler.create(programsRepository, queueManager);

@@ -129,3 +117,3 @@ const runId = await programHandler.createProgram({

it('should get program data for runid and program name', async function() {
const programHandler = ProgramHandler.create(programHandlerConfig);
const programHandler = ProgramHandler.create(programsRepository, queueManager);
const runId = await programHandler.createProgram({

@@ -143,3 +131,3 @@ jobs: ['current_program', 'next_program']

it('should set job to errored in db', async function() {
const programHandler = ProgramHandler.create(programHandlerConfig);
const programHandler = ProgramHandler.create(programsRepository, queueManager);

@@ -159,3 +147,3 @@ const runId = await programHandler.createProgram({

it('should return true if erroredAt is not null', async function() {
const programHandler = ProgramHandler.create(programHandlerConfig);
const programHandler = ProgramHandler.create(programsRepository, queueManager);

@@ -172,3 +160,3 @@ const runId = await programHandler.createProgram({

it('should return false if erroredAt is null', async function() {
const programHandler = ProgramHandler.create(programHandlerConfig);
const programHandler = ProgramHandler.create(programsRepository, queueManager);

@@ -186,3 +174,3 @@ const runId = await programHandler.createProgram({

it('should set program to errored in db', async function() {
const programHandler = ProgramHandler.create(programHandlerConfig);
const programHandler = ProgramHandler.create(programsRepository, queueManager);

@@ -203,3 +191,3 @@ const runId = await programHandler.createProgram({

it('should increment step', async function() {
const programHandler = ProgramHandler.create(programHandlerConfig);
const programHandler = ProgramHandler.create(programsRepository, queueManager);

@@ -218,3 +206,3 @@ const runId = await programHandler.createProgram({

it('should increment StepRetryCount', async function() {
const programHandler = ProgramHandler.create(programHandlerConfig);
const programHandler = ProgramHandler.create(programsRepository, queueManager);

@@ -233,3 +221,3 @@ const runId = await programHandler.createProgram({

it('should update job data related to jobName', async function() {
const programHandler = ProgramHandler.create(programHandlerConfig);
const programHandler = ProgramHandler.create(programsRepository, queueManager);
const runId = await programHandler.createProgram({

@@ -236,0 +224,0 @@ jobs: ['current_program', 'next_program']