Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rhea-promise

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rhea-promise - npm Package Compare versions

Comparing version 0.2.0 to 1.0.0

.DS_Store

12

changelog.md

@@ -0,1 +1,13 @@

### 1.0.0 - 2019-06-27
- Updated minimum version of `rhea` to `^1.0.8`.
- Added a read only property `id` to the `Session` object. The id property is created by concatenating session's local channel, remote channel and the connection id `"local-<number>_remote-<number>_<connection-id>"`, thus making it unique for that connection.
- Improved log statements by adding the session `id` and the sender, receiver `name` to help while debugging applications.
- Added `options` to `Link.close({closeSession: true | false})`, thus the user can specify whether the underlying session should be closed while closing the `Sender|Receiver`. Default is `true`.
- Improved `open` and `close` operations on `Connection`, `Session` and `Link` by creating timer in case the connection gets disconnected. Fixes [#41](https://github.com/amqp/rhea-promise/issues/41).
- The current `Sender` does not have a provision of **"awaiting"** on sending a message. The user needs to add handlers on the `Sender` for `accepted`, `rejected`, `released`, `modified` to ensure whether the message was successfully sent.
Now, we have added a new `AwaitableSender` which adds the handlers internally and provides an **awaitable** `send()` operation to the customer. Fixes [#45](https://github.com/amqp/rhea-promise/issues/45).
- Exporting new Errors:
- `InsufficientCreditError`: Defines the error that occurs when the Sender does not have enough credit.
- `SendOperationFailedError`: Defines the error that occurs when the Sender fails to send a message.
### 0.2.0 - 2019-05-17

@@ -2,0 +14,0 @@ - Updated `OperationTimeoutError` to be a non-AMQP Error as pointed out in [#42](https://github.com/amqp/rhea-promise/issues/42). Fixed in [PR](https://github.com/amqp/rhea-promise/pull/43).

54

dist/lib/connection.js

@@ -13,3 +13,3 @@ "use strict";

const entity_1 = require("./entity");
const operationTimeoutError_1 = require("./operationTimeoutError");
const errorDefinitions_1 = require("./errorDefinitions");
// Determines whether the given object is a CreatedRheConnectionOptions object.

@@ -167,2 +167,3 @@ function isCreatedRheaConnectionOptions(obj) {

let onError;
let onDisconnected;
let waitTimer;

@@ -174,2 +175,3 @@ const removeListeners = () => {

this._connection.removeListener(rhea_1.ConnectionEvents.connectionClose, onClose);
this._connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
};

@@ -186,2 +188,9 @@ onClose = (context) => {

};
onDisconnected = (context) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while closing itself: %O.", this.id, error);
};
const actionAfterTimeout = () => {

@@ -196,2 +205,3 @@ removeListeners();

this._connection.once(rhea_1.ConnectionEvents.connectionError, onError);
this._connection.once(rhea_1.ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout, this.options.operationTimeoutInSeconds * 1000);

@@ -277,2 +287,3 @@ this._connection.close();

let onClose;
let onDisconnected;
let waitTimer;

@@ -284,6 +295,7 @@ const removeListeners = () => {

rheaSession.removeListener(rhea_1.SessionEvents.sessionClose, onClose);
rheaSession.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
};
onOpen = (context) => {
removeListeners();
log.session("[%s] Resolving the promise with amqp session.", this.id);
log.session("[%s] Resolving the promise with amqp session '%s'.", this.id, session.id);
return resolve(session);

@@ -296,2 +308,10 @@ };

};
onDisconnected = (context) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while creating amqp session '%s': %O.", this.id, session.id, error);
return reject(error);
};
const actionAfterTimeout = () => {

@@ -301,3 +321,3 @@ removeListeners();

log.error("[%s] %s", this.id, msg);
return reject(new operationTimeoutError_1.OperationTimeoutError(msg));
return reject(new errorDefinitions_1.OperationTimeoutError(msg));
};

@@ -307,2 +327,3 @@ // listeners that we add for completing the operation are added directly to rhea's objects.

rheaSession.once(rhea_1.SessionEvents.sessionClose, onClose);
rheaSession.connection.once(rhea_1.ConnectionEvents.disconnected, onDisconnected);
log.session("[%s] Calling amqp session.begin().", this.id);

@@ -328,2 +349,23 @@ waitTimer = setTimeout(actionAfterTimeout, this.options.operationTimeoutInSeconds * 1000);

/**
* Creates an awaitable amqp sender. It either uses the provided session or creates a new one.
* @param options Optional parameters to create an awaitable sender link.
* - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will
* clear the timer and reject the Promise for all the entries of inflight send operation in its
* `deliveryDispositionMap`.
* - If the user is handling the reconnection of sender link or the underlying connection in it's
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
*
* @return Promise<AwaitableSender>.
*/
createAwaitableSender(options) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
if (options && options.session && options.session.createAwaitableSender) {
return options.session.createAwaitableSender(options);
}
const session = yield this.createSession();
return session.createAwaitableSender(options);
});
}
/**
* Creates an amqp receiver link. It either uses the provided session or creates a new one.

@@ -365,3 +407,4 @@ * @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link.

]);
log.connection("[%s] Successfully created the sender and receiver links on the same session.", this.id);
log.connection("[%s] Successfully created the sender '%s' and receiver '%s' on the same " +
"amqp session '%s'.", this.id, sender.name, receiver.name, session.id);
return {

@@ -390,2 +433,5 @@ session: session,

};
if (eventName === rhea_1.ConnectionEvents.protocolError) {
log.connection("[%s] ProtocolError is: %O.", this.id, context);
}
utils_1.emitEvent(params);

@@ -392,0 +438,0 @@ });

7

dist/lib/eventContext.js

@@ -20,9 +20,8 @@ "use strict";

function translate(rheaContext, emitter, eventName) {
const connectionId = (rheaContext.connection && rheaContext.connection.options) ? rheaContext.connection.options.id : "";
log.contextTranslator("[%s] Translating the context for event: '%s'.", connectionId, eventName);
// initialize the result
const result = Object.assign({ _context: rheaContext }, rheaContext);
const connection = emitter instanceof connection_1.Connection
? emitter
: emitter.connection;
log.contextTranslator("[%s] Translating the context for event: '%s'.", connection.id, eventName);
// initialize the result
const result = Object.assign({ _context: rheaContext }, rheaContext);
// set rhea-promise connection and container

@@ -29,0 +28,0 @@ result.connection = connection;

@@ -29,2 +29,4 @@ "use strict";

exports.Sender = sender_1.Sender;
var awaitableSender_1 = require("./awaitableSender");
exports.AwaitableSender = awaitableSender_1.AwaitableSender;
var utils_1 = require("./util/utils");

@@ -37,2 +39,6 @@ exports.AmqpResponseStatusCode = utils_1.AmqpResponseStatusCode;

exports.parseConnectionString = utils_1.parseConnectionString;
var errorDefinitions_1 = require("./errorDefinitions");
exports.InsufficientCreditError = errorDefinitions_1.InsufficientCreditError;
exports.OperationTimeoutError = errorDefinitions_1.OperationTimeoutError;
exports.SendOperationFailedError = errorDefinitions_1.SendOperationFailedError;
//# sourceMappingURL=index.js.map

@@ -10,3 +10,3 @@ "use strict";

const entity_1 = require("./entity");
const operationTimeoutError_1 = require("./operationTimeoutError");
const errorDefinitions_1 = require("./errorDefinitions");
var LinkType;

@@ -177,5 +177,6 @@ (function (LinkType) {

/**
* Closes the underlying amqp link and session in rhea if open. Also removes all the event
* handlers added in the rhea-promise library on the link and it's session
* @return {Promise<void>} Promise<void>
* Closes the underlying amqp link and optionally the session as well in rhea if open.
* Also removes all the event handlers added in the rhea-promise library on the link
* and optionally it's session.
* @returns Promise<void>
* - **Resolves** the promise when rhea emits the "sender_close" | "receiver_close" event.

@@ -185,7 +186,11 @@ * - **Rejects** the promise with an AmqpError when rhea emits the

*/
close() {
close(options) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
if (!options)
options = {};
if (options.closeSession == undefined)
options.closeSession = true;
this.removeAllListeners();
yield new Promise((resolve, reject) => {
log.error("[%s] The %s is open ? -> %s", this.connection.id, this.type, this.isOpen());
log.error("[%s] The %s '%s' on amqp session '%s' is open ? -> %s", this.connection.id, this.type, this.name, this.session.id, this.isOpen());
if (this.isOpen()) {

@@ -200,2 +205,3 @@ const errorEvent = this.type === LinkType.sender

let onClose;
let onDisconnected;
let waitTimer;

@@ -207,6 +213,8 @@ const removeListeners = () => {

this._link.removeListener(closeEvent, onClose);
this._link.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
};
onClose = (context) => {
removeListeners();
log[this.type]("[%s] Resolving the promise as the amqp %s has been closed.", this.connection.id, this.type);
log[this.type]("[%s] Resolving the promise as the %s '%s' on amqp session '%s' " +
"has been closed.", this.connection.id, this.type, this.name, this.session.id);
return resolve();

@@ -216,10 +224,19 @@ };

removeListeners();
log.error("[%s] Error occurred while closing amqp %s: %O.", this.connection.id, this.type, context.session.error);
log.error("[%s] Error occurred while closing %s '%s' on amqp session '%s': %O.", this.connection.id, this.type, this.name, this.session.id, context.session.error);
return reject(context.session.error);
};
onDisconnected = (context) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while closing amqp %s '%s' on amqp " +
"session '%s': %O.", this.connection.id, this.type, this.name, this.session.id, error);
};
const actionAfterTimeout = () => {
removeListeners();
const msg = `Unable to close the amqp %s ${this.name} due to operation timeout.`;
log.error("[%s] %s", this.connection.id, this.type, msg);
return reject(new operationTimeoutError_1.OperationTimeoutError(msg));
const msg = `Unable to close the ${this.type} '${this.name}' ` +
`on amqp session '${this.session.id}' due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);
return reject(new errorDefinitions_1.OperationTimeoutError(msg));
};

@@ -229,2 +246,3 @@ // listeners that we add for completing the operation are added directly to rhea's objects.

this._link.once(errorEvent, onError);
this._link.connection.once(rhea_1.ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);

@@ -238,4 +256,6 @@ this._link.close();

});
log[this.type]("[%s] %s has been closed, now closing it's session.", this.connection.id, this.type);
return this._session.close();
if (options.closeSession) {
log[this.type]("[%s] %s '%s' has been closed, now closing it's amqp session '%s'.", this.connection.id, this.type, this.name, this.session.id);
return this._session.close();
}
});

@@ -242,0 +262,0 @@ }

@@ -7,6 +7,6 @@ "use strict";

/**
* Describes the sender that wraps the rhea sender.
* @class Sender
* Describes the base sender that wraps the rhea sender.
* @class BaseSender
*/
class Sender extends link_1.Link {
class BaseSender extends link_1.Link {
constructor(session, sender, options) {

@@ -25,2 +25,12 @@ super(link_1.LinkType.sender, session, sender, options);

}
}
exports.BaseSender = BaseSender;
/**
* Describes the AMQP Sender.
* @class Sender
*/
class Sender extends BaseSender {
constructor(session, sender, options) {
super(session, sender, options);
}
/**

@@ -27,0 +37,0 @@ * Sends the message

@@ -11,4 +11,13 @@ "use strict";

const entity_1 = require("./entity");
const operationTimeoutError_1 = require("./operationTimeoutError");
const errorDefinitions_1 = require("./errorDefinitions");
const awaitableSender_1 = require("./awaitableSender");
/**
* @internal
*/
var SenderType;
(function (SenderType) {
SenderType["sender"] = "sender";
SenderType["AwaitableSender"] = "AwaitableSender";
})(SenderType || (SenderType = {}));
/**
* Describes the session that wraps the rhea session.

@@ -38,2 +47,21 @@ * @class Session

/**
* Returns the unique identifier for the session in the format:
* "local_<number>-remote_<number>-<connection-id>" or an empty string if the local channel or
* remote channel are not yet defined.
*/
get id() {
let result = "";
const session = this._session;
if (session.local) {
result += `local-${session.local.channel}_`;
}
if (session.remote) {
result += `remote-${session.remote.channel}_`;
}
if (result) {
result += `${this._connection.id}`;
}
return result;
}
/**
* Determines whether the session and the underlying connection is open.

@@ -95,6 +123,7 @@ * @returns {boolean} result `true` - is open; `false` otherwise.

return new Promise((resolve, reject) => {
log.error("[%s] The session is open ? -> %s", this.connection.id, this.isOpen());
log.error("[%s] The amqp session '%s' is open ? -> %s", this.connection.id, this.id, this.isOpen());
if (this.isOpen()) {
let onError;
let onClose;
let onDisconnected;
let waitTimer;

@@ -106,6 +135,7 @@ const removeListeners = () => {

this._session.removeListener(rhea_1.SessionEvents.sessionClose, onClose);
this._session.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
};
onClose = (context) => {
removeListeners();
log.session("[%s] Resolving the promise as the amqp session has been closed.", this.connection.id);
log.session("[%s] Resolving the promise as the amqp session '%s' has been closed.", this.connection.id, this.id);
return resolve();

@@ -115,10 +145,17 @@ };

removeListeners();
log.error("[%s] Error occurred while closing amqp session.", this.connection.id, context.session.error);
log.error("[%s] Error occurred while closing amqp session '%s'.", this.connection.id, this.id, context.session.error);
reject(context.session.error);
};
onDisconnected = (context) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while closing amqp session '%s': %O.", this.connection.id, this.id, error);
};
const actionAfterTimeout = () => {
removeListeners();
const msg = `Unable to close the amqp session due to operation timeout.`;
const msg = `Unable to close the amqp session ${this.id} due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);
reject(new operationTimeoutError_1.OperationTimeoutError(msg));
reject(new errorDefinitions_1.OperationTimeoutError(msg));
};

