@emartech/program-executor
Advanced tools
+6
-6
@@ -26,3 +26,3 @@ { | ||
| "camelcase-keys": "^6.2.2", | ||
| "graphql": "^16.12.0" | ||
| "graphql": "^16.13.0" | ||
| }, | ||
@@ -33,13 +33,13 @@ "engines": { | ||
| "devDependencies": { | ||
| "dotenv": "^17.2.4", | ||
| "eslint": "^10.0.0", | ||
| "dotenv": "^17.3.1", | ||
| "eslint": "^10.0.2", | ||
| "eslint-config-emarsys": "5.1.0", | ||
| "eslint-plugin-no-only-tests": "^3.3.0", | ||
| "eslint-plugin-security": "^3.0.1", | ||
| "eslint-plugin-security": "^4.0.0", | ||
| "jest": "29.0.0", | ||
| "knex": "^3.1.0", | ||
| "pg": "^8.18.0", | ||
| "pg": "^8.19.0", | ||
| "semantic-release": "^25.0.3" | ||
| }, | ||
| "version": "3.18.0" | ||
| "version": "3.19.0" | ||
| } |
+1
-1
@@ -8,3 +8,3 @@ # Program Executor JS [](https://badge.fury.io/js/%40emartech%2Fprogram-executor) [](https://app.codeship.com/projects/341917) | ||
| - NodeJS 7.10.1+ | ||
| - RabbitMQ | ||
| - RabbitMQ / Google PubSub | ||
| - Database (Postgres, MSSQL, MySQL, MariaDB, SQLite3, Oracle, and Amazon Redshift) and a connected [knex](https://knexjs.org/#Installation-client) instance | ||
@@ -11,0 +11,0 @@ |
@@ -34,2 +34,22 @@ 'use strict'; | ||
| class AnotherSuccessfulJob { | ||
| static get name() { | ||
| return 'another_successful_job'; | ||
| } | ||
| static create(programData) { | ||
| return new AnotherSuccessfulJob(programData); | ||
| } | ||
| // eslint-disable-next-line no-unused-vars | ||
| constructor(programData) { | ||
| /// ... initialize member variables based on programData if needed | ||
| } | ||
| async execute(message, jobDataHandler) { | ||
| // eslint-disable-next-line no-unused-vars | ||
| const jobSpecificData = await jobDataHandler.get(); | ||
| } | ||
| } | ||
| class ProblematicJob { | ||
@@ -59,3 +79,4 @@ static get name() { | ||
| successful_job: SuccessfulJob, | ||
| problematic_job: ProblematicJob | ||
| problematic_job: ProblematicJob, | ||
| another_successful_job: AnotherSuccessfulJob | ||
| }; | ||
@@ -208,3 +229,29 @@ | ||
| }); | ||
| it('should cancel the duplicate job', async function () { | ||
| const program = { | ||
| programData: { | ||
| customerId: customerId | ||
| }, | ||
| jobs: ['successful_job', 'another_successful_job'] | ||
| } | ||
| const runId = await ProgramExecutor.create(config).createProgram(program); | ||
| program.runId = runId; | ||
| await ProgramExecutor.create(config)._queueManager.queueProgram(program); | ||
| await ProgramExecutor.create(config).processPrograms(testJobLibrary); | ||
| await e2eTestReuse.waitForMessages(1000); | ||
| const result = await dbConnection('programs').where({ run_id: runId }).first(); | ||
| expect(result.run_id).toBe(runId); | ||
| expect(result.jobs).toEqual(['successful_job', 'another_successful_job']); | ||
| expect(result.step).toEqual(1); | ||
| expect(result.errored_at).toEqual(null); | ||
| expect(result.finished_at).not.toEqual(null); | ||
| expect(result.program_data).toEqual({ customerId: 1234 }); | ||
| }) | ||
| }); | ||
| }); |
@@ -58,2 +58,8 @@ 'use strict'; | ||
| const { currentJobInDb, currentStep } = await this._programHandler.getCurrentJob(runId); | ||
| if (currentJob !== currentJobInDb && this._jobLibrary[currentJob]) { | ||
| this.log('duplicate-cancelled', { ...message }); | ||
| return; | ||
| } | ||
| this.log('executing-job', { ...message, currentJob }); | ||
@@ -72,4 +78,8 @@ | ||
| devLogger.info('queue update remainingJobs', { remainingJobs, run_id: runId, OSPID: process.pid }); | ||
| await this._queueManager.queueProgram({ ...message, jobs: remainingJobs }); | ||
| await this._programHandler.incrementStep(runId); | ||
| const res = await this._programHandler.incrementStep(runId, currentStep); | ||
| if (res) { | ||
| await this._queueManager.queueProgram({ ...message, jobs: remainingJobs }); | ||
| } else{ | ||
| this.log('discard-duplicate', { ...message, currentJob }) | ||
| } | ||
| } else { | ||
@@ -76,0 +86,0 @@ devLogger.info('finished program', { run_id: runId, OSPID: process.pid }); |
@@ -25,2 +25,3 @@ 'use strict'; | ||
| let mockQueueProgram; | ||
| let getCurrentJobStub; | ||
@@ -75,2 +76,3 @@ beforeEach(async function () { | ||
| setProgramToErrorStub = jest.fn().mockResolvedValue(true); | ||
| getCurrentJobStub = jest.fn(); | ||
@@ -84,3 +86,4 @@ mockProgramHandler = { | ||
| isProgramFinishedWithError: programHandlerIsProgramFinishedWithErrorStub, | ||
| setProgramToError: setProgramToErrorStub | ||
| setProgramToError: setProgramToErrorStub, | ||
| getCurrentJob: getCurrentJobStub | ||
| }; | ||
@@ -100,2 +103,3 @@ ProgramHandler.create = jest.fn().mockReturnValue(mockProgramHandler); | ||
| it('should execute job from program library when its the next job', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'testJob', currentStep: 0 }); | ||
| const programData = { | ||
@@ -121,2 +125,3 @@ customerId: 123, | ||
| it('should pass job data handler for the given job and runId pair', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'testJob', currentStep: 0 }); | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
@@ -133,2 +138,3 @@ jobs: ['testJob'], | ||
| it('should cancel execution if program already encountered an error', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'testJob', currentStep: 0 }); | ||
| programHandlerIsProgramFinishedWithErrorStub.mockResolvedValue(true)(true); | ||
@@ -145,3 +151,21 @@ await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| it('should cancel execution if current job doesnt match current job in DB', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'nextJob', currentStep: 0 }); | ||
| const programData = { | ||
| customerId: 123, | ||
| hostname: 'yolo.myshopify.com' | ||
| }; | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| programData, | ||
| jobs: ['testJob'], | ||
| runId: '1' | ||
| }); | ||
| expect(testJobExecuteStub).not.toHaveBeenCalled(); | ||
| expect(mockQueueProgram).not.toHaveBeenCalled(); | ||
| }); | ||
| it('should throw an error if a job fails with non-ignorable error', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJob', currentStep: 0 }); | ||
| let caughtError; | ||
@@ -163,2 +187,3 @@ try { | ||
| it('should throw a retryable error if a job fails with ExecutionTimeExceededError', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJobWithExecutionTimeExceededError', currentStep: 0 }); | ||
| let caughtError; | ||
@@ -179,2 +204,3 @@ try { | ||
| it('should not throw an error if a job fails with an ignorable error', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJobWithIgnorableError', currentStep: 0 }); | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
@@ -192,2 +218,3 @@ jobs: ['failingJobWithIgnorableError'], | ||
| it('should set program to error if job fails with non retriable error', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJob', currentStep: 0 }); | ||
| try { | ||
@@ -207,2 +234,3 @@ await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| it('should increment try count if job fails with retriable error', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJobWithRetryableError', currentStep: 0 }); | ||
| try { | ||
@@ -221,2 +249,3 @@ await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| it('should update error message only if job fails with retriable error', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'failingJobWithRetryableError', currentStep: 0 }); | ||
| try { | ||
@@ -239,2 +268,3 @@ await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
| it('should requeue with the next program', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 }); | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
@@ -245,3 +275,4 @@ jobs: ['currentJob', 'nextJob'], | ||
| hostname: 'yolo.myshopify.com' | ||
| } | ||
| }, | ||
| runId: '1' | ||
| }); | ||
@@ -254,3 +285,4 @@ | ||
| hostname: 'yolo.myshopify.com' | ||
| } | ||
| }, | ||
| runId: '1' | ||
| }); | ||
@@ -260,2 +292,3 @@ }); | ||
| it('should call increment on requeue', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 }); | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
@@ -267,6 +300,7 @@ jobs: ['currentJob', 'nextJob'], | ||
| expect(mockProgramHandler.incrementStep).toHaveBeenCalledWith('1'); | ||
| expect(mockProgramHandler.incrementStep).toHaveBeenCalledWith('1', 0); | ||
| }); | ||
| it('should not requeue when it was the last job', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 }); | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
@@ -282,2 +316,3 @@ jobs: ['currentJob'], | ||
| it('should set program to finished when it was the last job', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 }); | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
@@ -293,2 +328,3 @@ jobs: ['currentJob'], | ||
| it('should not set job to finished when it was not the last job', async function () { | ||
| getCurrentJobStub.mockReturnValue({ currentJobInDb: 'currentJob', currentStep: 0 }); | ||
| await ProgramExecutorProcessor.create(programHandler, queueManager, jobLibrary).process({ | ||
@@ -295,0 +331,0 @@ jobs: ['currentJob', 'nextJob'], |
@@ -29,2 +29,10 @@ 'use strict'; | ||
| async getCurrentJob(runId) { | ||
| const program = await this._programsRepository.getProgramByRunId(runId); | ||
| return { | ||
| currentJobInDb: program.jobs[program.step], | ||
| currentStep: program.step | ||
| } | ||
| } | ||
| finishProgram(runId) { | ||
@@ -47,4 +55,4 @@ return this._programsRepository.finishProgram(runId); | ||
| incrementStep(runId) { | ||
| return this._programsRepository.incrementStep(runId); | ||
| incrementStep(runId, currentStep) { | ||
| return this._programsRepository.incrementStep(runId, currentStep); | ||
| } | ||
@@ -51,0 +59,0 @@ |
@@ -145,2 +145,14 @@ 'use strict'; | ||
| describe('#getCurrentJob', function () { | ||
| it('should get the current job of the program', async function () { | ||
| const programHandler = ProgramHandler.create(programsRepository, queueManager); | ||
| const runId = await programHandler.createProgram({ | ||
| jobs: ['current_program', 'next_program'] | ||
| }); | ||
| const currentState = await programHandler.getCurrentJob(runId); | ||
| expect(currentState.currentJobInDb).toEqual('current_program'); | ||
| }); | ||
| }); | ||
| describe('#setProgramToError', function () { | ||
@@ -147,0 +159,0 @@ it('should set job to errored in db', async function () { |
@@ -24,3 +24,3 @@ 'use strict'; | ||
| await pubsub.insert(queueData); | ||
| await pubsub.insert(queueData, queueData.runId); | ||
@@ -27,0 +27,0 @@ } catch (error) { |
@@ -19,3 +19,3 @@ 'use strict'; | ||
| const queueManager = new QueueManager(testTopicName, testProjectId, testGcpKeyFileName); | ||
| const queueData = { test_data: 123 }; | ||
| const queueData = { test_data: 123, runId: 123 }; | ||
@@ -27,3 +27,3 @@ await queueManager.queueProgram(queueData); | ||
| ); | ||
| expect(insertMock).toHaveBeenCalledWith(queueData); | ||
| expect(insertMock).toHaveBeenCalledWith(queueData, queueData.runId); | ||
| }); | ||
@@ -30,0 +30,0 @@ |
@@ -42,5 +42,13 @@ 'use strict'; | ||
| incrementStep(runId) { | ||
| incrementStep(runId, currentStep) { | ||
| const conditions = { | ||
| run_id: runId | ||
| }; | ||
| if (currentStep !== undefined) { | ||
| conditions.step = currentStep | ||
| } | ||
| return this._db(this._tableName) | ||
| .where({ run_id: runId }) | ||
| .where(conditions) | ||
| .update({ | ||
@@ -47,0 +55,0 @@ updated_at: new Date(), |
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 7 instances in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 7 instances in 1 package
112456
4.51%2484
4.28%Updated