harcon-amqp
Advanced tools
Comparing version 2.0.5 to 2.1.0
@@ -23,2 +23,3 @@ 'use strict' | ||
let comm = JSON.parse( message ) | ||
let reComm = Communication.importCommunication( comm.comm ) | ||
@@ -28,2 +29,3 @@ let reResComm = comm.response ? (comm.responseComms.length > 0 ? Communication.importCommunication( comm.responseComms[0] ) : reComm.twist( self.systemFirestarter.name, comm.err ) ) : null | ||
let interested = (!reResComm && self.matching( reComm ).length !== 0) || (reResComm && self.matchingResponse( reResComm ).length !== 0) | ||
// console.log('<<<<<<<<<<<', division, entityName, interested, comm ) | ||
if ( !interested ) return false | ||
@@ -33,2 +35,3 @@ self.innerProcessAmqp( comm ) | ||
// console.log('<><><>', division, entityName ) | ||
socket.connect( division, entityName + '.*', function ( ) { | ||
@@ -72,3 +75,2 @@ self.logger.harconlog( null, 'AMQP SUBSCRIBE socket is made.', { division: division, entity: entityName }, 'info' ) | ||
self.expiration = config.expiration || 0 | ||
self.prefetch = config.prefetch || 0 | ||
self.ctx = rabbit.createContext( self.connectURL ) | ||
@@ -105,4 +107,2 @@ self.ctx.on('ready', function () { | ||
amqpbarrel.newDivision = function ( division, callback ) { | ||
@@ -138,5 +138,7 @@ if ( this.outs[division] ) return callback() | ||
let realComm = Communication.importCommunication( comm.comm ) | ||
if ( !comm.response ) { | ||
// console.log( comm.callback ) | ||
if ( comm.callback ) | ||
realComm.callback = function ( ) { } | ||
realComm.callback = function () { } | ||
self.logger.harconlog( null, 'Request received from bus...', realComm, 'silly' ) | ||
@@ -158,12 +160,13 @@ self.parentIntoxicate( realComm ) | ||
let self = this | ||
if ( !comm.expose && self.isSystemEvent( comm.event ) ) return this.parentAppease( comm, err, responseComms ) | ||
if ( !comm.expose && self.isSystemEvent( comm.event ) ) | ||
return this.parentAppease( comm, err, responseComms ) | ||
let entityName = comm.event.substring(0, comm.event.indexOf('.') ) | ||
let packet = JSON.stringify( { id: comm.id, comm: comm, err: err ? err.message : null, response: true, responseComms: responseComms || [] } ) | ||
if ( !self.outs[ comm.division ] ) | ||
return self.logger.harconlog( new Error('Division is not ready yet...', comm.division) ) | ||
let entityName = comm.source // event.substring(0, comm.event.indexOf('.') ) | ||
let packet = JSON.stringify( { id: comm.id, comm: comm, err: err ? err.message : null, response: true, responseComms: responseComms || [] } ) | ||
self.logger.harconlog( null, 'Appeasing...', {comm: comm, err: err ? err.message : null, responseComms: responseComms}, 'silly' ) | ||
self.outs[ comm.division ].publish( entityName + '.1', packet, 'utf8') | ||
self.outs[ comm.sourceDivision ].publish( entityName + '.1', packet, 'utf8') | ||
} | ||
@@ -184,4 +187,7 @@ | ||
// console.log( '\n\n', comm.event, self.messages ) | ||
if ( comm.callback ) | ||
self.messages[ comm.id ] = { callback: comm.callback, timestamp: Date.now() } | ||
// console.log( '\n\n', comm.event, comm.division, self.messages ) | ||
let entityName = comm.event.substring(0, comm.event.indexOf('.') ) | ||
@@ -188,0 +194,0 @@ let packet = JSON.stringify( { id: comm.id, comm: comm, callback: !!comm.callback } ) |
{ | ||
"name": "harcon-amqp", | ||
"version": "2.0.5", | ||
"version": "2.1.0", | ||
"description": "AMQP plugin for the harcon messaging/service bus of node-based enterprise entities.", | ||
@@ -27,3 +27,3 @@ "keywords": [ | ||
"async": "^1.5.2", | ||
"harcon": "^3.6.5", | ||
"harcon": "^3.8.0", | ||
"rabbit.js": "^0.4.4" | ||
@@ -30,0 +30,0 @@ }, |
'use strict' | ||
let CleanTester = require('./CleanTest') | ||
var chai = require('chai') | ||
var should = chai.should() | ||
var expect = chai.expect | ||
describe('harcon-amqp', function () { | ||
var path = require('path') | ||
// Requires harcon. In your app the form 'require('harcon')' should be used | ||
var Harcon = require('harcon') | ||
let Amqp = require('../lib/Amqp') | ||
var Logger = require('./WinstonLogger') | ||
var Publisher = require('./Publisher') | ||
var Clerobee = require('clerobee') | ||
var clerobee = new Clerobee(16) | ||
var harconName = 'HarconTopic' | ||
describe('harcon', function () { | ||
var inflicter | ||
before(function (done) { | ||
CleanTester.init( function () { | ||
CleanTester.activatePublisher( function () { | ||
CleanTester.addVivian( done ) | ||
var logger = Logger.createWinstonLogger( { file: 'mochatest.log', level: 'debug' } ) | ||
// Initializes the Harcon system | ||
// also initialize the deployer component which will automaticall publish every component found in folder './test/components' | ||
inflicter = new Harcon( { | ||
name: harconName, | ||
Barrel: Amqp.Barrel, | ||
logger: logger, idLength: 32, | ||
blower: { commTimeout: 2000, tolerates: ['Alizee.superFlegme'] }, | ||
Marie: {greetings: 'Hi!'} | ||
}, function (err) { | ||
if (err) return done(err) | ||
inflicter.addicts( Publisher, function (err, res) { | ||
if (err) return done(err) | ||
Publisher.watch( path.join( process.cwd(), 'test', 'components' ) ) | ||
// Publishes an event listener function: Peter. It just sends a simple greetings in return | ||
inflicter.addict( null, 'peter', 'greet.*', function (greetings1, greetings2, callback) { | ||
callback(null, 'Hi there!') | ||
} ) | ||
// Publishes another function listening all messages which name starts with 'greet'. It just sends a simple greetings in return | ||
inflicter.addict( null, 'walter', 'greet.*', function (greetings1, greetings2, callback) { | ||
callback(null, 'My pleasure!') | ||
} ) | ||
done() | ||
} ) | ||
@@ -16,34 +59,207 @@ } ) | ||
describe('Test Harcon status calls', function () { | ||
it('Patient...', function (done) { | ||
CleanTester.checkHealth( done ) | ||
it('Retrieve divisions...', function (done) { | ||
setTimeout( function () { | ||
inflicter.divisions().then( function (divisions) { | ||
expect( divisions ).to.eql( [ harconName, harconName + '.click' ] ) | ||
done() | ||
} ).catch(function (error) { | ||
done(error) | ||
}) | ||
}, 500 ) | ||
}) | ||
}) | ||
describe('Test Amqp calls', function () { | ||
it('Simple message', function (done) { | ||
CleanTester.checkVivian( done ) | ||
it('Retrieve listeners...', function (done) { | ||
inflicter.listeners( function (err, listeners) { | ||
expect( listeners ).to.eql( [ 'Inflicter', 'Publisher', 'peter', 'walter', 'Alizee', 'Domina', 'Julie', 'Claire', 'Marie' ] ) | ||
done(err) | ||
} ) | ||
}) | ||
/* | ||
it('Send for divisions...', function (done) { | ||
inflicter.ignite( clerobee.generate(), '', 'Inflicter.divisions', function (err, res) { | ||
should.not.exist(err) | ||
should.exist(res) | ||
expect( res[0] ).to.include( harconName, harconName + '.click' ) | ||
done() | ||
} ) | ||
}) | ||
*/ | ||
it('Clean internals', function (done) { | ||
inflicter.pendingComms( function (err, comms) { | ||
comms.forEach( function (comm) { | ||
expect( Object.keys(comm) ).to.have.lengthOf( 0 ) | ||
} ) | ||
done(err) | ||
} ) | ||
}) | ||
}) | ||
describe('Harcon workflow over AMQP', function () { | ||
describe('Harcon workflow', function () { | ||
it('Simple greetings by name is', function (done) { | ||
CleanTester.checkMarie( done ) | ||
// Sending a greetings message with 2 parameters and waiting for the proper answer | ||
inflicter.ignite( clerobee.generate(), '', 'Marie.simple', 'whatsup?', 'how do you do?', function (err, res) { | ||
should.not.exist(err) | ||
should.exist(res) | ||
expect( res ).to.include( 'Bonjour!' ) | ||
done( ) | ||
} ) | ||
}) | ||
it('Simple greetings is', function (done) { | ||
CleanTester.checkGreetings( done ) | ||
// Sending a greetings message with 2 parameters and waiting for the proper answer | ||
inflicter.ignite( clerobee.generate(), '', 'greet.simple', 'whatsup?', 'how do you do?', function (err, res) { | ||
// console.log( err, res ) | ||
should.not.exist(err) | ||
should.exist(res) | ||
expect( res ).to.include( 'Hi there!' ) | ||
expect( res ).to.include( 'My pleasure!' ) | ||
expect( res ).to.include( 'Bonjour!' ) | ||
done( ) | ||
} ) | ||
}) | ||
it('Morning greetings is', function (done) { | ||
CleanTester.checkMorningGreetings( done ) | ||
// Sending a morning message and waiting for the proper answer | ||
inflicter.ignite( clerobee.generate(), '', 'morning.wakeup', function (err, res) { | ||
// console.log( err, res ) | ||
expect(err).to.be.a('null') | ||
expect(res[0]).to.eql( [ 'Hi there!', 'My pleasure!' ] ) | ||
done( ) | ||
} ) | ||
}) | ||
it('Domina forcing is', function (done) { | ||
CleanTester.checkDomina( done ) | ||
it('General dormir', function (done) { | ||
inflicter.ignite( clerobee.generate(), '', 'morning.dormir', function (err, res) { | ||
// console.log( err, res ) | ||
expect(err).to.be.a('null') | ||
expect(res).to.eql( [ 'Non, non, non!', 'Non, Mais non!' ] ) | ||
done( ) | ||
} ) | ||
}) | ||
it('Specific dormir', function (done) { | ||
inflicter.ignite( clerobee.generate(), '', 'morning.girls.dormir', function (err, res) { | ||
// console.log( err, res ) | ||
expect(err).to.be.a('null') | ||
expect(res).to.eql( [ 'Non, non, non!', 'Non, Mais non!' ] ) | ||
done( ) | ||
} ) | ||
}) | ||
it('No answer', function (done) { | ||
// Sending a morning message and waiting for the proper answer | ||
this.timeout(5000) | ||
inflicter.ignite( clerobee.generate(), '', 'cave.echo', function (err, res) { | ||
expect(err).to.be.an.instanceof( Error ) | ||
expect(res).to.be.a('null') | ||
done() | ||
} ) | ||
}) | ||
it('Timeout test', function (done) { | ||
this.timeout(5000) | ||
inflicter.simpleIgnite( 'Alizee.flegme', function (err, res) { | ||
expect(err).to.be.an.instanceof( Error ) | ||
expect(res).to.be.a('null') | ||
done( ) | ||
} ) | ||
}) | ||
it('Tolerated messages test', function (done) { | ||
this.timeout(5000) | ||
inflicter.simpleIgnite( 'Alizee.superFlegme', function (err, res) { | ||
expect(err).to.be.a('null') | ||
expect(res).to.eql( [ 'Quoi???' ] ) | ||
done( err ) | ||
} ) | ||
}) | ||
it('Division Promise test', function (done) { | ||
inflicter.ignite( clerobee.generate(), harconName + '.click', 'greet.simple', 'Hi', 'Ca vas?' ) | ||
.then( function ( res ) { | ||
should.exist(res) | ||
expect( res ).to.include( 'Hi there!' ) | ||
expect( res ).to.include( 'My pleasure!' ) | ||
expect( res ).to.include( 'Bonjour!' ) | ||
expect( res ).to.include( 'Pas du tout!' ) | ||
done() | ||
}) | ||
.catch( function ( reason ) { | ||
done( reason ) | ||
} ) | ||
}) | ||
it('Division test', function (done) { | ||
// Sending a morning message and waiting for the proper answer | ||
inflicter.ignite( clerobee.generate(), harconName + '.click', 'greet.simple', 'Hi', 'Ca vas?', function (err, res) { | ||
// console.log( err, res ) | ||
should.not.exist(err) | ||
should.exist(res) | ||
expect( res ).to.include( 'Hi there!' ) | ||
expect( res ).to.include( 'My pleasure!' ) | ||
expect( res ).to.include( 'Bonjour!' ) | ||
expect( res ).to.include( 'Pas du tout!' ) | ||
done( ) | ||
} ) | ||
}) | ||
it('Domina', function (done) { | ||
// Sending a morning message and waiting for the proper answer | ||
inflicter.simpleIgnite( 'Domina.force', function (err, res) { | ||
should.not.exist(err) | ||
should.exist(res) | ||
expect( res[0][0] ).to.eql( [ 'Non, Mais non!' ] ) | ||
expect( res[0][1] ).to.eql( [ 'Hi there!', 'My pleasure!' ] ) | ||
expect( res[0][2] ).to.eql( [ 'Pas du tout!' ] ) | ||
done( ) | ||
} ) | ||
}) | ||
it('Deactivate', function (done) { | ||
// Sending a morning message and waiting for the proper answer | ||
inflicter.deactivate('Claire') | ||
inflicter.ignite( clerobee.generate(), harconName + '.click', 'greet.simple', 'Hi', 'Ca vas?', function (err, res) { | ||
// console.log( err, res ) | ||
should.not.exist(err) | ||
should.exist(res) | ||
expect( res ).to.not.include( 'Pas du tout!' ) | ||
done( ) | ||
} ) | ||
}) | ||
}) | ||
describe('Post health tests', function () { | ||
it('Clean internals', function (done) { | ||
inflicter.pendingComms( function (err, comms) { | ||
comms.forEach( function (comm) { | ||
expect( Object.keys(comm) ).to.have.lengthOf( 0 ) | ||
} ) | ||
done(err) | ||
} ) | ||
}) | ||
}) | ||
after(function (done) { | ||
CleanTester.close( done ) | ||
// Shuts down Harcon when it is not needed anymore | ||
inflicter.close() | ||
done() | ||
}) | ||
}) |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
module.exports = { | ||
@@ -8,3 +6,13 @@ name: 'Alizee', | ||
callback( null, 'Non, non, non!' ) | ||
}, | ||
flegme: function ( callback ) { | ||
setTimeout( function () { | ||
callback( null, 'Quoi?') | ||
}, 3000 ) | ||
}, | ||
superFlegme: function ( callback ) { | ||
setTimeout( function () { | ||
callback( null, 'Quoi???') | ||
}, 4500 ) | ||
} | ||
} |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
module.exports = { | ||
@@ -4,0 +2,0 @@ name: 'Claire', |
@@ -1,5 +0,3 @@ | ||
'use strict' | ||
var async = require('async') | ||
let async = require('async') | ||
module.exports = { | ||
@@ -10,8 +8,12 @@ name: 'Domina', | ||
force: function ( ignite, callback ) { | ||
var self = this | ||
async.series([ | ||
function (cb) { | ||
ignite( 0, '', 'greet.simple', 'It is morning!', 'Time to wake up!', cb ) | ||
ignite( 0, '', 'Julie.dormir', cb ) | ||
}, | ||
function (cb) { | ||
ignite( 1, 'Inflicter.click', 'Claire.simple', 'It is morning!', 'Time to wake up!', cb ) | ||
ignite( 0, '', 'greet.gentle', 'It is morning!', 'Time to wake up!', cb ) | ||
}, | ||
function (cb) { | ||
ignite( 1, self.division + '.click', 'Claire.simple', 'It is morning!', 'Time to wake up!', cb ) | ||
} | ||
@@ -18,0 +20,0 @@ ], callback ) |
@@ -1,10 +0,8 @@ | ||
'use strict' | ||
module.exports = { | ||
name: 'Julie', | ||
context: 'dawn', | ||
context: 'morning', | ||
// When Julie is woken up, send a gentle message to everyone listening to such messages... Walter and Pater namely | ||
wakeup: function ( ignite, callback ) { | ||
this.harconlog( null, 'Simple logging test', {}, 'warn' ) | ||
ignite( 'greet.simple', 'It is morning!', 'Time to wake up!', function (err, res) { | ||
this.harconlog( null, 'Simple logging test', {}, 'info' ) | ||
ignite( 'greet.gentle', 'It is morning!', 'Time to wake up!', function (err, res) { | ||
callback(err, res) | ||
@@ -11,0 +9,0 @@ } ) |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
module.exports = { | ||
@@ -4,0 +2,0 @@ name: 'Marie', |
@@ -13,9 +13,27 @@ 'use strict' | ||
name: 'Publisher', | ||
context: 'Inflicter', | ||
files: [], | ||
init: function (options) { | ||
this.configs = {} | ||
if ( !this.configs ) | ||
this.configs = {} | ||
if ( !this.globalConfig ) | ||
this.globalConfig = {} | ||
this.watchMonitors = [] | ||
return this | ||
}, | ||
addGlobalConfig: function ( config ) { | ||
this.init() | ||
this.globalConfig = config | ||
return this | ||
}, | ||
addConfig: function ( name, config ) { | ||
this.init() | ||
this.configs[name] = config | ||
return this | ||
}, | ||
@@ -26,6 +44,10 @@ scheduleFile: function ( folder, fileName ) { | ||
this.files.push( path ) | ||
return this | ||
}, | ||
igniteFiles: function ( ) { | ||
let self = this | ||
self.files.forEach( function (newFile) { | ||
let newFiles = self.files.slice() | ||
self.files.length = 0 | ||
newFiles.forEach( function (newFile) { | ||
let fn = function (err, res) { | ||
@@ -38,9 +60,11 @@ if ( err ) { | ||
if ( fs.existsSync( newFile ) ) { | ||
console.log( newFile ) | ||
let component = require( newFile.substring( 0, newFile.length - 3 ) ) | ||
if ( !component.adequate || component.adequate() ) | ||
self.ignite( 'Inflicter.addicts', component, self.configs[component.name], fn ) | ||
if ( !component.name ) return | ||
if ( !component.adequate || component.adequate() ) { | ||
self.ignite( 'Inflicter.addicts', component, self.configs[component.name] || self.globalConfig[component.name], fn ) | ||
} | ||
} else | ||
self.ignite( 'Inflicter.detracts', path.basename( newFile, '.js'), fn ) | ||
} ) | ||
self.files = [] | ||
}, | ||
@@ -50,11 +74,6 @@ readFiles: function ( folder, matcher, callback ) { | ||
fs.readdir(folder, function (err, files) { | ||
if (err) | ||
console.error( err ) | ||
else { | ||
for (let i = 0; i < files.length; i += 1) { | ||
if ( matcher(files[i]) ) { | ||
self.scheduleFile( folder, files[i] ) | ||
} | ||
} | ||
} | ||
if (err) return callback( err ) | ||
for (let i = 0; i < files.length; i += 1) | ||
if ( matcher(files[i]) ) | ||
self.scheduleFile( folder, files[i] ) | ||
if ( callback ) | ||
@@ -64,3 +83,3 @@ callback() | ||
}, | ||
watch: function ( folder, timeout, pattern ) { | ||
watch: function ( folder, pattern, timeout, callback ) { | ||
let self = this | ||
@@ -72,4 +91,2 @@ let extension = '.js' | ||
folder = path.resolve( folder ) | ||
if ( !fs.existsSync( folder ) ) | ||
@@ -83,5 +100,11 @@ mkdirp.sync( folder ) | ||
} | ||
self.readFiles( folder, matcher, function () { | ||
if ( timeout > 0 && !self.intervalObject ) | ||
self.intervalObject = setInterval( function () { self.igniteFiles( ) }, timeout ) | ||
self.readFiles( folder, matcher, function (err) { | ||
if (err) return callback(err) | ||
watch.createMonitor( folder, function (monitor) { | ||
self.monitor = monitor | ||
self.watchMonitors.push( monitor ) | ||
let handler = function (f, stat) { | ||
@@ -95,11 +118,14 @@ if ( isComponent( f, stat ) ) | ||
}) | ||
if ( timeout && timeout > 0 ) | ||
self.intervalObject = setInterval( function () { self.igniteFiles( ) }, timeout ) | ||
else | ||
self.igniteFiles( ) | ||
self.igniteFiles( ) | ||
if ( callback ) | ||
callback() | ||
}) | ||
}, | ||
close: function ( callback ) { | ||
if ( this.monitor ) | ||
this.monitor.stop() | ||
this.watchMonitors.forEach( function ( monitor ) { | ||
monitor.stop() | ||
} ) | ||
this.watchMonitors.length = 0 | ||
@@ -106,0 +132,0 @@ if ( this.intervalObject ) |
@@ -1,5 +0,3 @@ | ||
'use strict' | ||
var winston = require('winston') | ||
let winston = require('winston') | ||
exports.createWinstonLogger = function ( options ) { | ||
@@ -15,3 +13,3 @@ options = options || {} | ||
winston.handleExceptions( new (winston.transports.Console)({ level: 'error', colorize: 'true' }) ) | ||
let transports = [ | ||
var transports = [ | ||
new (winston.transports.Console)({ level: 'error', colorize: 'true' }), | ||
@@ -18,0 +16,0 @@ new (winston.transports.File)( { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
30459
801
18
Updatedharcon@^3.8.0