@@ -128,3 +165,4 @@ // listeners that we add for completing the operation are added directly to rhea's objects.

this._session.once(rhea_1.SessionEvents.sessionError, onError);
log.session("[%s] Calling session.close()", this.connection.id);
this._session.connection.once(rhea_1.ConnectionEvents.disconnected, onDisconnected);
log.session("[%s] Calling session.close() for amqp session '%s'.", this.connection.id, this.id);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);

@@ -141,5 +179,5 @@ this._session.close();

* Creates an amqp receiver on this session.
* @param {Session} session The amqp session object on which the receiver link needs to be established.
* @param {ReceiverOptions} [options] Options that can be provided while creating an amqp receiver.
* @return {Promise<Receiver>} Promise<Receiver>
* @param session The amqp session object on which the receiver link needs to be established.
* @param options Options that can be provided while creating an amqp receiver.
* @return Promise<Receiver>
* - **Resolves** the promise with the Receiver object when rhea emits the "receiver_open" event.

@@ -174,9 +212,9 @@ * - **Rejects** the promise with an AmqpError when rhea emits the "receiver_close" event while trying

this.on(rhea_1.SessionEvents.sessionError, options.onSessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the 'receiver'.", this.connection.id, rhea_1.SessionEvents.sessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " +
"while creating the 'receiver'.", this.connection.id, rhea_1.SessionEvents.sessionError, this.id);
}
if (options && options.onSessionClose) {
this.on(rhea_1.SessionEvents.sessionClose, options.onSessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
" while creating the 'receiver'.", this.connection.id, rhea_1.SessionEvents.sessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " +
" while creating the 'receiver'.", this.connection.id, rhea_1.SessionEvents.sessionClose, this.id);
}

@@ -188,2 +226,3 @@ const rheaReceiver = this._session.attach_receiver(options);

let onClose;
let onDisconnected;
let waitTimer;

@@ -211,6 +250,7 @@ if (options && options.onMessage) {

rheaReceiver.removeListener(rhea_1.ReceiverEvents.receiverClose, onClose);
rheaReceiver.session.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
};
onOpen = (context) => {
removeListeners();
log.receiver("[%s] Resolving the promise with amqp receiver '%s'.", this.connection.id, receiver.name);
log.receiver("[%s] Resolving the promise with amqp receiver '%s' on amqp session '%s'.", this.connection.id, receiver.name, this.id);
return resolve(receiver);

@@ -220,11 +260,21 @@ };

removeListeners();
log.error("[%s] Error occurred while creating a receiver over amqp connection: %O.", this.connection.id, context.receiver.error);
log.error("[%s] Error occurred while creating the amqp receiver '%s' on amqp session " +
"'%s' over amqp connection: %O.", this.connection.id, receiver.name, this.id, context.receiver.error);
return reject(context.receiver.error);
};
onDisconnected = (context) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while creating amqp receiver '%s' on amqp " +
"session '%s': %O.", this.connection.id, receiver.name, this.id, error);
return reject(error);
};
const actionAfterTimeout = () => {
removeListeners();
const msg = `Unable to create the amqp receiver ${receiver.name} due to ` +
`operation timeout.`;
const msg = `Unable to create the amqp receiver '${receiver.name}' on amqp ` +
`session '${this.id}' due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);
return reject(new operationTimeoutError_1.OperationTimeoutError(msg));
return reject(new errorDefinitions_1.OperationTimeoutError(msg));
};

@@ -234,2 +284,3 @@ // listeners that we add for completing the operation are added directly to rhea's objects.

rheaReceiver.once(rhea_1.ReceiverEvents.receiverClose, onClose);
rheaReceiver.session.connection.on(rhea_1.ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);

@@ -240,4 +291,4 @@ });

* Creates an amqp sender on this session.
* @param {SenderOptions} [options] Options that can be provided while creating an amqp sender.
* @return {Promise<Sender>} Promise<Sender>
* @param options Options that can be provided while creating an amqp sender.
* @return Promise<Sender>
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.

@@ -248,2 +299,29 @@ * - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while trying

createSender(options) {
return this._createSender(SenderType.sender, options);
}
/**
* Creates an awaitable amqp sender on this session.
* @param options Options that can be provided while creating an async amqp sender.
* - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will
* clear the timer and reject the Promise for all the entries of inflight send operation in its
* `deliveryDispositionMap`.
* - If the user is handling the reconnection of sender link or the underlying connection in it's
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
*
* @return Promise<AwaitableSender>
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while trying
* to create an amqp sender or the operation timeout occurs.
*/
createAwaitableSender(options) {
return this._createSender(SenderType.AwaitableSender, options);
}
/**
* Creates the Sender based on the provided type.
* @internal
* @param type The type of sender
* @param options Options to be provided while creating the sender.
*/
_createSender(type, options) {
return new Promise((resolve, reject) => {

@@ -253,15 +331,22 @@ // Register session handlers for session_error and session_close if provided.

this.on(rhea_1.SessionEvents.sessionError, options.onSessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the sender.", this.connection.id, rhea_1.SessionEvents.sessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " +
"while creating the sender.", this.connection.id, rhea_1.SessionEvents.sessionError, this.id);
}
if (options && options.onSessionClose) {
this.on(rhea_1.SessionEvents.sessionClose, options.onSessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the sender.", this.connection.id, rhea_1.SessionEvents.sessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " +
"while creating the sender.", this.connection.id, rhea_1.SessionEvents.sessionClose, this.id);
}
const rheaSender = this._session.attach_sender(options);
const sender = new sender_1.Sender(this, rheaSender, options);
let sender;
if (type === SenderType.sender) {
sender = new sender_1.Sender(this, rheaSender, options);
}
else {
sender = new awaitableSender_1.AwaitableSender(this, rheaSender, options);
}
sender.actionInitiated++;
let onSendable;
let onClose;
let onDisconnected;
let waitTimer;

@@ -277,14 +362,16 @@ // listeners provided by the user in the options object should be added

}
if (options.onAccepted) {
sender.on(rhea_1.SenderEvents.accepted, options.onAccepted);
if (type === SenderType.sender) {
if (options.onAccepted) {
sender.on(rhea_1.SenderEvents.accepted, options.onAccepted);
}
if (options.onRejected) {
sender.on(rhea_1.SenderEvents.rejected, options.onRejected);
}
if (options.onReleased) {
sender.on(rhea_1.SenderEvents.released, options.onReleased);
}
if (options.onModified) {
sender.on(rhea_1.SenderEvents.modified, options.onModified);
}
}
if (options.onRejected) {
sender.on(rhea_1.SenderEvents.rejected, options.onRejected);
}
if (options.onReleased) {
sender.on(rhea_1.SenderEvents.released, options.onReleased);
}
if (options.onModified) {
sender.on(rhea_1.SenderEvents.modified, options.onModified);
}
}

@@ -296,6 +383,7 @@ const removeListeners = () => {

rheaSender.removeListener(rhea_1.SenderEvents.senderClose, onClose);
rheaSender.session.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
};
onSendable = (context) => {
removeListeners();
log.sender("[%s] Resolving the promise with amqp sender '%s'.", this.connection.id, sender.name);
log.sender("[%s] Resolving the promise with amqp sender '%s' on amqp session '%s'.", this.connection.id, sender.name, this.id);
return resolve(sender);

@@ -305,11 +393,21 @@ };

removeListeners();
log.error("[%s] Error occurred while creating a sender over amqp connection: %O.", this.connection.id, context.sender.error);
log.error("[%s] Error occurred while creating the amqp sender '%s' on amqp session '%s' " +
"over amqp connection: %O.", this.connection.id, sender.name, this.id, context.sender.error);
return reject(context.sender.error);
};
onDisconnected = (context) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while creating amqp sender '%s' on amqp " +
"session '%s': %O.", this.connection.id, sender.name, this.id, error);
return reject(error);
};
const actionAfterTimeout = () => {
removeListeners();
const msg = `Unable to create the amqp sender ${sender.name} due to ` +
`operation timeout.`;
const msg = `Unable to create the amqp sender '${sender.name}' on amqp session ` +
`'${this.id}' due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);
return reject(new operationTimeoutError_1.OperationTimeoutError(msg));
return reject(new errorDefinitions_1.OperationTimeoutError(msg));
};

@@ -319,2 +417,3 @@ // listeners that we add for completing the operation are added directly to rhea's objects.

rheaSender.once(rhea_1.SenderEvents.senderClose, onClose);
rheaSender.session.connection.on(rhea_1.ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);

@@ -387,4 +486,4 @@ });

if (typeof this._session.eventNames === "function") {
log.eventHandler("[%s] rhea-promise 'session' object is listening for events: %o " +
"emitted by rhea's 'session' object.", this.connection.id, this._session.eventNames());
log.eventHandler("[%s] rhea-promise 'session' object '%s' is listening for events: %o " +
"emitted by rhea's 'session' object.", this.connection.id, this.id, this._session.eventNames());
}

@@ -391,0 +490,0 @@ }

@@ -128,3 +128,5 @@ "use strict";

const emit = () => {
log[params.emitterType]("[%s] %s got event: '%s'. Re-emitting the translated context.", params.connectionId, params.emitterType, params.eventName);
const id = params.emitter &&
(params.emitter.id || params.emitter.name);
log[params.emitterType]("[%s] %s '%s' got event: '%s'. Re-emitting the translated context.", params.connectionId, params.emitterType, id, params.eventName);
params.emitter.emit(params.eventName, eventContext_1.EventContext.translate(params.rheaContext, params.emitter, params.eventName));

@@ -131,0 +133,0 @@ };

@@ -21,3 +21,4 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

import { Entity } from "./entity";
import { OperationTimeoutError } from "./operationTimeoutError";
import { OperationTimeoutError } from "./errorDefinitions";
import { AwaitableSender, AwaitableSenderOptions } from "./awaitableSender";

@@ -34,2 +35,11 @@ /**

/**
* Describes the options that can be provided while creating an Async AMQP sender.
* One can also provide a session if it was already created.
* @interface AwaitableSenderOptionsWithSession
*/
export interface AwaitableSenderOptionsWithSession extends AwaitableSenderOptions {
session?: Session;
}
/**
* Describes the options that can be provided while creating an AMQP receiver. One can also provide

@@ -329,2 +339,3 @@ * a session if it was already created.

let onError: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let waitTimer: any;

@@ -336,2 +347,3 @@ const removeListeners = () => {

this._connection.removeListener(ConnectionEvents.connectionClose, onClose);
this._connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
};

@@ -353,2 +365,10 @@

onDisconnected = (context: RheaEventContext) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while closing itself: %O.", this.id, error);
};
const actionAfterTimeout = () => {

@@ -364,2 +384,3 @@ removeListeners();

this._connection.once(ConnectionEvents.connectionError, onError);
this._connection.once(ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);

@@ -452,2 +473,3 @@ this._connection.close();

let onClose: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let waitTimer: any;

@@ -460,2 +482,3 @@

rheaSession.removeListener(SessionEvents.sessionClose, onClose);
rheaSession.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
};

@@ -465,3 +488,3 @@

removeListeners();
log.session("[%s] Resolving the promise with amqp session.", this.id);
log.session("[%s] Resolving the promise with amqp session '%s'.", this.id, session.id);
return resolve(session);

@@ -477,2 +500,12 @@ };

onDisconnected = (context: RheaEventContext) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while creating amqp session '%s': %O.",
this.id, session.id, error);
return reject(error);
};
const actionAfterTimeout = () => {

@@ -488,2 +521,3 @@ removeListeners();

rheaSession.once(SessionEvents.sessionClose, onClose);
rheaSession.connection.once(ConnectionEvents.disconnected, onDisconnected);
log.session("[%s] Calling amqp session.begin().", this.id);

@@ -509,2 +543,22 @@ waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);

/**
* Creates an awaitable amqp sender. It either uses the provided session or creates a new one.
* @param options Optional parameters to create an awaitable sender link.
* - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will
* clear the timer and reject the Promise for all the entries of inflight send operation in its
* `deliveryDispositionMap`.
* - If the user is handling the reconnection of sender link or the underlying connection in it's
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
*
* @return Promise<AwaitableSender>.
*/
async createAwaitableSender(options?: AwaitableSenderOptionsWithSession): Promise<AwaitableSender> {
if (options && options.session && options.session.createAwaitableSender) {
return options.session.createAwaitableSender(options);
}
const session = await this.createSession();
return session.createAwaitableSender(options);
}
/**
* Creates an amqp receiver link. It either uses the provided session or creates a new one.

@@ -545,4 +599,4 @@ * @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link.

]);
log.connection("[%s] Successfully created the sender and receiver links on the same session.",
this.id);
log.connection("[%s] Successfully created the sender '%s' and receiver '%s' on the same " +
"amqp session '%s'.", this.id, sender.name, receiver.name, session.id);
return {

@@ -571,2 +625,5 @@ session: session,

};
if (eventName === ConnectionEvents.protocolError) {
log.connection("[%s] ProtocolError is: %O.", this.id, context);
}
emitEvent(params);

@@ -573,0 +630,0 @@ });

@@ -97,4 +97,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

eventName: string): EventContext {
const connectionId = (rheaContext.connection && rheaContext.connection.options) ? rheaContext.connection.options.id : "";
log.contextTranslator("[%s] Translating the context for event: '%s'.", connectionId, eventName);
const connection: Connection = emitter instanceof Connection
? emitter
: (emitter as Link | Session).connection;
log.contextTranslator("[%s] Translating the context for event: '%s'.", connection.id, eventName);
// initialize the result

@@ -106,6 +110,2 @@ const result: EventContext = {

const connection: Connection = emitter instanceof Connection
? emitter
: (emitter as Link | Session).connection;
// set rhea-promise connection and container

@@ -112,0 +112,0 @@ result.connection = connection;

@@ -10,3 +10,4 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

SessionEvents, ContainerOptions as ContainerOptionsBase, TerminusOptions, Types, Sasl,
EndpointOptions, MessageUtil, TypeError, SimpleError, Source, ConnectionError, Typed, WebSocketImpl, WebSocketInstance
EndpointOptions, MessageUtil, TypeError, SimpleError, Source, ConnectionError, Typed,
WebSocketImpl, WebSocketInstance, TargetTerminusOptions
} from "rhea";

@@ -22,2 +23,4 @@

export { Sender, SenderOptions } from "./sender";
export { AwaitableSenderOptions, AwaitableSender, PromiseLike } from "./awaitableSender";
export { LinkCloseOptions } from "./link";
export {

@@ -27,1 +30,4 @@ Func, AmqpResponseStatusCode, isAmqpError, ConnectionStringParseOptions, delay, messageHeader,

} from "./util/utils";
export {
InsufficientCreditError, OperationTimeoutError, SendOperationFailedError
} from "./errorDefinitions";

@@ -7,3 +7,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

link, LinkOptions, AmqpError, Dictionary, Source, TerminusOptions, SenderEvents, ReceiverEvents,
EventContext as RheaEventContext
EventContext as RheaEventContext, ConnectionEvents
} from "rhea";

@@ -14,3 +14,3 @@ import { Session } from "./session";

import { Entity } from "./entity";
import { OperationTimeoutError } from "./operationTimeoutError";
import { OperationTimeoutError } from "./errorDefinitions";

@@ -22,2 +22,15 @@ export enum LinkType {

/**
* @interface LinkCloseOptions
* Describes the options that can be provided while closing the link.
*/
export interface LinkCloseOptions {
/**
* Indicates whether the underlying amqp session should also be closed when the
* link is being closed.
* - **Default: `true`**.
*/
closeSession?: boolean;
}
export abstract class Link extends Entity {

@@ -214,5 +227,6 @@ linkOptions?: LinkOptions;

/**
* Closes the underlying amqp link and session in rhea if open. Also removes all the event
* handlers added in the rhea-promise library on the link and it's session
* @return {Promise<void>} Promise<void>
* Closes the underlying amqp link and optionally the session as well in rhea if open.
* Also removes all the event handlers added in the rhea-promise library on the link
* and optionally it's session.
* @returns Promise<void>
* - **Resolves** the promise when rhea emits the "sender_close" | "receiver_close" event.

@@ -222,6 +236,9 @@ * - **Rejects** the promise with an AmqpError when rhea emits the

*/
async close(): Promise<void> {
async close(options?: LinkCloseOptions): Promise<void> {
if (!options) options = {};
if (options.closeSession == undefined) options.closeSession = true;
this.removeAllListeners();
await new Promise<void>((resolve, reject) => {
log.error("[%s] The %s is open ? -> %s", this.connection.id, this.type, this.isOpen());
log.error("[%s] The %s '%s' on amqp session '%s' is open ? -> %s",
this.connection.id, this.type, this.name, this.session.id, this.isOpen());
if (this.isOpen()) {

@@ -236,2 +253,3 @@ const errorEvent = this.type === LinkType.sender

let onClose: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let waitTimer: any;

@@ -244,2 +262,3 @@

this._link.removeListener(closeEvent, onClose);
this._link.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
};

@@ -249,4 +268,4 @@

removeListeners();
log[this.type]("[%s] Resolving the promise as the amqp %s has been closed.",
this.connection.id, this.type);
log[this.type]("[%s] Resolving the promise as the %s '%s' on amqp session '%s' " +
"has been closed.", this.connection.id, this.type, this.name, this.session.id);
return resolve();

@@ -257,11 +276,21 @@ };

removeListeners();
log.error("[%s] Error occurred while closing amqp %s: %O.",
this.connection.id, this.type, context.session!.error);
log.error("[%s] Error occurred while closing %s '%s' on amqp session '%s': %O.",
this.connection.id, this.type, this.name, this.session.id, context.session!.error);
return reject(context.session!.error);
};
onDisconnected = (context: RheaEventContext) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while closing amqp %s '%s' on amqp " +
"session '%s': %O.", this.connection.id, this.type, this.name, this.session.id, error);
};
const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to close the amqp %s ${this.name} due to operation timeout.`;
log.error("[%s] %s", this.connection.id, this.type, msg);
const msg: string = `Unable to close the ${this.type} '${this.name}' ` +
`on amqp session '${this.session.id}' due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);
return reject(new OperationTimeoutError(msg));

@@ -273,2 +302,3 @@ };

this._link.once(errorEvent, onError);
this._link.connection.once(ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout,

@@ -282,5 +312,8 @@ this.connection.options!.operationTimeoutInSeconds! * 1000);

});
log[this.type]("[%s] %s has been closed, now closing it's session.",
this.connection.id, this.type);
return this._session.close();
if (options.closeSession) {
log[this.type]("[%s] %s '%s' has been closed, now closing it's amqp session '%s'.",
this.connection.id, this.type, this.name, this.session.id);
return this._session.close();
}
}

@@ -287,0 +320,0 @@

@@ -13,6 +13,33 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

/**
* Descibes the options that can be provided while creating an AMQP Basesender.
* @interface BaseSenderOptions
*/
export interface BaseSenderOptions extends RheaSenderOptions {
/**
* @property {OnAmqpEvent} [onError] The handler that can be provided for receiving any
* errors that occur on the "sender_error" event.
*/
onError?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onClose] The handler that can be provided for receiving the
* "sender_close" event.
*/
onClose?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onSessionError] The handler that can be provided for receiving
* the "session_error" event that occurs on the underlying session.
*/
onSessionError?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onSessionClose] The handler that can be provided for receiving the
* "session_close" event that occurs on the underlying session.
*/
onSessionClose?: OnAmqpEvent;
}
/**
* Descibes the options that can be provided while creating an AMQP sender.
* @interface SenderOptions
*/
export interface SenderOptions extends RheaSenderOptions {
export interface SenderOptions extends BaseSenderOptions {
/**

@@ -38,22 +65,2 @@ * @property {OnAmqpEvent} [onAccepted] The handler that can be provided for receiving the

onModified?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onError] The handler that can be provided for receiving any
* errors that occur on the "sender_error" event.
*/
onError?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onClose] The handler that can be provided for receiving the
* "sender_close" event.
*/
onClose?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onSessionError] The handler that can be provided for receiving
* the "session_error" event that occurs on the underlying session.
*/
onSessionError?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onSessionClose] The handler that can be provided for receiving the
* "session_close" event that occurs on the underlying session.
*/
onSessionClose?: OnAmqpEvent;
}

@@ -70,9 +77,8 @@

/**
* Describes the sender that wraps the rhea sender.
* @class Sender
* Describes the base sender that wraps the rhea sender.
* @class BaseSender
*/
export class Sender extends Link {
senderOptions?: SenderOptions;
export class BaseSender extends Link {
constructor(session: Session, sender: RheaSender, options?: SenderOptions) {
constructor(session: Session, sender: RheaSender, options?: BaseSenderOptions) {
super(LinkType.sender, session, sender, options);

@@ -92,3 +98,14 @@ }

}
}
/**
* Describes the AMQP Sender.
* @class Sender
*/
export class Sender extends BaseSender {
constructor(session: Session, sender: RheaSender, options?: SenderOptions) {
super(session, sender, options);
}
/**

@@ -95,0 +112,0 @@ * Sends the message

@@ -10,3 +10,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

SenderEvents, ReceiverEvents, SessionEvents, AmqpError, Session as RheaSession,
EventContext as RheaEventContext
EventContext as RheaEventContext, ConnectionEvents
} from "rhea";

@@ -16,3 +16,4 @@ import { Func, EmitParameters, emitEvent } from "./util/utils";

import { Entity } from "./entity";
import { OperationTimeoutError } from "./operationTimeoutError";
import { OperationTimeoutError } from "./errorDefinitions";
import { AwaitableSender, AwaitableSenderOptions } from "./awaitableSender";

@@ -28,2 +29,10 @@ /**

/**
* @internal
*/
enum SenderType {
sender = "sender",
AwaitableSender = "AwaitableSender"
}
/**
* Describes the session that wraps the rhea session.

@@ -59,2 +68,25 @@ * @class Session

/**
* Returns the unique identifier for the session in the format:
* "local_<number>-remote_<number>-<connection-id>" or an empty string if the local channel or
* remote channel are not yet defined.
*/
get id(): string {
let result: string = "";
const session: any = this._session;
if (session.local) {
result += `local-${session.local.channel}_`;
}
if (session.remote) {
result += `remote-${session.remote.channel}_`;
}
if (result) {
result += `${this._connection.id}`;
}
return result;
}
/**
* Determines whether the session and the underlying connection is open.

@@ -121,6 +153,7 @@ * @returns {boolean} result `true` - is open; `false` otherwise.

return new Promise<void>((resolve, reject) => {
log.error("[%s] The session is open ? -> %s", this.connection.id, this.isOpen());
log.error("[%s] The amqp session '%s' is open ? -> %s", this.connection.id, this.id, this.isOpen());
if (this.isOpen()) {
let onError: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let waitTimer: any;

@@ -133,2 +166,3 @@

this._session.removeListener(SessionEvents.sessionClose, onClose);
this._session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
};

@@ -138,4 +172,4 @@

removeListeners();
log.session("[%s] Resolving the promise as the amqp session has been closed.",
this.connection.id);
log.session("[%s] Resolving the promise as the amqp session '%s' has been closed.",
this.connection.id, this.id);
return resolve();

@@ -146,10 +180,19 @@ };

removeListeners();
log.error("[%s] Error occurred while closing amqp session.",
this.connection.id, context.session!.error);
log.error("[%s] Error occurred while closing amqp session '%s'.",
this.connection.id, this.id, context.session!.error);
reject(context.session!.error);
};
onDisconnected = (context: RheaEventContext) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while closing amqp session '%s': %O.",
this.connection.id, this.id, error);
};
const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to close the amqp session due to operation timeout.`;
const msg: string = `Unable to close the amqp session ${this.id} due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);

@@ -162,3 +205,4 @@ reject(new OperationTimeoutError(msg));

this._session.once(SessionEvents.sessionError, onError);
log.session("[%s] Calling session.close()", this.connection.id);
this._session.connection.once(ConnectionEvents.disconnected, onDisconnected);
log.session("[%s] Calling session.close() for amqp session '%s'.", this.connection.id, this.id);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000);

@@ -175,5 +219,5 @@ this._session.close();

* Creates an amqp receiver on this session.
* @param {Session} session The amqp session object on which the receiver link needs to be established.
* @param {ReceiverOptions} [options] Options that can be provided while creating an amqp receiver.
* @return {Promise<Receiver>} Promise<Receiver>
* @param session The amqp session object on which the receiver link needs to be established.
* @param options Options that can be provided while creating an amqp receiver.
* @return Promise<Receiver>
* - **Resolves** the promise with the Receiver object when rhea emits the "receiver_open" event.

@@ -209,4 +253,4 @@ * - **Rejects** the promise with an AmqpError when rhea emits the "receiver_close" event while trying

this.on(SessionEvents.sessionError, options.onSessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the 'receiver'.", this.connection.id, SessionEvents.sessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " +
"while creating the 'receiver'.", this.connection.id, SessionEvents.sessionError, this.id);
}

@@ -216,4 +260,4 @@

this.on(SessionEvents.sessionClose, options.onSessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
" while creating the 'receiver'.", this.connection.id, SessionEvents.sessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " +
" while creating the 'receiver'.", this.connection.id, SessionEvents.sessionClose, this.id);
}

@@ -225,2 +269,3 @@ const rheaReceiver = this._session.attach_receiver(options);

let onClose: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let waitTimer: any;

@@ -256,2 +301,3 @@

rheaReceiver.removeListener(ReceiverEvents.receiverClose, onClose);
rheaReceiver.session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
};

@@ -261,4 +307,4 @@

removeListeners();
log.receiver("[%s] Resolving the promise with amqp receiver '%s'.",
this.connection.id, receiver.name);
log.receiver("[%s] Resolving the promise with amqp receiver '%s' on amqp session '%s'.",
this.connection.id, receiver.name, this.id);
return resolve(receiver);

@@ -269,11 +315,22 @@ };

removeListeners();
log.error("[%s] Error occurred while creating a receiver over amqp connection: %O.",
this.connection.id, context.receiver!.error);
log.error("[%s] Error occurred while creating the amqp receiver '%s' on amqp session " +
"'%s' over amqp connection: %O.",
this.connection.id, receiver.name, this.id, context.receiver!.error);
return reject(context.receiver!.error);
};
onDisconnected = (context: RheaEventContext) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while creating amqp receiver '%s' on amqp " +
"session '%s': %O.", this.connection.id, receiver.name, this.id, error);
return reject(error);
};
const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to create the amqp receiver ${receiver.name} due to ` +
`operation timeout.`;
const msg: string = `Unable to create the amqp receiver '${receiver.name}' on amqp ` +
`session '${this.id}' due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);

@@ -286,2 +343,3 @@ return reject(new OperationTimeoutError(msg));

rheaReceiver.once(ReceiverEvents.receiverClose, onClose);
rheaReceiver.session.connection.on(ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000);

@@ -293,4 +351,4 @@ });

* Creates an amqp sender on this session.
* @param {SenderOptions} [options] Options that can be provided while creating an amqp sender.
* @return {Promise<Sender>} Promise<Sender>
* @param options Options that can be provided while creating an amqp sender.
* @return Promise<Sender>
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.

@@ -301,2 +359,33 @@ * - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while trying

createSender(options?: SenderOptions): Promise<Sender> {
return this._createSender(SenderType.sender, options) as Promise<Sender>;
}
/**
* Creates an awaitable amqp sender on this session.
* @param options Options that can be provided while creating an async amqp sender.
* - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will
* clear the timer and reject the Promise for all the entries of inflight send operation in its
* `deliveryDispositionMap`.
* - If the user is handling the reconnection of sender link or the underlying connection in it's
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
*
* @return Promise<AwaitableSender>
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while trying
* to create an amqp sender or the operation timeout occurs.
*/
createAwaitableSender(options?: AwaitableSenderOptions): Promise<AwaitableSender> {
return this._createSender(SenderType.AwaitableSender, options) as Promise<AwaitableSender>;
}
/**
* Creates the Sender based on the provided type.
* @internal
* @param type The type of sender
* @param options Options to be provided while creating the sender.
*/
private _createSender(
type: SenderType,
options?: SenderOptions | AwaitableSenderOptions): Promise<Sender | AwaitableSender> {
return new Promise((resolve, reject) => {

@@ -306,4 +395,4 @@ // Register session handlers for session_error and session_close if provided.

this.on(SessionEvents.sessionError, options.onSessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the sender.", this.connection.id, SessionEvents.sessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " +
"while creating the sender.", this.connection.id, SessionEvents.sessionError, this.id);
}

@@ -313,11 +402,17 @@

this.on(SessionEvents.sessionClose, options.onSessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the sender.", this.connection.id, SessionEvents.sessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " +
"while creating the sender.", this.connection.id, SessionEvents.sessionClose, this.id);
}
const rheaSender = this._session.attach_sender(options);
const sender = new Sender(this, rheaSender, options);
let sender: Sender | AwaitableSender;
if (type === SenderType.sender) {
sender = new Sender(this, rheaSender, options);
} else {
sender = new AwaitableSender(this, rheaSender, options);
}
sender.actionInitiated++;
let onSendable: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let waitTimer: any;

@@ -334,14 +429,16 @@

}
if (options.onAccepted) {
sender.on(SenderEvents.accepted, options.onAccepted);
if (type === SenderType.sender) {
if ((options as SenderOptions).onAccepted) {
sender.on(SenderEvents.accepted, (options as SenderOptions).onAccepted!);
}
if ((options as SenderOptions).onRejected) {
sender.on(SenderEvents.rejected, (options as SenderOptions).onRejected!);
}
if ((options as SenderOptions).onReleased) {
sender.on(SenderEvents.released, (options as SenderOptions).onReleased!);
}
if ((options as SenderOptions).onModified) {
sender.on(SenderEvents.modified, (options as SenderOptions).onModified!);
}
}
if (options.onRejected) {
sender.on(SenderEvents.rejected, options.onRejected);
}
if (options.onReleased) {
sender.on(SenderEvents.released, options.onReleased);
}
if (options.onModified) {
sender.on(SenderEvents.modified, options.onModified);
}
}

@@ -354,2 +451,3 @@

rheaSender.removeListener(SenderEvents.senderClose, onClose);
rheaSender.session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
};

@@ -359,4 +457,4 @@

removeListeners();
log.sender("[%s] Resolving the promise with amqp sender '%s'.",
this.connection.id, sender.name);
log.sender("[%s] Resolving the promise with amqp sender '%s' on amqp session '%s'.",
this.connection.id, sender.name, this.id);
return resolve(sender);

@@ -367,11 +465,22 @@ };

