Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rhea-promise

Package Overview
Dependencies
Maintainers
4
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rhea-promise - npm Package Compare versions

Comparing version 1.2.0 to 1.2.1

4

changelog.md

@@ -0,1 +1,5 @@

### 1.2.1 - (2021-04-15)
- `createSession`, `createReceiver`, and `createSender` methods now only close underlying rhea analogue when cancelled if the resource has already been opened.
### 1.2.0 - 2021-03-25

@@ -2,0 +6,0 @@ - Exposes the `incoming` getter on the `Session` that lets accessing size and capacity of the incoming deliveries [#79](https://github.com/amqp/rhea-promise/pull/79).

53

dist/lib/connection.js

@@ -322,2 +322,38 @@ "use strict";

return new Promise((resolve, reject) => {
const abortSignal = options && options.abortSignal;
let onAbort;
if (abortSignal) {
const rejectOnAbort = () => {
const err = utils_1.createAbortError();
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
onAbort = () => {
removeListeners();
if (rheaSession.is_open()) {
// This scenario *shouldn't* be possible because if `is_open()` returns true,
// our `onOpen` handler should have executed and removed this abort listener.
// This is a 'just in case' check in case the operation was cancelled sometime
// between when the session's state was updated and when the sessionOpen
// event was handled.
rheaSession.close();
}
else if (!rheaSession.is_closed()) {
// If the rheaSession isn't closed, then it's possible the peer will still
// attempt to begin the session.
// We can detect that if it occurs and close our session.
rheaSession.once(rhea_1.SessionEvents.sessionOpen, () => {
rheaSession.close();
});
}
return rejectOnAbort();
};
if (abortSignal.aborted) {
// Exit early before we do any work.
return rejectOnAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
const rheaSession = this._connection.create_session();

@@ -329,4 +365,2 @@ const session = new session_1.Session(this, rheaSession);

let onDisconnected;
let onAbort;
const abortSignal = options && options.abortSignal;
let waitTimer;

@@ -361,9 +395,2 @@ const removeListeners = () => {

};
onAbort = () => {
removeListeners();
rheaSession.close();
const err = utils_1.createAbortError();
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -382,10 +409,2 @@ removeListeners();

rheaSession.begin();
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -392,0 +411,0 @@ }

@@ -234,2 +234,38 @@ "use strict";

}
const abortSignal = options && options.abortSignal;
let onAbort;
if (abortSignal) {
const rejectOnAbort = () => {
const err = utils_1.createAbortError();
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
onAbort = () => {
removeListeners();
if (rheaReceiver.is_open()) {
// This scenario *shouldn't* be possible because if `is_open()` returns true,
// our `onOpen` handler should have executed and removed this abort listener.
// This is a 'just in case' check in case the operation was cancelled sometime
// between when the receiver's state was updated and when the receiverOpen
// event was handled.
rheaReceiver.close();
}
else if (!rheaReceiver.is_closed()) {
// If the rheaReceiver isn't closed, then it's possible the peer will still
// attempt to attach the link and open our receiver.
// We can detect that if it occurs and close our receiver.
rheaReceiver.once(rhea_1.ReceiverEvents.receiverOpen, () => {
rheaReceiver.close();
});
}
return rejectOnAbort();
};
if (abortSignal.aborted) {
// Exit early before we do any work.
return rejectOnAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
// Register session handlers for session_error and session_close if provided.

@@ -254,4 +290,2 @@ // listeners provided by the user in the options object should be added

let onDisconnected;
let onAbort;
const abortSignal = options && options.abortSignal;
let waitTimer;

@@ -304,9 +338,2 @@ if (options && options.onMessage) {

};
onAbort = () => {
removeListeners();
rheaReceiver.close();
const err = utils_1.createAbortError();
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -324,10 +351,2 @@ removeListeners();

waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -372,2 +391,38 @@ }

return new Promise((resolve, reject) => {
const abortSignal = options && options.abortSignal;
let onAbort;
if (abortSignal) {
const rejectOnAbort = () => {
const err = utils_1.createAbortError();
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
onAbort = () => {
removeListeners();
if (rheaSender.is_open()) {
// This scenario *shouldn't* be possible because if `is_open()` returns true,
// our `onOpen` handler should have executed and removed this abort listener.
// This is a 'just in case' check in case the operation was cancelled sometime
// between when the sender's state was updated and when the senderOpen
// event was handled.
rheaSender.close();
}
else if (!rheaSender.is_closed()) {
// If the rheaSender isn't closed, then it's possible the peer will still
// attempt to attach the link and open our sender.
// We can detect that if it occurs and close our sender.
rheaSender.once(rhea_1.SenderEvents.senderOpen, () => {
rheaSender.close();
});
}
return rejectOnAbort();
};
if (abortSignal.aborted) {
// Exit early before we do any work.
return rejectOnAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
// Register session handlers for session_error and session_close if provided.

@@ -396,4 +451,2 @@ if (options && options.onSessionError) {

let onDisconnected;
let onAbort;
const abortSignal = options && options.abortSignal;
let waitTimer;

@@ -454,9 +507,2 @@ // listeners provided by the user in the options object should be added

};
onAbort = () => {
removeListeners();
rheaSender.close();
const err = utils_1.createAbortError();
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -474,10 +520,2 @@ removeListeners();

waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000);
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
}
else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -484,0 +522,0 @@ }

@@ -536,2 +536,39 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

return new Promise((resolve, reject) => {
const abortSignal = options && options.abortSignal;
let onAbort: Func<void, void>;
if (abortSignal) {
const rejectOnAbort = () => {
const err = createAbortError();
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
onAbort = () => {
removeListeners();
if (rheaSession.is_open()) {
// This scenario *shouldn't* be possible because if `is_open()` returns true,
// our `onOpen` handler should have executed and removed this abort listener.
// This is a 'just in case' check in case the operation was cancelled sometime
// between when the session's state was updated and when the sessionOpen
// event was handled.
rheaSession.close();
} else if (!rheaSession.is_closed()) {
// If the rheaSession isn't closed, then it's possible the peer will still
// attempt to begin the session.
// We can detect that if it occurs and close our session.
rheaSession.once(SessionEvents.sessionOpen, () => {
rheaSession.close();
});
}
return rejectOnAbort();
};
if (abortSignal.aborted) {
// Exit early before we do any work.
return rejectOnAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
const rheaSession = this._connection.create_session();

@@ -543,4 +580,2 @@ const session = new Session(this, rheaSession);

let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

@@ -580,10 +615,2 @@

onAbort = () => {
removeListeners();
rheaSession.close();
const err = createAbortError();
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -603,10 +630,2 @@ removeListeners();

rheaSession.begin();
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -613,0 +632,0 @@ }

@@ -278,2 +278,39 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

const abortSignal = options && options.abortSignal;
let onAbort: Func<void, void>;
if (abortSignal) {
const rejectOnAbort = () => {
const err = createAbortError();
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
onAbort = () => {
removeListeners();
if (rheaReceiver.is_open()) {
// This scenario *shouldn't* be possible because if `is_open()` returns true,
// our `onOpen` handler should have executed and removed this abort listener.
// This is a 'just in case' check in case the operation was cancelled sometime
// between when the receiver's state was updated and when the receiverOpen
// event was handled.
rheaReceiver.close();
} else if (!rheaReceiver.is_closed()) {
// If the rheaReceiver isn't closed, then it's possible the peer will still
// attempt to attach the link and open our receiver.
// We can detect that if it occurs and close our receiver.
rheaReceiver.once(ReceiverEvents.receiverOpen, () => {
rheaReceiver.close();
});
}
return rejectOnAbort();
};
if (abortSignal.aborted) {
// Exit early before we do any work.
return rejectOnAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
// Register session handlers for session_error and session_close if provided.

@@ -299,4 +336,2 @@ // listeners provided by the user in the options object should be added

let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

@@ -361,10 +396,2 @@

onAbort = () => {
removeListeners();
rheaReceiver.close();
const err = createAbortError();
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -383,10 +410,2 @@ removeListeners();

waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000);
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -436,2 +455,39 @@ }

return new Promise((resolve, reject) => {
const abortSignal = options && options.abortSignal;
let onAbort: Func<void, void>;
if (abortSignal) {
const rejectOnAbort = () => {
const err = createAbortError();
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
onAbort = () => {
removeListeners();
if (rheaSender.is_open()) {
// This scenario *shouldn't* be possible because if `is_open()` returns true,
// our `onOpen` handler should have executed and removed this abort listener.
// This is a 'just in case' check in case the operation was cancelled sometime
// between when the sender's state was updated and when the senderOpen
// event was handled.
rheaSender.close();
} else if (!rheaSender.is_closed()) {
// If the rheaSender isn't closed, then it's possible the peer will still
// attempt to attach the link and open our sender.
// We can detect that if it occurs and close our sender.
rheaSender.once(SenderEvents.senderOpen, () => {
rheaSender.close();
});
}
return rejectOnAbort();
};
if (abortSignal.aborted) {
// Exit early before we do any work.
return rejectOnAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
// Register session handlers for session_error and session_close if provided.

@@ -461,4 +517,2 @@ if (options && options.onSessionError) {

let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

@@ -525,10 +579,2 @@

onAbort = () => {
removeListeners();
rheaSender.close();
const err = createAbortError();
log.error("[%s] [%s]", this.connection.id, err.message);
return reject(err);
};
const actionAfterTimeout = () => {

@@ -547,10 +593,2 @@ removeListeners();

waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000);
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
});

@@ -557,0 +595,0 @@ }

{
"name": "rhea-promise",
"version": "1.2.0",
"version": "1.2.1",
"description": "A Promisified layer over rhea AMQP client",

@@ -5,0 +5,0 @@ "license": "Apache-2.0",

@@ -454,2 +454,4 @@ import * as rhea from "rhea";

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
const sessionMap = (connection["_connection"] as any)["local_channel_map"];
assert.deepEqual(sessionMap, {});
await connection.close();

@@ -480,2 +482,13 @@ });

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
const sessionMap = (connection["_connection"] as any)["local_channel_map"];
// There should be at most 1 session.
const [sessionName] = Object.keys(sessionMap);
const session = sessionName && sessionMap[sessionName];
if (!session.is_closed()) {
await new Promise(resolve => {
session.once(rhea.SessionEvents.sessionClose, resolve);
});
}
assert.isTrue(session.is_closed(), "Session should be closed.");
assert.deepEqual(sessionMap, {});
await connection.close();

@@ -482,0 +495,0 @@ });

@@ -276,2 +276,3 @@ import * as rhea from "rhea";

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
assert.isUndefined(extractLink(session), "Expected the session to not have any links.")
await connection.close();

@@ -295,2 +296,4 @@ });

const link = extractLink(session)!;
let abortErrorThrown = false;

@@ -304,2 +307,12 @@ try {

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
assert.isFalse(link.is_open(), "Link should not be open.");
// Cancelling link creation should guarantee that the underlying
// link is closed and removed from the session.
if (!link.is_closed()) {
await new Promise(resolve => {
link.once(rhea.SenderEvents.senderClose, resolve);
});
}
assert.isTrue(link.is_closed(), "Link should be closed.");
assert.isUndefined(extractLink(session), "Expected the session to not have any links.")
await connection.close();

@@ -331,2 +344,3 @@ });

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
assert.isUndefined(extractLink(session), "Expected the session to not have any links.")
await connection.close();

@@ -350,2 +364,4 @@ });

const link = extractLink(session)!;
let abortErrorThrown = false;

@@ -359,2 +375,12 @@ try {

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
assert.isFalse(link.is_open(), "Link should not be open.");
// Cancelling link creation should guarantee that the underlying
// link is closed and removed from the session.
if (!link.is_closed()) {
await new Promise(resolve => {
link.once(rhea.SenderEvents.senderClose, resolve);
});
}
assert.isTrue(link.is_closed(), "Link should be closed.");
assert.isUndefined(extractLink(session), "Expected the session to not have any links.")
await connection.close();

@@ -386,2 +412,3 @@ });

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
assert.isUndefined(extractLink(session), "Expected the session to not have any links.")
await connection.close();

@@ -404,2 +431,4 @@ });

abortController.abort();
const link = extractLink(session)!;

@@ -414,5 +443,20 @@ let abortErrorThrown = false;

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
assert.isFalse(link.is_open(), "Link should not be open.");
// Cancelling link creation should guarantee that the underlying
// link is closed and removed from the session.
if (!link.is_closed()) {
await new Promise(resolve => {
link.once(rhea.ReceiverEvents.receiverClose, resolve);
});
}
assert.isTrue(link.is_closed(), "Link should be closed.");
assert.isUndefined(extractLink(session), "Expected the session to not have any links.")
await connection.close();
});
});
function extractLink(session: Session) {
return session["_session"].find_link(() => true);
}
});

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc