@cheep/transport-rabbitmq
Advanced tools
Comparing version 1.0.5 to 1.1.0
{ | ||
"name": "@cheep/transport-rabbitmq", | ||
"version": "1.0.5", | ||
"version": "1.1.0", | ||
"contributors": [ | ||
@@ -25,3 +25,3 @@ "Ezeki", | ||
"amqplib": "0.7.1", | ||
"@cheep/transport": "1.0.4" | ||
"@cheep/transport": "1.1.0" | ||
}, | ||
@@ -28,0 +28,0 @@ "main": "src/index.js", |
@@ -1,2 +0,2 @@ | ||
import { SendMessageProps, SendReplyMessageProps, TransportBase, TransportOptions, TransportUtils } from '@cheep/transport'; | ||
import { FailedMessage, SendMessageProps, SendReplyMessageProps, TransportBase, TransportOptions, TransportUtils } from '@cheep/transport'; | ||
export declare class RabbitMQTransport extends TransportBase { | ||
@@ -15,2 +15,3 @@ protected options: TransportOptions & { | ||
private bindingSetup; | ||
private failedMessagesSetup; | ||
private i; | ||
@@ -39,4 +40,5 @@ constructor(options: TransportOptions & { | ||
dispose(): Promise<void>; | ||
subscribeFailedMessages(action: (failedMessage: FailedMessage) => Promise<void> | void): Promise<void>; | ||
protected sendMessage(props: SendMessageProps): Promise<void>; | ||
protected sendReplyMessage(props: SendReplyMessageProps): Promise<void>; | ||
} |
@@ -139,2 +139,9 @@ "use strict"; | ||
}); | ||
if (this.failedMessagesSetup) { | ||
yield this.channel.removeSetup(this.failedMessagesSetup, (c, cb) => { | ||
c.removeAllListeners(); | ||
cb(null); | ||
}); | ||
this.failedMessagesSetup = null; | ||
} | ||
yield _super.stop.call(this); | ||
@@ -155,2 +162,41 @@ }); | ||
} | ||
subscribeFailedMessages(action) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this.failedMessagesSetup = (x) => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
x.consume(this.options.failedMessagesQueueName, (msg) => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
var _a, _b; | ||
if (!msg) { | ||
return; | ||
} | ||
const route = (_b = (_a = msg.properties) === null || _a === void 0 ? void 0 : _a.headers) === null || _b === void 0 ? void 0 : _b.route; | ||
const correlationId = msg.properties.correlationId; | ||
const replyTo = msg.properties.replyTo; | ||
const messageString = msg.content | ||
? msg.content.toString() | ||
: null; | ||
// try to parse it as a JSON | ||
let message; | ||
if (messageString) { | ||
try { | ||
message = JSON.parse(messageString); | ||
} | ||
catch (err) { } | ||
} | ||
try { | ||
yield action({ | ||
route, | ||
correlationId, | ||
replyTo, | ||
message, | ||
}); | ||
x.ack(msg); | ||
} | ||
catch (err) { | ||
console.log('Error on processing failled message', err); | ||
} | ||
})); | ||
}); | ||
yield this.channel.addSetup(this.failedMessagesSetup); | ||
}); | ||
} | ||
sendMessage(props) { | ||
@@ -157,0 +203,0 @@ return tslib_1.__awaiter(this, void 0, void 0, function* () { |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
18254
267