Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

harcon-amqp

Package Overview
Dependencies
Maintainers
1
Versions
197
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

harcon-amqp - npm Package Compare versions

Comparing version 0.1.0 to 0.2.0

test/cleanRunner.js

83

lib/AmqpBarrel.js
var rabbit = require('rabbit.js');
var Harcon = require('harcon');

@@ -11,3 +10,3 @@ var Barrel = Harcon.Barrel;

amqpbarrel.extendedInit = function( systemName, config, callback ){
amqpbarrel.extendedInit = function( config, callback ){
var self = this;

@@ -17,15 +16,26 @@

self.ctx = rabbit.createContext( config.connectURL || 'amqp://localhost' );
self.pushs = {};
self.pull = self.ctx.socket('PULL');
self.pull.setEncoding('utf8');
console.log('>>>>>>>>>>>', config.division, systemName);
self.pull.connect( config.division || systemName, function() {
self.pull.on('readable', self.processAmqp );
var connectURL = config.connectURL || 'amqp://localhost';
var handlerFn = self.processAmqp.bind(self);
self.ctx = rabbit.createContext( connectURL );
self.ctx.on('ready', function() {
self.logger.harconlog( null, 'AMQP connection is made.', connectURL, 'info' );
self.pushs = {};
self.pull = self.ctx.socket('PULL');
self.pull.setEncoding('utf8');
self.pull.on('readable', handlerFn );
self.pull.connect( self.division, function( ) {
self.logger.harconlog( null, 'AMQP pull queue is made.', self.division, 'info' );
self.newDivision( systemName, callback );
if( callback )
callback( );
} );
self.pull.on('error', self.logger.error );
} );
self.ctx.on('error', self.logger.error );
};
amqpbarrel.newDivision = function( division, callback ){
if( this.pushs[division] ){
return callback ? callback() : division;
}
var self = this;

@@ -35,6 +45,10 @@ var push = self.ctx.socket('PUSH');

push.connect( division, function() {
self.logger.harconlog( null, 'AMQP push queue is made.', division, 'info' );
self.pushs[division] = push;
if( callback )
callback();
} );
push.on('error', self.logger.error );
};

@@ -44,12 +58,23 @@

var self = this;
if( self.messages[ comm.id ] ){
comm.comm.callback = self.messages[ comm.comm.id ];
delete self.messages[ comm.id ];
self.logger.harconlog( null, 'Received from bus...', comm, 'silly' );
var realComm = Communication.importCommunication( comm.comm );
if( !comm.response ){
if( comm.callback )
realComm.callback = function(err, res){ };
self.logger.harconlog( null, 'Request received from bus...', realComm, 'silly' );
self.parentIntoxicate( realComm );
} else {
if( self.messages[ comm.id ] ){
realComm.callback = self.messages[ comm.id ];
delete self.messages[ comm.id ];
}
var responses = comm.responseComms.map(function(c){ return Communication.importCommunication( c ); });
self.parentAppease( realComm, comm.err ? new Error(comm.err) : null, responses );
}
if( comm.response ){
self.parentAppease( Communication.importCommunication( comm.comm ), comm.err, comm.responseComms.map(function(c){ return Communication.importCommunication( c ); }) );
} else{
self.parentIntoxicate( Communication.importCommunication( comm.comm ) );
}
};
amqpbarrel.processAmqp = function( message ){

@@ -69,6 +94,6 @@ var self = this;

if( comm.callback )
self.messages[ comm.id ] = comm.callback;
var packet = JSON.stringify( { id: comm.id, comm: comm, err: err, response: true, responseComms: responseComms || [] } );
var packet = JSON.stringify( { id: comm.id, comm: comm, err: err ? err.message : null, response: true, responseComms: responseComms || [] } );
self.logger.harconlog( null, 'Appeasing...', {comm: comm, err: err ? err.message : null, responseComms: responseComms}, 'silly' );
self.pushs[ comm.division ].write(packet, 'utf8');

@@ -82,18 +107,18 @@ };

self.logger.harconlog( null, 'Intoxicating to bus...', comm, 'silly' );
if( self.messages[ comm.id ] )
self.logger.harconlog( new Error('Duplicate message delivery!'), comm.id );
if( comm.callback )
self.messages[ comm.id ] = comm.callback;
var packet = JSON.stringify( { id: comm.id, comm: comm } );
var packet = JSON.stringify( { id: comm.id, comm: comm, callback: !!comm.callback } );
self.pushs[ comm.division ].write(packet, 'utf8');
};
amqpbarrel.extendedClose = function( ){
amqpbarrel.extendedClose = function( callback ){
if( this.ctx )
this.ctx.close();
if( this.pull )
this.pull.close();
if( this.pushs )
for( var division in this.pushs )
this.pushs[ division ].close();
this.ctx.close( callback );
};
module.exports = AmqpBarrel;
{
"name": "harcon-amqp",
"version": "0.1.0",
"version": "0.2.0",
"description": "ZeroMQ plugin for the harcon messaging/service bus of node-based enterprise entities.",

@@ -29,3 +29,3 @@ "keywords": [

"dependencies": {
"harcon": ">=1.4.2",
"harcon": "~2",
"rabbit.js": "^0.4.2"

@@ -41,2 +41,4 @@ },

"gulp-util": "~3",
"mkdirp": "latest",
"watch": "latest",
"winston": "latest"

@@ -43,0 +45,0 @@ },

@@ -1,48 +0,103 @@

var chai = require('chai'),
should = chai.should(),
expect = chai.expect;
var CleanTester = require('./CleanTest');
var Harcon = require('harcon');
var Amqp = require('../lib/Amqp');
var Logger = require('./WinstonLogger');
var logger = Logger.createWinstonLogger( { console: true } );
var harcon, Julie;
describe("harcon-amqp", function () {
before(function(done){
harcon = new Harcon( { Barrel: Amqp.Barrel, logger: logger, idLength: 32, marie: {greetings: 'Hi!'} }, function(){
Julie = {
name: 'Julie',
context: 'morning',
wakeup: function( greetings, ignite, callback ){
callback( null, 'Thanks. ' + greetings );
}
};
harcon.addicts( Julie );
done();
CleanTester.init( function(){
CleanTester.addVivian( done );
} );
});
describe("Test Harcon status calls", function () {
it('Patient...', function(done){
CleanTester.checkHealth( done );
});
});
describe("Test Amqp calls", function () {
it('Simple message', function(done){
console.log( harcon.listeners(), harcon.divisions() );
console.error( 'Sending...' );
harcon.simpleIgnite( 'Julie.wakeup', 'whatsup?', function(err, res){
CleanTester.checkVivian( done );
});
});
describe("Harcon workflow over AMQP", function () {
it('Simple greetings by name is', function(done){
CleanTester.checkMarie( done );
});
it('Simple greetings is', function(done){
CleanTester.checkGreetings( done );
});
it('Morning greetings is', function(done){
CleanTester.checkMorningGreetings( done );
});
/*
it('General dormir', function(done){
harcon.ignite( '0', '', 'morning.dormir', function(err, res){
//console.log( err, res );
expect(err).to.be.a('null');
expect(res).to.eql( [ 'Non, non, non!', 'Non, Mais non!' ] );
done( );
} );
});
it('Specific dormir', function(done){
harcon.ignite( '0', '', 'morning.girls.dormir', function(err, res){
//console.log( err, res );
expect(err).to.be.a('null');
expect(res).to.eql( [ 'Non, non, non!', 'Non, Mais non!' ] );
done( );
} );
});
it('No answer', function(done){
// Sending a morning message and waiting for the proper answer
harcon.ignite( '0', '', 'cave.echo', function(err, res){
//console.log( err, res );
expect(err).to.be.an.instanceof( Error );
expect(res).to.be.a('null');
done( );
} );
});
it('Division test', function(done){
// Sending a morning message and waiting for the proper answer
harcon.ignite( '0', 'Inflicter.click', 'greet.simple', 'Hi', 'Ca vas?', function(err, res){
//console.log( err, res );
should.not.exist(err); should.exist(res);
expect( res ).to.include( 'Thanks. whatsup?' );
expect( res ).to.include( 'Hi there!' );
expect( res ).to.include( 'My pleasure!' );
expect( res ).to.include( 'Bonjour!' );
expect( res ).to.include( 'Pas du tout!' );
done( );
} );
});
it('Deactivate', function(done){
// Sending a morning message and waiting for the proper answer
harcon.deactivate('Claire');
harcon.ignite( '0', 'click', 'greet.simple', 'Hi', 'Ca vas?', function(err, res){
//console.log( err, res );
should.not.exist(err); should.exist(res);
expect( res ).to.not.include( 'Pas du tout!' );
done( );
} );
});
*/
});
after(function(done){
if( harcon )
harcon.close();
done();
CleanTester.close( done );
});
});

@@ -6,3 +6,3 @@ var winston = require('winston');

if( options.console ){
return new (winston.Logger)({ transports: [ new (winston.transports.Console)({ /*level: 'debug',*/ colorize: 'true' }) ] });
return new (winston.Logger)({ transports: [ new (winston.transports.Console)({ level: options.level || 'debug', colorize: 'true' }) ] });
}

@@ -24,2 +24,2 @@

return new (winston.Logger)({ transports: transports });
};
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc