Socket
Socket
Sign inDemoInstall

rhea-promise

Package Overview
Dependencies
Maintainers
4
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rhea-promise - npm Package Compare versions

Comparing version 1.0.0 to 1.1.0

test/connection.spec.ts

6

changelog.md

@@ -0,1 +1,7 @@

### 1.1.0 - 2021-02-08
- All async methods now take a signal that can be used to cancel the operation. Fixes [#48](https://github.com/amqp/rhea-promise/issues/48)
- Added a `timeoutInSeconds` parameter to the `send` method on the `AwaitableSender` that overrides the timeout value for the send operation set when creating the sender.
- When the `error` event is fired when closing the sender/receiver link, surface errors occurring on the sender/receiver context if none are found on the session context. Details can be found in [PR #55](https://github.com/amqp/rhea-promise/pull/55)
- Updated minimum version of `rhea` to `^1.0.24`. Details can be found in [PR 68](https://github.com/amqp/rhea-promise/pull/68)
### 1.0.0 - 2019-06-27

@@ -2,0 +8,0 @@ - Updated minimum version of `rhea` to `^1.0.8`.

0

CODE_OF_CONDUCT.md

@@ -0,0 +0,0 @@ # Contributor Covenant Code of Conduct

@@ -10,2 +10,3 @@ "use strict";

const errorDefinitions_1 = require("./errorDefinitions");
const utils_1 = require("./util/utils");
/**

@@ -119,8 +120,20 @@ * Describes the sender where one can await on the message being sent.

* given message is assumed to be of type Message interface and encoded appropriately.
* @param {AwaitableSendOptions} [options] Options to configure the timeout and cancellation for
* the send operation.
* @returns {Promise<Delivery>} Promise<Delivery> The delivery information about the sent message.
*/
send(msg, tag, format) {
send(msg, tag, format, options) {
return new Promise((resolve, reject) => {
log.sender("[%s] Sender '%s' on amqp session '%s', credit: %d available: %d", this.connection.id, this.name, this.session.id, this.credit, this.session.outgoing.available());
const abortSignal = options && options.abortSignal;
const timeoutInSeconds = options && options.timeoutInSeconds;
if (abortSignal && abortSignal.aborted) {
const err = utils_1.createAbortError("Send request has been cancelled.");
log.error("[%s] %s", this.connection.id, err.message);
return reject(err);
}
if (this.sendable()) {
let sendTimeoutInSeconds = this.sendTimeoutInSeconds;
if (typeof timeoutInSeconds === "number" && timeoutInSeconds > 0)
sendTimeoutInSeconds = timeoutInSeconds;
const timer = setTimeout(() => {

@@ -133,9 +146,35 @@ this.deliveryDispositionMap.delete(delivery.id);

return reject(new errorDefinitions_1.OperationTimeoutError(message));
}, this.sendTimeoutInSeconds * 1000);
}, sendTimeoutInSeconds * 1000);
const onAbort = () => {
if (this.deliveryDispositionMap.has(delivery.id)) {
const promise = this.deliveryDispositionMap.get(delivery.id);
clearTimeout(promise.timer);
const deleteResult = this.deliveryDispositionMap.delete(delivery.id);
log.sender("[%s] Event: 'abort', Successfully deleted the delivery with id %d from the " +
" map of sender '%s' on amqp session '%s' and cleared the timer: %s.", this.connection.id, delivery.id, this.name, this.session.id, deleteResult);
const err = utils_1.createAbortError("Send request has been cancelled.");
log.error("[%s] %s", this.connection.id, err.message);
promise.reject(err);
}
};
const removeAbortListener = () => {
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};
const delivery = this._link.send(msg, tag, format);
this.deliveryDispositionMap.set(delivery.id, {
resolve: resolve,
reject: reject,
resolve: (delivery) => {
resolve(delivery);
removeAbortListener();
},
reject: (reason) => {
reject(reason);
removeAbortListener();
},
timer: timer
});
if (abortSignal) {
abortSignal.addEventListener("abort", onAbort);
}
}

@@ -142,0 +181,0 @@ else {

@@ -19,3 +19,3 @@ "use strict";

/**
* Descibes the AQMP Connection.
* Describes the AMQP Connection.
* @class Connection

@@ -105,8 +105,9 @@ */

* Creates a new amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* @return {Promise<Connection>} Promise<Connection>
* - **Resolves** the promise with the Connection object when rhea emits the "connection_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_close" event
* while trying to establish an amqp connection.
* while trying to establish an amqp connection or with an AbortError if the operation was cancelled.
*/
open() {
open(options) {
return new Promise((resolve, reject) => {

@@ -116,2 +117,4 @@ if (!this.isOpen()) {

let onClose;
let onAbort;
const abortSignal = options && options.abortSignal;
let waitTimer;

@@ -124,2 +127,5 @@ const removeListeners = () => {

this._connection.removeListener(rhea_1.ConnectionEvents.disconnected, onClose);
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

@@ -137,2 +143,9 @@ onOpen = (context) => {

};
onAbort = () => {
removeListeners();
this._connection.close();
const err = utils_1.createAbortError("Connection open request has been cancelled.");
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -152,2 +165,10 @@ removeListeners();

this.actionInitiated++;
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
}

@@ -161,8 +182,12 @@ else {

* Closes the amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* When the abort signal in the options is fired, the local endpoint is closed.
* This does not guarantee that the remote has closed as well. It only stops listening for
* an acknowledgement that the remote endpoint is closed as well.
* @return {Promise<void>} Promise<void>
* - **Resolves** the promise when rhea emits the "connection_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while
* trying to close an amqp connection.
* trying to close an amqp connection or with an AbortError if the operation was cancelled.
*/
close() {
close(options) {
return new Promise((resolve, reject) => {

@@ -174,2 +199,4 @@ log.error("[%s] The connection is open ? -> %s", this.id, this.isOpen());

let onDisconnected;
let onAbort;
const abortSignal = options && options.abortSignal;
let waitTimer;

@@ -182,2 +209,5 @@ const removeListeners = () => {

this._connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

@@ -201,2 +231,8 @@ onClose = (context) => {

};
onAbort = () => {
removeListeners();
const err = utils_1.createAbortError("Connection close request has been cancelled.");
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -215,2 +251,10 @@ removeListeners();

this.actionInitiated++;
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
}

@@ -281,8 +325,9 @@ else {

* Creates an amqp session on the provided amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* @return {Promise<Session>} Promise<Session>
* - **Resolves** the promise with the Session object when rhea emits the "session_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while
* trying to create an amqp session.
* trying to create an amqp session or with an AbortError if the operation was cancelled.
*/
createSession() {
createSession(options) {
return new Promise((resolve, reject) => {

@@ -295,2 +340,4 @@ const rheaSession = this._connection.create_session();

let onDisconnected;
let onAbort;
const abortSignal = options && options.abortSignal;
let waitTimer;

@@ -303,2 +350,5 @@ const removeListeners = () => {

rheaSession.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

@@ -323,2 +373,9 @@ onOpen = (context) => {

};
onAbort = () => {
removeListeners();
rheaSession.close();
const err = utils_1.createAbortError("Create session request has been cancelled.");
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -337,2 +394,10 @@ removeListeners();

rheaSession.begin();
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -342,2 +407,5 @@ }

* Creates an amqp sender link. It either uses the provided session or creates a new one.
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {SenderOptionsWithSession} options Optional parameters to create a sender link.

@@ -351,3 +419,3 @@ * @return {Promise<Sender>} Promise<Sender>.

}
const session = yield this.createSession();
const session = yield this.createSession({ abortSignal: options && options.abortSignal });
return session.createSender(options);

@@ -364,3 +432,3 @@ });

* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
* shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation.
*

@@ -374,3 +442,3 @@ * @return Promise<AwaitableSender>.

}
const session = yield this.createSession();
const session = yield this.createSession({ abortSignal: options && options.abortSignal });
return session.createAwaitableSender(options);

@@ -381,2 +449,5 @@ });

* Creates an amqp receiver link. It either uses the provided session or creates a new one.
* - **Resolves** the promise with the Sender object when rhea emits the "receiver_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "receiver_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link.

@@ -390,3 +461,3 @@ * @return {Promise<Receiver>} Promise<Receiver>.

}
const session = yield this.createSession();
const session = yield this.createSession({ abortSignal: options && options.abortSignal });
return session.createReceiver(options);

@@ -405,3 +476,3 @@ });

*/
createRequestResponseLink(senderOptions, receiverOptions, providedSession) {
createRequestResponseLink(senderOptions, receiverOptions, providedSession, abortSignal) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {

@@ -414,6 +485,6 @@ if (!senderOptions) {

}
const session = providedSession || (yield this.createSession());
const session = providedSession || (yield this.createSession({ abortSignal }));
const [sender, receiver] = yield Promise.all([
session.createSender(senderOptions),
session.createReceiver(receiverOptions)
session.createSender(Object.assign({}, senderOptions, { abortSignal })),
session.createReceiver(Object.assign({}, receiverOptions, { abortSignal }))
]);

@@ -436,3 +507,3 @@ log.connection("[%s] Successfully created the sender '%s' and receiver '%s' on the same " +

_initializeEventListeners() {
for (const eventName in rhea_1.ConnectionEvents) {
for (const eventName of Object.keys(rhea_1.ConnectionEvents)) {
this._connection.on(rhea_1.ConnectionEvents[eventName], (context) => {

@@ -446,3 +517,3 @@ const params = {

};
if (eventName === rhea_1.ConnectionEvents.protocolError) {
if (rhea_1.ConnectionEvents[eventName] === rhea_1.ConnectionEvents.protocolError) {
log.connection("[%s] ProtocolError is: %O.", this.id, context);

@@ -453,3 +524,3 @@ }

}
// Add event handlers for *_error and *_close events that can be propogated to the connection
// Add event handlers for *_error and *_close events that can be propagated to the connection
// object, if they are not handled at their level. * denotes - Sender, Receiver, Session

@@ -456,0 +527,0 @@ // Sender

22

dist/lib/link.js

@@ -190,4 +190,3 @@ "use strict";

options.closeSession = true;
this.removeAllListeners();
yield new Promise((resolve, reject) => {
const closePromise = new Promise((resolve, reject) => {
log.error("[%s] The %s '%s' on amqp session '%s' is open ? -> %s", this.connection.id, this.type, this.name, this.session.id, this.isOpen());

@@ -220,4 +219,11 @@ if (this.isOpen()) {

removeListeners();
log.error("[%s] Error occurred while closing %s '%s' on amqp session '%s': %O.", this.connection.id, this.type, this.name, this.session.id, context.session.error);
return reject(context.session.error);
let error = context.session.error;
if (this.type === LinkType.sender && context.sender && context.sender.error) {
error = context.sender.error;
}
else if (this.type === LinkType.receiver && context.receiver && context.receiver.error) {
error = context.receiver.error;
}
log.error("[%s] Error occurred while closing %s '%s' on amqp session '%s': %O.", this.connection.id, this.type, this.name, this.session.id, error);
return reject(error);
};

@@ -251,2 +257,8 @@ onDisconnected = (context) => {

});
try {
yield closePromise;
}
finally {
this.removeAllListeners();
}
if (options.closeSession) {

@@ -266,3 +278,3 @@ log[this.type]("[%s] %s '%s' has been closed, now closing it's amqp session '%s'.", this.connection.id, this.type, this.name, this.session.id);

const events = this.type === LinkType.sender ? rhea_1.SenderEvents : rhea_1.ReceiverEvents;
for (const eventName in events) {
for (const eventName of Object.keys(events)) {
this._link.on(events[eventName], (context) => {

@@ -269,0 +281,0 @@ const params = {

@@ -5,2 +5,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
const log = require("./log");

@@ -114,57 +115,87 @@ const receiver_1 = require("./receiver");

* handlers added in the rhea-promise library on the session
* @param options A set of options including a signal used to cancel the operation.
* When the abort signal in the options is fired, the local endpoint is closed.
* This does not guarantee that the remote has closed as well. It only stops listening for
* an acknowledgement that the remote endpoint is closed as well.
* @return {Promise<void>} Promise<void>
* - **Resolves** the promise when rhea emits the "session_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_error" event while trying
* to close an amqp session.
* to close an amqp session or with an AbortError if the operation was cancelled.
*/
close() {
this.removeAllListeners();
return new Promise((resolve, reject) => {
log.error("[%s] The amqp session '%s' is open ? -> %s", this.connection.id, this.id, this.isOpen());
if (this.isOpen()) {
let onError;
let onClose;
let onDisconnected;
let waitTimer;
const removeListeners = () => {
clearTimeout(waitTimer);
this.actionInitiated--;
this._session.removeListener(rhea_1.SessionEvents.sessionError, onError);
this._session.removeListener(rhea_1.SessionEvents.sessionClose, onClose);
this._session.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
};
onClose = (context) => {
removeListeners();
log.session("[%s] Resolving the promise as the amqp session '%s' has been closed.", this.connection.id, this.id);
close(options) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
const closePromise = new Promise((resolve, reject) => {
log.error("[%s] The amqp session '%s' is open ? -> %s", this.connection.id, this.id, this.isOpen());
if (this.isOpen()) {
let onError;
let onClose;
let onDisconnected;
let onAbort;
const abortSignal = options && options.abortSignal;
let waitTimer;
const removeListeners = () => {
clearTimeout(waitTimer);
this.actionInitiated--;
this._session.removeListener(rhea_1.SessionEvents.sessionError, onError);
this._session.removeListener(rhea_1.SessionEvents.sessionClose, onClose);
this._session.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};
onClose = (context) => {
removeListeners();
log.session("[%s] Resolving the promise as the amqp session '%s' has been closed.", this.connection.id, this.id);
return resolve();
};
onError = (context) => {
removeListeners();
log.error("[%s] Error occurred while closing amqp session '%s'.", this.connection.id, this.id, context.session.error);
reject(context.session.error);
};
onDisconnected = (context) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while closing amqp session '%s': %O.", this.connection.id, this.id, error);
};
onAbort = () => {
removeListeners();
const err = utils_1.createAbortError("Session close request has been cancelled.");
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {
removeListeners();
const msg = `Unable to close the amqp session ${this.id} due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);
reject(new errorDefinitions_1.OperationTimeoutError(msg));
};
// listeners that we add for completing the operation are added directly to rhea's objects.
this._session.once(rhea_1.SessionEvents.sessionClose, onClose);
this._session.once(rhea_1.SessionEvents.sessionError, onError);
this._session.connection.once(rhea_1.ConnectionEvents.disconnected, onDisconnected);
log.session("[%s] Calling session.close() for amqp session '%s'.", this.connection.id, this.id);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);
this._session.close();
this.actionInitiated++;
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
}
else {
return resolve();
};
onError = (context) => {
removeListeners();
log.error("[%s] Error occurred while closing amqp session '%s'.", this.connection.id, this.id, context.session.error);
reject(context.session.error);
};
onDisconnected = (context) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while closing amqp session '%s': %O.", this.connection.id, this.id, error);
};
const actionAfterTimeout = () => {
removeListeners();
const msg = `Unable to close the amqp session ${this.id} due to operation timeout.`;
log.error("[%s] %s", this.connection.id, msg);
reject(new errorDefinitions_1.OperationTimeoutError(msg));
};
// listeners that we add for completing the operation are added directly to rhea's objects.
this._session.once(rhea_1.SessionEvents.sessionClose, onClose);
this._session.once(rhea_1.SessionEvents.sessionError, onError);
this._session.connection.once(rhea_1.ConnectionEvents.disconnected, onDisconnected);
log.session("[%s] Calling session.close() for amqp session '%s'.", this.connection.id, this.id);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);
this._session.close();
this.actionInitiated++;
}
});
try {
yield closePromise;
}
else {
return resolve();
finally {
this.removeAllListeners();
}

@@ -221,2 +252,4 @@ });

let onDisconnected;
let onAbort;
const abortSignal = options && options.abortSignal;
let waitTimer;

@@ -245,2 +278,5 @@ if (options && options.onMessage) {

rheaReceiver.session.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

@@ -267,2 +303,9 @@ onOpen = (context) => {

};
onAbort = () => {
removeListeners();
rheaReceiver.close();
const err = utils_1.createAbortError("Create receiver request has been cancelled.");
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -280,2 +323,10 @@ removeListeners();

waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -343,2 +394,4 @@ }

let onDisconnected;
let onAbort;
const abortSignal = options && options.abortSignal;
let waitTimer;

@@ -375,2 +428,5 @@ // listeners provided by the user in the options object should be added

rheaSender.session.connection.removeListener(rhea_1.ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

@@ -397,2 +453,9 @@ onSendable = (context) => {

};
onAbort = () => {
removeListeners();
rheaSender.close();
const err = utils_1.createAbortError("Create sender request has been cancelled.");
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -410,2 +473,10 @@ removeListeners();

waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -420,3 +491,3 @@ }

_initializeEventListeners() {
for (const eventName in rhea_1.SessionEvents) {
for (const eventName of Object.keys(rhea_1.SessionEvents)) {
this._session.on(rhea_1.SessionEvents[eventName], (context) => {

@@ -433,3 +504,3 @@ const params = {

}
// Add event handlers for *_error and *_close events that can be propogated to the session
// Add event handlers for *_error and *_close events that can be propagated to the session
// object, if they are not handled at their level. * denotes - Sender and Receiver.

@@ -436,0 +507,0 @@ // Sender

@@ -146,2 +146,14 @@ "use strict";

exports.emitEvent = emitEvent;
exports.abortErrorName = "AbortError";
/**
* Helper method to return an Error to be used when an operation is cancelled
* using an AbortSignalLike
* @param errorMessage
*/
function createAbortError(errorMessage) {
const error = new Error(errorMessage);
error.name = exports.abortErrorName;
return error;
}
exports.createAbortError = createAbortError;
//# sourceMappingURL=utils.js.map

@@ -15,2 +15,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

} from "./errorDefinitions";
import { AbortSignalLike, createAbortError } from "./util/utils";

@@ -45,2 +46,16 @@ /**

export interface AwaitableSendOptions {
/**
* The duration in which the promise to send the message should complete (resolve/reject).
* If it is not completed, then the Promise will be rejected after timeout occurs.
* Default: `20 seconds`.
*/
timeoutInSeconds?: number;
/**
* A signal to cancel the send operation. This does not guarantee that the message will not be
* sent. It only stops listening for an acknowledgement from the remote endpoint.
*/
abortSignal?: AbortSignalLike;
}
/**

@@ -175,5 +190,7 @@ * Describes the sender where one can await on the message being sent.

* given message is assumed to be of type Message interface and encoded appropriately.
* @param {AwaitableSendOptions} [options] Options to configure the timeout and cancellation for
* the send operation.
* @returns {Promise<Delivery>} Promise<Delivery> The delivery information about the sent message.
*/
send(msg: Message | Buffer, tag?: Buffer | string, format?: number): Promise<Delivery> {
send(msg: Message | Buffer, tag?: Buffer | string, format?: number, options?: AwaitableSendOptions): Promise<Delivery> {
return new Promise<Delivery>((resolve, reject) => {

@@ -183,3 +200,15 @@ log.sender("[%s] Sender '%s' on amqp session '%s', credit: %d available: %d",

this.session.outgoing.available());
const abortSignal = options && options.abortSignal;
const timeoutInSeconds = options && options.timeoutInSeconds;
if (abortSignal && abortSignal.aborted) {
const err = createAbortError("Send request has been cancelled.");
log.error("[%s] %s", this.connection.id, err.message);
return reject(err);
}
if (this.sendable()) {
let sendTimeoutInSeconds = this.sendTimeoutInSeconds;
if (typeof timeoutInSeconds === "number" && timeoutInSeconds > 0) sendTimeoutInSeconds = timeoutInSeconds;
const timer = setTimeout(() => {

@@ -192,10 +221,38 @@ this.deliveryDispositionMap.delete(delivery.id);

return reject(new OperationTimeoutError(message));
}, this.sendTimeoutInSeconds * 1000);
}, sendTimeoutInSeconds * 1000);
const onAbort = () => {
if (this.deliveryDispositionMap.has(delivery.id)) {
const promise = this.deliveryDispositionMap.get(delivery.id) as PromiseLike;
clearTimeout(promise.timer);
const deleteResult = this.deliveryDispositionMap.delete(delivery.id);
log.sender(
"[%s] Event: 'abort', Successfully deleted the delivery with id %d from the " +
" map of sender '%s' on amqp session '%s' and cleared the timer: %s.",
this.connection.id, delivery.id, this.name, this.session.id, deleteResult
);
const err = createAbortError("Send request has been cancelled.");
log.error("[%s] %s", this.connection.id, err.message);
promise.reject(err);
}
};
const removeAbortListener = () => {
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};
const delivery = (this._link as RheaSender).send(msg, tag, format);
this.deliveryDispositionMap.set(delivery.id, {
resolve: resolve,
reject: reject,
resolve: (delivery: any) => {
resolve(delivery);
removeAbortListener();
},
reject: (reason?: any) => {
reject(reason);
removeAbortListener();
},
timer: timer
});
if (abortSignal) { abortSignal.addEventListener("abort", onAbort); }
} else {

@@ -202,0 +259,0 @@ // Please send the message after some time.

@@ -12,3 +12,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

import { defaultOperationTimeoutInSeconds } from "./util/constants";
import { Func, EmitParameters, emitEvent } from "./util/utils";
import { Func, EmitParameters, emitEvent, AbortSignalLike, createAbortError } from "./util/utils";
import {

@@ -53,2 +53,32 @@ ConnectionEvents, SessionEvents, SenderEvents, ReceiverEvents, create_connection, websocket_connect,

/**
* Set of options to use when running Connection.open()
*/
export interface ConnectionOpenOptions {
/**
* A signal used to cancel the Connection.open() operation.
*/
abortSignal?: AbortSignalLike;
}
/**
* Set of options to use when running Connection.close()
*/
export interface ConnectionCloseOptions {
/**
* A signal used to cancel the Connection.close() operation.
*/
abortSignal?: AbortSignalLike;
}
/**
* Set of options to use when running Connection.createSession()
*/
export interface SessionCreateOptions {
/**
* A signal used to cancel the Connection.createSession() operation.
*/
abortSignal?: AbortSignalLike;
}
/**
* Describes the options that can be provided while creating an AMQP connection.

@@ -156,3 +186,3 @@ * @interface ConnectionOptions

/**
* Descibes the AQMP Connection.
* Describes the AMQP Connection.
* @class Connection

@@ -270,8 +300,9 @@ */

* Creates a new amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* @return {Promise<Connection>} Promise<Connection>
* - **Resolves** the promise with the Connection object when rhea emits the "connection_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_close" event
* while trying to establish an amqp connection.
* while trying to establish an amqp connection or with an AbortError if the operation was cancelled.
*/
open(): Promise<Connection> {
open(options?: ConnectionOpenOptions): Promise<Connection> {
return new Promise((resolve, reject) => {

@@ -282,2 +313,4 @@ if (!this.isOpen()) {

let onClose: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

@@ -291,2 +324,3 @@

this._connection.removeListener(ConnectionEvents.disconnected, onClose);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};

@@ -308,2 +342,10 @@

onAbort = () => {
removeListeners();
this._connection.close();
const err = createAbortError("Connection open request has been cancelled.");
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -324,2 +366,10 @@ removeListeners();

this.actionInitiated++;
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
} else {

@@ -334,8 +384,12 @@ return resolve(this);

* Closes the amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* When the abort signal in the options is fired, the local endpoint is closed.
* This does not guarantee that the remote has closed as well. It only stops listening for
* an acknowledgement that the remote endpoint is closed as well.
* @return {Promise<void>} Promise<void>
* - **Resolves** the promise when rhea emits the "connection_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while
* trying to close an amqp connection.
* trying to close an amqp connection or with an AbortError if the operation was cancelled.
*/
close(): Promise<void> {
close(options?: ConnectionCloseOptions): Promise<void> {
return new Promise<void>((resolve, reject) => {

@@ -347,3 +401,6 @@ log.error("[%s] The connection is open ? -> %s", this.id, this.isOpen());

let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;
const removeListeners = () => {

@@ -355,2 +412,3 @@ clearTimeout(waitTimer);

this._connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};

@@ -380,2 +438,9 @@

onAbort = () => {
removeListeners();
const err = createAbortError("Connection close request has been cancelled.");
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -395,2 +460,10 @@ removeListeners();

this.actionInitiated++;
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
} else {

@@ -468,8 +541,9 @@ return resolve();

* Creates an amqp session on the provided amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* @return {Promise<Session>} Promise<Session>
* - **Resolves** the promise with the Session object when rhea emits the "session_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while
* trying to create an amqp session.
* trying to create an amqp session or with an AbortError if the operation was cancelled.
*/
createSession(): Promise<Session> {
createSession(options?: SessionCreateOptions): Promise<Session> {
return new Promise((resolve, reject) => {

@@ -482,2 +556,4 @@ const rheaSession = this._connection.create_session();

let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

@@ -491,2 +567,3 @@

rheaSession.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};

@@ -517,2 +594,10 @@

onAbort = () => {
removeListeners();
rheaSession.close();
const err = createAbortError("Create session request has been cancelled.");
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -532,2 +617,10 @@ removeListeners();

rheaSession.begin();
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -538,10 +631,13 @@ }

* Creates an amqp sender link. It either uses the provided session or creates a new one.
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {SenderOptionsWithSession} options Optional parameters to create a sender link.
* @return {Promise<Sender>} Promise<Sender>.
*/
async createSender(options?: SenderOptionsWithSession): Promise<Sender> {
async createSender(options?: SenderOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<Sender> {
if (options && options.session && options.session.createSender) {
return options.session.createSender(options);
}
const session = await this.createSession();
const session = await this.createSession({ abortSignal: options && options.abortSignal });
return session.createSender(options);

@@ -558,11 +654,11 @@ }

* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
* shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation.
*
* @return Promise<AwaitableSender>.
*/
async createAwaitableSender(options?: AwaitableSenderOptionsWithSession): Promise<AwaitableSender> {
async createAwaitableSender(options?: AwaitableSenderOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<AwaitableSender> {
if (options && options.session && options.session.createAwaitableSender) {
return options.session.createAwaitableSender(options);
}
const session = await this.createSession();
const session = await this.createSession({ abortSignal: options && options.abortSignal });
return session.createAwaitableSender(options);

@@ -573,10 +669,13 @@ }

* Creates an amqp receiver link. It either uses the provided session or creates a new one.
* - **Resolves** the promise with the Sender object when rhea emits the "receiver_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "receiver_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link.
* @return {Promise<Receiver>} Promise<Receiver>.
*/
async createReceiver(options?: ReceiverOptionsWithSession): Promise<Receiver> {
async createReceiver(options?: ReceiverOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<Receiver> {
if (options && options.session && options.session.createReceiver) {
return options.session.createReceiver(options);
}
const session = await this.createSession();
const session = await this.createSession({ abortSignal: options && options.abortSignal });
return session.createReceiver(options);

@@ -596,3 +695,3 @@ }

async createRequestResponseLink(senderOptions: SenderOptions, receiverOptions: ReceiverOptions,
providedSession?: Session): Promise<ReqResLink> {
providedSession?: Session, abortSignal?: AbortSignalLike): Promise<ReqResLink> {
if (!senderOptions) {

@@ -604,6 +703,6 @@ throw new Error(`Please provide sender options.`);

}
const session = providedSession || await this.createSession();
const session = providedSession || await this.createSession({ abortSignal });
const [sender, receiver] = await Promise.all([
session.createSender(senderOptions),
session.createReceiver(receiverOptions)
session.createSender({ ...senderOptions, abortSignal }),
session.createReceiver({ ...receiverOptions, abortSignal })
]);

@@ -626,3 +725,3 @@ log.connection("[%s] Successfully created the sender '%s' and receiver '%s' on the same " +

private _initializeEventListeners(): void {
for (const eventName in ConnectionEvents) {
for (const eventName of Object.keys(ConnectionEvents) as Array<keyof typeof ConnectionEvents>) {
this._connection.on(ConnectionEvents[eventName], (context) => {

@@ -636,3 +735,3 @@ const params: EmitParameters = {

};
if (eventName === ConnectionEvents.protocolError) {
if (ConnectionEvents[eventName] === ConnectionEvents.protocolError) {
log.connection("[%s] ProtocolError is: %O.", this.id, context);

@@ -644,3 +743,3 @@ }

// Add event handlers for *_error and *_close events that can be propogated to the connection
// Add event handlers for *_error and *_close events that can be propagated to the connection
// object, if they are not handled at their level. * denotes - Sender, Receiver, Session

@@ -647,0 +746,0 @@

@@ -0,0 +0,0 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

@@ -0,0 +0,0 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

@@ -0,0 +0,0 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

@@ -0,0 +0,0 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

@@ -0,0 +0,0 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

@@ -235,4 +235,4 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

if (options.closeSession == undefined) options.closeSession = true;
this.removeAllListeners();
await new Promise<void>((resolve, reject) => {
const closePromise = new Promise<void>((resolve, reject) => {
log.error("[%s] The %s '%s' on amqp session '%s' is open ? -> %s",

@@ -269,5 +269,12 @@ this.connection.id, this.type, this.name, this.session.id, this.isOpen());

removeListeners();
let error = context.session!.error;
if (this.type === LinkType.sender && context.sender && context.sender.error) {
error = context.sender.error;
} else if (this.type === LinkType.receiver && context.receiver && context.receiver.error) {
error = context.receiver.error;
}
log.error("[%s] Error occurred while closing %s '%s' on amqp session '%s': %O.",
this.connection.id, this.type, this.name, this.session.id, context.session!.error);
return reject(context.session!.error);
this.connection.id, this.type, this.name, this.session.id, error);
return reject(error);
};

@@ -305,2 +312,8 @@

try {
await closePromise;
} finally {
this.removeAllListeners();
}
if (options.closeSession) {

@@ -321,3 +334,3 @@ log[this.type]("[%s] %s '%s' has been closed, now closing it's amqp session '%s'.",

const events = this.type === LinkType.sender ? SenderEvents : ReceiverEvents;
for (const eventName in events) {
for (const eventName of Object.keys(events) as Array<keyof typeof events>) {
this._link.on(events[eventName],

@@ -324,0 +337,0 @@ (context: RheaEventContext) => {

@@ -0,0 +0,0 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

@@ -0,0 +0,0 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

@@ -0,0 +0,0 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

@@ -12,3 +12,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

} from "rhea";
import { Func, EmitParameters, emitEvent } from "./util/utils";
import { Func, EmitParameters, emitEvent, createAbortError, AbortSignalLike } from "./util/utils";
import { OnAmqpEvent } from "./eventContext";

@@ -28,2 +28,9 @@ import { Entity } from "./entity";

/**
* Set of options to use when running session.close()
*/
export interface SessionCloseOptions {
abortSignal?: AbortSignalLike;
}
/**
* @internal

@@ -143,10 +150,14 @@ */

* handlers added in the rhea-promise library on the session
* @param options A set of options including a signal used to cancel the operation.
* When the abort signal in the options is fired, the local endpoint is closed.
* This does not guarantee that the remote has closed as well. It only stops listening for
* an acknowledgement that the remote endpoint is closed as well.
* @return {Promise<void>} Promise<void>
* - **Resolves** the promise when rhea emits the "session_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_error" event while trying
* to close an amqp session.
* to close an amqp session or with an AbortError if the operation was cancelled.
*/
close(): Promise<void> {
this.removeAllListeners();
return new Promise<void>((resolve, reject) => {
async close(options?: SessionCloseOptions): Promise<void> {
const closePromise = new Promise<void>((resolve, reject) => {
log.error("[%s] The amqp session '%s' is open ? -> %s", this.connection.id, this.id, this.isOpen());

@@ -157,2 +168,4 @@ if (this.isOpen()) {

let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

@@ -166,2 +179,3 @@

this._session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};

@@ -192,2 +206,9 @@

onAbort = () => {
removeListeners();
const err = createAbortError("Session close request has been cancelled.");
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -208,2 +229,10 @@ removeListeners();

this.actionInitiated++;
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
} else {

@@ -213,2 +242,9 @@ return resolve();

});
try {
await closePromise;
} finally {
this.removeAllListeners();
}
}

@@ -225,3 +261,3 @@

*/
createReceiver(options?: ReceiverOptions): Promise<Receiver> {
createReceiver(options?: ReceiverOptions & { abortSignal?: AbortSignalLike; }): Promise<Receiver> {
return new Promise((resolve, reject) => {

@@ -267,2 +303,4 @@ if (options &&

let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

@@ -299,2 +337,3 @@

rheaReceiver.session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};

@@ -327,2 +366,10 @@

onAbort = () => {
removeListeners();
rheaReceiver.close();
const err = createAbortError("Create receiver request has been cancelled.");
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -341,2 +388,10 @@ removeListeners();

waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000);
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -353,3 +408,3 @@ }

*/
createSender(options?: SenderOptions): Promise<Sender> {
createSender(options?: SenderOptions & { abortSignal?: AbortSignalLike; }): Promise<Sender> {
return this._createSender(SenderType.sender, options) as Promise<Sender>;

@@ -373,3 +428,3 @@ }

*/
createAwaitableSender(options?: AwaitableSenderOptions): Promise<AwaitableSender> {
createAwaitableSender(options?: AwaitableSenderOptions & { abortSignal?: AbortSignalLike; }): Promise<AwaitableSender> {
return this._createSender(SenderType.AwaitableSender, options) as Promise<AwaitableSender>;

@@ -386,3 +441,3 @@ }

type: SenderType,
options?: SenderOptions | AwaitableSenderOptions): Promise<Sender | AwaitableSender> {
options?: (SenderOptions | AwaitableSenderOptions) & { abortSignal?: AbortSignalLike; }): Promise<Sender | AwaitableSender> {
return new Promise((resolve, reject) => {

@@ -413,2 +468,4 @@ // Register session handlers for session_error and session_close if provided.

let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

@@ -447,2 +504,3 @@

rheaSender.session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};

@@ -475,2 +533,10 @@

onAbort = () => {
removeListeners();
rheaSender.close();
const err = createAbortError("Create sender request has been cancelled.");
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -489,2 +555,10 @@ removeListeners();

waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000);
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -501,3 +575,3 @@ }

for (const eventName in SessionEvents) {
for (const eventName of Object.keys(SessionEvents) as Array<keyof typeof SessionEvents>) {
this._session.on(SessionEvents[eventName],

@@ -516,3 +590,3 @@ (context) => {

// Add event handlers for *_error and *_close events that can be propogated to the session
// Add event handlers for *_error and *_close events that can be propagated to the session
// object, if they are not handled at their level. * denotes - Sender and Receiver.

@@ -519,0 +593,0 @@

@@ -0,0 +0,0 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

@@ -200,1 +200,37 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

}
export interface AbortSignalLike {
/**
* Indicates if the signal has already been aborted.
*/
readonly aborted: boolean;
/**
* Add new "abort" event listener, only support "abort" event.
*/
addEventListener(
type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any
): void;
/**
* Remove "abort" event listener, only support "abort" event.
*/
removeEventListener(
type: "abort",
listener: (this: AbortSignalLike, ev: any) => any,
options?: any
): void;
}
export const abortErrorName = "AbortError";
/**
* Helper method to return an Error to be used when an operation is cancelled
* using an AbortSignalLike
* @param errorMessage
*/
export function createAbortError(errorMessage: string): Error {
const error = new Error(errorMessage);
error.name = abortErrorName;
return error;
}
{
"name": "rhea-promise",
"version": "1.0.0",
"version": "1.1.0",
"description": "A Promisified layer over rhea AMQP client",

@@ -10,3 +10,3 @@ "license": "Apache-2.0",

"debug": "^3.1.0",
"rhea": "^1.0.8",
"rhea": "^1.0.24",
"tslib": "^1.10.0"

@@ -22,10 +22,15 @@ },

"devDependencies": {
"@azure/abort-controller": "^1.0.1",
"@types/chai": "^4.2.11",
"@types/debug": "^0.0.31",
"@types/dotenv": "^6.1.1",
"@types/mocha": "^7.0.2",
"@types/node": "^8.0.37",
"@types/dotenv": "^6.1.1",
"chai": "^4.2.0",
"dotenv": "^8.0.0",
"mocha": "^6.2.3",
"rimraf": "^2.6.3",
"ts-node": "^8.2.0",
"tslint": "^5.17.0",
"typescript": "^3.5.1",
"dotenv": "^8.0.0"
"typescript": "3.5.1"
},

@@ -37,3 +42,3 @@ "scripts": {

"build": "npm run tslint && npm run tsc",
"test": "npm run build",
"test": "npm run build && mocha -r ts-node/register ./test/*.spec.ts",
"prepack": "npm i && npm run build"

@@ -40,0 +45,0 @@ },

# rhea-promise
A Promisified layer over [rhea](https://githhub.com/amqp/rhea) AMQP client.
A Promisified layer over [rhea](https://github.com/amqp/rhea) AMQP client.

@@ -49,3 +49,3 @@ ## Pre-requisite ##

### Error propogation to the parent entity
### Error propagation to the parent entity
- In `AMQP`, for two peers to communicate successfully, different entities (Container, Connection, Session, Link) need to be created. There is a relationship between those entities.

@@ -59,3 +59,3 @@ - 1 Container can have 1..* Connections.

- if a session goes down then, all the the links on that session are down.
- When an entity goes down rhea emits \*_error and \*_close events, where * can be "sender", "receiver", "session", "connection". If event listeners for the aforementioned events are not added at the appropriate level, then `rhea` propogates those events to its parent entity.
- When an entity goes down rhea emits \*_error and \*_close events, where * can be "sender", "receiver", "session", "connection". If event listeners for the aforementioned events are not added at the appropriate level, then `rhea` propagates those events to its parent entity.
If they are not handled at the `Container` level (uber parent), then they are transformed into an `error` event. This would cause your

@@ -67,4 +67,4 @@ application to crash if there is no listener added for the `error` event.

event emitter methods like `.once()`, `.on()`, `.prependListeners()`, etc. Since `rhea-promise` add those event listeners on `rhea` objects,
the errors will never be propogated to the parent entity. This can be good as well as bad depending on what you do.
- **Good** - `*_error` events and `*_close` events emitted on an entity will not be propogated to it's parent. Thus ensuring that errors are handled at the right level.
the errors will never be propagated to the parent entity. This can be good as well as bad depending on what you do.
- **Good** - `*_error` events and `*_close` events emitted on an entity will not be propagated to it's parent. Thus ensuring that errors are handled at the right level.
- **Bad** - If you do not add listeners for `*_error` and `*_close` events at the right level, then you will never know why an entity shutdown.

@@ -302,2 +302,2 @@

## AMQP Protocol specification
Amqp protocol specification can be found [here](http://www.amqp.org/sites/amqp.org/files/amqp.pdf).
Amqp protocol specification can be found [here](http://www.amqp.org/sites/amqp.org/files/amqp.pdf).

@@ -7,2 +7,3 @@ /// <reference types="node" />

import { Session } from "./session";
import { AbortSignalLike } from "./util/utils";
/**

@@ -33,2 +34,15 @@ * Describes the interface for the send operation Promise which contains a reference to resolve,

}
export interface AwaitableSendOptions {
/**
* The duration in which the promise to send the message should complete (resolve/reject).
* If it is not completed, then the Promise will be rejected after timeout occurs.
* Default: `20 seconds`.
*/
timeoutInSeconds?: number;
/**
* A signal to cancel the send operation. This does not guarantee that the message will not be
* sent. It only stops listening for an acknowledgement from the remote endpoint.
*/
abortSignal?: AbortSignalLike;
}
/**

@@ -62,6 +76,8 @@ * Describes the sender where one can await on the message being sent.

* given message is assumed to be of type Message interface and encoded appropriately.
* @param {AwaitableSendOptions} [options] Options to configure the timeout and cancellation for
* the send operation.
* @returns {Promise<Delivery>} Promise<Delivery> The delivery information about the sent message.
*/
send(msg: Message | Buffer, tag?: Buffer | string, format?: number): Promise<Delivery>;
send(msg: Message | Buffer, tag?: Buffer | string, format?: number, options?: AwaitableSendOptions): Promise<Delivery>;
}
//# sourceMappingURL=awaitableSender.d.ts.map

@@ -8,2 +8,3 @@ /// <reference types="node" />

import { Container } from "./container";
import { AbortSignalLike } from "./util/utils";
import { ConnectionEvents, ConnectionOptions as RheaConnectionOptions, Connection as RheaConnection, AmqpError, Dictionary, ConnectionError } from "rhea";

@@ -38,2 +39,29 @@ import { OnAmqpEvent } from "./eventContext";

/**
* Set of options to use when running Connection.open()
*/
export interface ConnectionOpenOptions {
/**
* A signal used to cancel the Connection.open() operation.
*/
abortSignal?: AbortSignalLike;
}
/**
* Set of options to use when running Connection.close()
*/
export interface ConnectionCloseOptions {
/**
* A signal used to cancel the Connection.close() operation.
*/
abortSignal?: AbortSignalLike;
}
/**
* Set of options to use when running Connection.createSession()
*/
export interface SessionCreateOptions {
/**
* A signal used to cancel the Connection.createSession() operation.
*/
abortSignal?: AbortSignalLike;
}
/**
* Describes the options that can be provided while creating an AMQP connection.

@@ -129,3 +157,3 @@ * @interface ConnectionOptions

/**
* Descibes the AQMP Connection.
* Describes the AMQP Connection.
* @class Connection

@@ -193,16 +221,21 @@ */

* Creates a new amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* @return {Promise<Connection>} Promise<Connection>
* - **Resolves** the promise with the Connection object when rhea emits the "connection_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_close" event
* while trying to establish an amqp connection.
* while trying to establish an amqp connection or with an AbortError if the operation was cancelled.
*/
open(): Promise<Connection>;
open(options?: ConnectionOpenOptions): Promise<Connection>;
/**
* Closes the amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* When the abort signal in the options is fired, the local endpoint is closed.
* This does not guarantee that the remote has closed as well. It only stops listening for
* an acknowledgement that the remote endpoint is closed as well.
* @return {Promise<void>} Promise<void>
* - **Resolves** the promise when rhea emits the "connection_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while
* trying to close an amqp connection.
* trying to close an amqp connection or with an AbortError if the operation was cancelled.
*/
close(): Promise<void>;
close(options?: ConnectionCloseOptions): Promise<void>;
/**

@@ -247,14 +280,20 @@ * Determines whether the connection is open.

* Creates an amqp session on the provided amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* @return {Promise<Session>} Promise<Session>
* - **Resolves** the promise with the Session object when rhea emits the "session_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while
* trying to create an amqp session.
* trying to create an amqp session or with an AbortError if the operation was cancelled.
*/
createSession(): Promise<Session>;
createSession(options?: SessionCreateOptions): Promise<Session>;
/**
* Creates an amqp sender link. It either uses the provided session or creates a new one.
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {SenderOptionsWithSession} options Optional parameters to create a sender link.
* @return {Promise<Sender>} Promise<Sender>.
*/
createSender(options?: SenderOptionsWithSession): Promise<Sender>;
createSender(options?: SenderOptionsWithSession & {
abortSignal?: AbortSignalLike;
}): Promise<Sender>;
/**

@@ -268,13 +307,20 @@ * Creates an awaitable amqp sender. It either uses the provided session or creates a new one.

* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
* shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation.
*
* @return Promise<AwaitableSender>.
*/
createAwaitableSender(options?: AwaitableSenderOptionsWithSession): Promise<AwaitableSender>;
createAwaitableSender(options?: AwaitableSenderOptionsWithSession & {
abortSignal?: AbortSignalLike;
}): Promise<AwaitableSender>;
/**
* Creates an amqp receiver link. It either uses the provided session or creates a new one.
* - **Resolves** the promise with the Sender object when rhea emits the "receiver_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "receiver_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link.
* @return {Promise<Receiver>} Promise<Receiver>.
*/
createReceiver(options?: ReceiverOptionsWithSession): Promise<Receiver>;
createReceiver(options?: ReceiverOptionsWithSession & {
abortSignal?: AbortSignalLike;
}): Promise<Receiver>;
/**

@@ -290,3 +336,3 @@ * Creates an amqp sender-receiver link. It either uses the provided session or creates a new one.

*/
createRequestResponseLink(senderOptions: SenderOptions, receiverOptions: ReceiverOptions, providedSession?: Session): Promise<ReqResLink>;
createRequestResponseLink(senderOptions: SenderOptions, receiverOptions: ReceiverOptions, providedSession?: Session, abortSignal?: AbortSignalLike): Promise<ReqResLink>;
/**

@@ -293,0 +339,0 @@ * Adds event listeners for the possible events that can occur on the connection object and

@@ -5,2 +5,3 @@ import { Connection } from "./connection";

import { SessionEvents, AmqpError, Session as RheaSession } from "rhea";
import { AbortSignalLike } from "./util/utils";
import { OnAmqpEvent } from "./eventContext";

@@ -17,2 +18,8 @@ import { Entity } from "./entity";

/**
* Set of options to use when running session.close()
*/
export interface SessionCloseOptions {
abortSignal?: AbortSignalLike;
}
/**
* Describes the session that wraps the rhea session.

@@ -66,8 +73,12 @@ * @class Session

* handlers added in the rhea-promise library on the session
* @param options A set of options including a signal used to cancel the operation.
* When the abort signal in the options is fired, the local endpoint is closed.
* This does not guarantee that the remote has closed as well. It only stops listening for
* an acknowledgement that the remote endpoint is closed as well.
* @return {Promise<void>} Promise<void>
* - **Resolves** the promise when rhea emits the "session_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_error" event while trying
* to close an amqp session.
* to close an amqp session or with an AbortError if the operation was cancelled.
*/
close(): Promise<void>;
close(options?: SessionCloseOptions): Promise<void>;
/**

@@ -82,3 +93,5 @@ * Creates an amqp receiver on this session.

*/
createReceiver(options?: ReceiverOptions): Promise<Receiver>;
createReceiver(options?: ReceiverOptions & {
abortSignal?: AbortSignalLike;
}): Promise<Receiver>;
/**

@@ -92,3 +105,5 @@ * Creates an amqp sender on this session.

*/
createSender(options?: SenderOptions): Promise<Sender>;
createSender(options?: SenderOptions & {
abortSignal?: AbortSignalLike;
}): Promise<Sender>;
/**

@@ -109,3 +124,5 @@ * Creates an awaitable amqp sender on this session.

*/
createAwaitableSender(options?: AwaitableSenderOptions): Promise<AwaitableSender>;
createAwaitableSender(options?: AwaitableSenderOptions & {
abortSignal?: AbortSignalLike;
}): Promise<AwaitableSender>;
/**

@@ -112,0 +129,0 @@ * Creates the Sender based on the provided type.

@@ -133,2 +133,23 @@ import { EventContext as RheaEventContext } from "rhea";

export declare function emitEvent(params: EmitParameters): void;
export interface AbortSignalLike {
/**
* Indicates if the signal has already been aborted.
*/
readonly aborted: boolean;
/**
* Add new "abort" event listener, only support "abort" event.
*/
addEventListener(type: "abort", listener: (this: AbortSignalLike, ev: any) => any, options?: any): void;
/**
* Remove "abort" event listener, only support "abort" event.
*/
removeEventListener(type: "abort", listener: (this: AbortSignalLike, ev: any) => any, options?: any): void;
}
export declare const abortErrorName = "AbortError";
/**
* Helper method to return an Error to be used when an operation is cancelled
* using an AbortSignalLike
* @param errorMessage
*/
export declare function createAbortError(errorMessage: string): Error;
//# sourceMappingURL=utils.d.ts.map

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc