New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

harcon-amqp

Package Overview
Dependencies
Maintainers
1
Versions
197
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

harcon-amqp - npm Package Compare versions

Comparing version 1.3.0 to 2.0.0

test/AMQPQueueTest.js

19

Gulpfile.js
var gulp = require('gulp'),
plugins = require('gulp-load-plugins')( { scope: ['devDependencies'] } )
var runSequence = require('run-sequence')
gulp.task( 'eslint', function (callback) {

@@ -14,19 +11,5 @@ return gulp.src( './lib/*.js' )

gulp.task( 'mochaTestWorker', function (callback) {
process.env.SOCKET_TYPE = 'PUSHWORKER'
gulp.task( 'mochaTest', function (callback) {
return gulp.src( './test/amqp.mocha.js' ).pipe( plugins.mocha({reporter: 'nyan'}) )
} )
gulp.task( 'mochaTestSub', function (callback) {
process.env.SOCKET_TYPE = 'PUBSUB'
return gulp.src( './test/amqp.mocha.js' ).pipe( plugins.mocha({reporter: 'nyan'}) )
} )
gulp.task( 'mochaTestPull', function (callback) {
process.env.SOCKET_TYPE = 'PUSHPULL'
return gulp.src( './test/amqp.mocha.js' ).pipe( plugins.mocha({reporter: 'nyan'}) )
} )
gulp.task( 'mochaTest', function (callback) {
runSequence(
'mochaTestWorker', 'mochaTestSub', 'mochaTestPull', callback
)
} )

@@ -33,0 +16,0 @@ gulp.task( 'doc', function (callback) {

'use strict'
var rabbit = require('rabbit.js')
var Harcon = require('harcon')
var async = require('async')
var Barrel = Harcon.Barrel
var Communication = Harcon.Communication
let async = require('async')
let rabbit = require('rabbit.js')
let Harcon = require('harcon')
let Barrel = Harcon.Barrel
let Communication = Harcon.Communication
function AmqpBarrel ( ) { }
AmqpBarrel.prototype = new Barrel()
var amqpbarrel = AmqpBarrel.prototype
let amqpbarrel = AmqpBarrel.prototype
function socketOutContext ( ctx, socketType, options ) {
switch ( socketType ) {
case 'PUBSUB': return ctx.socket( 'PUBLISH' )
case 'PUSHWORKER': return ctx.socket( 'PUSH' )
case 'PUSHPULL': return ctx.socket( 'PUSH' )
}
}
function socketInContext ( ctx, socketType, options ) {
switch ( socketType ) {
case 'PUBSUB': return ctx.socket( 'SUBSCRIBE' )
case 'PUSHWORKER': return ctx.socket( 'WORKER', { prefetch: options.prefetch } )
case 'PUSHPULL': return ctx.socket( 'PULL' )
}
}
amqpbarrel.createIn = function ( division, entityName, callback ) {
let self = this
amqpbarrel.createIn = function ( division, callback ) {
var self = this
let socket = self.ctx.socket( 'SUBSCRIBE', { routing: 'topic' } )
var socket = socketInContext( self.ctx, self.socketType, self )
self.ins[division][entityName] = socket
self.ins[division] = socket
socket.setEncoding('utf8')
socket.on('data', function ( message ) {
var comm = JSON.parse( message )
let comm = JSON.parse( message )
let reComm = Communication.importCommunication( comm.comm )
let reResComm = comm.response ? (comm.responseComms.length > 0 ? Communication.importCommunication( comm.responseComms[0] ) : reComm.twist( self.systemFirestarter.name, comm.err ) ) : null
var reComm = Communication.importCommunication( comm.comm )
var reResComm = comm.response ? (comm.responseComms.length > 0 ? Communication.importCommunication( comm.responseComms[0] ) : reComm.twist( self.systemFirestarter.name, comm.err ) ) : null
var interested = (!reResComm && self.matching( reComm ).length !== 0) || (reResComm && self.matchingResponse( reResComm ).length !== 0)
if ( self.expAck )
socket.ack()
let interested = (!reResComm && self.matching( reComm ).length !== 0) || (reResComm && self.matchingResponse( reResComm ).length !== 0)
if ( !interested ) return false
self.innerProcessAmqp( comm )
} )
/*
socket.on('readable', function ( message ) {
var msg
while( (msg = self.ins[division].read()) ) {
var comm = JSON.parse( msg )
self.innerProcessAmqp( comm )
}
} )
*/
socket.connect( division, function ( ) {
self.logger.harconlog( null, 'AMQP ' + self.socketType + ' in socket is made.', division, 'info' )
if ( callback )
callback( )
socket.connect( division, entityName + '.*', function ( ) {
self.logger.harconlog( null, 'AMQP SUBSCRIBE socket is made.', { division: division, entity: entityName }, 'info' )
callback( )
} )
socket.on('error', self.logger.error )
}
amqpbarrel.createOut = function ( division, callback ) {
var self = this
let self = this
var socket = socketOutContext( self.ctx, self.socketType )
let socket = self.ctx.socket( 'PUBLISH', { routing: 'topic' } )
socket.setDefaultEncoding('utf8')

@@ -76,3 +49,3 @@ if ( self.expiration )

socket.connect( division, function () {
self.logger.harconlog( null, 'AMQP ' + self.socketType + ' out socket is made.', division, 'info' )
self.logger.harconlog( null, 'AMQP PUBLISH socket is made.', division, 'info' )

@@ -88,3 +61,3 @@ self.outs[division] = socket

amqpbarrel.extendedInit = function ( config, callback ) {
var self = this
let self = this

@@ -94,4 +67,3 @@ self.messages = {}

self.connectURL = config.connectURL || 'amqp://localhost'
self.socketType = config.socketType || 'PUBSUB' // PUSHWORKER || PUBSUB || PUSHPULL
self.expAck = self.socketType === 'PUSHWORKER'
self.socketType = 'PUBSUB' // PUSHWORKER || PUBSUB || PUSHPULL
self.quiteMode = self.socketType === 'PUBSUB'

@@ -120,8 +92,8 @@ self.timeout = config.timeout || 0

amqpbarrel.cleanupMessages = function () {
var self = this
let self = this
var time = Date.now()
let time = Date.now()
for ( let key of Object.keys( self.messages ) ) {
if ( time - self.messages[key].timestamp > self.timeout ) {
var callbackFn = self.messages[key].callback
let callbackFn = self.messages[key].callback
delete self.messages[ key ]

@@ -133,23 +105,34 @@ callbackFn( new Error('Response timeout') )

amqpbarrel.newDivision = function ( division, callback ) {
if ( this.outs[division] ) {
return callback ? callback() : division
}
var self = this
if ( this.outs[division] ) return callback()
this.createOut( division, callback )
}
amqpbarrel.removeEntity = function ( division, context, name, callback) {
callback()
}
amqpbarrel.newEntity = function ( division, context, name, callback) {
if ( this.ins[division] && this.ins[division][name] ) return callback()
async.series( [
function (cb) { self.createIn( division, cb ) },
function (cb) { self.createOut( division, cb ) }
], callback || function (err) {
if (err)
console.error( err )
} )
if ( !this.ins[division] ) this.ins[division] = []
let self = this
let fns = []
if (context)
fns.push(function (cb) {
self.createIn( division, context, cb )
})
fns.push(function (cb) {
self.createIn( division, name, cb )
})
async.series( fns, callback )
}
amqpbarrel.innerProcessAmqp = function ( comm ) {
var self = this
let self = this
self.logger.harconlog( null, 'Received from bus...', comm, 'silly' )
var realComm = Communication.importCommunication( comm.comm )
let realComm = Communication.importCommunication( comm.comm )
if ( !comm.response ) {

@@ -165,3 +148,3 @@ if ( comm.callback )

}
var responses = comm.responseComms.map(function (c) { return Communication.importCommunication( c ) })
let responses = comm.responseComms.map(function (c) { return Communication.importCommunication( c ) })

@@ -174,6 +157,7 @@ self.parentAppease( realComm, comm.err ? new Error(comm.err) : null, responses )

amqpbarrel.appease = function ( comm, err, responseComms ) {
var self = this
let self = this
if ( !comm.expose && self.isSystemEvent( comm.event ) ) return this.parentAppease( comm, err, responseComms )
var packet = JSON.stringify( { id: comm.id, comm: comm, err: err ? err.message : null, response: true, responseComms: responseComms || [] } )
let entityName = comm.event.substring(0, comm.event.indexOf('.') )
let packet = JSON.stringify( { id: comm.id, comm: comm, err: err ? err.message : null, response: true, responseComms: responseComms || [] } )

@@ -184,3 +168,3 @@ if ( !self.outs[ comm.division ] )

self.logger.harconlog( null, 'Appeasing...', {comm: comm, err: err ? err.message : null, responseComms: responseComms}, 'silly' )
self.outs[ comm.division ].write(packet, 'utf8')
self.outs[ comm.division ].publish( entityName + '.1', packet, 'utf8')
}

@@ -190,3 +174,3 @@

amqpbarrel.intoxicate = function ( comm ) {
var self = this
let self = this
if ( self.isSystemEvent( comm.event ) ) return this.parentIntoxicate( comm )

@@ -204,4 +188,5 @@

self.messages[ comm.id ] = { callback: comm.callback, timestamp: Date.now() }
var packet = JSON.stringify( { id: comm.id, comm: comm, callback: !!comm.callback } )
self.outs[ comm.division ].write(packet, 'utf8')
let entityName = comm.event.substring(0, comm.event.indexOf('.') )
let packet = JSON.stringify( { id: comm.id, comm: comm, callback: !!comm.callback } )
self.outs[ comm.division ].publish( entityName + '.1', packet, 'utf8')
}

@@ -208,0 +193,0 @@

{
"name": "harcon-amqp",
"version": "1.3.0",
"version": "2.0.0",
"description": "AMQP plugin for the harcon messaging/service bus of node-based enterprise entities.",

@@ -27,3 +27,3 @@ "keywords": [

"async": "^1.5.2",
"harcon": "^3.3.0",
"harcon": "^3.6.0",
"rabbit.js": "^0.4.4"

@@ -41,3 +41,2 @@ },

"mkdirp": "^0.5.1",
"run-sequence": "^1.1.5",
"watch": "^0.17.1",

@@ -44,0 +43,0 @@ "winston": "^2.2.0"

@@ -1,7 +0,9 @@

var CleanTester = require('./CleanTest')
'use strict'
let CleanTester = require('./CleanTest')
describe('harcon-amqp', function () {
before(function (done) {
CleanTester.init( process.env.SOCKET_TYPE || 'PUSHWORKER', function () {
CleanTester.init( function () {
CleanTester.activatePublisher( function () {

@@ -8,0 +10,0 @@ CleanTester.addVivian( done )

@@ -1,4 +0,6 @@

var CleanTester = require('./CleanTest')
var async = require('async')
'use strict'
let CleanTester = require('./CleanTest')
let async = require('async')
function error (err) { if (err) {

@@ -11,3 +13,3 @@ console.error(err)

var fnNames = [
let fnNames = [
'init',

@@ -25,3 +27,3 @@ 'activatePublisher',

var tasks = []
let tasks = []
fnNames.forEach(function ( fnName ) {

@@ -28,0 +30,0 @@ tasks.push(function (cb) {

@@ -1,21 +0,23 @@

var chai = require('chai'),
'use strict'
let chai = require('chai'),
should = chai.should(),
expect = chai.expect
var Harcon = require('harcon')
var Amqp = require('../lib/Amqp')
let Harcon = require('harcon')
let Amqp = require('../lib/Amqp')
var Logger = require('./WinstonLogger')
var logger = Logger.createWinstonLogger( { console: true, level: 'debug' } )
let Logger = require('./WinstonLogger')
let logger = Logger.createWinstonLogger( { console: true, level: 'debug' } )
var Publisher = require('./Publisher')
let Publisher = require('./Publisher')
module.exports = {
harcon: null,
init: function ( socketType, callback ) {
var self = this
self.harcon = new Harcon( { socketType: socketType, Barrel: Amqp.Barrel, logger: logger, idLength: 32, marie: {greetings: 'Hi!'} }, callback )
init: function ( callback ) {
let self = this
self.harcon = new Harcon( { Barrel: Amqp.Barrel, logger: logger, idLength: 32, marie: {greetings: 'Hi!'} }, callback )
},
activatePublisher: function ( callback ) {
var self = this
let self = this
self.harcon.addicts( Publisher )

@@ -26,4 +28,4 @@ Publisher.watch( './test/components', -1 )

addVivian: function (callback) {
var self = this
var Vivian = {
let self = this
let Vivian = {
name: 'Vivian',

@@ -40,3 +42,3 @@ context: 'morning',

checkHealth: function ( callback ) {
var self = this
let self = this
setTimeout( function () {

@@ -43,0 +45,0 @@ self.harcon.divisions( function (err, divisions) {

@@ -0,7 +1,9 @@

'use strict'
module.exports = {
name: 'Alizee',
context: 'morning.girls',
dormir: function( ignite, callback ){
callback( null, 'Non, non, non!' );
dormir: function ( ignite, callback ) {
callback( null, 'Non, non, non!' )
}
};
}

@@ -0,1 +1,3 @@

'use strict'
module.exports = {

@@ -6,11 +8,11 @@ name: 'Claire',

init: function (options) {
console.log('Init...', options);
console.log('Init...', options)
},
// Simple service function listening to the greet.usual message where greet comes from context and usual is identified by the name of the fuction.
usual: function (callback) {
callback(null, 'Enchanté, mon plaisir!');
callback(null, 'Enchanté, mon plaisir!')
},
simple: function (greetings1, greetings2, callback) {
callback( null, 'Pas du tout!' );
callback( null, 'Pas du tout!' )
}
};
}

@@ -1,3 +0,5 @@

var async = require('async');
'use strict'
let async = require('async')
module.exports = {

@@ -7,12 +9,12 @@ name: 'Domina',

// When Julie is woken up, send a gentle message to everyone listening to such messages... Walter and Pater namely
force: function( ignite, callback ){
force: function ( ignite, callback ) {
async.series([
function(cb){
ignite( 0, '', 'greet.simple', 'It is morning!', 'Time to wake up!', cb );
function (cb) {
ignite( 0, '', 'greet.simple', 'It is morning!', 'Time to wake up!', cb )
},
function(cb){
ignite( 1, 'Inflicter.click', 'Claire.simple', 'It is morning!', 'Time to wake up!', cb );
function (cb) {
ignite( 1, 'Inflicter.click', 'Claire.simple', 'It is morning!', 'Time to wake up!', cb )
}
], callback );
], callback )
}
};
}

@@ -0,1 +1,3 @@

'use strict'
module.exports = {

@@ -5,11 +7,11 @@ name: 'Julie',

// When Julie is woken up, send a gentle message to everyone listening to such messages... Walter and Pater namely
wakeup: function( ignite, callback ){
this.harconlog( null, 'Simple logging test', {}, 'warn' );
ignite( 'greet.simple', 'It is morning!', 'Time to wake up!', function(err, res){
callback(err, res);
} );
wakeup: function ( ignite, callback ) {
this.harconlog( null, 'Simple logging test', {}, 'warn' )
ignite( 'greet.simple', 'It is morning!', 'Time to wake up!', function (err, res) {
callback(err, res)
} )
},
dormir: function( ignite, callback ){
callback( null, 'Non, Mais non!' );
dormir: function ( ignite, callback ) {
callback( null, 'Non, Mais non!' )
}
};
}

@@ -0,1 +1,3 @@

'use strict'
module.exports = {

@@ -5,10 +7,10 @@ name: 'Marie',

init: function (options) {
console.log('Init...', options);
console.log('Init...', options)
},
// Simple service function listening to the greet.simple message where greet comes from context and simple is identified by the name of the fuction.
simple: function (greetings1, greetings2, callback) {
this.greetings = [greetings1, greetings2];
this.shifted( { data: 'content' } );
callback(null, 'Bonjour!');
this.greetings = [greetings1, greetings2]
this.shifted( { data: 'content' } )
callback(null, 'Bonjour!')
}
};
}

@@ -1,10 +0,10 @@

var _ = require('lodash')
'use strict'
var ctx = require('rabbit.js').createContext('amqp://localhost')
let ctx = require('rabbit.js').createContext('amqp://localhost')
console.log( ctx.close )
/*
var pub = ctx.socket('PUB', {routing: 'topic'})
var sub = ctx.socket('SUB', {routing: 'topic'})
var sub2 = ctx.socket('SUB', {routing: 'topic'})
let pub = ctx.socket('PUB', {routing: 'topic'})
let sub = ctx.socket('SUB', {routing: 'topic'})
let sub2 = ctx.socket('SUB', {routing: 'topic'})
sub.pipe(process.stdout)

@@ -24,5 +24,5 @@ sub2.pipe(process.stdout)

var pull = ctx.socket('PULL')
let pull = ctx.socket('PULL')
function process ( ) {
var msg
let msg
while ( (msg = pull.read()) ) {

@@ -33,3 +33,3 @@ console.log('>>>>>>>>', JSON.parse(msg).username )

var push = ctx.socket('PUSH')
let push = ctx.socket('PUSH')
push.setDefaultEncoding('utf8')

@@ -41,3 +41,2 @@ pull.setEncoding('utf8')

pull.on('readable', process )
console.log( '????????????', _.functions( push ), _.functions( pull ) )
push.write(JSON.stringify({username: 'Fiver'}), 'utf8')

@@ -44,0 +43,0 @@ // push.publish('user.create', JSON.stringify({username: "Fiver"}))

@@ -1,18 +0,20 @@

var Harcon = require('harcon');
var Amqp = require('../../lib/Amqp');
'use strict'
var Logger = require('../WinstonLogger');
var logger = Logger.createWinstonLogger( { console: true, level: 'debug' } );
let Harcon = require('harcon')
let Amqp = require('../../lib/Amqp')
var harcon = new Harcon( { Barrel: Amqp.Barrel, barrel: { socketType: 'PUBSUB' }, logger: logger }, function(err){
if( err ) return console.error( err );
let Logger = require('../WinstonLogger')
let logger = Logger.createWinstonLogger( { console: true, level: 'debug' } )
var Marie = {
let harcon = new Harcon( { Barrel: Amqp.Barrel, barrel: { }, logger: logger }, function (err) {
if ( err ) return console.error( err )
let Marie = {
name: 'Marie',
whiny: function (greetings1, greetings2, greetings3, greetings4, callback) {
console.log( 'Marie is whiny', greetings1, greetings2, greetings3, greetings4 );
callback( null, 'Pas du tout!' );
console.log( 'Marie is whiny', greetings1, greetings2, greetings3, greetings4 )
callback( null, 'Pas du tout!' )
}
};
harcon.addicts( Marie, {}, function(){ } );
} );
}
harcon.addicts( Marie, {}, function () { } )
} )

@@ -1,25 +0,27 @@

var Harcon = require('harcon');
var Amqp = require('../../lib/Amqp');
'use strict'
var Logger = require('../WinstonLogger');
var logger = Logger.createWinstonLogger( { console: true, level: 'debug' } );
let Harcon = require('harcon')
let Amqp = require('../../lib/Amqp')
var harcon = new Harcon( { Barrel: Amqp.Barrel, barrel: { socketType: 'PUBSUB' }, logger: logger }, function(err){
if( err ) return console.error( err );
let Logger = require('../WinstonLogger')
let logger = Logger.createWinstonLogger( { console: true, level: 'debug' } )
var Claire = {
let harcon = new Harcon( { Barrel: Amqp.Barrel, barrel: { }, logger: logger }, function (err) {
if ( err ) return console.error( err )
let Claire = {
name: 'Claire',
simple: function (greetings1, ignite, callback) {
console.log('Claire is simple...');
ignite( 'Marie.whiny', greetings1, 'Bon matin!', 'Bon jour!', 'Bon soleil!', callback );
console.log('Claire is simple...')
ignite( 'Marie.whiny', greetings1, 'Bon matin!', 'Bon jour!', 'Bon soleil!', callback )
}
};
harcon.addicts( Claire, {}, function(){ } );
}
harcon.addicts( Claire, {}, function () { } )
setTimeout( function(){
console.log('>>>>>> Sending ...');
harcon.simpleIgnite( 'Claire.simple', 'Hallo!', function(err, res){
console.log('::::::', err, res);
} );
}, 5000 );
} );
setTimeout( function () {
console.log('>>>>>> Sending ...')
harcon.simpleIgnite( 'Claire.simple', 'Hallo!', function (err, res) {
console.log('::::::', err, res)
} )
}, 5000 )
} )

@@ -1,9 +0,11 @@

var fs = require('fs')
var watch = require('watch')
var mkdirp = require('mkdirp')
'use strict'
var path = require('path')
let fs = require('fs')
let watch = require('watch')
let mkdirp = require('mkdirp')
var events = ['created', 'removed', 'changed']
let path = require('path')
let events = ['created', 'removed', 'changed']
module.exports = {

@@ -20,3 +22,3 @@ name: 'Publisher',

scheduleFile: function ( folder, fileName ) {
var path = folder ? folder + '/' + fileName : fileName
let path = folder ? folder + '/' + fileName : fileName
if ( this.files.indexOf( path ) === -1 )

@@ -26,5 +28,5 @@ this.files.push( path )

igniteFiles: function ( ) {
var self = this
let self = this
self.files.forEach( function (newFile) {
var fn = function (err, res) {
let fn = function (err, res) {
if ( err ) {

@@ -36,3 +38,3 @@ console.error( err, newFile )

if ( fs.existsSync( newFile ) ) {
var component = require( newFile.substring( 0, newFile.length - 3 ) )
let component = require( newFile.substring( 0, newFile.length - 3 ) )
if ( !component.adequate || component.adequate() )

@@ -46,3 +48,3 @@ self.ignite( 'Inflicter.addicts', component, self.configs[component.name], fn )

readFiles: function ( folder, matcher, callback ) {
var self = this
let self = this
fs.readdir(folder, function (err, files) {

@@ -52,3 +54,3 @@ if (err)

else {
for (var i = 0; i < files.length; i += 1) {
for (let i = 0; i < files.length; i += 1) {
if ( matcher(files[i]) ) {

@@ -64,5 +66,5 @@ self.scheduleFile( folder, files[i] )

watch: function ( folder, timeout, pattern ) {
var self = this
var extension = '.js'
var matcher = function (filePath) { return pattern ? pattern.test(filePath) : filePath.endsWith( extension ) }
let self = this
let extension = '.js'
let matcher = function (filePath) { return pattern ? pattern.test(filePath) : filePath.endsWith( extension ) }

@@ -78,3 +80,3 @@ self.close()

var isComponent = function (filePath, stat) {
let isComponent = function (filePath, stat) {
return !stat.isDirectory() && matcher(filePath)

@@ -85,3 +87,3 @@ }

self.monitor = monitor
var handler = function (f, stat) {
let handler = function (f, stat) {
if ( isComponent( f, stat ) )

@@ -88,0 +90,0 @@ self.scheduleFile( null, f )

@@ -1,11 +0,13 @@

var Harcon = require('harcon')
var Amqp = require('../lib/Amqp')
'use strict'
var Logger = require('./WinstonLogger')
var logger = Logger.createWinstonLogger( { console: true, level: 'debug' } )
let Harcon = require('harcon')
let Amqp = require('../lib/Amqp')
var harcon = new Harcon( { Barrel: Amqp.Barrel, barrel: { timeout: 1000 }, logger: logger }, function (err) {
let Logger = require('./WinstonLogger')
let logger = Logger.createWinstonLogger( { console: true, level: 'debug' } )
let harcon = new Harcon( { Barrel: Amqp.Barrel, barrel: { timeout: 1000 }, logger: logger }, function (err) {
if ( err ) return console.error( err )
var Marie = {
let Marie = {
name: 'Marie',

@@ -12,0 +14,0 @@ greet: function ( cb ) {

@@ -1,3 +0,5 @@

var winston = require('winston')
'use strict'
let winston = require('winston')
exports.createWinstonLogger = function ( options ) {

@@ -13,3 +15,3 @@ options = options || {}

winston.handleExceptions( new (winston.transports.Console)({ level: 'error', colorize: 'true' }) )
var transports = [
let transports = [
new (winston.transports.Console)({ level: 'error', colorize: 'true' }),

@@ -16,0 +18,0 @@ new (winston.transports.File)( {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc