Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

@emartech/program-executor

Package Overview
Dependencies
Maintainers
95
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@emartech/program-executor - npm Package Compare versions

Comparing version
1.2.0
to
1.3.0
+1
-1
package.json

@@ -45,3 +45,3 @@ {

},
"version": "1.2.0"
"version": "1.3.0"
}
'use strict';
const consumer = require('@emartech/rabbitmq-client').Consumer;
const ProgramHandler = require('./program-handler');

@@ -19,2 +21,3 @@ const ProgramsRepository = require('./repositories/programs');

this._queueManager = QueueManager.create(config.amqpUrl, config.queueName);
this._programHandler = ProgramHandler.create(this._programsRepository, this._queueManager);
}

@@ -29,5 +32,28 @@

createProgram(data) {
return ProgramHandler.create(this._programsRepository, this._queueManager).createProgram(data);
return this._programHandler.createProgram(data);
}
processPrograms(jobLibrary) {
const programExecutorProcessor = require('./program-executor-processor').create(
this._programsRepository,
this._queueManager,
jobLibrary
);
consumer
.create(
{ default: { url: this._config.amqpUrl } },
{
logger: `${this._config.queueName}-consumer`,
channel: this._config.queueName,
prefetchCount: 1,
retryTime: 60000,
onMessage: async message => {
await programExecutorProcessor.process(message);
}
}
)
.process();
}
static create(config) {

@@ -34,0 +60,0 @@ return new ProgramExecutor(config);

@@ -7,3 +7,10 @@ 'use strict';

const QueueManager = require('./queue-manager');
const ProgramExecutorProcessor = require('./program-executor-processor');
const Consumer = require('@emartech/rabbitmq-client').Consumer;
const testJobLibrary = {
firstJob: {},
secondJob: {}
};
describe('ProgramExecutor', function() {

@@ -25,2 +32,8 @@ let config;

this.sandbox.spy(QueueManager, 'create');
this.sandbox.spy(ProgramExecutorProcessor, 'create');
this.sandbox.stub(ProgramExecutorProcessor.prototype, 'process');
this.sandbox.spy(Consumer, 'create');
this.sandbox.stub(Consumer.prototype, 'process');
});

@@ -45,2 +58,57 @@

});
describe('#processPrograms', async function() {
it('should create program executor processor with job library', async function() {
await ProgramExecutor.create(config).processPrograms(testJobLibrary);
expect(ProgramExecutorProcessor.create).to.have.been.calledWith(
this.sinon.match.instanceOf(ProgramsRepository),
this.sinon.match.instanceOf(QueueManager),
testJobLibrary
);
expect(ProgramsRepository.create).to.have.been.calledWith(config.knex, config.tableName);
expect(QueueManager.create).to.have.been.calledWith(config.amqpUrl, config.queueName);
});
it('should create consumer with the given rabbitMq config', async function() {
await ProgramExecutor.create(config).processPrograms(testJobLibrary);
expect(Consumer.create.lastCall.args[0]).to.eql({ default: { url: config.amqpUrl } });
});
it('should create consumer to consume the given queue', async function() {
await ProgramExecutor.create(config).processPrograms(testJobLibrary);
expect(Consumer.create.lastCall.args[1]).to.containSubset({ channel: config.queueName });
});
it('should create consumer with a logger based on the given queue name', async function() {
await ProgramExecutor.create(config).processPrograms(testJobLibrary);
expect(Consumer.create.lastCall.args[1]).to.containSubset({ logger: `${config.queueName}-consumer` });
});
it('should config consumer with prefecth count and a retry time', async function() {
await ProgramExecutor.create(config).processPrograms(testJobLibrary);
expect(Consumer.create.lastCall.args[1]).to.containSubset({ prefetchCount: 1, retryTime: 60000 });
});
it('should call the created executor when message callback fires', async function() {
await ProgramExecutor.create(config).processPrograms(testJobLibrary);
const onMessageFunction = Consumer.create.lastCall.args[1].onMessage;
await onMessageFunction({ random: 'message' });
expect(ProgramExecutorProcessor.prototype.process).to.have.been.calledWith({ random: 'message' });
});
it('should start processing', async function() {
await ProgramExecutor.create(config).processPrograms(testJobLibrary);
expect(Consumer.prototype.process).to.have.been.called;
});
});
});