@emartech/program-executor
Advanced tools
+1
-1
@@ -45,3 +45,3 @@ { | ||
| }, | ||
| "version": "1.2.0" | ||
| "version": "1.3.0" | ||
| } |
+27
-1
| 'use strict'; | ||
| const consumer = require('@emartech/rabbitmq-client').Consumer; | ||
| const ProgramHandler = require('./program-handler'); | ||
@@ -19,2 +21,3 @@ const ProgramsRepository = require('./repositories/programs'); | ||
| this._queueManager = QueueManager.create(config.amqpUrl, config.queueName); | ||
| this._programHandler = ProgramHandler.create(this._programsRepository, this._queueManager); | ||
| } | ||
@@ -29,5 +32,28 @@ | ||
| createProgram(data) { | ||
| return ProgramHandler.create(this._programsRepository, this._queueManager).createProgram(data); | ||
| return this._programHandler.createProgram(data); | ||
| } | ||
| processPrograms(jobLibrary) { | ||
| const programExecutorProcessor = require('./program-executor-processor').create( | ||
| this._programsRepository, | ||
| this._queueManager, | ||
| jobLibrary | ||
| ); | ||
| consumer | ||
| .create( | ||
| { default: { url: this._config.amqpUrl } }, | ||
| { | ||
| logger: `${this._config.queueName}-consumer`, | ||
| channel: this._config.queueName, | ||
| prefetchCount: 1, | ||
| retryTime: 60000, | ||
| onMessage: async message => { | ||
| await programExecutorProcessor.process(message); | ||
| } | ||
| } | ||
| ) | ||
| .process(); | ||
| } | ||
| static create(config) { | ||
@@ -34,0 +60,0 @@ return new ProgramExecutor(config); |
+68
-0
@@ -7,3 +7,10 @@ 'use strict'; | ||
| const QueueManager = require('./queue-manager'); | ||
| const ProgramExecutorProcessor = require('./program-executor-processor'); | ||
| const Consumer = require('@emartech/rabbitmq-client').Consumer; | ||
| const testJobLibrary = { | ||
| firstJob: {}, | ||
| secondJob: {} | ||
| }; | ||
| describe('ProgramExecutor', function() { | ||
@@ -25,2 +32,8 @@ let config; | ||
| this.sandbox.spy(QueueManager, 'create'); | ||
| this.sandbox.spy(ProgramExecutorProcessor, 'create'); | ||
| this.sandbox.stub(ProgramExecutorProcessor.prototype, 'process'); | ||
| this.sandbox.spy(Consumer, 'create'); | ||
| this.sandbox.stub(Consumer.prototype, 'process'); | ||
| }); | ||
@@ -45,2 +58,57 @@ | ||
| }); | ||
| describe('#processPrograms', async function() { | ||
| it('should create program executor processor with job library', async function() { | ||
| await ProgramExecutor.create(config).processPrograms(testJobLibrary); | ||
| expect(ProgramExecutorProcessor.create).to.have.been.calledWith( | ||
| this.sinon.match.instanceOf(ProgramsRepository), | ||
| this.sinon.match.instanceOf(QueueManager), | ||
| testJobLibrary | ||
| ); | ||
| expect(ProgramsRepository.create).to.have.been.calledWith(config.knex, config.tableName); | ||
| expect(QueueManager.create).to.have.been.calledWith(config.amqpUrl, config.queueName); | ||
| }); | ||
| it('should create consumer with the given rabbitMq config', async function() { | ||
| await ProgramExecutor.create(config).processPrograms(testJobLibrary); | ||
| expect(Consumer.create.lastCall.args[0]).to.eql({ default: { url: config.amqpUrl } }); | ||
| }); | ||
| it('should create consumer to consume the given queue', async function() { | ||
| await ProgramExecutor.create(config).processPrograms(testJobLibrary); | ||
| expect(Consumer.create.lastCall.args[1]).to.containSubset({ channel: config.queueName }); | ||
| }); | ||
| it('should create consumer with a logger based on the given queue name', async function() { | ||
| await ProgramExecutor.create(config).processPrograms(testJobLibrary); | ||
| expect(Consumer.create.lastCall.args[1]).to.containSubset({ logger: `${config.queueName}-consumer` }); | ||
| }); | ||
| it('should config consumer with prefecth count and a retry time', async function() { | ||
| await ProgramExecutor.create(config).processPrograms(testJobLibrary); | ||
| expect(Consumer.create.lastCall.args[1]).to.containSubset({ prefetchCount: 1, retryTime: 60000 }); | ||
| }); | ||
| it('should call the created executor when message callback fires', async function() { | ||
| await ProgramExecutor.create(config).processPrograms(testJobLibrary); | ||
| const onMessageFunction = Consumer.create.lastCall.args[1].onMessage; | ||
| await onMessageFunction({ random: 'message' }); | ||
| expect(ProgramExecutorProcessor.prototype.process).to.have.been.calledWith({ random: 'message' }); | ||
| }); | ||
| it('should start processing', async function() { | ||
| await ProgramExecutor.create(config).processPrograms(testJobLibrary); | ||
| expect(Consumer.prototype.process).to.have.been.called; | ||
| }); | ||
| }); | ||
| }); |
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
46916
8%1094
6.94%