@emartech/program-executor
Advanced tools
| 'use strict'; | ||
| const logger = require('@emartech/json-logger')('program-executor'); | ||
| const JobDataHandler = require('../job-data-handler'); | ||
| class ProgramExecutorProcessor { | ||
| constructor(programHandler, queueManager, jobLibrary) { | ||
| this._jobLibrary = jobLibrary; | ||
| this._programHandler = programHandler; | ||
| this._queueManager = queueManager; | ||
| } | ||
| async process(message) { | ||
| const { runId } = message; | ||
| try { | ||
| await this._programHandler.incrementStepRetryCount(runId); | ||
| await this._executeNextJob(message); | ||
| } catch (error) { | ||
| if (error.retryable) { | ||
| await this._programHandler.setJobRetriableErrorMessage(runId, error.message); | ||
| } else { | ||
| await this._programHandler.setProgramToError(runId, error.message); | ||
| } | ||
| if (!error.ignorable) { | ||
| throw error; | ||
| } | ||
| } | ||
| } | ||
| async _executeNextJob(message) { | ||
| const { programData, jobs, runId } = message; | ||
| const currentJob = jobs[0]; | ||
| const isProgramFinishedWithError = await this._programHandler.isProgramFinishedWithError(runId); | ||
| if (isProgramFinishedWithError) { | ||
| this.log('execution-cancelled', { ...message }); | ||
| return; | ||
| } | ||
| this.log('executing-job', { ...message, currentJob }); | ||
| if (this._jobLibrary[currentJob]) { | ||
| const jobDataHandler = JobDataHandler.create(this._programHandler, runId, currentJob); | ||
| await this._jobLibrary[currentJob].create(programData).execute(message, jobDataHandler); | ||
| this.log('job-finished', { ...message, currentJob }); | ||
| } else { | ||
| this.log('job-skipped', { ...message, currentJob, level: 'warn' }); | ||
| } | ||
| const remainingJobs = jobs.slice(1); | ||
| if (remainingJobs.length > 0) { | ||
| this._queueManager.queueProgram({ ...message, jobs: remainingJobs }); | ||
| await this._programHandler.incrementStep(runId); | ||
| } else { | ||
| await this._programHandler.finishProgram(runId); | ||
| } | ||
| } | ||
| log(action, { programData, jobs, runId, currentJob, level = 'info' }) { | ||
| logger[level](action, { executor: { current_job: currentJob, program_data: programData, jobs, run_id: runId } }); | ||
| } | ||
| static create(programHandler, queueManager, jobLibrary) { | ||
| return new ProgramExecutorProcessor(programHandler, queueManager, jobLibrary); | ||
| } | ||
| } | ||
| module.exports = ProgramExecutorProcessor; |
| 'use strict'; | ||
| const ProgramExecutorProcessor = require('./'); | ||
| const ProgramHandler = require('../program-handler'); | ||
| const JobDataHandler = require('../job-data-handler'); | ||
| const QueueManager = require('../queue-manager'); | ||
| describe('ProgramExecutorProcessor', function() { | ||
| let programHandler; | ||
| let queueManager; | ||
| let jobLibrary; | ||
| let testJobExecuteStub; | ||
| let failingJobExecuteStub; | ||
| let failingJobExecuteWithIgnorableErrorStub; | ||
| let failingJobExecuteWithRetryableErrorStub; | ||
| let programHandlerIsProgramFinishedWithErrorStub; | ||
| beforeEach(async function() { | ||
| const ignorableError = new Error('Something wrong happened, but ignore this!'); | ||
| ignorableError.ignorable = true; | ||
| const retryableError = new Error('Something wrong happened, but please retry!'); | ||
| retryableError.retryable = true; | ||
| testJobExecuteStub = this.sandbox.stub(); | ||
| failingJobExecuteStub = this.sandbox.stub().rejects(new Error('Something wrong happened!')); | ||
| failingJobExecuteWithIgnorableErrorStub = this.sandbox.stub().rejects(ignorableError); | ||
| failingJobExecuteWithRetryableErrorStub = this.sandbox.stub().rejects(retryableError); | ||
| jobLibrary = { | ||
| testJob: { | ||
| create: this.sandbox.stub().returns({ | ||
| execute: testJobExecuteStub | ||
| }) | ||
| }, | ||
| currentJob: { | ||
| create: this.sandbox.stub().returns({ execute() {} }) | ||
| }, | ||
| nextJob: { | ||
| create: this.sandbox.stub().returns({ execute() {} }) | ||
| }, | ||
| failingJob: { | ||
| create: this.sandbox.stub().returns({ | ||
| execute: failingJobExecuteStub | ||
| }) | ||
| }, | ||
| failingJobWithIgnorableError: { | ||
| create: this.sandbox.stub().returns({ | ||
| execute: failingJobExecuteWithIgnorableErrorStub | ||
| }) | ||
| }, | ||
| failingJobWithRetryableError: { | ||
| create: this.sandbox.stub().returns({ | ||
| execute: failingJobExecuteWithRetryableErrorStub | ||
| }) | ||
| } | ||
| }; | ||
| this.sandbox.spy(JobDataHandler, 'create'); | ||
| this.sandbox.stub(ProgramHandler.prototype, 'finishProgram').resolves(true); | ||
| this.sandbox.stub(ProgramHandler.prototype, 'setProgramToError').resolves(true); | ||
| this.sandbox.stub(ProgramHandler.prototype, 'setJobRetriableErrorMessage').resolves(true); | ||
| this.sandbox.stub(ProgramHandler.prototype, 'incrementStep').resolves(true); | ||
| this.sandbox.stub(ProgramHandler.prototype, 'incrementStepRetryCount').resolves(true); | ||
| programHandlerIsProgramFinishedWithErrorStub = this.sandbox | ||
| .stub(ProgramHandler.prototype, 'isProgramFinishedWithError') | ||
| .resolves(false); | ||
| this.sandbox.stub(QueueManager.prototype, 'queueProgram').resolves(true); | ||
| programHandler = ProgramHandler.create(); | ||
| queueManager = QueueManager.create(); | ||
| }); | ||
| describe('process', function() { | ||
| it('should execute job from program library when its the next job', async function() { | ||
| const programData = { | ||
| customerId: 123, | ||
| hostname: 'yolo.myshopify.com' | ||
| }; | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| programData, | ||
| jobs: ['testJob'], | ||
| runId: '1' | ||
| }); | ||
| expect(jobLibrary.testJob.create).to.be.calledWithExactly(programData); | ||
| expect(testJobExecuteStub).to.be.calledWith({ | ||
| programData, | ||
| jobs: ['testJob'], | ||
| runId: '1' | ||
| }); | ||
| }); | ||
| it('should pass job data handler for the given job and runId pair', async function() { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['testJob'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| const jobDataHandler = testJobExecuteStub.getCall(0).args[1]; | ||
| expect(JobDataHandler.create).to.be.calledWith(this.sinon.match.instanceOf(ProgramHandler), '1', 'testJob'); | ||
| expect(jobDataHandler).to.be.an.instanceOf(JobDataHandler); | ||
| }); | ||
| it('should increment try count on process', async function() { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['testJob'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| expect(ProgramHandler.prototype.incrementStepRetryCount).to.have.been.calledWith('1'); | ||
| }); | ||
| it('should cancel execution if program already encountered an error', async function() { | ||
| programHandlerIsProgramFinishedWithErrorStub.resolves(true); | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['testJob'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| expect(testJobExecuteStub).not.to.have.been.called; | ||
| expect(QueueManager.prototype.queueProgram).not.to.have.been.called; | ||
| }); | ||
| it('should throw an error if a job fails with non-ignorable error', async function() { | ||
| let caughtError; | ||
| try { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['failingJob'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| } catch (error) { | ||
| caughtError = error; | ||
| } | ||
| expect(caughtError).not.to.be.undefined; | ||
| expect(failingJobExecuteStub).to.be.called; | ||
| }); | ||
| it('should not throw an error if a job fails with an ignorable error', async function() { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['failingJobWithIgnorableError'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| expect(ProgramHandler.prototype.setProgramToError).to.be.calledWith( | ||
| '1', | ||
| 'Something wrong happened, but ignore this!' | ||
| ); | ||
| }); | ||
| it('should set program to error if job fails with non retriable error', async function() { | ||
| try { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['failingJob'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| } catch (error) { | ||
| expect(ProgramHandler.prototype.setProgramToError).to.be.calledWith('1', 'Something wrong happened!'); | ||
| expect(ProgramHandler.prototype.setJobRetriableErrorMessage).not.have.been.called; | ||
| } | ||
| }); | ||
| it('should update error message only if job fails with retriable error', async function() { | ||
| try { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['failingJobWithRetryableError'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| } catch (error) { | ||
| expect(ProgramHandler.prototype.setProgramToError).not.have.been.called; | ||
| expect(ProgramHandler.prototype.setJobRetriableErrorMessage).to.be.calledWith( | ||
| '1', | ||
| 'Something wrong happened, but please retry!' | ||
| ); | ||
| } | ||
| }); | ||
| it('should requeue with the next program', async function() { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['currentJob', 'nextJob'], | ||
| programData: { | ||
| customerId: 123, | ||
| hostname: 'yolo.myshopify.com' | ||
| } | ||
| }); | ||
| expect(QueueManager.prototype.queueProgram).to.have.been.calledWith({ | ||
| jobs: ['nextJob'], | ||
| programData: { | ||
| customerId: 123, | ||
| hostname: 'yolo.myshopify.com' | ||
| } | ||
| }); | ||
| }); | ||
| it('should call increment on requeue', async function() { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['currentJob', 'nextJob'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| expect(ProgramHandler.prototype.incrementStep).to.have.been.calledWith('1'); | ||
| }); | ||
| it('should not requeue when it was the last job', async function() { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['currentJob'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| expect(QueueManager.prototype.queueProgram).not.to.have.been.called; | ||
| }); | ||
| it('should set program to finished when it was the last job', async function() { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['currentJob'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| expect(ProgramHandler.prototype.finishProgram).to.have.been.calledWith('1'); | ||
| }); | ||
| it('should not set job to finished when it was not the last job', async function() { | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| jobs: ['currentJob', 'nextJob'], | ||
| programData: {}, | ||
| runId: '1' | ||
| }); | ||
| expect(ProgramHandler.prototype.finishProgram).not.to.have.been.called; | ||
| }); | ||
| }); | ||
| }); |
+1
-1
@@ -45,3 +45,3 @@ { | ||
| }, | ||
| "version": "1.0.1" | ||
| "version": "1.1.0" | ||
| } |
+5
-1
| 'use strict'; | ||
| const ProgramHandler = require('./program-handler'); | ||
| const ProgramsRepository = require('./repositories/programs'); | ||
| const QueueManager = require('./queue-manager'); | ||
@@ -15,2 +17,4 @@ class ProgramExecutor { | ||
| this._config = config; | ||
| this._programsRepository = ProgramsRepository.create(config.knex, config.tableName); | ||
| this._queueManager = QueueManager.create(config.amqpUrl, config.queueName); | ||
| } | ||
@@ -25,3 +29,3 @@ | ||
| createProgram(data) { | ||
| return ProgramHandler.create(this._config).createProgram(data); | ||
| return ProgramHandler.create(this._programsRepository, this._queueManager).createProgram(data); | ||
| } | ||
@@ -28,0 +32,0 @@ |
+12
-2
@@ -5,2 +5,4 @@ 'use strict'; | ||
| const ProgramHandler = require('./program-handler'); | ||
| const ProgramsRepository = require('./repositories/programs'); | ||
| const QueueManager = require('./queue-manager'); | ||
@@ -20,6 +22,9 @@ describe('ProgramExecutor', function() { | ||
| this.sandbox.stub(ProgramHandler.prototype, 'createProgram'); | ||
| this.sandbox.spy(ProgramsRepository, 'create'); | ||
| this.sandbox.spy(QueueManager, 'create'); | ||
| }); | ||
| describe('#createProgram', async function() { | ||
| it('should create program handler with config and call createProgram with given data', async function() { | ||
| it('should create program handler and call createProgram with given data', async function() { | ||
| await ProgramExecutor.create(config).createProgram({ | ||
@@ -29,3 +34,8 @@ jobs: ['current_program', 'next_program'] | ||
| expect(ProgramHandler.create).to.have.been.calledWith(config); | ||
| expect(ProgramHandler.create).to.have.been.calledWith( | ||
| this.sinon.match.instanceOf(ProgramsRepository), | ||
| this.sinon.match.instanceOf(QueueManager) | ||
| ); | ||
| expect(ProgramsRepository.create).to.have.been.calledWith(config.knex, config.tableName); | ||
| expect(QueueManager.create).to.have.been.calledWith(config.amqpUrl, config.queueName); | ||
| expect(ProgramHandler.prototype.createProgram).to.have.been.calledWith({ | ||
@@ -32,0 +42,0 @@ jobs: ['current_program', 'next_program'] |
| 'use strict'; | ||
| const RunIdGenerator = require('../runid-generator'); | ||
| const QueueManager = require('../queue-manager'); | ||
| const ProgramsRepository = require('../repositories/programs'); | ||
| class ProgramHandler { | ||
| constructor({ knex, amqpUrl, tableName, queueName }) { | ||
| this._programsRepository = ProgramsRepository.create(knex, tableName); | ||
| this._queueManager = QueueManager.create(amqpUrl, queueName); | ||
| constructor(programsRepository, queueManager) { | ||
| this._programsRepository = programsRepository; | ||
| this._queueManager = queueManager; | ||
| } | ||
@@ -58,4 +56,4 @@ | ||
| static create(config) { | ||
| return new ProgramHandler(config); | ||
| static create(programsRepository, queueManager) { | ||
| return new ProgramHandler(programsRepository, queueManager); | ||
| } | ||
@@ -62,0 +60,0 @@ } |
@@ -11,31 +11,17 @@ 'use strict'; | ||
| describe('ProgramHandler', function() { | ||
| let programsRepository; | ||
| let queueManager; | ||
| let queueProgramStub; | ||
| let programsRepository; | ||
| let programHandlerConfig; | ||
| beforeEach(async function() { | ||
| programsRepository = ProgramsRepository.create(this.db, 'programs'); | ||
| this.sandbox.spy(QueueManager, 'create'); | ||
| queueManager = QueueManager.create('amqp://guest:guest@localhost:9999', 'program-executor'); | ||
| queueProgramStub = this.sandbox.stub(QueueManager.prototype, 'queueProgram').resolves(true); | ||
| programHandlerConfig = { | ||
| knex: this.db, | ||
| tableName: 'programs', | ||
| queueName: 'program-executor', | ||
| amqpUrl: 'amqp://guest:guest@localhost:9999' | ||
| }; | ||
| }); | ||
| describe('#createProgram', async function() { | ||
| it('should use the given rabbitMq and channel', async function() { | ||
| await ProgramHandler.create(programHandlerConfig).createProgram({ | ||
| jobs: ['current_program', 'next_program'] | ||
| }); | ||
| expect(QueueManager.create).to.have.been.calledWith(programHandlerConfig.amqpUrl, programHandlerConfig.queueName); | ||
| }); | ||
| it('should create a program with the given jobs and program data', async function() { | ||
| const programData = { test: 'data' }; | ||
| await ProgramHandler.create(programHandlerConfig).createProgram({ | ||
| await ProgramHandler.create(programsRepository, queueManager).createProgram({ | ||
| programData, | ||
@@ -52,3 +38,3 @@ jobs: ['current_program', 'next_program'] | ||
| it('should save program in db', async function() { | ||
| const runId = await ProgramHandler.create(programHandlerConfig).createProgram({ | ||
| const runId = await ProgramHandler.create(programsRepository, queueManager).createProgram({ | ||
| jobs: ['current_program', 'next_program'] | ||
@@ -62,3 +48,3 @@ }); | ||
| it('should save program data in db', async function() { | ||
| const runId = await ProgramHandler.create(programHandlerConfig).createProgram({ | ||
| const runId = await ProgramHandler.create(programsRepository, queueManager).createProgram({ | ||
| programData: { | ||
@@ -83,3 +69,3 @@ hostname, | ||
| const runId = await ProgramHandler.create(programHandlerConfig).createProgram({ | ||
| const runId = await ProgramHandler.create(programsRepository, queueManager).createProgram({ | ||
| jobs: ['current_program', 'next_program'], | ||
@@ -94,3 +80,5 @@ jobData | ||
| it('should generate a runid for the program', async function() { | ||
| await ProgramHandler.create(programHandlerConfig).createProgram({ jobs: ['current_program', 'next_program'] }); | ||
| await ProgramHandler.create(programsRepository, queueManager).createProgram({ | ||
| jobs: ['current_program', 'next_program'] | ||
| }); | ||
@@ -101,3 +89,3 @@ expect(queueProgramStub.lastCall.lastArg.runId).not.to.be.undefined; | ||
| it('should return the generated runid', async function() { | ||
| const result = await ProgramHandler.create(programHandlerConfig).createProgram({ | ||
| const result = await ProgramHandler.create(programsRepository, queueManager).createProgram({ | ||
| jobs: ['current_program', 'next_program'] | ||
@@ -113,3 +101,3 @@ }); | ||
| it('should set job to finished in db', async function() { | ||
| const programHandler = ProgramHandler.create(programHandlerConfig); | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
@@ -129,3 +117,3 @@ const runId = await programHandler.createProgram({ | ||
| it('should get program data for runid and program name', async function() { | ||
| const programHandler = ProgramHandler.create(programHandlerConfig); | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
| const runId = await programHandler.createProgram({ | ||
@@ -143,3 +131,3 @@ jobs: ['current_program', 'next_program'] | ||
| it('should set job to errored in db', async function() { | ||
| const programHandler = ProgramHandler.create(programHandlerConfig); | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
@@ -159,3 +147,3 @@ const runId = await programHandler.createProgram({ | ||
| it('should return true if erroredAt is not null', async function() { | ||
| const programHandler = ProgramHandler.create(programHandlerConfig); | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
@@ -172,3 +160,3 @@ const runId = await programHandler.createProgram({ | ||
| it('should return false if erroredAt is null', async function() { | ||
| const programHandler = ProgramHandler.create(programHandlerConfig); | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
@@ -186,3 +174,3 @@ const runId = await programHandler.createProgram({ | ||
| it('should set program to errored in db', async function() { | ||
| const programHandler = ProgramHandler.create(programHandlerConfig); | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
@@ -203,3 +191,3 @@ const runId = await programHandler.createProgram({ | ||
| it('should increment step', async function() { | ||
| const programHandler = ProgramHandler.create(programHandlerConfig); | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
@@ -218,3 +206,3 @@ const runId = await programHandler.createProgram({ | ||
| it('should increment StepRetryCount', async function() { | ||
| const programHandler = ProgramHandler.create(programHandlerConfig); | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
@@ -233,3 +221,3 @@ const runId = await programHandler.createProgram({ | ||
| it('should update job data related to jobName', async function() { | ||
| const programHandler = ProgramHandler.create(programHandlerConfig); | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
| const runId = await programHandler.createProgram({ | ||
@@ -236,0 +224,0 @@ jobs: ['current_program', 'next_program'] |
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
42327
37.07%26
8.33%980
38.03%