harcon-amqp
Advanced tools
Comparing version 0.5.0 to 0.5.5
@@ -0,1 +1,3 @@ | ||
'use strict'; | ||
var rabbit = require('rabbit.js'); | ||
@@ -58,2 +60,3 @@ var Harcon = require('harcon'); | ||
self.quiteMode = self.socketType === 'PUBSUB'; | ||
self.timeout = config.timeout || 0; | ||
self.ctx = rabbit.createContext( self.connectURL ); | ||
@@ -69,4 +72,23 @@ self.ctx.on('ready', function() { | ||
self.ctx.on('error', self.logger.error ); | ||
if( self.timeout > 0 ){ | ||
self.cleaner = setInterval( function(){ | ||
self.cleanupMessages(); | ||
}, self.timeout ); | ||
} | ||
}; | ||
amqpbarrel.cleanupMessages = function(){ | ||
var self = this; | ||
var time = Date.now(); | ||
for ( let key of Object.keys( self.messages ) ){ | ||
if( time - self.messages[key].timestamp > self.timeout ){ | ||
var callbackFn = self.messages[key].callback; | ||
delete self.messages[ key ]; | ||
callbackFn( new Error('Response timeout') ); | ||
} | ||
} | ||
}; | ||
amqpbarrel.newDivision = function( division, callback ){ | ||
@@ -101,3 +123,3 @@ if( this.outs[division] ){ | ||
if( self.messages[ comm.id ] ){ | ||
realComm.callback = self.messages[ comm.id ]; | ||
realComm.callback = self.messages[ comm.id ].callback; | ||
delete self.messages[ comm.id ]; | ||
@@ -136,6 +158,6 @@ } | ||
if( self.messages[ comm.id ] ) | ||
self.logger.harconlog( new Error('Duplicate message delivery!'), comm.id ); | ||
return self.logger.harconlog( new Error('Duplicate message delivery!'), comm.id ); | ||
if( comm.callback ) | ||
self.messages[ comm.id ] = comm.callback; | ||
self.messages[ comm.id ] = { callback: comm.callback, timestamp: Date.now() }; | ||
var packet = JSON.stringify( { id: comm.id, comm: comm, callback: !!comm.callback } ); | ||
@@ -146,2 +168,4 @@ self.outs[ comm.division ].write(packet, 'utf8'); | ||
amqpbarrel.extendedClose = function( callback ){ | ||
if( this.cleaner ) | ||
clearInterval( this.cleaner ); | ||
if( this.ctx ) | ||
@@ -148,0 +172,0 @@ this.ctx.close( callback ); |
{ | ||
"name": "harcon-amqp", | ||
"version": "0.5.0", | ||
"version": "0.5.5", | ||
"description": "AMQP plugin for the harcon messaging/service bus of node-based enterprise entities.", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -6,7 +6,8 @@ # harcon-amqp | ||
Note: In PUB/SUB model, [harcon](https://github.com/imrefazekas/harcon) won't send back notification about possible misaddressing... | ||
## Installation | ||
```javascript | ||
npm install harcon harcon-amqp --save | ||
``` | ||
@@ -22,3 +23,4 @@ | ||
connectURL: 'amqp://localhost', | ||
socketType: 'PUBSUB' // 'PUSHPULL' to be used for PUSH/PULL socket type | ||
socketType: 'PUBSUB', // 'PUSHPULL' to be used for PUSH/PULL socket type | ||
timeout: 0 | ||
}; | ||
@@ -28,1 +30,3 @@ var harcon = new Harcon( { Barrel: Amqp.Barrel, barrel: amqpConfig }, function(err){ | ||
``` | ||
Should the recipients be not available or fail to meet the timeframe defined by the attribute 'timeout', [harcon](https://github.com/imrefazekas/harcon) will call the callbacks of the given messages with an error. |
39762
22
638
30