mototaxi
Advanced tools
Comparing version 1.1.55 to 1.1.56
@@ -12,2 +12,3 @@ import { IEventEmitter } from '../IEventEmitter'; | ||
private listeners; | ||
private polling; | ||
constructor(sqs: AWS.SQS, config: ISqsConfig, logger?: ILogger); | ||
@@ -17,5 +18,6 @@ emit(transactionId: string, payload: any): void; | ||
removeListener(listener: any): void; | ||
private subscribeToRelevantTransactions(transactionId, source, action); | ||
private startPolling(); | ||
private subscribeToRelevantTransactions(source); | ||
private removeFromEventQueue(receiptHandle); | ||
private log(message); | ||
} |
@@ -11,2 +11,3 @@ 'use strict'; | ||
this.listeners = {}; | ||
this.polling = false; | ||
} | ||
@@ -31,7 +32,11 @@ AwsEventEmitter.prototype.emit = function (transactionId, payload) { | ||
AwsEventEmitter.prototype.addListener = function (transactionId, action) { | ||
this.listeners[transactionId] = action; | ||
this.startPolling(); | ||
}; | ||
AwsEventEmitter.prototype.removeListener = function (listener) { | ||
// nothing | ||
}; | ||
AwsEventEmitter.prototype.startPolling = function () { | ||
var _this = this; | ||
if (this.listeners[transactionId]) { | ||
throw new Error("Listener already started for " + transactionId + "."); | ||
} | ||
var execute = function () { | ||
var pollOnce = function () { | ||
var incomingParams = { | ||
@@ -47,14 +52,14 @@ QueueUrl: _this.config.eventQueueUrl, | ||
var source = Rx.Observable.from(eventQueueData.Messages || []); | ||
_this.subscribeToRelevantTransactions(transactionId, source, action); | ||
_this.subscribeToRelevantTransactions(source); | ||
}); | ||
}; | ||
execute(); | ||
this.listeners[transactionId] = setInterval(function () { | ||
execute(); | ||
}, this.config.pollingInterval || 5000); | ||
pollOnce(); | ||
if (!this.polling) { | ||
setInterval(function () { | ||
_this.polling = true; | ||
pollOnce(); | ||
}, this.config.pollingInterval || 5000); | ||
} | ||
}; | ||
AwsEventEmitter.prototype.removeListener = function (listener) { | ||
// not used | ||
}; | ||
AwsEventEmitter.prototype.subscribeToRelevantTransactions = function (transactionId, source, action) { | ||
AwsEventEmitter.prototype.subscribeToRelevantTransactions = function (source) { | ||
var _this = this; | ||
@@ -69,10 +74,11 @@ source | ||
.filter(function (message) { | ||
return message.transaction.transactionId === transactionId; | ||
return message.transaction.transactionId; | ||
}) | ||
.subscribe(function (message) { | ||
_this.log("AwsEventEmitter: Data received from event queue: " + transactionId); | ||
_this.log("AwsEventEmitter: Data received from event queue: " + message.transaction.transactionId); | ||
try { | ||
var action = _this.listeners[message.transaction.transactionId]; | ||
action(message.transaction.payload); | ||
_this.removeFromEventQueue(message.receiptHandle); | ||
clearInterval(_this.listeners[transactionId]); | ||
delete _this.listeners[message.transaction.transactionId]; | ||
} | ||
@@ -79,0 +85,0 @@ catch (err) { |
@@ -12,2 +12,3 @@ import { IEventEmitter } from '../IEventEmitter'; | ||
private listeners; | ||
private polling; | ||
constructor(sqs: AWS.SQS, config: ISqsConfig, logger?: ILogger); | ||
@@ -17,5 +18,6 @@ emit(transactionId: string, payload: any): void; | ||
removeListener(listener: any): void; | ||
private subscribeToRelevantTransactions(transactionId, source, action); | ||
private startPolling(); | ||
private subscribeToRelevantTransactions(source); | ||
private removeFromEventQueue(receiptHandle); | ||
private log(message); | ||
} |
@@ -12,2 +12,3 @@ 'use strict'; | ||
this.listeners = {}; | ||
this.polling = false; | ||
} | ||
@@ -32,7 +33,11 @@ AwsEventEmitter.prototype.emit = function (transactionId, payload) { | ||
AwsEventEmitter.prototype.addListener = function (transactionId, action) { | ||
this.listeners[transactionId] = action; | ||
this.startPolling(); | ||
}; | ||
AwsEventEmitter.prototype.removeListener = function (listener) { | ||
// nothing | ||
}; | ||
AwsEventEmitter.prototype.startPolling = function () { | ||
var _this = this; | ||
if (this.listeners[transactionId]) { | ||
throw new Error("Listener already started for " + transactionId + "."); | ||
} | ||
var execute = function () { | ||
var pollOnce = function () { | ||
var incomingParams = { | ||
@@ -48,14 +53,14 @@ QueueUrl: _this.config.eventQueueUrl, | ||
var source = Rx.Observable.from(eventQueueData.Messages || []); | ||
_this.subscribeToRelevantTransactions(transactionId, source, action); | ||
_this.subscribeToRelevantTransactions(source); | ||
}); | ||
}; | ||
execute(); | ||
this.listeners[transactionId] = setInterval(function () { | ||
execute(); | ||
}, this.config.pollingInterval || 5000); | ||
pollOnce(); | ||
if (!this.polling) { | ||
setInterval(function () { | ||
_this.polling = true; | ||
pollOnce(); | ||
}, this.config.pollingInterval || 5000); | ||
} | ||
}; | ||
AwsEventEmitter.prototype.removeListener = function (listener) { | ||
// not used | ||
}; | ||
AwsEventEmitter.prototype.subscribeToRelevantTransactions = function (transactionId, source, action) { | ||
AwsEventEmitter.prototype.subscribeToRelevantTransactions = function (source) { | ||
var _this = this; | ||
@@ -70,10 +75,11 @@ source | ||
.filter(function (message) { | ||
return message.transaction.transactionId === transactionId; | ||
return message.transaction.transactionId; | ||
}) | ||
.subscribe(function (message) { | ||
_this.log("AwsEventEmitter: Data received from event queue: " + transactionId); | ||
_this.log("AwsEventEmitter: Data received from event queue: " + message.transaction.transactionId); | ||
try { | ||
var action = _this.listeners[message.transaction.transactionId]; | ||
action(message.transaction.payload); | ||
_this.removeFromEventQueue(message.receiptHandle); | ||
clearInterval(_this.listeners[transactionId]); | ||
delete _this.listeners[message.transaction.transactionId]; | ||
} | ||
@@ -80,0 +86,0 @@ catch (err) { |
{ | ||
"name": "mototaxi", | ||
"version": "1.1.55", | ||
"version": "1.1.56", | ||
"description": "A support library for commands and handlers in existing or 'could be' CQRS systems.", | ||
@@ -5,0 +5,0 @@ "author": "Byron Sommardahl <byron@acklenavenue.com>", |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
1160329
1348