harcon-amqp
Advanced tools
Comparing version 0.4.1 to 0.5.0
@@ -11,13 +11,13 @@ var rabbit = require('rabbit.js'); | ||
amqpbarrel.createPull = function( division, callback ){ | ||
amqpbarrel.createIn = function( division, callback ){ | ||
var self = this; | ||
var pull = self.ctx.socket('PULL'); | ||
var socket = self.ctx.socket( (self.socketType === 'PUBSUB') ? 'SUBSCRIBE' : 'PULL' ); | ||
self.pulls[division] = pull; | ||
self.ins[division] = socket; | ||
pull.setEncoding('utf8'); | ||
pull.on('readable', function( message ){ | ||
socket.setEncoding('utf8'); | ||
socket.on('readable', function( message ){ | ||
var msg; | ||
while( (msg = self.pulls[division].read()) ) { | ||
while( (msg = self.ins[division].read()) ) { | ||
var comm = JSON.parse( msg ); | ||
@@ -27,4 +27,4 @@ self.innerProcessAmqp( comm ); | ||
} ); | ||
pull.connect( division, function( ) { | ||
self.logger.harconlog( null, 'AMQP pull queue is made.', division, 'info' ); | ||
socket.connect( division, function( ) { | ||
self.logger.harconlog( null, 'AMQP subscribe socket is made.', division, 'info' ); | ||
@@ -34,13 +34,13 @@ if( callback ) | ||
} ); | ||
pull.on('error', self.logger.error ); | ||
socket.on('error', self.logger.error ); | ||
}; | ||
amqpbarrel.createPush = function( division, callback ){ | ||
amqpbarrel.createOut = function( division, callback ){ | ||
var self = this; | ||
var push = self.ctx.socket('PUSH'); | ||
push.setDefaultEncoding('utf8'); | ||
push.connect( division, function() { | ||
self.logger.harconlog( null, 'AMQP push queue is made.', division, 'info' ); | ||
var socket = self.ctx.socket( (self.socketType === 'PUBSUB') ? 'PUBLISH' : 'PUSH' ); | ||
socket.setDefaultEncoding('utf8'); | ||
socket.connect( division, function() { | ||
self.logger.harconlog( null, 'AMQP publish socket is made.', division, 'info' ); | ||
self.pushs[division] = push; | ||
self.outs[division] = socket; | ||
@@ -50,3 +50,3 @@ if( callback ) | ||
} ); | ||
push.on('error', self.logger.error ); | ||
socket.on('error', self.logger.error ); | ||
}; | ||
@@ -59,8 +59,10 @@ | ||
var connectURL = config.connectURL || 'amqp://localhost'; | ||
self.ctx = rabbit.createContext( connectURL ); | ||
self.connectURL = config.connectURL || 'amqp://localhost'; | ||
self.socketType = config.socketType || 'PUBSUB'; | ||
self.quiteMode = self.socketType === 'PUBSUB'; | ||
self.ctx = rabbit.createContext( self.connectURL ); | ||
self.ctx.on('ready', function() { | ||
self.logger.harconlog( null, 'AMQP connection is made.', connectURL, 'info' ); | ||
self.pushs = {}; | ||
self.pulls = {}; | ||
self.logger.harconlog( null, 'AMQP connection is made.', self.connectURL, 'info' ); | ||
self.outs = {}; | ||
self.ins = {}; | ||
@@ -74,3 +76,3 @@ if( callback ) | ||
amqpbarrel.newDivision = function( division, callback ){ | ||
if( this.pushs[division] ){ | ||
if( this.outs[division] ){ | ||
return callback ? callback() : division; | ||
@@ -81,4 +83,4 @@ } | ||
async.series( [ | ||
function(cb){ self.createPull( division, cb ); }, | ||
function(cb){ self.createPush( division, cb ); } | ||
function(cb){ self.createIn( division, cb ); }, | ||
function(cb){ self.createOut( division, cb ); } | ||
], callback || function(err){ | ||
@@ -120,7 +122,7 @@ if(err) | ||
if( !self.pushs[ comm.division ] ) | ||
if( !self.outs[ comm.division ] ) | ||
return self.logger.harconlog( new Error('Division is not ready yet...', comm.division) ); | ||
self.logger.harconlog( null, 'Appeasing...', {comm: comm, err: err ? err.message : null, responseComms: responseComms}, 'silly' ); | ||
self.pushs[ comm.division ].write(packet, 'utf8'); | ||
self.outs[ comm.division ].write(packet, 'utf8'); | ||
}; | ||
@@ -133,3 +135,3 @@ | ||
if( !self.pushs[ comm.division ] ) | ||
if( !self.outs[ comm.division ] ) | ||
return self.logger.harconlog( new Error('Division is not ready yet...', comm.division) ); | ||
@@ -145,3 +147,3 @@ | ||
var packet = JSON.stringify( { id: comm.id, comm: comm, callback: !!comm.callback } ); | ||
self.pushs[ comm.division ].write(packet, 'utf8'); | ||
self.outs[ comm.division ].write(packet, 'utf8'); | ||
}; | ||
@@ -148,0 +150,0 @@ |
{ | ||
"name": "harcon-amqp", | ||
"version": "0.4.1", | ||
"version": "0.5.0", | ||
"description": "AMQP plugin for the harcon messaging/service bus of node-based enterprise entities.", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
# harcon-amqp | ||
AMQP plugin fpr harcon | ||
AMQP transport layer ("Barrel") plugin for [harcon](https://github.com/imrefazekas/harcon). | ||
Both PUB/SUB and PUSH/PULL socket types are supported. See socket types explained [here](http://www.squaremobius.net/rabbit.js/). | ||
Note: In PUB/SUB model, [harcon](https://github.com/imrefazekas/harcon) won't send back notification about possible misaddressing... | ||
## Installation | ||
npm install harcon harcon-amqp --save | ||
## Usage | ||
```javascript | ||
var Harcon = require('harcon'); | ||
var Amqp = require('harcon-amqp'); | ||
var amqpConfig = { | ||
connectURL: 'amqp://localhost', | ||
socketType: 'PUBSUB' // 'PUSHPULL' to be used for PUSH/PULL socket type | ||
}; | ||
var harcon = new Harcon( { Barrel: Amqp.Barrel, barrel: amqpConfig }, function(err){ | ||
} ); | ||
``` |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
38421
21
599
26