removeListeners();
log.error("[%s] Error occurred while creating a sender over amqp connection: %O.",
this.connection.id, context.sender!.error);
log.error("[%s] Error occurred while creating the amqp sender '%s' on amqp session '%s' " +
"over amqp connection: %O.",
this.connection.id, sender.name, this.id, context.sender!.error);
return reject(context.sender!.error);
};
onDisconnected = (context: RheaEventContext) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while creating amqp sender '%s' on amqp " +
"session '%s': %O.", this.connection.id, sender.name, this.id, error);
return reject(error);
};
const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to create the amqp sender ${sender.name} due to ` +
`operation timeout.`;
const msg: string = `Unable to create the amqp sender '${sender.name}' on amqp session ` +
`'${this.id}' due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);

@@ -384,2 +493,3 @@ return reject(new OperationTimeoutError(msg));

rheaSender.once(SenderEvents.senderClose, onClose);
rheaSender.session.connection.on(ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000);

@@ -458,6 +568,7 @@ });

if (typeof this._session.eventNames === "function") {
log.eventHandler("[%s] rhea-promise 'session' object is listening for events: %o " +
"emitted by rhea's 'session' object.", this.connection.id, this._session.eventNames());
log.eventHandler("[%s] rhea-promise 'session' object '%s' is listening for events: %o " +
"emitted by rhea's 'session' object.",
this.connection.id, this.id, this._session.eventNames());
}
}
}

@@ -181,4 +181,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

const emit = () => {
log[params.emitterType]("[%s] %s got event: '%s'. Re-emitting the translated context.",
params.connectionId, params.emitterType, params.eventName);
const id = params.emitter &&
((params.emitter as Connection | Session).id || (params.emitter as Link).name);
log[params.emitterType]("[%s] %s '%s' got event: '%s'. Re-emitting the translated context.",
params.connectionId, params.emitterType, id, params.eventName);
params.emitter.emit(params.eventName,

@@ -185,0 +187,0 @@ EventContext.translate(params.rheaContext, params.emitter, params.eventName));

{
"name": "rhea-promise",
"version": "0.2.0",
"version": "1.0.0",
"description": "A Promisified layer over rhea AMQP client",

@@ -10,4 +10,4 @@ "license": "Apache-2.0",

"debug": "^3.1.0",
"rhea": "^1.0.7",
"tslib": "^1.9.3"
"rhea": "^1.0.8",
"tslib": "^1.10.0"
},

@@ -26,5 +26,5 @@ "keywords": [

"rimraf": "^2.6.3",
"ts-node": "^8.1.0",
"tslint": "^5.16.0",
"typescript": "^3.4.5",
"ts-node": "^8.2.0",
"tslint": "^5.17.0",
"typescript": "^3.5.1",
"dotenv": "^8.0.0"

@@ -31,0 +31,0 @@ },

# rhea-promise
A Promisified layer over rhea AMQP client.
A Promisified layer over [rhea](https://githhub.com/amqp/rhea) AMQP client.

@@ -76,3 +76,3 @@ ## Pre-requisite ##

#### Sending a message.
#### Sending a message via `Sender`.
- Running the example from terminal: `> ts-node ./examples/send.ts`.

@@ -134,4 +134,9 @@

const delivery: Delivery = await sender.send(message);
console.log(">>>>>[%s] Delivery id: ", connection.id, delivery.id);
// Please, note that we are not awaiting on sender.send()
// You will notice that `delivery.settled` will be `false`.
const delivery: Delivery = sender.send(message);
console.log(">>>>>[%s] Delivery id: %d, settled: %s",
connection.id,
delivery.id,
delivery.settled);

@@ -145,2 +150,68 @@ await sender.close();

### Sending a message via `AwaitableSender`
- Running the example from terminal: `> ts-node ./examples/awaitableSend.ts`.
```typescript
import {
Connection, Message, ConnectionOptions, Delivery, AwaitableSenderOptions, AwaitableSender
} from "rhea-promise";
import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();
const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const senderAddress = process.env.SENDER_ADDRESS || "address";
async function main(): Promise<void> {
const connectionOptions: ConnectionOptions = {
transport: "tls",
host: host,
hostname: host,
username: username,
password: password,
port: port,
reconnect: false
};
const connection: Connection = new Connection(connectionOptions);
const senderName = "sender-1";
const awaitableSenderOptions: AwaitableSenderOptions = {
name: senderName,
target: {
address: senderAddress
},
sendTimeoutInSeconds: 10
};
await connection.open();
// Notice that we are awaiting on the message being sent.
const sender: AwaitableSender = await connection.createAwaitableSender(
awaitableSenderOptions
);
for (let i = 0; i < 10; i++) {
const message: Message = {
body: `Hello World - ${i}`,
message_id: i
};
// Note: Here we are awaiting for the send to complete.
// You will notice that `delivery.settled` will be `true`, irrespective of whether the promise resolves or rejects.
const delivery: Delivery = await sender.send(message);
console.log(
"[%s] await sendMessage -> Delivery id: %d, settled: %s",
connection.id,
delivery.id,
delivery.settled
);
}
await sender.close();
await connection.close();
}
main().catch((err) => console.log(err));
```
### Receiving a message

@@ -228,1 +299,5 @@ - Running the example from terminal: `> ts-node ./examples/receive.ts`.

```
## AMQP Protocol specification
Amqp protocol specification can be found [here](http://www.amqp.org/sites/amqp.org/files/amqp.pdf).

@@ -44,3 +44,2 @@ {

"no-unused-variable": false,
"no-use-before-declare": true,
"no-var-keyword": true,

@@ -47,0 +46,0 @@ "no-floating-promises": true,

@@ -11,2 +11,3 @@ /// <reference types="node" />

import { Entity } from "./entity";
import { AwaitableSender, AwaitableSenderOptions } from "./awaitableSender";
/**

@@ -21,2 +22,10 @@ * Describes the options that can be provided while creating an AMQP sender. One can also provide

/**
* Describes the options that can be provided while creating an Async AMQP sender.
* One can also provide a session if it was already created.
* @interface AwaitableSenderOptionsWithSession
*/
export interface AwaitableSenderOptionsWithSession extends AwaitableSenderOptions {
session?: Session;
}
/**
* Describes the options that can be provided while creating an AMQP receiver. One can also provide

@@ -249,2 +258,15 @@ * a session if it was already created.

/**
* Creates an awaitable amqp sender. It either uses the provided session or creates a new one.
* @param options Optional parameters to create an awaitable sender link.
* - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will
* clear the timer and reject the Promise for all the entries of inflight send operation in its
* `deliveryDispositionMap`.
* - If the user is handling the reconnection of sender link or the underlying connection in it's
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
*
* @return Promise<AwaitableSender>.
*/
createAwaitableSender(options?: AwaitableSenderOptionsWithSession): Promise<AwaitableSender>;
/**
* Creates an amqp receiver link. It either uses the provided session or creates a new one.

@@ -251,0 +273,0 @@ * @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link.

@@ -1,2 +0,2 @@

export { Delivery, Message, MessageProperties, MessageHeader, EventContext as RheaEventContext, ConnectionOptions as ConnectionOptionsBase, AmqpError, Dictionary, types, message, filter, Filter, uuid_to_string, generate_uuid, string_to_uuid, LinkError, ProtocolError, LinkOptions, DeliveryAnnotations, MessageAnnotations, ReceiverEvents, SenderEvents, ConnectionEvents, SessionEvents, ContainerOptions as ContainerOptionsBase, TerminusOptions, Types, Sasl, EndpointOptions, MessageUtil, TypeError, SimpleError, Source, ConnectionError, Typed, WebSocketImpl, WebSocketInstance } from "rhea";
export { Delivery, Message, MessageProperties, MessageHeader, EventContext as RheaEventContext, ConnectionOptions as ConnectionOptionsBase, AmqpError, Dictionary, types, message, filter, Filter, uuid_to_string, generate_uuid, string_to_uuid, LinkError, ProtocolError, LinkOptions, DeliveryAnnotations, MessageAnnotations, ReceiverEvents, SenderEvents, ConnectionEvents, SessionEvents, ContainerOptions as ContainerOptionsBase, TerminusOptions, Types, Sasl, EndpointOptions, MessageUtil, TypeError, SimpleError, Source, ConnectionError, Typed, WebSocketImpl, WebSocketInstance, TargetTerminusOptions } from "rhea";
export { EventContext, OnAmqpEvent } from "./eventContext";

@@ -8,3 +8,6 @@ export { Container, ContainerOptions } from "./container";

export { Sender, SenderOptions } from "./sender";
export { AwaitableSenderOptions, AwaitableSender, PromiseLike } from "./awaitableSender";
export { LinkCloseOptions } from "./link";
export { Func, AmqpResponseStatusCode, isAmqpError, ConnectionStringParseOptions, delay, messageHeader, messageProperties, parseConnectionString, ParsedOutput } from "./util/utils";
export { InsufficientCreditError, OperationTimeoutError, SendOperationFailedError } from "./errorDefinitions";
//# sourceMappingURL=index.d.ts.map

@@ -9,2 +9,14 @@ import { link, LinkOptions, AmqpError, Dictionary, Source, TerminusOptions } from "rhea";

}
/**
* @interface LinkCloseOptions
* Describes the options that can be provided while closing the link.
*/
export interface LinkCloseOptions {
/**
* Indicates whether the underlying amqp session should also be closed when the
* link is being closed.
* - **Default: `true`**.
*/
closeSession?: boolean;
}
export declare abstract class Link extends Entity {

@@ -101,5 +113,6 @@ linkOptions?: LinkOptions;

/**
* Closes the underlying amqp link and session in rhea if open. Also removes all the event
* handlers added in the rhea-promise library on the link and it's session
* @return {Promise<void>} Promise<void>
* Closes the underlying amqp link and optionally the session as well in rhea if open.
* Also removes all the event handlers added in the rhea-promise library on the link
* and optionally it's session.
* @returns Promise<void>
* - **Resolves** the promise when rhea emits the "sender_close" | "receiver_close" event.

@@ -109,3 +122,3 @@ * - **Rejects** the promise with an AmqpError when rhea emits the

*/
close(): Promise<void>;
close(options?: LinkCloseOptions): Promise<void>;
/**

@@ -112,0 +125,0 @@ * Adds event listeners for the possible events that can occur on the link object and

@@ -8,6 +8,32 @@ /// <reference types="node" />

/**
* Descibes the options that can be provided while creating an AMQP Basesender.
* @interface BaseSenderOptions
*/
export interface BaseSenderOptions extends RheaSenderOptions {
/**
* @property {OnAmqpEvent} [onError] The handler that can be provided for receiving any
* errors that occur on the "sender_error" event.
*/
onError?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onClose] The handler that can be provided for receiving the
* "sender_close" event.
*/
onClose?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onSessionError] The handler that can be provided for receiving
* the "session_error" event that occurs on the underlying session.
*/
onSessionError?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onSessionClose] The handler that can be provided for receiving the
* "session_close" event that occurs on the underlying session.
*/
onSessionClose?: OnAmqpEvent;
}
/**
* Descibes the options that can be provided while creating an AMQP sender.
* @interface SenderOptions
*/
export interface SenderOptions extends RheaSenderOptions {
export interface SenderOptions extends BaseSenderOptions {
/**

@@ -33,22 +59,2 @@ * @property {OnAmqpEvent} [onAccepted] The handler that can be provided for receiving the

onModified?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onError] The handler that can be provided for receiving any
* errors that occur on the "sender_error" event.
*/
onError?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onClose] The handler that can be provided for receiving the
* "sender_close" event.
*/
onClose?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onSessionError] The handler that can be provided for receiving
* the "session_error" event that occurs on the underlying session.
*/
onSessionError?: OnAmqpEvent;
/**
* @property {OnAmqpEvent} [onSessionClose] The handler that can be provided for receiving the
* "session_close" event that occurs on the underlying session.
*/
onSessionClose?: OnAmqpEvent;
}

@@ -63,8 +69,7 @@ /**

/**
* Describes the sender that wraps the rhea sender.
* @class Sender
* Describes the base sender that wraps the rhea sender.
* @class BaseSender
*/
export declare class Sender extends Link {
senderOptions?: SenderOptions;
constructor(session: Session, sender: RheaSender, options?: SenderOptions);
export declare class BaseSender extends Link {
constructor(session: Session, sender: RheaSender, options?: BaseSenderOptions);
setDrained(drained: boolean): void;

@@ -76,2 +81,9 @@ /**

sendable(): boolean;
}
/**
* Describes the AMQP Sender.
* @class Sender
*/
export declare class Sender extends BaseSender {
constructor(session: Session, sender: RheaSender, options?: SenderOptions);
/**

@@ -78,0 +90,0 @@ * Sends the message

@@ -7,2 +7,3 @@ import { Connection } from "./connection";

import { Entity } from "./entity";
import { AwaitableSender, AwaitableSenderOptions } from "./awaitableSender";
/**

@@ -31,2 +32,8 @@ * Describes the event listeners that can be added to the Session.

/**
* Returns the unique identifier for the session in the format:
* "local_<number>-remote_<number>-<connection-id>" or an empty string if the local channel or
* remote channel are not yet defined.
*/
readonly id: string;
/**
* Determines whether the session and the underlying connection is open.

@@ -67,5 +74,5 @@ * @returns {boolean} result `true` - is open; `false` otherwise.

* Creates an amqp receiver on this session.
* @param {Session} session The amqp session object on which the receiver link needs to be established.
* @param {ReceiverOptions} [options] Options that can be provided while creating an amqp receiver.
* @return {Promise<Receiver>} Promise<Receiver>
* @param session The amqp session object on which the receiver link needs to be established.
* @param options Options that can be provided while creating an amqp receiver.
* @return Promise<Receiver>
* - **Resolves** the promise with the Receiver object when rhea emits the "receiver_open" event.

@@ -78,4 +85,4 @@ * - **Rejects** the promise with an AmqpError when rhea emits the "receiver_close" event while trying

* Creates an amqp sender on this session.
* @param {SenderOptions} [options] Options that can be provided while creating an amqp sender.
* @return {Promise<Sender>} Promise<Sender>
* @param options Options that can be provided while creating an amqp sender.
* @return Promise<Sender>
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.

@@ -87,2 +94,25 @@ * - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while trying

/**
* Creates an awaitable amqp sender on this session.
* @param options Options that can be provided while creating an async amqp sender.
* - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will
* clear the timer and reject the Promise for all the entries of inflight send operation in its
* `deliveryDispositionMap`.
* - If the user is handling the reconnection of sender link or the underlying connection in it's
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
*
* @return Promise<AwaitableSender>
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while trying
* to create an amqp sender or the operation timeout occurs.
*/
createAwaitableSender(options?: AwaitableSenderOptions): Promise<AwaitableSender>;
/**
* Creates the Sender based on the provided type.
* @internal
* @param type The type of sender
* @param options Options to be provided while creating the sender.
*/
private _createSender;
/**
* Adds event listeners for the possible events that can occur on the session object and

@@ -89,0 +119,0 @@ * re-emits the same event back with the received arguments from rhea's event emitter.

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc